mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-11 18:05:58 +00:00
Add runtime setup for enabled IM channels
This commit is contained in:
@@ -179,6 +179,13 @@ class ChannelService:
|
||||
|
||||
return await self._start_channel(name, config)
|
||||
|
||||
async def configure_channel(self, name: str, config: dict[str, Any]) -> bool:
|
||||
"""Apply runtime config for a channel and restart it if the service is running."""
|
||||
self._config[name] = dict(config)
|
||||
if not self._running:
|
||||
return True
|
||||
return await self.restart_channel(name)
|
||||
|
||||
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
||||
"""Instantiate and start a single channel."""
|
||||
import_path = _CHANNEL_REGISTRY.get(name)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import secrets
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
@@ -14,10 +15,18 @@ from deerflow.persistence.channel_connections import ChannelConnectionRepository
|
||||
from deerflow.persistence.engine import get_session_factory
|
||||
|
||||
router = APIRouter(prefix="/api/channels", tags=["channel-connections"])
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_STATE_TTL_SECONDS = 600
|
||||
|
||||
|
||||
class ChannelCredentialFieldResponse(BaseModel):
|
||||
name: str
|
||||
label: str
|
||||
type: str = "text"
|
||||
required: bool = True
|
||||
|
||||
|
||||
class ChannelProviderResponse(BaseModel):
|
||||
provider: str
|
||||
display_name: str
|
||||
@@ -27,6 +36,7 @@ class ChannelProviderResponse(BaseModel):
|
||||
unavailable_reason: str | None = None
|
||||
auth_mode: str
|
||||
connection_status: str
|
||||
credential_fields: list[ChannelCredentialFieldResponse] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ChannelProvidersResponse(BaseModel):
|
||||
@@ -59,6 +69,10 @@ class ChannelConnectResponse(BaseModel):
|
||||
expires_in: int
|
||||
|
||||
|
||||
class ChannelRuntimeConfigRequest(BaseModel):
|
||||
values: dict[str, str] = Field(default_factory=dict)
|
||||
|
||||
|
||||
_PROVIDER_META: dict[str, dict[str, str]] = {
|
||||
"telegram": {"display_name": "Telegram", "auth_mode": "deep_link"},
|
||||
"slack": {"display_name": "Slack", "auth_mode": "binding_code"},
|
||||
@@ -69,6 +83,31 @@ _PROVIDER_META: dict[str, dict[str, str]] = {
|
||||
"wecom": {"display_name": "WeCom", "auth_mode": "binding_code"},
|
||||
}
|
||||
|
||||
_CREDENTIAL_FIELDS: dict[str, tuple[dict[str, str], ...]] = {
|
||||
"telegram": (
|
||||
{"name": "bot_token", "label": "Bot token", "type": "password"},
|
||||
{"name": "bot_username", "label": "Bot username", "type": "text"},
|
||||
),
|
||||
"slack": (
|
||||
{"name": "bot_token", "label": "Bot token", "type": "password"},
|
||||
{"name": "app_token", "label": "App token", "type": "password"},
|
||||
),
|
||||
"discord": ({"name": "bot_token", "label": "Bot token", "type": "password"},),
|
||||
"feishu": (
|
||||
{"name": "app_id", "label": "App ID", "type": "text"},
|
||||
{"name": "app_secret", "label": "App secret", "type": "password"},
|
||||
),
|
||||
"dingtalk": (
|
||||
{"name": "client_id", "label": "Client ID", "type": "text"},
|
||||
{"name": "client_secret", "label": "Client secret", "type": "password"},
|
||||
),
|
||||
"wechat": ({"name": "bot_token", "label": "Bot token", "type": "password"},),
|
||||
"wecom": (
|
||||
{"name": "bot_id", "label": "Bot ID", "type": "text"},
|
||||
{"name": "bot_secret", "label": "Bot secret", "type": "password"},
|
||||
),
|
||||
}
|
||||
|
||||
_RUNTIME_REQUIREMENTS: dict[str, tuple[str, ...]] = {
|
||||
"telegram": ("bot_token",),
|
||||
"slack": ("bot_token", "app_token"),
|
||||
@@ -140,8 +179,9 @@ def _runtime_channel_configured(provider: str, channels_config: dict[str, Any])
|
||||
|
||||
|
||||
def _runtime_unavailable_reason(provider: str) -> str:
|
||||
keys = " and ".join(f"channels.{provider}.{key}" for key in _RUNTIME_REQUIREMENTS[provider])
|
||||
return f"Enable and configure channels.{provider} with {keys}."
|
||||
meta = _PROVIDER_META.get(provider)
|
||||
display_name = meta["display_name"] if meta else provider
|
||||
return f"Enter the required {display_name} credentials to connect this channel."
|
||||
|
||||
|
||||
def _provider_unavailable_reason(
|
||||
@@ -153,9 +193,7 @@ def _provider_unavailable_reason(
|
||||
if not provider_config.enabled:
|
||||
return None
|
||||
if not provider_config.configured:
|
||||
if provider == "telegram":
|
||||
return "Configure channel_connections.telegram.bot_username for Telegram deep links."
|
||||
return f"Configure channel_connections.{provider}."
|
||||
return _runtime_unavailable_reason(provider)
|
||||
if not _runtime_channel_configured(provider, channels_config):
|
||||
return _runtime_unavailable_reason(provider)
|
||||
return None
|
||||
@@ -231,6 +269,62 @@ def _newest_connection_by_provider(connections: list[dict[str, Any]]) -> dict[st
|
||||
return by_provider
|
||||
|
||||
|
||||
def _credential_fields(provider: str) -> list[ChannelCredentialFieldResponse]:
|
||||
fields = _CREDENTIAL_FIELDS.get(provider)
|
||||
if fields is None:
|
||||
raise HTTPException(status_code=404, detail="Unknown channel provider")
|
||||
return [ChannelCredentialFieldResponse(**field) for field in fields]
|
||||
|
||||
|
||||
def _provider_response(
|
||||
config: ChannelConnectionsConfig,
|
||||
channels_config: dict[str, Any],
|
||||
provider: str,
|
||||
meta: dict[str, str],
|
||||
connection: dict[str, Any] | None = None,
|
||||
) -> ChannelProviderResponse:
|
||||
status, unavailable_reason = _provider_status(config, channels_config, provider)
|
||||
return ChannelProviderResponse(
|
||||
provider=provider,
|
||||
display_name=meta["display_name"],
|
||||
enabled=status["enabled"],
|
||||
configured=status["configured"],
|
||||
connectable=status["enabled"] and status["configured"] and unavailable_reason is None,
|
||||
unavailable_reason=unavailable_reason,
|
||||
auth_mode=meta["auth_mode"],
|
||||
connection_status=connection["status"] if connection else "not_connected",
|
||||
credential_fields=_credential_fields(provider),
|
||||
)
|
||||
|
||||
|
||||
def _required_runtime_values(provider: str, values: dict[str, str]) -> dict[str, str]:
|
||||
fields = _credential_fields(provider)
|
||||
cleaned: dict[str, str] = {}
|
||||
missing: list[str] = []
|
||||
for field in fields:
|
||||
raw_value = values.get(field.name, "")
|
||||
value = raw_value.strip() if isinstance(raw_value, str) else str(raw_value or "").strip()
|
||||
if field.required and not value:
|
||||
missing.append(field.label)
|
||||
cleaned[field.name] = value
|
||||
if missing:
|
||||
raise HTTPException(status_code=400, detail=f"Missing required channel configuration: {', '.join(missing)}")
|
||||
return cleaned
|
||||
|
||||
|
||||
async def _restart_runtime_channel_if_available(provider: str, runtime_config: dict[str, Any]) -> bool | None:
|
||||
try:
|
||||
from app.channels.service import get_channel_service
|
||||
except Exception:
|
||||
logger.exception("Failed to import channel service while configuring %s", provider)
|
||||
return None
|
||||
|
||||
service = get_channel_service()
|
||||
if service is None:
|
||||
return None
|
||||
return await service.configure_channel(provider, runtime_config)
|
||||
|
||||
|
||||
@router.get("/providers", response_model=ChannelProvidersResponse)
|
||||
async def get_channel_providers(request: Request) -> ChannelProvidersResponse:
|
||||
config = _get_channel_connections_config(request)
|
||||
@@ -248,20 +342,10 @@ async def get_channel_providers(request: Request) -> ChannelProvidersResponse:
|
||||
|
||||
providers: list[ChannelProviderResponse] = []
|
||||
for provider, meta in _PROVIDER_META.items():
|
||||
status, unavailable_reason = _provider_status(config, channels_config, provider)
|
||||
if not config.provider_status(provider)["enabled"]:
|
||||
continue
|
||||
connection = by_provider.get(provider)
|
||||
providers.append(
|
||||
ChannelProviderResponse(
|
||||
provider=provider,
|
||||
display_name=meta["display_name"],
|
||||
enabled=status["enabled"],
|
||||
configured=status["configured"],
|
||||
connectable=status["enabled"] and status["configured"] and unavailable_reason is None,
|
||||
unavailable_reason=unavailable_reason,
|
||||
auth_mode=meta["auth_mode"],
|
||||
connection_status=connection["status"] if connection else "not_connected",
|
||||
)
|
||||
)
|
||||
providers.append(_provider_response(config, channels_config, provider, meta, connection))
|
||||
return ChannelProvidersResponse(enabled=config.enabled, providers=providers)
|
||||
|
||||
|
||||
@@ -320,3 +404,41 @@ async def connect_channel_provider(provider: str, request: Request) -> ChannelCo
|
||||
instruction=_connect_instruction(provider, code),
|
||||
expires_in=_STATE_TTL_SECONDS,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/{provider}/runtime-config", response_model=ChannelProviderResponse)
|
||||
async def configure_channel_provider_runtime(
|
||||
provider: str,
|
||||
body: ChannelRuntimeConfigRequest,
|
||||
request: Request,
|
||||
) -> ChannelProviderResponse:
|
||||
config = _get_channel_connections_config(request)
|
||||
if not config.enabled:
|
||||
raise HTTPException(status_code=400, detail="Channel connections are disabled")
|
||||
|
||||
provider_config = _provider_config(config, provider)
|
||||
if not provider_config.enabled:
|
||||
raise HTTPException(status_code=400, detail="Channel provider is not enabled")
|
||||
|
||||
values = _required_runtime_values(provider, body.values)
|
||||
channels_config = _get_channels_config(request)
|
||||
existing = channels_config.get(provider)
|
||||
runtime_config = dict(existing) if isinstance(existing, dict) else {}
|
||||
runtime_config["enabled"] = True
|
||||
|
||||
for key in _RUNTIME_REQUIREMENTS[provider]:
|
||||
runtime_config[key] = values[key]
|
||||
|
||||
if provider == "telegram":
|
||||
provider_config.bot_username = values["bot_username"]
|
||||
request.app.state.channel_connections_config = config
|
||||
|
||||
channels_config[provider] = runtime_config
|
||||
request.app.state.channels_config = channels_config
|
||||
|
||||
started = await _restart_runtime_channel_if_available(provider, runtime_config)
|
||||
if started is False:
|
||||
display_name = _PROVIDER_META[provider]["display_name"]
|
||||
raise HTTPException(status_code=400, detail=f"Failed to start {display_name} channel. Check the values and try again.")
|
||||
|
||||
return _provider_response(config, channels_config, provider, _PROVIDER_META[provider])
|
||||
|
||||
Reference in New Issue
Block a user