mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-13 10:55:59 +00:00
@@ -135,6 +135,28 @@ class ChannelService:
|
|||||||
self._running = False
|
self._running = False
|
||||||
logger.info("ChannelService stopped")
|
logger.info("ChannelService stopped")
|
||||||
|
|
||||||
|
def _load_channel_config(self, name: str) -> dict[str, Any] | None:
|
||||||
|
"""Load the latest config for a specific channel from disk.
|
||||||
|
|
||||||
|
Uses ``get_app_config()`` which detects file changes via mtime,
|
||||||
|
so edits to ``config.yaml`` are picked up without a process restart.
|
||||||
|
Falls back to the cached ``self._config`` when config loading fails.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from deerflow.config.app_config import get_app_config
|
||||||
|
|
||||||
|
app_config = get_app_config()
|
||||||
|
extra = app_config.model_extra or {}
|
||||||
|
channels_config = extra.get("channels", {})
|
||||||
|
channel_config = channels_config.get(name)
|
||||||
|
if isinstance(channel_config, dict):
|
||||||
|
# Update the cached config so get_status() stays consistent.
|
||||||
|
self._config[name] = channel_config
|
||||||
|
return channel_config
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to reload config for channel %s, using cached version", name)
|
||||||
|
return self._config.get(name)
|
||||||
|
|
||||||
async def restart_channel(self, name: str) -> bool:
|
async def restart_channel(self, name: str) -> bool:
|
||||||
"""Restart a specific channel. Returns True if successful."""
|
"""Restart a specific channel. Returns True if successful."""
|
||||||
if name in self._channels:
|
if name in self._channels:
|
||||||
@@ -144,11 +166,15 @@ class ChannelService:
|
|||||||
logger.exception("Error stopping channel %s for restart", name)
|
logger.exception("Error stopping channel %s for restart", name)
|
||||||
del self._channels[name]
|
del self._channels[name]
|
||||||
|
|
||||||
config = self._config.get(name)
|
config = self._load_channel_config(name)
|
||||||
if not config or not isinstance(config, dict):
|
if not config or not isinstance(config, dict):
|
||||||
logger.warning("No config for channel %s", name)
|
logger.warning("No config for channel %s", name)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
if not config.get("enabled", False):
|
||||||
|
logger.info("Channel %s is disabled, skipping restart", name)
|
||||||
|
return True
|
||||||
|
|
||||||
return await self._start_channel(name, config)
|
return await self._start_channel(name, config)
|
||||||
|
|
||||||
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
||||||
|
|||||||
@@ -3234,6 +3234,151 @@ class TestChannelService:
|
|||||||
warning_records = [r for r in caplog.records if "telegram" in r.message and r.levelno == logging.WARNING]
|
warning_records = [r for r in caplog.records if "telegram" in r.message and r.levelno == logging.WARNING]
|
||||||
assert not warning_records
|
assert not warning_records
|
||||||
|
|
||||||
|
# -- restart_channel config reload tests (issue #3497) --
|
||||||
|
|
||||||
|
def test_restart_channel_reloads_config_from_disk(self, monkeypatch):
|
||||||
|
"""restart_channel reads the latest config via get_app_config()."""
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
initial_config = {"feishu": {"enabled": True, "app_id": "old_id", "app_secret": "old_secret"}}
|
||||||
|
updated_config = {"feishu": {"enabled": True, "app_id": "new_id", "app_secret": "new_secret"}}
|
||||||
|
|
||||||
|
service = ChannelService(channels_config=initial_config)
|
||||||
|
|
||||||
|
def mock_get_app_config():
|
||||||
|
return SimpleNamespace(model_extra={"channels": updated_config})
|
||||||
|
|
||||||
|
monkeypatch.setattr("deerflow.config.app_config.get_app_config", mock_get_app_config)
|
||||||
|
|
||||||
|
started_configs = {}
|
||||||
|
|
||||||
|
async def mock_start_channel(name, config):
|
||||||
|
started_configs[name] = config
|
||||||
|
return True
|
||||||
|
|
||||||
|
service._start_channel = mock_start_channel
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
await service.restart_channel("feishu")
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
assert started_configs["feishu"]["app_id"] == "new_id"
|
||||||
|
assert started_configs["feishu"]["app_secret"] == "new_secret"
|
||||||
|
assert service._config["feishu"]["app_id"] == "new_id"
|
||||||
|
|
||||||
|
def test_restart_channel_falls_back_to_cached_config_on_error(self, monkeypatch):
|
||||||
|
"""When get_app_config() fails, restart_channel uses cached config."""
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
cached_config = {"feishu": {"enabled": True, "app_id": "cached_id", "app_secret": "cached_secret"}}
|
||||||
|
service = ChannelService(channels_config=cached_config)
|
||||||
|
|
||||||
|
def _raise():
|
||||||
|
raise RuntimeError("config missing")
|
||||||
|
|
||||||
|
monkeypatch.setattr("deerflow.config.app_config.get_app_config", _raise)
|
||||||
|
|
||||||
|
started_configs = {}
|
||||||
|
|
||||||
|
async def mock_start_channel(name, config):
|
||||||
|
started_configs[name] = config
|
||||||
|
return True
|
||||||
|
|
||||||
|
service._start_channel = mock_start_channel
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
await service.restart_channel("feishu")
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
assert started_configs["feishu"]["app_id"] == "cached_id"
|
||||||
|
|
||||||
|
def test_restart_channel_returns_false_for_unknown_channel(self):
|
||||||
|
"""restart_channel returns False when the channel has no config."""
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
service = ChannelService(channels_config={})
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
result = await service.restart_channel("nonexistent")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_restart_channel_stops_existing_channel_before_restart(self):
|
||||||
|
"""restart_channel stops the running channel instance before restarting."""
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
service = ChannelService(channels_config={"feishu": {"enabled": True, "app_id": "x", "app_secret": "y"}})
|
||||||
|
|
||||||
|
stopped = []
|
||||||
|
|
||||||
|
class FakeChannel:
|
||||||
|
is_running = True
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
stopped.append(True)
|
||||||
|
|
||||||
|
service._channels["feishu"] = FakeChannel()
|
||||||
|
|
||||||
|
started_configs = {}
|
||||||
|
|
||||||
|
async def mock_start_channel(name, config):
|
||||||
|
started_configs[name] = config
|
||||||
|
return True
|
||||||
|
|
||||||
|
service._start_channel = mock_start_channel
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
await service.restart_channel("feishu")
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
assert stopped
|
||||||
|
assert "feishu" in started_configs
|
||||||
|
|
||||||
|
def test_restart_channel_skips_disabled_channel(self, monkeypatch):
|
||||||
|
"""restart_channel stops the channel and returns True when config has enabled: false."""
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
service = ChannelService(channels_config={"feishu": {"enabled": True, "app_id": "x", "app_secret": "y"}})
|
||||||
|
|
||||||
|
stopped = []
|
||||||
|
|
||||||
|
class FakeChannel:
|
||||||
|
is_running = True
|
||||||
|
|
||||||
|
async def stop(self):
|
||||||
|
stopped.append(True)
|
||||||
|
|
||||||
|
service._channels["feishu"] = FakeChannel()
|
||||||
|
|
||||||
|
# Simulate config.yaml updated to enabled: false
|
||||||
|
disabled_config = {"feishu": {"enabled": False, "app_id": "x", "app_secret": "y"}}
|
||||||
|
|
||||||
|
def mock_get_app_config():
|
||||||
|
return SimpleNamespace(model_extra={"channels": disabled_config})
|
||||||
|
|
||||||
|
monkeypatch.setattr("deerflow.config.app_config.get_app_config", mock_get_app_config)
|
||||||
|
|
||||||
|
started = []
|
||||||
|
|
||||||
|
async def mock_start_channel(name, config):
|
||||||
|
started.append(name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
service._start_channel = mock_start_channel
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
result = await service.restart_channel("feishu")
|
||||||
|
assert result is True # successfully stopped (no restart needed)
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
assert stopped # old channel was stopped
|
||||||
|
assert not started # _start_channel was NOT called
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Slack send retry tests
|
# Slack send retry tests
|
||||||
|
|||||||
Reference in New Issue
Block a user