diff --git a/README.md b/README.md index 1dcea8cfd..7015cb159 100644 --- a/README.md +++ b/README.md @@ -343,7 +343,7 @@ See the [MCP Server Guide](backend/docs/MCP_SERVER.md) for detailed instructions DeerFlow supports receiving tasks from messaging apps. Channels auto-start when configured — no public IP required for any of them. -DeerFlow can also expose user-owned IM channel connections in the workspace UI. When `channel_connections` is enabled, logged-in users can bind Telegram, Slack, or Discord from the sidebar / Settings > Channels. It reuses the existing outbound `channels.*` transports, so no public IP or provider callback URL is required. Incoming IM messages then run under the connected DeerFlow user account. See [IM Channel Connections](backend/docs/IM_CHANNEL_CONNECTIONS.md) for setup and security notes. +DeerFlow can also expose user-owned IM channel connections in the workspace UI. When `channel_connections` is enabled, logged-in users can bind Telegram, Slack, Discord, Feishu/Lark, DingTalk, WeChat, or WeCom from the sidebar / Settings > Channels. It reuses the existing outbound `channels.*` transports, so no public IP or provider callback URL is required. Incoming IM messages then run under the connected DeerFlow user account. See [IM Channel Connections](backend/docs/IM_CHANNEL_CONNECTIONS.md) for setup and security notes. | Channel | Transport | Difficulty | |---------|-----------|------------| diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 607ebfafe..938d97b90 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -404,11 +404,11 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, Discord, DingTalk **User-owned channel connections** (`config.yaml` -> `channel_connections`): - Disabled by default. It is a user-binding layer on top of the existing `channels.*` runtime config, not a replacement for provider bot credentials. - No public IP, OAuth callback URL, or provider webhook route is required by the current implementation. -- Telegram uses a deep-link `/start ` flow over the existing long-polling worker. Slack uses `/connect ` over the existing Socket Mode worker. Discord uses `/connect ` over the existing Gateway worker. +- Telegram uses a deep-link `/start ` flow over the existing long-polling worker. Slack, Discord, Feishu/Lark, DingTalk, WeChat, and WeCom use `/connect ` over their existing outbound channel workers. - Frontend APIs: `GET /api/channels/providers`, `GET /api/channels/connections`, `POST /api/channels/{provider}/connect`, and `DELETE /api/channels/connections/{connection_id}`. - Browser APIs remain protected by normal Gateway auth/CSRF. Provider messages arrive through the already-configured channel workers. - Slack replies use the configured operator bot token from `channels.slack` unless a future provider-token flow stores per-connection credentials. -- Telegram, Slack, and Discord workers resolve incoming platform identities to connection records before reaching `ChannelManager`. +- Telegram, Slack, Discord, Feishu/Lark, DingTalk, WeChat, and WeCom workers resolve incoming platform identities to connection records before reaching `ChannelManager`. - See `backend/docs/IM_CHANNEL_CONNECTIONS.md` for provider setup and operational notes. diff --git a/backend/app/channels/dingtalk.py b/backend/app/channels/dingtalk.py index fb53ce272..85bbc30e5 100644 --- a/backend/app/channels/dingtalk.py +++ b/backend/app/channels/dingtalk.py @@ -14,7 +14,8 @@ from typing import Any import httpx from app.channels.base import Channel -from app.channels.commands import is_known_channel_command +from app.channels.commands import extract_connect_code, is_known_channel_command +from app.channels.connection_identity import attach_connection_identity from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) @@ -136,6 +137,7 @@ class DingTalkChannel(Channel): self._incoming_messages: dict[str, Any] = {} self._incoming_messages_lock = threading.Lock() self._card_repliers: dict[str, Any] = {} + self._connection_repo = config.get("connection_repo") @property def supports_streaming(self) -> bool: @@ -395,6 +397,24 @@ class DingTalkChannel(Channel): text[:100], ) + connect_code = extract_connect_code(text) + if connect_code and self._connection_repo is not None: + if self._main_loop and self._main_loop.is_running(): + fut = asyncio.run_coroutine_threadsafe( + self._bind_connection_from_connect_code( + conversation_type=conversation_type, + sender_staff_id=sender_staff_id, + sender_nick=sender_nick, + conversation_id=conversation_id, + code=connect_code, + ), + self._main_loop, + ) + fut.add_done_callback(lambda f, mid=msg_id: self._log_future_error(f, "bind_connection", mid)) + else: + logger.warning("[DingTalk] main loop not running, cannot bind channel connection") + return + if _is_dingtalk_command(text): msg_type = InboundMessageType.COMMAND else: @@ -450,11 +470,95 @@ class DingTalkChannel(Channel): return "" async def _prepare_inbound(self, chat_id: str, inbound: InboundMessage) -> None: + inbound = await self._attach_connection_identity(inbound) # Running reply must finish before publish_inbound so AI card tracks are # registered before the manager emits streaming outbounds. await self._send_running_reply(chat_id, inbound) await self.bus.publish_inbound(inbound) + @staticmethod + def _connection_workspace_id(conversation_type: str, conversation_id: str) -> str | None: + if conversation_type == _CONVERSATION_TYPE_GROUP and conversation_id: + return conversation_id + return None + + async def _attach_connection_identity(self, inbound: InboundMessage) -> InboundMessage: + conversation_type = str(inbound.metadata.get("conversation_type") or _CONVERSATION_TYPE_P2P) + conversation_id = str(inbound.metadata.get("conversation_id") or "") + return await attach_connection_identity( + inbound, + repo=self._connection_repo, + provider="dingtalk", + workspace_id=self._connection_workspace_id(conversation_type, conversation_id), + fallback_without_workspace=True, + ) + + async def _bind_connection_from_connect_code( + self, + *, + conversation_type: str, + sender_staff_id: str, + sender_nick: str, + conversation_id: str, + code: str, + ) -> bool: + if self._connection_repo is None or not code: + return False + + state = await self._connection_repo.consume_oauth_state(provider="dingtalk", state=code) + if state is None: + await self._send_connection_reply( + conversation_type, + sender_staff_id, + conversation_id, + "DingTalk connection code is invalid or expired.", + ) + return True + + if not sender_staff_id: + await self._send_connection_reply( + conversation_type, + sender_staff_id, + conversation_id, + "DingTalk connection could not be completed from this message.", + ) + return True + + await self._connection_repo.upsert_connection( + owner_user_id=state["owner_user_id"], + provider="dingtalk", + external_account_id=sender_staff_id, + external_account_name=sender_nick or None, + workspace_id=self._connection_workspace_id(conversation_type, conversation_id), + metadata={ + "conversation_type": conversation_type, + "conversation_id": conversation_id, + }, + status="connected", + ) + await self._send_connection_reply( + conversation_type, + sender_staff_id, + conversation_id, + "DingTalk connected to DeerFlow.", + ) + return True + + async def _send_connection_reply( + self, + conversation_type: str, + sender_staff_id: str, + conversation_id: str, + text: str, + ) -> None: + robot_code = self._client_id + if conversation_type == _CONVERSATION_TYPE_GROUP: + if conversation_id: + await self._send_text_message_to_group(robot_code, conversation_id, text) + return + if sender_staff_id: + await self._send_text_message_to_user(robot_code, sender_staff_id, text) + async def _send_running_reply(self, chat_id: str, inbound: InboundMessage) -> None: conversation_type = inbound.metadata.get("conversation_type", _CONVERSATION_TYPE_P2P) sender_staff_id = inbound.metadata.get("sender_staff_id", "") diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index fddbc7186..094d24f58 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -11,7 +11,8 @@ import time from typing import Any, Literal from app.channels.base import Channel -from app.channels.commands import is_known_channel_command +from app.channels.commands import extract_connect_code, is_known_channel_command +from app.channels.connection_identity import attach_connection_identity from app.channels.message_bus import ( PENDING_CLARIFICATION_METADATA_KEY, RESOLVED_FROM_PENDING_CLARIFICATION_METADATA_KEY, @@ -71,6 +72,7 @@ class FeishuChannel(Channel): self._CreateImageRequestBody = None self._GetMessageResourceRequest = None self._thread_lock = threading.Lock() + self._connection_repo = config.get("connection_repo") @staticmethod def _non_empty_str(value: Any) -> str | None: @@ -726,11 +728,47 @@ class FeishuChannel(Channel): async def _prepare_inbound(self, msg_id: str, inbound) -> None: """Kick off Feishu side effects without delaying inbound dispatch.""" + inbound = await self._attach_connection_identity(inbound) reaction_task = asyncio.create_task(self._add_reaction(msg_id, "OK")) self._track_background_task(reaction_task, name="add_reaction", msg_id=msg_id) self._ensure_running_card_started(msg_id) await self.bus.publish_inbound(inbound) + async def _attach_connection_identity(self, inbound: InboundMessage) -> InboundMessage: + return await attach_connection_identity( + inbound, + repo=self._connection_repo, + provider="feishu", + workspace_id=inbound.chat_id, + ) + + async def _bind_connection_from_connect_code(self, *, message_id: str, chat_id: str, user_id: str, code: str) -> bool: + if self._connection_repo is None or not code: + return False + + state = await self._connection_repo.consume_oauth_state(provider="feishu", state=code) + if state is None: + await self._reply_card(message_id, "Feishu connection code is invalid or expired.") + return True + + if not user_id or not chat_id: + await self._reply_card(message_id, "Feishu connection could not be completed from this message.") + return True + + await self._connection_repo.upsert_connection( + owner_user_id=state["owner_user_id"], + provider="feishu", + external_account_id=user_id, + workspace_id=chat_id, + metadata={ + "chat_id": chat_id, + "message_id": message_id, + }, + status="connected", + ) + await self._reply_card(message_id, "Feishu connected to DeerFlow.") + return True + def _on_message(self, event) -> None: """Called by lark-oapi when a message is received (runs in lark thread).""" try: @@ -819,6 +857,23 @@ class FeishuChannel(Channel): logger.info("[Feishu] empty text, ignoring message") return + connect_code = extract_connect_code(text) + if connect_code and self._connection_repo is not None: + if self._main_loop and self._main_loop.is_running(): + fut = asyncio.run_coroutine_threadsafe( + self._bind_connection_from_connect_code( + message_id=msg_id, + chat_id=chat_id, + user_id=sender_id, + code=connect_code, + ), + self._main_loop, + ) + fut.add_done_callback(lambda f, mid=msg_id: self._log_future_error(f, "bind_connection", mid)) + else: + logger.warning("[Feishu] main loop not running, cannot bind channel connection") + return + # Only treat known slash commands as commands; absolute paths and # other slash-prefixed text should be handled as normal chat. if _is_feishu_command(text): diff --git a/backend/app/channels/wechat.py b/backend/app/channels/wechat.py index 9a9ddf1a6..a605a8d2f 100644 --- a/backend/app/channels/wechat.py +++ b/backend/app/channels/wechat.py @@ -22,8 +22,9 @@ from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from app.channels.base import Channel -from app.channels.commands import is_known_channel_command -from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment +from app.channels.commands import extract_connect_code, is_known_channel_command +from app.channels.connection_identity import attach_connection_identity +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) @@ -253,6 +254,7 @@ class WechatChannel(Channel): self._state_dir = self._resolve_state_dir(config.get("state_dir")) self._cursor_path = self._state_dir / "wechat-getupdates.json" if self._state_dir else None self._auth_path = self._state_dir / "wechat-auth.json" if self._state_dir else None + self._connection_repo = config.get("connection_repo") self._load_state() async def start(self) -> None: @@ -617,6 +619,16 @@ class WechatChannel(Channel): if thread_ts: self._context_tokens_by_thread[thread_ts] = context_token + connect_code = extract_connect_code(text) + if connect_code and self._connection_repo is not None: + handled = await self._bind_connection_from_connect_code( + chat_id=chat_id, + context_token=context_token, + code=connect_code, + ) + if handled: + return + inbound = self._make_inbound( chat_id=chat_id, user_id=chat_id, @@ -632,8 +644,54 @@ class WechatChannel(Channel): }, ) inbound.topic_id = None + inbound = await self._attach_connection_identity(inbound) await self.bus.publish_inbound(inbound) + async def _attach_connection_identity(self, inbound: InboundMessage) -> InboundMessage: + return await attach_connection_identity( + inbound, + repo=self._connection_repo, + provider="wechat", + workspace_id=inbound.chat_id, + ) + + async def _bind_connection_from_connect_code(self, *, chat_id: str, context_token: str, code: str) -> bool: + if self._connection_repo is None or not code: + return False + + state = await self._connection_repo.consume_oauth_state(provider="wechat", state=code) + if state is None: + await self._send_connection_reply(chat_id, context_token, "WeChat connection code is invalid or expired.") + return True + + if not chat_id: + await self._send_connection_reply(chat_id, context_token, "WeChat connection could not be completed from this message.") + return True + + await self._connection_repo.upsert_connection( + owner_user_id=state["owner_user_id"], + provider="wechat", + external_account_id=chat_id, + workspace_id=chat_id, + metadata={ + "context_token": context_token, + }, + status="connected", + ) + await self._send_connection_reply(chat_id, context_token, "WeChat connected to DeerFlow.") + return True + + async def _send_connection_reply(self, chat_id: str, context_token: str, text: str) -> None: + if not context_token: + return + await self._send_text_message( + chat_id=chat_id, + context_token=context_token, + text=text, + client_id_prefix="deerflow-connect", + max_retries=1, + ) + async def _ensure_authenticated(self) -> bool: async with self._auth_lock: if self._bot_token: diff --git a/backend/app/channels/wecom.py b/backend/app/channels/wecom.py index 33d3cf1bb..badb0b525 100644 --- a/backend/app/channels/wecom.py +++ b/backend/app/channels/wecom.py @@ -8,8 +8,10 @@ from collections.abc import Awaitable, Callable from typing import Any, cast from app.channels.base import Channel -from app.channels.commands import is_known_channel_command +from app.channels.commands import extract_connect_code, is_known_channel_command +from app.channels.connection_identity import attach_connection_identity from app.channels.message_bus import ( + InboundMessage, InboundMessageType, MessageBus, OutboundMessage, @@ -29,6 +31,7 @@ class WeComChannel(Channel): self._ws_frames: dict[str, dict[str, Any]] = {} self._ws_stream_ids: dict[str, str] = {} self._working_message = "Working on it..." + self._connection_repo = config.get("connection_repo") @property def supports_streaming(self) -> bool: @@ -271,6 +274,16 @@ class WeComChannel(Channel): user_id = (body.get("from") or {}).get("userid") + connect_code = extract_connect_code(text) + if connect_code and self._connection_repo is not None: + handled = await self._bind_connection_from_connect_code( + frame=frame, + user_id=str(user_id or ""), + code=connect_code, + ) + if handled: + return + inbound_type = InboundMessageType.COMMAND if is_known_channel_command(text) else InboundMessageType.CHAT inbound = self._make_inbound( chat_id=user_id, # keep user's conversation in memory @@ -292,8 +305,52 @@ class WeComChannel(Channel): except Exception: pass + inbound = await self._attach_connection_identity(inbound) await self.bus.publish_inbound(inbound) + async def _attach_connection_identity(self, inbound: InboundMessage) -> InboundMessage: + return await attach_connection_identity( + inbound, + repo=self._connection_repo, + provider="wecom", + workspace_id=str(inbound.metadata.get("aibotid") or "") or None, + fallback_without_workspace=True, + ) + + async def _bind_connection_from_connect_code(self, *, frame: dict[str, Any], user_id: str, code: str) -> bool: + if self._connection_repo is None or not code: + return False + + state = await self._connection_repo.consume_oauth_state(provider="wecom", state=code) + if state is None: + await self._send_connection_reply(frame, "WeCom connection code is invalid or expired.") + return True + + if not user_id: + await self._send_connection_reply(frame, "WeCom connection could not be completed from this message.") + return True + + body = frame.get("body", {}) or {} + workspace_id = str(body.get("aibotid") or "") or None + await self._connection_repo.upsert_connection( + owner_user_id=state["owner_user_id"], + provider="wecom", + external_account_id=user_id, + workspace_id=workspace_id, + metadata={ + "aibotid": workspace_id, + "chattype": body.get("chattype"), + }, + status="connected", + ) + await self._send_connection_reply(frame, "WeCom connected to DeerFlow.") + return True + + async def _send_connection_reply(self, frame: dict[str, Any], text: str) -> None: + if not self._ws_client: + return + await self._ws_client.reply(frame, {"msgtype": "text", "text": {"content": text}}) + async def _send_ws(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: if not self._ws_client: return diff --git a/backend/app/gateway/routers/channel_connections.py b/backend/app/gateway/routers/channel_connections.py index 8981b1b86..59b6af620 100644 --- a/backend/app/gateway/routers/channel_connections.py +++ b/backend/app/gateway/routers/channel_connections.py @@ -63,12 +63,20 @@ _PROVIDER_META: dict[str, dict[str, str]] = { "telegram": {"display_name": "Telegram", "auth_mode": "deep_link"}, "slack": {"display_name": "Slack", "auth_mode": "binding_code"}, "discord": {"display_name": "Discord", "auth_mode": "binding_code"}, + "feishu": {"display_name": "Feishu", "auth_mode": "binding_code"}, + "dingtalk": {"display_name": "DingTalk", "auth_mode": "binding_code"}, + "wechat": {"display_name": "WeChat", "auth_mode": "binding_code"}, + "wecom": {"display_name": "WeCom", "auth_mode": "binding_code"}, } _RUNTIME_REQUIREMENTS: dict[str, tuple[str, ...]] = { "telegram": ("bot_token",), "slack": ("bot_token", "app_token"), "discord": ("bot_token",), + "feishu": ("app_id", "app_secret"), + "dingtalk": ("client_id", "client_secret"), + "wechat": ("bot_token",), + "wecom": ("bot_id", "bot_secret"), } @@ -187,18 +195,17 @@ async def _create_state( def _connect_instruction(provider: str, code: str) -> str: if provider == "telegram": return f"Send /start {code} to the DeerFlow Telegram bot." - if provider == "slack": - return f"Send /connect {code} to the DeerFlow Slack bot." - if provider == "discord": - return f"Send /connect {code} to the DeerFlow Discord bot." - raise HTTPException(status_code=404, detail="Unknown channel provider") + meta = _PROVIDER_META.get(provider) + if meta is None: + raise HTTPException(status_code=404, detail="Unknown channel provider") + return f"Send /connect {code} to the DeerFlow {meta['display_name']} bot." def _connect_url(config: ChannelConnectionsConfig, provider: str, code: str) -> str | None: if provider == "telegram": provider_config = _provider_config(config, provider) return f"https://t.me/{provider_config.bot_username}?start={code}" - if provider in {"slack", "discord"}: + if _PROVIDER_META.get(provider, {}).get("auth_mode") == "binding_code": return None raise HTTPException(status_code=404, detail="Unknown channel provider") diff --git a/backend/docs/IM_CHANNEL_CONNECTIONS.md b/backend/docs/IM_CHANNEL_CONNECTIONS.md index c83eb9d14..0face8c03 100644 --- a/backend/docs/IM_CHANNEL_CONNECTIONS.md +++ b/backend/docs/IM_CHANNEL_CONNECTIONS.md @@ -1,6 +1,6 @@ # IM Channel Connections -DeerFlow supports user-owned IM channel bindings for Telegram, Slack, and Discord. The feature reuses the existing `channels.*` runtime configuration, so it works in local and private deployments with the same outbound transports already supported by DeerFlow. +DeerFlow supports user-owned IM channel bindings for Telegram, Slack, Discord, Feishu/Lark, DingTalk, WeChat, and WeCom. The feature reuses the existing `channels.*` runtime configuration, so it works in local and private deployments with the same outbound transports already supported by DeerFlow. No public IP, OAuth callback URL, or provider webhook is required in this implementation. @@ -22,6 +22,25 @@ channels: discord: enabled: true bot_token: $DISCORD_BOT_TOKEN + + feishu: + enabled: true + app_id: $FEISHU_APP_ID + app_secret: $FEISHU_APP_SECRET + + dingtalk: + enabled: true + client_id: $DINGTALK_CLIENT_ID + client_secret: $DINGTALK_CLIENT_SECRET + + wechat: + enabled: true + bot_token: $WECHAT_BOT_TOKEN + + wecom: + enabled: true + bot_id: $WECOM_BOT_ID + bot_secret: $WECOM_BOT_SECRET ``` Then enable user bindings in `channel_connections`: @@ -39,6 +58,18 @@ channel_connections: discord: enabled: true + + feishu: + enabled: true + + dingtalk: + enabled: true + + wechat: + enabled: true + + wecom: + enabled: true ``` `channel_connections` does not duplicate provider secrets. It only controls the browser-facing connect UI and stores per-user binding records. Telegram needs `bot_username` only so the frontend can open a deep link. @@ -63,6 +94,12 @@ Discord: - The UI shows `Send /connect to the DeerFlow Discord bot.` - The existing Discord Gateway worker receives the message and binds the Discord user/guild to the current DeerFlow user. +Feishu/Lark, DingTalk, WeChat, and WeCom: + +- The frontend creates a short one-time code. +- The UI shows `Send /connect to the DeerFlow bot.` +- The already-running long-connection or polling worker receives the message and binds the platform user/workspace identity to the current DeerFlow user. + Codes expire after 10 minutes and are single-use. ## Runtime Model diff --git a/backend/packages/harness/deerflow/config/channel_connections_config.py b/backend/packages/harness/deerflow/config/channel_connections_config.py index 093aa7ccc..4092d5863 100644 --- a/backend/packages/harness/deerflow/config/channel_connections_config.py +++ b/backend/packages/harness/deerflow/config/channel_connections_config.py @@ -30,6 +30,14 @@ class DiscordChannelConnectionConfig(BaseModel): return True +class BindingCodeChannelConnectionConfig(BaseModel): + enabled: bool = False + + @property + def configured(self) -> bool: + return True + + class ChannelConnectionsConfig(BaseModel): """Top-level config for browser-connectable IM channels.""" @@ -37,6 +45,10 @@ class ChannelConnectionsConfig(BaseModel): slack: SlackChannelConnectionConfig = Field(default_factory=SlackChannelConnectionConfig) telegram: TelegramChannelConnectionConfig = Field(default_factory=TelegramChannelConnectionConfig) discord: DiscordChannelConnectionConfig = Field(default_factory=DiscordChannelConnectionConfig) + feishu: BindingCodeChannelConnectionConfig = Field(default_factory=BindingCodeChannelConnectionConfig) + dingtalk: BindingCodeChannelConnectionConfig = Field(default_factory=BindingCodeChannelConnectionConfig) + wechat: BindingCodeChannelConnectionConfig = Field(default_factory=BindingCodeChannelConnectionConfig) + wecom: BindingCodeChannelConnectionConfig = Field(default_factory=BindingCodeChannelConnectionConfig) def provider_status(self, provider: str) -> dict[str, bool]: config = getattr(self, provider, None) diff --git a/backend/tests/test_additional_channel_connections.py b/backend/tests/test_additional_channel_connections.py new file mode 100644 index 000000000..c4b4fdb0f --- /dev/null +++ b/backend/tests/test_additional_channel_connections.py @@ -0,0 +1,248 @@ +"""Connection binding tests for browser-connectable IM channels beyond Telegram/Slack/Discord.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from unittest.mock import AsyncMock, MagicMock + +from app.channels.message_bus import InboundMessage, MessageBus + + +async def _make_repo(tmp_path, name: str): + from deerflow.persistence.channel_connections import ChannelConnectionRepository + from deerflow.persistence.engine import get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / f'{name}.db'}", sqlite_dir=str(tmp_path)) + return ChannelConnectionRepository(get_session_factory()) + + +async def _seed_state(repo, provider: str, state: str, owner_user_id: str = "deerflow-user-1") -> None: + await repo.create_oauth_state( + owner_user_id=owner_user_id, + provider=provider, + state=state, + expires_at=datetime.now(UTC) + timedelta(minutes=5), + ) + + +def test_feishu_connect_command_binds_identity(tmp_path): + import anyio + + from app.channels.feishu import FeishuChannel + + async def go(): + repo = await _make_repo(tmp_path, "feishu") + state = "feishu-bind-code" + await _seed_state(repo, "feishu", state) + channel = FeishuChannel( + bus=MessageBus(), + config={"app_id": "app", "app_secret": "secret", "connection_repo": repo}, + ) + channel._reply_card = AsyncMock() + + handled = await channel._bind_connection_from_connect_code( + message_id="om-message-1", + chat_id="oc-chat-1", + user_id="ou-user-1", + code=state, + ) + + connections = await repo.list_connections("deerflow-user-1") + assert handled is True + assert len(connections) == 1 + assert connections[0]["provider"] == "feishu" + assert connections[0]["external_account_id"] == "ou-user-1" + assert connections[0]["workspace_id"] == "oc-chat-1" + channel._reply_card.assert_awaited_once_with("om-message-1", "Feishu connected to DeerFlow.") + await repo.close() + + anyio.run(go) + + +def test_dingtalk_connect_command_binds_identity(tmp_path): + import anyio + + from app.channels.dingtalk import _CONVERSATION_TYPE_GROUP, DingTalkChannel + + async def go(): + repo = await _make_repo(tmp_path, "dingtalk") + state = "dingtalk-bind-code" + await _seed_state(repo, "dingtalk", state) + channel = DingTalkChannel( + bus=MessageBus(), + config={"client_id": "client", "client_secret": "secret", "connection_repo": repo}, + ) + channel._send_connection_reply = AsyncMock() + + handled = await channel._bind_connection_from_connect_code( + conversation_type=_CONVERSATION_TYPE_GROUP, + sender_staff_id="staff-user-1", + sender_nick="Alice", + conversation_id="cid-group-1", + code=state, + ) + + connections = await repo.list_connections("deerflow-user-1") + assert handled is True + assert len(connections) == 1 + assert connections[0]["provider"] == "dingtalk" + assert connections[0]["external_account_id"] == "staff-user-1" + assert connections[0]["external_account_name"] == "Alice" + assert connections[0]["workspace_id"] == "cid-group-1" + channel._send_connection_reply.assert_awaited_once() + await repo.close() + + anyio.run(go) + + +def test_wechat_connect_command_binds_identity(tmp_path): + import anyio + + from app.channels.wechat import WechatChannel + + async def go(): + repo = await _make_repo(tmp_path, "wechat") + state = "wechat-bind-code" + await _seed_state(repo, "wechat", state) + channel = WechatChannel( + bus=MessageBus(), + config={"bot_token": "token", "connection_repo": repo}, + ) + channel._send_connection_reply = AsyncMock() + + handled = await channel._bind_connection_from_connect_code( + chat_id="wx-user-1", + context_token="ctx-1", + code=state, + ) + + connections = await repo.list_connections("deerflow-user-1") + assert handled is True + assert len(connections) == 1 + assert connections[0]["provider"] == "wechat" + assert connections[0]["external_account_id"] == "wx-user-1" + assert connections[0]["workspace_id"] == "wx-user-1" + channel._send_connection_reply.assert_awaited_once_with("wx-user-1", "ctx-1", "WeChat connected to DeerFlow.") + await repo.close() + + anyio.run(go) + + +def test_wecom_connect_command_binds_identity(tmp_path): + import anyio + + from app.channels.wecom import WeComChannel + + async def go(): + repo = await _make_repo(tmp_path, "wecom") + state = "wecom-bind-code" + await _seed_state(repo, "wecom", state) + channel = WeComChannel( + bus=MessageBus(), + config={"bot_id": "bot", "bot_secret": "secret", "connection_repo": repo}, + ) + channel._ws_client = MagicMock() + channel._ws_client.reply = AsyncMock() + frame = {"body": {"aibotid": "bot-1", "chattype": "single"}} + + handled = await channel._bind_connection_from_connect_code( + frame=frame, + user_id="wecom-user-1", + code=state, + ) + + connections = await repo.list_connections("deerflow-user-1") + assert handled is True + assert len(connections) == 1 + assert connections[0]["provider"] == "wecom" + assert connections[0]["external_account_id"] == "wecom-user-1" + assert connections[0]["workspace_id"] == "bot-1" + channel._ws_client.reply.assert_awaited_once_with(frame, {"msgtype": "text", "text": {"content": "WeCom connected to DeerFlow."}}) + await repo.close() + + anyio.run(go) + + +def test_additional_channels_attach_owner_identity(tmp_path): + import anyio + + from app.channels.dingtalk import _CONVERSATION_TYPE_GROUP, DingTalkChannel + from app.channels.feishu import FeishuChannel + from app.channels.wechat import WechatChannel + from app.channels.wecom import WeComChannel + + async def go(): + repo = await _make_repo(tmp_path, "additional-identity") + await repo.upsert_connection( + owner_user_id="deerflow-user-1", + provider="feishu", + external_account_id="ou-user-1", + workspace_id="oc-chat-1", + ) + await repo.upsert_connection( + owner_user_id="deerflow-user-1", + provider="dingtalk", + external_account_id="staff-user-1", + workspace_id="cid-group-1", + ) + await repo.upsert_connection( + owner_user_id="deerflow-user-1", + provider="wechat", + external_account_id="wx-user-1", + workspace_id="wx-user-1", + ) + await repo.upsert_connection( + owner_user_id="deerflow-user-1", + provider="wecom", + external_account_id="wecom-user-1", + workspace_id="bot-1", + ) + + cases = [ + ( + FeishuChannel(bus=MessageBus(), config={"connection_repo": repo}), + InboundMessage(channel_name="feishu", chat_id="oc-chat-1", user_id="ou-user-1", text="hello"), + ), + ( + DingTalkChannel(bus=MessageBus(), config={"connection_repo": repo}), + InboundMessage( + channel_name="dingtalk", + chat_id="cid-group-1", + user_id="staff-user-1", + text="hello", + metadata={ + "conversation_type": _CONVERSATION_TYPE_GROUP, + "conversation_id": "cid-group-1", + }, + ), + ), + ( + WechatChannel(bus=MessageBus(), config={"connection_repo": repo}), + InboundMessage(channel_name="wechat", chat_id="wx-user-1", user_id="wx-user-1", text="hello"), + ), + ( + WeComChannel(bus=MessageBus(), config={"connection_repo": repo}), + InboundMessage( + channel_name="wecom", + chat_id="wecom-user-1", + user_id="wecom-user-1", + text="hello", + metadata={"aibotid": "bot-1"}, + ), + ), + ] + + for channel, inbound in cases: + attached = await channel._attach_connection_identity(inbound) + assert attached.owner_user_id == "deerflow-user-1" + assert attached.connection_id + assert attached.workspace_id == { + "feishu": "oc-chat-1", + "dingtalk": "cid-group-1", + "wechat": "wx-user-1", + "wecom": "bot-1", + }[channel.name] + + await repo.close() + + anyio.run(go) diff --git a/backend/tests/test_channel_connections_config.py b/backend/tests/test_channel_connections_config.py index 5ee900a00..8a14878c0 100644 --- a/backend/tests/test_channel_connections_config.py +++ b/backend/tests/test_channel_connections_config.py @@ -10,6 +10,10 @@ def test_channel_connections_disabled_by_default(): assert config.slack.enabled is False assert config.telegram.enabled is False assert config.discord.enabled is False + assert config.feishu.enabled is False + assert config.dingtalk.enabled is False + assert config.wechat.enabled is False + assert config.wecom.enabled is False def test_enabled_channel_connections_do_not_require_public_url_or_encryption_key(): @@ -22,6 +26,10 @@ def test_enabled_channel_connections_do_not_require_public_url_or_encryption_key }, "slack": {"enabled": True}, "discord": {"enabled": True}, + "feishu": {"enabled": True}, + "dingtalk": {"enabled": True}, + "wechat": {"enabled": True}, + "wecom": {"enabled": True}, } ) @@ -29,6 +37,10 @@ def test_enabled_channel_connections_do_not_require_public_url_or_encryption_key assert config.provider_status("telegram") == {"enabled": True, "configured": True} assert config.provider_status("slack") == {"enabled": True, "configured": True} assert config.provider_status("discord") == {"enabled": True, "configured": True} + assert config.provider_status("feishu") == {"enabled": True, "configured": True} + assert config.provider_status("dingtalk") == {"enabled": True, "configured": True} + assert config.provider_status("wechat") == {"enabled": True, "configured": True} + assert config.provider_status("wecom") == {"enabled": True, "configured": True} def test_provider_status_reports_disabled_and_unknown_providers(): @@ -37,4 +49,8 @@ def test_provider_status_reports_disabled_and_unknown_providers(): assert config.provider_status("slack") == {"enabled": False, "configured": False} assert config.provider_status("telegram") == {"enabled": False, "configured": False} assert config.provider_status("discord") == {"enabled": False, "configured": False} + assert config.provider_status("feishu") == {"enabled": False, "configured": False} + assert config.provider_status("dingtalk") == {"enabled": False, "configured": False} + assert config.provider_status("wechat") == {"enabled": False, "configured": False} + assert config.provider_status("wecom") == {"enabled": False, "configured": False} assert config.provider_status("unknown") == {"enabled": False, "configured": False} diff --git a/backend/tests/test_channel_connections_router.py b/backend/tests/test_channel_connections_router.py index f2768a911..42f7fc0f3 100644 --- a/backend/tests/test_channel_connections_router.py +++ b/backend/tests/test_channel_connections_router.py @@ -45,6 +45,10 @@ def _enabled_connections_config() -> ChannelConnectionsConfig: "telegram": {"enabled": True, "bot_username": "deerflow_bot"}, "slack": {"enabled": True}, "discord": {"enabled": True}, + "feishu": {"enabled": True}, + "dingtalk": {"enabled": True}, + "wechat": {"enabled": True}, + "wecom": {"enabled": True}, } ) @@ -54,6 +58,10 @@ def _channels_config() -> dict: "telegram": {"enabled": True, "bot_token": "telegram-token"}, "slack": {"enabled": True, "bot_token": "xoxb-operator", "app_token": "xapp-operator"}, "discord": {"enabled": True, "bot_token": "discord-bot"}, + "feishu": {"enabled": True, "app_id": "feishu-app", "app_secret": "feishu-secret"}, + "dingtalk": {"enabled": True, "client_id": "dingtalk-client", "client_secret": "dingtalk-secret"}, + "wechat": {"enabled": True, "bot_token": "wechat-token"}, + "wecom": {"enabled": True, "bot_id": "wecom-bot", "bot_secret": "wecom-secret"}, } @@ -70,12 +78,21 @@ def test_get_providers_uses_existing_channels_config(tmp_path): body = response.json() assert body["enabled"] is True by_provider = {item["provider"]: item for item in body["providers"]} + assert set(by_provider) == {"telegram", "slack", "discord", "feishu", "dingtalk", "wechat", "wecom"} assert by_provider["telegram"]["configured"] is True assert by_provider["telegram"]["auth_mode"] == "deep_link" assert by_provider["slack"]["configured"] is True assert by_provider["slack"]["auth_mode"] == "binding_code" assert by_provider["discord"]["configured"] is True assert by_provider["discord"]["auth_mode"] == "binding_code" + assert by_provider["feishu"]["configured"] is True + assert by_provider["feishu"]["auth_mode"] == "binding_code" + assert by_provider["dingtalk"]["configured"] is True + assert by_provider["dingtalk"]["auth_mode"] == "binding_code" + assert by_provider["wechat"]["configured"] is True + assert by_provider["wechat"]["auth_mode"] == "binding_code" + assert by_provider["wecom"]["configured"] is True + assert by_provider["wecom"]["auth_mode"] == "binding_code" anyio.run(repo.close) @@ -97,6 +114,14 @@ def test_get_providers_reports_unconfigured_when_runtime_channel_is_missing(tmp_ assert "channels.slack" in by_provider["slack"]["unavailable_reason"] assert by_provider["discord"]["configured"] is False assert "channels.discord" in by_provider["discord"]["unavailable_reason"] + assert by_provider["feishu"]["configured"] is False + assert "channels.feishu" in by_provider["feishu"]["unavailable_reason"] + assert by_provider["dingtalk"]["configured"] is False + assert "channels.dingtalk" in by_provider["dingtalk"]["unavailable_reason"] + assert by_provider["wechat"]["configured"] is False + assert "channels.wechat" in by_provider["wechat"]["unavailable_reason"] + assert by_provider["wecom"]["configured"] is False + assert "channels.wecom" in by_provider["wecom"]["unavailable_reason"] anyio.run(repo.close) @@ -247,6 +272,39 @@ def test_connect_discord_returns_binding_command_and_persists_state(tmp_path): anyio.run(repo.close) +def test_connect_existing_binding_code_channels_return_command_and_persist_state(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + app = _make_app(_enabled_connections_config(), repo, _channels_config()) + + providers = ["feishu", "dingtalk", "wechat", "wecom"] + with TestClient(app) as client: + responses = {provider: client.post(f"/api/channels/{provider}/connect") for provider in providers} + + for provider, response in responses.items(): + expected_display_name = { + "feishu": "Feishu", + "dingtalk": "DingTalk", + "wechat": "WeChat", + "wecom": "WeCom", + }[provider] + assert response.status_code == 200 + body = response.json() + assert body["provider"] == provider + assert body["mode"] == "binding_code" + assert body["url"] is None + assert len(body["code"]) >= 22 + assert body["instruction"] == f"Send /connect {body['code']} to the DeerFlow {expected_display_name} bot." + + async def count_states(provider=provider): + return await repo.count_oauth_states(owner_user_id=str(_user().id), provider=provider) + + assert anyio.run(count_states) == 1 + + anyio.run(repo.close) + + def test_connect_unconfigured_runtime_channel_returns_400(tmp_path): import anyio diff --git a/config.example.yaml b/config.example.yaml index c58cc28fd..1a20e23fd 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -1134,16 +1134,16 @@ run_events: # ============================================================================ # User-Owned IM Channel Connections # ============================================================================ -# Lets logged-in users connect their own Telegram, Slack, and Discord accounts -# from the DeerFlow frontend while reusing the existing `channels` runtime -# configuration below. +# Lets logged-in users connect their own IM accounts from the DeerFlow frontend +# while reusing the existing `channels` runtime configuration below. # # Security notes: # - No public IP, OAuth callback URL, or provider webhook is required. # - Provider bot/app credentials stay under `channels.*`. # - `channel_connections` stores per-user bindings and one-time connect codes. # - Telegram uses a deep link when `bot_username` is configured. -# - Slack and Discord use `/connect ` through the already-running bot. +# - Slack, Discord, Feishu, DingTalk, WeChat, and WeCom use `/connect ` +# through the already-running bot/app. # # channel_connections: # enabled: false @@ -1157,6 +1157,18 @@ run_events: # # discord: # enabled: false +# +# feishu: +# enabled: false +# +# dingtalk: +# enabled: false +# +# wechat: +# enabled: false +# +# wecom: +# enabled: false # ============================================================================ # IM Channels Configuration diff --git a/frontend/src/components/workspace/channels/channel-provider-icon.tsx b/frontend/src/components/workspace/channels/channel-provider-icon.tsx index 4193231f6..1e48e65d8 100644 --- a/frontend/src/components/workspace/channels/channel-provider-icon.tsx +++ b/frontend/src/components/workspace/channels/channel-provider-icon.tsx @@ -96,6 +96,96 @@ export function ChannelProviderIcon({ ); } + if (normalizedProvider === "feishu") { + return ( + + ); + } + + if (normalizedProvider === "dingtalk") { + return ( + + ); + } + + if (normalizedProvider === "wechat") { + return ( + + ); + } + + if (normalizedProvider === "wecom") { + return ( + + ); + } + return (