diff --git a/README.md b/README.md index a093b6f10..3c36a8952 100644 --- a/README.md +++ b/README.md @@ -340,6 +340,8 @@ See the [MCP Server Guide](backend/docs/MCP_SERVER.md) for detailed instructions DeerFlow supports receiving tasks from messaging apps. Channels auto-start when configured — no public IP required for any of them. +DeerFlow can also expose user-owned IM channel connections in the workspace UI. When `channel_connections` is enabled, logged-in users can connect Telegram, Slack, or Discord from the sidebar / Settings > Channels. Provider credentials are encrypted at rest, and incoming IM messages run under the connected DeerFlow user account. See [IM Channel Connections](backend/docs/IM_CHANNEL_CONNECTIONS.md) for OAuth callback URLs, webhook setup, and security notes. + | Channel | Transport | Difficulty | |---------|-----------|------------| | Telegram | Bot API (long-polling) | Easy | diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 29a776217..088380a26 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -367,8 +367,7 @@ Proxied through nginx: `/api/langgraph/*` → Gateway LangGraph-compatible runti ### IM Channels System (`app/channels/`) -Bridges external messaging platforms (Feishu, Slack, Telegram, DingTalk) to the DeerFlow agent via Gateway's LangGraph-compatible API. - +Bridges external messaging platforms (Feishu, Slack, Telegram, Discord, DingTalk) to the DeerFlow agent via Gateway's LangGraph-compatible API. **Architecture**: Channels communicate with Gateway through the `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side. The internal SDK client injects process-local internal auth plus a matching CSRF cookie/header pair so Gateway accepts state-changing thread/run requests from channel workers without relying on browser session cookies. @@ -378,18 +377,21 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, DingTalk) to the - `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, routes commands, keeps Slack/Telegram on `client.runs.wait()`, and uses `client.runs.stream(["messages-tuple", "values"])` for Feishu incremental outbound updates - `base.py` - Abstract `Channel` base class (start/stop/send lifecycle) - `service.py` - Manages lifecycle of all configured channels from `config.yaml` -- `slack.py` / `feishu.py` / `telegram.py` / `dingtalk.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place; `dingtalk.py` optionally uses AI Card streaming for in-place updates when `card_template_id` is configured) +- `slack.py` / `feishu.py` / `telegram.py` / `discord.py` / `dingtalk.py` - Platform-specific implementations (`feishu.py` tracks the running card `message_id` in memory and patches the same card in place; `dingtalk.py` optionally uses AI Card streaming for in-place updates when `card_template_id` is configured) +- `app/gateway/routers/channel_connections.py` - Browser-facing user connection APIs plus provider callbacks/webhooks +- `deerflow.persistence.channel_connections` - SQL-backed user-owned connection, credential, OAuth state, conversation, and webhook delivery store **Message Flow**: 1. External platform -> Channel impl -> `MessageBus.publish_inbound()` 2. `ChannelManager._dispatch_loop()` consumes from queue -3. For chat: look up/create thread through Gateway's LangGraph-compatible API -4. Feishu chat: `runs.stream()` → accumulate AI text → publish multiple outbound updates (`is_final=False`) → publish final outbound (`is_final=True`) -5. Slack/Telegram chat: `runs.wait()` → extract final response → publish outbound -6. Feishu channel sends one running reply card up front, then patches the same card for each outbound update (card JSON sets `config.update_multi=true` for Feishu's patch API requirement) -7. DingTalk AI Card mode (when `card_template_id` configured): `runs.stream()` → create card with initial text → stream updates via `PUT /v1.0/card/streaming` → finalize on `is_final=True`. Falls back to `sampleMarkdown` if card creation or streaming fails -8. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API -9. Outbound → channel callbacks → platform reply +3. For user-owned channel connections, incoming messages carry `connection_id`, `owner_user_id`, and `workspace_id`; `owner_user_id` becomes the DeerFlow run `user_id`, while the raw platform user id remains `channel_user_id` +4. For chat: look up/create thread through Gateway's LangGraph-compatible API +5. Feishu chat: `runs.stream()` → accumulate AI text → publish multiple outbound updates (`is_final=False`) → publish final outbound (`is_final=True`) +6. Slack/Telegram chat: `runs.wait()` → extract final response → publish outbound +7. Feishu channel sends one running reply card up front, then patches the same card for each outbound update (card JSON sets `config.update_multi=true` for Feishu's patch API requirement) +8. DingTalk AI Card mode (when `card_template_id` configured): `runs.stream()` → create card with initial text → stream updates via `PUT /v1.0/card/streaming` → finalize on `is_final=True`. Falls back to `sampleMarkdown` if card creation or streaming fails +9. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API +10. Outbound → channel callbacks → platform reply **Configuration** (`config.yaml` -> `channels`): - `langgraph_url` - LangGraph-compatible Gateway API base URL (default: `http://localhost:8001/api`) @@ -397,6 +399,15 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, DingTalk) to the - In Docker Compose, IM channels run inside the `gateway` container, so `localhost` points back to that container. Use `http://gateway:8001/api` for `langgraph_url` and `http://gateway:8001` for `gateway_url`, or set `DEER_FLOW_CHANNELS_LANGGRAPH_URL` / `DEER_FLOW_CHANNELS_GATEWAY_URL`. - Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token), `dingtalk` (client_id, client_secret, optional `card_template_id` for AI Card streaming) +**User-owned channel connections** (`config.yaml` -> `channel_connections`): +- Disabled by default. When enabled, `public_base_url` and `encryption_key` are required. +- Frontend APIs: `GET /api/channels/providers`, `GET /api/channels/connections`, `POST /api/channels/{provider}/connect`, and `DELETE /api/channels/connections/{connection_id}`. +- Public provider routes: Slack/Discord OAuth callbacks and Slack/Telegram webhooks are explicitly allowed through AuthMiddleware; webhooks are exempt from CSRF because they are provider-to-server calls and validate Slack signatures or Telegram secret tokens. +- Slack HTTP Events mode uses per-connection encrypted bot tokens for replies. Legacy Slack Socket Mode remains available through the `channels.slack` config. +- Telegram supports frontend deep-link binding and can process signed webhook updates; long polling remains the local/self-host fallback. +- Discord OAuth stores the user identity and guild metadata; Gateway messages from `discord.py` resolve to connection identity before reaching `ChannelManager`. +- See `backend/docs/IM_CHANNEL_CONNECTIONS.md` for provider setup and operational notes. + ### Memory System (`packages/harness/deerflow/agents/memory/`) diff --git a/backend/app/channels/discord.py b/backend/app/channels/discord.py index 3b113c28d..651b5e83a 100644 --- a/backend/app/channels/discord.py +++ b/backend/app/channels/discord.py @@ -10,7 +10,7 @@ from pathlib import Path from typing import Any from app.channels.base import Channel -from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) @@ -69,6 +69,7 @@ class DiscordChannel(Channel): self._discord_loop: asyncio.AbstractEventLoop | None = None self._main_loop: asyncio.AbstractEventLoop | None = None self._discord_module = None + self._connection_repo = config.get("connection_repo") async def start(self) -> None: if self._running: @@ -314,6 +315,7 @@ class DiscordChannel(Channel): }, ) inbound.topic_id = thread_id + inbound = await self._attach_connection_identity(inbound, guild_id=str(guild.id) if guild else None) self._publish(inbound) # Start typing indicator in the thread if typing_target: @@ -421,6 +423,7 @@ class DiscordChannel(Channel): }, ) inbound.topic_id = thread_id + inbound = await self._attach_connection_identity(inbound, guild_id=str(guild.id) if guild else None) # Start typing indicator in the correct target (thread or channel) if typing_target: @@ -435,6 +438,31 @@ class DiscordChannel(Channel): future = asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop) future.add_done_callback(lambda f: logger.exception("[Discord] publish_inbound failed", exc_info=f.exception()) if f.exception() else None) + async def _attach_connection_identity(self, inbound: InboundMessage, guild_id: str | None = None) -> InboundMessage: + if self._connection_repo is None: + return inbound + + connection = None + if guild_id: + connection = await self._connection_repo.find_connection_by_external_identity( + provider="discord", + external_account_id=inbound.user_id, + workspace_id=guild_id, + ) + if connection is None: + connection = await self._connection_repo.find_connection_by_external_identity( + provider="discord", + external_account_id=inbound.user_id, + workspace_id=None, + ) + if connection is None: + return inbound + + inbound.connection_id = connection["id"] + inbound.owner_user_id = connection["owner_user_id"] + inbound.workspace_id = connection.get("workspace_id") + return inbound + def _run_client(self) -> None: self._discord_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._discord_loop) diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index 9beceeb3a..d0f72b5b9 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -614,6 +614,7 @@ class ChannelManager: assistant_id: str = DEFAULT_ASSISTANT_ID, default_session: dict[str, Any] | None = None, channel_sessions: dict[str, Any] | None = None, + connection_repo: Any | None = None, ) -> None: self.bus = bus self.store = store @@ -623,6 +624,7 @@ class ChannelManager: self._assistant_id = assistant_id self._default_session = _as_dict(default_session) self._channel_sessions = dict(channel_sessions or {}) + self._connection_repo = connection_repo self._client = None # lazy init — langgraph_sdk async client self._csrf_token = generate_csrf_token() self._semaphore: asyncio.Semaphore | None = None @@ -671,12 +673,16 @@ class ChannelManager: configurable["checkpoint_ns"] = "" configurable["thread_id"] = thread_id - # ``user_id`` drives user-scoped filesystem buckets that only accept - # ``[A-Za-z0-9_-]``, so normalize the channel id and keep the raw value - # under ``channel_user_id`` for platform-facing lookups. + # ``user_id`` drives DeerFlow-owned memory, files, and thread buckets. + # For browser-connected IM channels, prefer the DeerFlow account that + # owns the connection. Preserve the raw platform user under + # ``channel_user_id`` for platform-facing lookups and audits. run_context_identity: dict[str, Any] = {"thread_id": thread_id} - if msg.user_id: + if msg.owner_user_id: + run_context_identity["user_id"] = make_safe_user_id(msg.owner_user_id) + elif msg.user_id: run_context_identity["user_id"] = make_safe_user_id(msg.user_id) + if msg.user_id: run_context_identity["channel_user_id"] = msg.user_id run_context = _merge_dicts( @@ -792,10 +798,27 @@ class ChannelManager: # -- chat handling ----------------------------------------------------- - async def _create_thread(self, client, msg: InboundMessage) -> str: - """Create a new thread through Gateway and store the mapping.""" - thread = await client.threads.create() - thread_id = thread["thread_id"] + async def _lookup_thread_id(self, msg: InboundMessage) -> str | None: + if msg.connection_id and self._connection_repo is not None: + return await self._connection_repo.get_thread_id( + msg.connection_id, + msg.chat_id, + msg.topic_id, + ) + return self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) + + async def _store_thread_id(self, msg: InboundMessage, thread_id: str) -> None: + if msg.connection_id and msg.owner_user_id and self._connection_repo is not None: + await self._connection_repo.set_thread_id( + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, + provider=msg.channel_name, + external_conversation_id=msg.chat_id, + external_topic_id=msg.topic_id, + thread_id=thread_id, + ) + return + self.store.set_thread_id( msg.channel_name, msg.chat_id, @@ -803,6 +826,12 @@ class ChannelManager: topic_id=msg.topic_id, user_id=msg.user_id, ) + + async def _create_thread(self, client, msg: InboundMessage) -> str: + """Create a new thread through Gateway and store the mapping.""" + thread = await client.threads.create() + thread_id = thread["thread_id"] + await self._store_thread_id(msg, thread_id) logger.info("[Manager] new thread created through Gateway: thread_id=%s for chat_id=%s topic_id=%s", thread_id, msg.chat_id, msg.topic_id) return thread_id @@ -812,7 +841,7 @@ class ChannelManager: # Look up existing DeerFlow thread. # topic_id may be None (e.g. Telegram private chats) — the store # handles this by using the "channel:chat_id" key without a topic suffix. - thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) + thread_id = await self._lookup_thread_id(msg) if thread_id: logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id) @@ -896,6 +925,8 @@ class ChannelManager: artifacts=artifacts, attachments=attachments, thread_ts=msg.thread_ts, + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, metadata=_response_metadata(msg.metadata, pending_clarification=pending_clarification), ) logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id) @@ -958,6 +989,8 @@ class ChannelManager: text=latest_text, is_final=False, thread_ts=msg.thread_ts, + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, metadata=_response_metadata(msg.metadata), ) ) @@ -1004,6 +1037,8 @@ class ChannelManager: attachments=attachments, is_final=True, thread_ts=msg.thread_ts, + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, metadata=_response_metadata(msg.metadata, pending_clarification=pending_clarification), ) ) @@ -1028,16 +1063,10 @@ class ChannelManager: client = self._get_client() thread = await client.threads.create() new_thread_id = thread["thread_id"] - self.store.set_thread_id( - msg.channel_name, - msg.chat_id, - new_thread_id, - topic_id=msg.topic_id, - user_id=msg.user_id, - ) + await self._store_thread_id(msg, new_thread_id) reply = "New conversation started." elif command == "status": - thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) + thread_id = await self._lookup_thread_id(msg) reply = f"Active thread: {thread_id}" if thread_id else "No active conversation." elif command == "models": reply = await self._fetch_gateway("/api/models", "models") @@ -1060,9 +1089,11 @@ class ChannelManager: outbound = OutboundMessage( channel_name=msg.channel_name, chat_id=msg.chat_id, - thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", + thread_id=await self._lookup_thread_id(msg) or "", text=reply, thread_ts=msg.thread_ts, + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) @@ -1098,9 +1129,11 @@ class ChannelManager: outbound = OutboundMessage( channel_name=msg.channel_name, chat_id=msg.chat_id, - thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", + thread_id=await self._lookup_thread_id(msg) or "", text=error_text, thread_ts=msg.thread_ts, + connection_id=msg.connection_id, + owner_user_id=msg.owner_user_id, metadata=_slim_metadata(msg.metadata), ) await self.bus.publish_outbound(outbound) diff --git a/backend/app/channels/message_bus.py b/backend/app/channels/message_bus.py index 4e847cca0..64a3c2271 100644 --- a/backend/app/channels/message_bus.py +++ b/backend/app/channels/message_bus.py @@ -44,6 +44,12 @@ class InboundMessage: Messages sharing the same ``topic_id`` within a ``chat_id`` will reuse the same DeerFlow thread. When ``None``, each message creates a new thread (one-shot Q&A). + connection_id: Optional DeerFlow channel connection id. When present, + conversation mapping is scoped by the connection instead of the + legacy global ``channel_name:chat_id[:topic_id]`` key. + owner_user_id: DeerFlow user id that owns the channel connection. + Platform user ids stay in ``user_id``. + workspace_id: Optional external workspace/guild/team id. files: Optional list of file attachments (platform-specific dicts). metadata: Arbitrary extra data from the channel. created_at: Unix timestamp when the message was created. @@ -56,6 +62,9 @@ class InboundMessage: msg_type: InboundMessageType = InboundMessageType.CHAT thread_ts: str | None = None topic_id: str | None = None + connection_id: str | None = None + owner_user_id: str | None = None + workspace_id: str | None = None files: list[dict[str, Any]] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) created_at: float = field(default_factory=time.time) @@ -95,6 +104,9 @@ class OutboundMessage: is_final: Whether this is the final message in the response stream. thread_ts: Optional platform thread identifier for threaded replies. metadata: Arbitrary extra data. + connection_id: Optional DeerFlow channel connection id used for + connection-specific outbound credentials. + owner_user_id: DeerFlow user id that owns the channel connection. created_at: Unix timestamp. """ @@ -106,6 +118,8 @@ class OutboundMessage: attachments: list[ResolvedAttachment] = field(default_factory=list) is_final: bool = True thread_ts: str | None = None + connection_id: str | None = None + owner_user_id: str | None = None metadata: dict[str, Any] = field(default_factory=dict) created_at: float = field(default_factory=time.time) diff --git a/backend/app/channels/providers/__init__.py b/backend/app/channels/providers/__init__.py new file mode 100644 index 000000000..e8dde0764 --- /dev/null +++ b/backend/app/channels/providers/__init__.py @@ -0,0 +1 @@ +"""Provider-specific helpers for user-owned IM channel connections.""" diff --git a/backend/app/channels/providers/discord_connect.py b/backend/app/channels/providers/discord_connect.py new file mode 100644 index 000000000..5f0ad458e --- /dev/null +++ b/backend/app/channels/providers/discord_connect.py @@ -0,0 +1,110 @@ +"""Discord OAuth helpers for user-owned channel connections.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx + +DISCORD_API_BASE_URL = "https://discord.com/api/v10" +DISCORD_TOKEN_URL = f"{DISCORD_API_BASE_URL}/oauth2/token" +DISCORD_CURRENT_USER_URL = f"{DISCORD_API_BASE_URL}/users/@me" +DISCORD_CURRENT_USER_GUILDS_URL = f"{DISCORD_API_BASE_URL}/users/@me/guilds" + + +class DiscordConnectError(RuntimeError): + """Raised when Discord OAuth fails.""" + + +@dataclass(frozen=True) +class DiscordIdentity: + user_id: str + display_name: str | None + username: str | None + guilds: list[dict[str, Any]] + access_token: str + refresh_token: str | None + token_type: str | None + scopes: list[str] + expires_at: datetime | None + raw_token: dict[str, Any] + + +def _split_scopes(value: str | None) -> list[str]: + if not value: + return [] + return [scope.strip() for scope in value.replace(",", " ").split() if scope.strip()] + + +def _display_name(user: dict[str, Any]) -> str | None: + global_name = user.get("global_name") + if isinstance(global_name, str) and global_name: + return global_name + username = user.get("username") + return str(username) if username else None + + +async def complete_discord_oauth( + *, + client_id: str, + client_secret: str, + code: str, + redirect_uri: str, + http_client: httpx.AsyncClient | None = None, +) -> DiscordIdentity: + async def _complete(client: httpx.AsyncClient) -> DiscordIdentity: + token_response = await client.post( + DISCORD_TOKEN_URL, + data={ + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=10, + ) + token_response.raise_for_status() + token = token_response.json() + access_token = token.get("access_token") + if not access_token: + raise DiscordConnectError("Discord OAuth response did not include an access token") + + auth_headers = {"Authorization": f"Bearer {access_token}"} + user_response = await client.get(DISCORD_CURRENT_USER_URL, headers=auth_headers, timeout=10) + user_response.raise_for_status() + user = user_response.json() + user_id = user.get("id") + if not user_id: + raise DiscordConnectError("Discord user response did not include a user id") + + guilds_response = await client.get(DISCORD_CURRENT_USER_GUILDS_URL, headers=auth_headers, timeout=10) + guilds: list[dict[str, Any]] = [] + if guilds_response.status_code == 200: + guilds = guilds_response.json() + + expires_at = None + expires_in = token.get("expires_in") + if isinstance(expires_in, int | float): + expires_at = datetime.now(UTC) + timedelta(seconds=float(expires_in)) + + return DiscordIdentity( + user_id=str(user_id), + display_name=_display_name(user), + username=user.get("username"), + guilds=guilds, + access_token=str(access_token), + refresh_token=token.get("refresh_token"), + token_type=token.get("token_type"), + scopes=_split_scopes(token.get("scope")), + expires_at=expires_at, + raw_token=token, + ) + + if http_client is None: + async with httpx.AsyncClient() as client: + return await _complete(client) + return await _complete(http_client) diff --git a/backend/app/channels/providers/slack_connect.py b/backend/app/channels/providers/slack_connect.py new file mode 100644 index 000000000..7a093bc78 --- /dev/null +++ b/backend/app/channels/providers/slack_connect.py @@ -0,0 +1,110 @@ +"""Slack OAuth and Events helpers for user-owned channel connections.""" + +from __future__ import annotations + +import hashlib +import hmac +import time +from dataclasses import dataclass +from typing import Any + +import httpx + +SLACK_OAUTH_ACCESS_URL = "https://slack.com/api/oauth.v2.access" +SLACK_SIGNATURE_VERSION = "v0" +SLACK_SIGNATURE_TOLERANCE_SECONDS = 60 * 5 + + +class SlackConnectError(RuntimeError): + """Raised when Slack OAuth or request verification fails.""" + + +@dataclass(frozen=True) +class SlackInstall: + team_id: str + team_name: str | None + authed_user_id: str + bot_user_id: str | None + bot_access_token: str + scopes: list[str] + raw: dict[str, Any] + + +def verify_slack_signature( + *, + signing_secret: str, + timestamp: str | None, + body: bytes, + signature: str | None, + now: int | None = None, +) -> bool: + if not signing_secret or not timestamp or not signature: + return False + + try: + timestamp_int = int(timestamp) + except (TypeError, ValueError): + return False + + current_time = int(time.time()) if now is None else now + if abs(current_time - timestamp_int) > SLACK_SIGNATURE_TOLERANCE_SECONDS: + return False + + base = f"{SLACK_SIGNATURE_VERSION}:{timestamp}:".encode() + body + digest = hmac.new(signing_secret.encode("utf-8"), base, hashlib.sha256).hexdigest() + expected = f"{SLACK_SIGNATURE_VERSION}={digest}" + return hmac.compare_digest(expected, signature) + + +def _split_scopes(value: str | None) -> list[str]: + if not value: + return [] + return [scope.strip() for scope in value.split(",") if scope.strip()] + + +async def exchange_slack_oauth_code( + *, + client_id: str, + client_secret: str, + code: str, + redirect_uri: str, + http_client: httpx.AsyncClient | None = None, +) -> SlackInstall: + async def _post(client: httpx.AsyncClient) -> dict[str, Any]: + response = await client.post( + SLACK_OAUTH_ACCESS_URL, + data={ + "client_id": client_id, + "client_secret": client_secret, + "code": code, + "redirect_uri": redirect_uri, + }, + timeout=10, + ) + response.raise_for_status() + return response.json() + + if http_client is None: + async with httpx.AsyncClient() as client: + payload = await _post(client) + else: + payload = await _post(http_client) + + if not payload.get("ok"): + raise SlackConnectError(str(payload.get("error") or "Slack OAuth exchange failed")) + + access_token = payload.get("access_token") + team = payload.get("team") or {} + authed_user = payload.get("authed_user") or {} + if not access_token or not team.get("id") or not authed_user.get("id"): + raise SlackConnectError("Slack OAuth response did not include required installation fields") + + return SlackInstall( + team_id=str(team["id"]), + team_name=team.get("name"), + authed_user_id=str(authed_user["id"]), + bot_user_id=payload.get("bot_user_id"), + bot_access_token=str(access_token), + scopes=_split_scopes(payload.get("scope")), + raw=payload, + ) diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index 1b9526297..5e95b48f0 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -52,6 +52,56 @@ def _resolve_service_url(config: dict[str, Any], config_key: str, env_key: str, return default +def _merge_channel_connection_runtime_config(channels_config: dict[str, Any], app_config: AppConfig) -> None: + connection_config = getattr(app_config, "channel_connections", None) + if connection_config is None or not getattr(connection_config, "enabled", False): + return + + telegram = getattr(connection_config, "telegram", None) + if telegram is not None and getattr(telegram, "enabled", False) and getattr(telegram, "configured", False): + telegram_config = dict(channels_config.get("telegram", {})) if isinstance(channels_config.get("telegram"), dict) else {} + telegram_config.setdefault("enabled", True) + telegram_config.setdefault("bot_token", telegram.bot_token) + channels_config["telegram"] = telegram_config + + slack = getattr(connection_config, "slack", None) + if slack is not None and getattr(slack, "enabled", False) and getattr(slack, "configured", False): + slack_config = dict(channels_config.get("slack", {})) if isinstance(channels_config.get("slack"), dict) else {} + slack_config.setdefault("enabled", True) + slack_config.setdefault("event_delivery", slack.event_delivery) + slack_config.setdefault("signing_secret", slack.signing_secret) + channels_config["slack"] = slack_config + + discord = getattr(connection_config, "discord", None) + if discord is not None and getattr(discord, "enabled", False) and getattr(discord, "configured", False): + discord_config = dict(channels_config.get("discord", {})) if isinstance(channels_config.get("discord"), dict) else {} + discord_config.setdefault("enabled", True) + discord_config.setdefault("bot_token", discord.bot_token) + channels_config["discord"] = discord_config + + +def _make_connection_repo(app_config: AppConfig): + connection_config = getattr(app_config, "channel_connections", None) + if connection_config is None or not getattr(connection_config, "enabled", False): + return None + encryption_key = getattr(connection_config, "encryption_key", "") + if not encryption_key: + return None + + try: + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import get_session_factory + except Exception: + logger.exception("Failed to import channel connection repository") + return None + + session_factory = get_session_factory() + if session_factory is None: + logger.warning("Channel connections are enabled but database persistence is not available") + return None + return ChannelConnectionRepository(session_factory, cipher=ChannelCredentialCipher.from_key(encryption_key)) + + class ChannelService: """Manages the lifecycle of all configured IM channels. @@ -59,9 +109,10 @@ class ChannelService: instantiates enabled channels, and starts the ChannelManager dispatcher. """ - def __init__(self, channels_config: dict[str, Any] | None = None) -> None: + def __init__(self, channels_config: dict[str, Any] | None = None, *, connection_repo: Any | None = None) -> None: self.bus = MessageBus() self.store = ChannelStore() + self._connection_repo = connection_repo config = dict(channels_config or {}) langgraph_url = _resolve_service_url(config, "langgraph_url", _CHANNELS_LANGGRAPH_URL_ENV, DEFAULT_LANGGRAPH_URL) gateway_url = _resolve_service_url(config, "gateway_url", _CHANNELS_GATEWAY_URL_ENV, DEFAULT_GATEWAY_URL) @@ -74,6 +125,7 @@ class ChannelService: gateway_url=gateway_url, default_session=default_session if isinstance(default_session, dict) else None, channel_sessions=channel_sessions, + connection_repo=connection_repo, ) self._channels: dict[str, Any] = {} # name -> Channel instance self._config = config @@ -90,8 +142,9 @@ class ChannelService: # extra fields are allowed by AppConfig (extra="allow") extra = app_config.model_extra or {} if "channels" in extra: - channels_config = extra["channels"] - return cls(channels_config=channels_config) + channels_config = dict(extra["channels"] or {}) + _merge_channel_connection_runtime_config(channels_config, app_config) + return cls(channels_config=channels_config, connection_repo=_make_connection_repo(app_config)) async def start(self) -> None: """Start the manager and all enabled channels.""" @@ -169,6 +222,8 @@ class ChannelService: try: config = dict(config) config["channel_store"] = self.store + if self._connection_repo is not None: + config["connection_repo"] = self._connection_repo channel = channel_cls(bus=self.bus, config=config) self._channels[name] = channel await channel.start() diff --git a/backend/app/channels/slack.py b/backend/app/channels/slack.py index 65cb36cf5..f701e1ea4 100644 --- a/backend/app/channels/slack.py +++ b/backend/app/channels/slack.py @@ -49,6 +49,8 @@ class SlackChannel(Channel): self._web_client = None self._loop: asyncio.AbstractEventLoop | None = None self._allowed_users = _normalize_allowed_users(config.get("allowed_users", [])) + self._connection_repo = config.get("connection_repo") + self._web_client_factory = config.get("web_client_factory") async def start(self) -> None: if self._running: @@ -63,15 +65,24 @@ class SlackChannel(Channel): return self._SocketModeResponse = SocketModeResponse + if self._web_client_factory is None: + self._web_client_factory = WebClient bot_token = self.config.get("bot_token", "") app_token = self.config.get("app_token", "") + if self._connection_repo is not None and self.config.get("event_delivery") == "http": + self._loop = asyncio.get_event_loop() + self._running = True + self.bus.subscribe_outbound(self._on_outbound) + logger.info("Slack channel started in HTTP Events mode") + return + if not bot_token or not app_token: logger.error("Slack channel requires bot_token and app_token") return - self._web_client = WebClient(token=bot_token) + self._web_client = self._web_client_factory(token=bot_token) self._socket_client = SocketModeClient( app_token=app_token, web_client=self._web_client, @@ -96,7 +107,8 @@ class SlackChannel(Channel): logger.info("Slack channel stopped") async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: - if not self._web_client: + web_client = await self._get_web_client_for_message(msg) + if not web_client: return kwargs: dict[str, Any] = { @@ -109,11 +121,12 @@ class SlackChannel(Channel): last_exc: Exception | None = None for attempt in range(_max_retries): try: - await asyncio.to_thread(self._web_client.chat_postMessage, **kwargs) + await asyncio.to_thread(web_client.chat_postMessage, **kwargs) # Add a completion reaction to the thread root if msg.thread_ts: await asyncio.to_thread( - self._add_reaction, + self._add_reaction_with_client, + web_client, msg.chat_id, msg.thread_ts, "white_check_mark", @@ -137,7 +150,8 @@ class SlackChannel(Channel): if msg.thread_ts: try: await asyncio.to_thread( - self._add_reaction, + self._add_reaction_with_client, + web_client, msg.chat_id, msg.thread_ts, "x", @@ -149,7 +163,8 @@ class SlackChannel(Channel): raise last_exc async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: - if not self._web_client: + web_client = await self._get_web_client_for_message(msg) + if not web_client: return False try: @@ -162,7 +177,7 @@ class SlackChannel(Channel): if msg.thread_ts: kwargs["thread_ts"] = msg.thread_ts - await asyncio.to_thread(self._web_client.files_upload_v2, **kwargs) + await asyncio.to_thread(web_client.files_upload_v2, **kwargs) logger.info("[Slack] file uploaded: %s to channel=%s", attachment.filename, msg.chat_id) return True except Exception: @@ -171,12 +186,24 @@ class SlackChannel(Channel): # -- internal ---------------------------------------------------------- - def _add_reaction(self, channel_id: str, timestamp: str, emoji: str) -> None: - """Add an emoji reaction to a message (best-effort, non-blocking).""" - if not self._web_client: - return + async def _get_web_client_for_message(self, msg: OutboundMessage): + if msg.connection_id and self._connection_repo is not None: + credentials = await self._connection_repo.get_credentials(msg.connection_id) + access_token = credentials.get("access_token") if credentials else None + if not access_token: + logger.warning("[Slack] no bot token found for connection=%s", msg.connection_id) + return None + if self._web_client_factory is None: + from slack_sdk import WebClient + + self._web_client_factory = WebClient + return self._web_client_factory(token=access_token) + return self._web_client + + @staticmethod + def _add_reaction_with_client(web_client, channel_id: str, timestamp: str, emoji: str) -> None: try: - self._web_client.reactions_add( + web_client.reactions_add( channel=channel_id, timestamp=timestamp, name=emoji, @@ -185,6 +212,12 @@ class SlackChannel(Channel): if "already_reacted" not in str(exc): logger.warning("[Slack] failed to add reaction %s: %s", emoji, exc) + def _add_reaction(self, channel_id: str, timestamp: str, emoji: str) -> None: + """Add an emoji reaction to a message (best-effort, non-blocking).""" + if not self._web_client: + return + self._add_reaction_with_client(self._web_client, channel_id, timestamp, emoji) + def _send_running_reply(self, channel_id: str, thread_ts: str) -> None: """Send a 'Working on it......' reply in the thread (called from SDK thread).""" if not self._web_client: diff --git a/backend/app/channels/telegram.py b/backend/app/channels/telegram.py index 9985fd43f..9ed663858 100644 --- a/backend/app/channels/telegram.py +++ b/backend/app/channels/telegram.py @@ -35,6 +35,7 @@ class TelegramChannel(Channel): pass # chat_id -> last sent message_id for threaded replies self._last_bot_message: dict[str, int] = {} + self._connection_repo = config.get("connection_repo") async def start(self) -> None: if self._running: @@ -171,6 +172,26 @@ class TelegramChannel(Channel): logger.exception("[Telegram] failed to send file: %s", attachment.filename) return False + async def process_webhook_update(self, payload: dict[str, Any]) -> bool: + if not self._application: + return False + try: + from telegram import Update + except ImportError: + logger.error("python-telegram-bot is not installed. Install it with: uv add python-telegram-bot") + return False + + update = Update.de_json(payload, self._application.bot) + if update is None: + return False + + if self._tg_loop and self._tg_loop.is_running(): + future = asyncio.run_coroutine_threadsafe(self._application.process_update(update), self._tg_loop) + await asyncio.wrap_future(future) + else: + await self._application.process_update(update) + return True + # -- helpers ----------------------------------------------------------- async def _send_running_reply(self, chat_id: str, reply_to_message_id: int) -> None: @@ -228,10 +249,72 @@ class TelegramChannel(Channel): return True return user_id in self._allowed_users + @staticmethod + def _telegram_display_name(user) -> str: + full_name = getattr(user, "full_name", None) + if isinstance(full_name, str) and full_name: + return full_name + username = getattr(user, "username", None) + if isinstance(username, str) and username: + return username + return str(getattr(user, "id", "")) + + async def _bind_connection_from_start_token(self, update, state_token: str) -> bool: + if self._connection_repo is None or not state_token: + return False + + state = await self._connection_repo.consume_oauth_state(provider="telegram", state=state_token) + if state is None: + await update.message.reply_text("Telegram connection link is invalid or expired.") + return True + + owner_user_id = state["owner_user_id"] + user_id = str(update.effective_user.id) + chat_id = str(update.effective_chat.id) + connection = await self._connection_repo.upsert_connection( + owner_user_id=owner_user_id, + provider="telegram", + external_account_id=user_id, + external_account_name=self._telegram_display_name(update.effective_user), + workspace_id=chat_id, + workspace_name=None, + metadata={ + "chat_id": chat_id, + "chat_type": update.effective_chat.type, + "telegram_username": getattr(update.effective_user, "username", None), + }, + status="connected", + ) + logger.info("[Telegram] bound chat=%s user=%s to DeerFlow user=%s connection=%s", chat_id, user_id, owner_user_id, connection["id"]) + await update.message.reply_text("Telegram connected to DeerFlow.") + return True + + async def _attach_connection_identity(self, inbound: InboundMessage) -> InboundMessage: + if self._connection_repo is None: + return inbound + + connection = await self._connection_repo.find_connection_by_external_identity( + provider="telegram", + external_account_id=inbound.user_id, + workspace_id=inbound.chat_id, + ) + if connection is None: + return inbound + + inbound.connection_id = connection["id"] + inbound.owner_user_id = connection["owner_user_id"] + inbound.workspace_id = connection.get("workspace_id") + return inbound + async def _cmd_start(self, update, context) -> None: """Handle /start command.""" if not self._check_user(update.effective_user.id): return + args = getattr(context, "args", []) if context is not None else [] + if args: + handled = await self._bind_connection_from_start_token(update, str(args[0])) + if handled: + return await update.message.reply_text("Welcome to DeerFlow! Send me a message to start a conversation.\nType /help for available commands.") async def _process_incoming_with_reply(self, chat_id: str, msg_id: int, inbound: InboundMessage) -> None: @@ -267,6 +350,7 @@ class TelegramChannel(Channel): thread_ts=msg_id, ) inbound.topic_id = topic_id + inbound = await self._attach_connection_identity(inbound) if self._main_loop and self._main_loop.is_running(): fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) @@ -309,6 +393,7 @@ class TelegramChannel(Channel): thread_ts=msg_id, ) inbound.topic_id = topic_id + inbound = await self._attach_connection_identity(inbound) if self._main_loop and self._main_loop.is_running(): fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index dd5701083..b8ffc468f 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -15,6 +15,7 @@ from app.gateway.routers import ( artifacts, assistants_compat, auth, + channel_connections, channels, feedback, mcp, @@ -376,6 +377,9 @@ This gateway provides runtime endpoints for agent runs plus custom endpoints for # Suggestions API is mounted at /api/threads/{thread_id}/suggestions app.include_router(suggestions.router) + # User-facing IM channel connection API is mounted at /api/channels + app.include_router(channel_connections.router) + # Channels API is mounted at /api/channels app.include_router(channels.router) diff --git a/backend/app/gateway/auth_middleware.py b/backend/app/gateway/auth_middleware.py index 6b6452264..89cb624c4 100644 --- a/backend/app/gateway/auth_middleware.py +++ b/backend/app/gateway/auth_middleware.py @@ -27,6 +27,7 @@ _PUBLIC_PATH_PREFIXES: tuple[str, ...] = ( "/docs", "/redoc", "/openapi.json", + "/api/channels/webhooks/", ) # Exact auth paths that are public (login/register/status check). @@ -38,6 +39,8 @@ _PUBLIC_EXACT_PATHS: frozenset[str] = frozenset( "/api/v1/auth/logout", "/api/v1/auth/setup-status", "/api/v1/auth/initialize", + "/api/channels/slack/callback", + "/api/channels/discord/callback", } ) diff --git a/backend/app/gateway/csrf_middleware.py b/backend/app/gateway/csrf_middleware.py index f34882032..dde7bf563 100644 --- a/backend/app/gateway/csrf_middleware.py +++ b/backend/app/gateway/csrf_middleware.py @@ -39,6 +39,8 @@ def should_check_csrf(request: Request) -> bool: return False path = request.url.path.rstrip("/") + if path.startswith("/api/channels/webhooks/"): + return False # Exempt /api/v1/auth/me endpoint if path == "/api/v1/auth/me": return False diff --git a/backend/app/gateway/routers/channel_connections.py b/backend/app/gateway/routers/channel_connections.py new file mode 100644 index 000000000..e0b70defc --- /dev/null +++ b/backend/app/gateway/routers/channel_connections.py @@ -0,0 +1,487 @@ +"""Browser-facing APIs for user-owned IM channel connections.""" + +from __future__ import annotations + +import hashlib +import json +import secrets +from datetime import UTC, datetime, timedelta +from typing import Any +from urllib.parse import urlencode + +from fastapi import APIRouter, HTTPException, Request, Response +from pydantic import BaseModel, Field +from starlette.responses import PlainTextResponse, RedirectResponse + +from app.channels.message_bus import InboundMessage, InboundMessageType +from app.channels.providers import discord_connect, slack_connect +from deerflow.config.channel_connections_config import ChannelConnectionsConfig +from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher +from deerflow.persistence.engine import get_session_factory + +router = APIRouter(prefix="/api/channels", tags=["channel-connections"]) + +_STATE_TTL_SECONDS = 600 + + +class ChannelProviderResponse(BaseModel): + provider: str + display_name: str + enabled: bool + configured: bool + auth_mode: str + connection_status: str + + +class ChannelProvidersResponse(BaseModel): + enabled: bool + providers: list[ChannelProviderResponse] + + +class ChannelConnectionResponse(BaseModel): + id: str + provider: str + status: str + external_account_id: str | None = None + external_account_name: str | None = None + workspace_id: str | None = None + workspace_name: str | None = None + scopes: list[str] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) + + +class ChannelConnectionsResponse(BaseModel): + connections: list[ChannelConnectionResponse] + + +class ChannelConnectResponse(BaseModel): + provider: str + mode: str + url: str + expires_in: int + + +_PROVIDER_META: dict[str, dict[str, str]] = { + "telegram": {"display_name": "Telegram", "auth_mode": "deep_link"}, + "slack": {"display_name": "Slack", "auth_mode": "oauth"}, + "discord": {"display_name": "Discord", "auth_mode": "oauth_and_bot_install"}, +} + + +def _get_user_id(request: Request) -> str: + user = getattr(request.state, "user", None) + if user is None: + raise HTTPException(status_code=401, detail="Authentication required") + return str(user.id) + + +def _get_channel_connections_config(request: Request) -> ChannelConnectionsConfig: + config = getattr(request.app.state, "channel_connections_config", None) + if isinstance(config, ChannelConnectionsConfig): + return config + + from deerflow.config.app_config import get_app_config + + return get_app_config().channel_connections + + +def _get_repository(request: Request, config: ChannelConnectionsConfig) -> ChannelConnectionRepository: + repo = getattr(request.app.state, "channel_connection_repo", None) + if isinstance(repo, ChannelConnectionRepository): + return repo + + sf = get_session_factory() + if sf is None: + raise HTTPException(status_code=503, detail="Channel connection persistence is not available") + if not config.encryption_key: + raise HTTPException(status_code=503, detail="Channel connection encryption key is not configured") + + repo = ChannelConnectionRepository(sf, cipher=ChannelCredentialCipher.from_key(config.encryption_key)) + request.app.state.channel_connection_repo = repo + return repo + + +def _provider_config(config: ChannelConnectionsConfig, provider: str): + provider_config = getattr(config, provider, None) + if provider_config is None: + raise HTTPException(status_code=404, detail="Unknown channel provider") + return provider_config + + +async def _create_state( + repo: ChannelConnectionRepository, + *, + owner_user_id: str, + provider: str, + requested_scopes: list[str] | None = None, + metadata: dict[str, Any] | None = None, +) -> str: + state = secrets.token_urlsafe(32) + await repo.create_oauth_state( + owner_user_id=owner_user_id, + provider=provider, + state=state, + requested_scopes=requested_scopes, + metadata=metadata, + expires_at=datetime.now(UTC) + timedelta(seconds=_STATE_TTL_SECONDS), + ) + return state + + +def _build_connect_url(config: ChannelConnectionsConfig, provider: str, state: str) -> str: + provider_config = _provider_config(config, provider) + if provider == "telegram": + return f"https://t.me/{provider_config.bot_username}?start={state}" + + redirect_uri = f"{config.public_base_url.rstrip('/')}/api/channels/{provider}/callback" + if provider == "slack": + query = urlencode( + { + "client_id": provider_config.client_id, + "scope": ",".join(provider_config.scopes), + "redirect_uri": redirect_uri, + "state": state, + } + ) + return f"https://slack.com/oauth/v2/authorize?{query}" + + if provider == "discord": + scopes = "identify guilds bot applications.commands" + query = urlencode( + { + "client_id": provider_config.client_id, + "response_type": "code", + "redirect_uri": redirect_uri, + "scope": scopes, + "state": state, + "permissions": provider_config.permissions, + } + ) + return f"https://discord.com/oauth2/authorize?{query}" + + raise HTTPException(status_code=404, detail="Unknown channel provider") + + +def _callback_redirect(provider: str, state_data: dict[str, Any]) -> RedirectResponse: + redirect_after = state_data.get("redirect_after") + if isinstance(redirect_after, str) and redirect_after: + return RedirectResponse(redirect_after) + return RedirectResponse(f"/workspace?channel_connected={provider}") + + +def _get_message_bus(request: Request): + bus = getattr(request.app.state, "channel_message_bus", None) + if bus is not None: + return bus + try: + from app.channels.service import get_channel_service + except Exception: + return None + service = get_channel_service() + return service.bus if service is not None else None + + +def _get_channel_instance(request: Request, name: str): + channel_instances = getattr(request.app.state, "channel_instances", None) + if isinstance(channel_instances, dict) and name in channel_instances: + return channel_instances[name] + try: + from app.channels.service import get_channel_service + except Exception: + return None + service = get_channel_service() + return service.get_channel(name) if service is not None else None + + +async def _publish_slack_event( + *, + repo: ChannelConnectionRepository, + bus: Any, + payload: dict[str, Any], +) -> bool: + event = payload.get("event") or {} + event_type = event.get("type") + if event_type not in {"message", "app_mention"}: + return False + if event.get("bot_id") or event.get("subtype"): + return False + + text = str(event.get("text") or "").strip() + user_id = str(event.get("user") or "") + channel_id = str(event.get("channel") or "") + team_id = str(payload.get("team_id") or event.get("team") or event.get("team_id") or "") + if not text or not user_id or not channel_id or not team_id: + return False + + connection = await repo.find_connection_by_external_identity( + provider="slack", + external_account_id=user_id, + workspace_id=team_id, + ) + if connection is None: + return False + + thread_ts = str(event.get("thread_ts") or event.get("ts") or "") + inbound = InboundMessage( + channel_name="slack", + chat_id=channel_id, + user_id=user_id, + text=text, + msg_type=InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT, + thread_ts=thread_ts, + metadata={"team_id": team_id, "event_id": payload.get("event_id")}, + connection_id=connection["id"], + owner_user_id=connection["owner_user_id"], + workspace_id=team_id, + ) + inbound.topic_id = thread_ts or None + await bus.publish_inbound(inbound) + return True + + +@router.get("/providers", response_model=ChannelProvidersResponse) +async def get_channel_providers(request: Request) -> ChannelProvidersResponse: + config = _get_channel_connections_config(request) + repo = _get_repository(request, config) if config.enabled and config.encryption_key else None + owner_user_id = _get_user_id(request) + connections = await repo.list_connections(owner_user_id) if repo is not None else [] + by_provider = {item["provider"]: item for item in connections} + + providers: list[ChannelProviderResponse] = [] + for provider, meta in _PROVIDER_META.items(): + status = config.provider_status(provider) + connection = by_provider.get(provider) + providers.append( + ChannelProviderResponse( + provider=provider, + display_name=meta["display_name"], + enabled=status["enabled"], + configured=status["configured"], + auth_mode=meta["auth_mode"], + connection_status=connection["status"] if connection else "not_connected", + ) + ) + return ChannelProvidersResponse(enabled=config.enabled, providers=providers) + + +@router.get("/connections", response_model=ChannelConnectionsResponse) +async def get_channel_connections(request: Request) -> ChannelConnectionsResponse: + config = _get_channel_connections_config(request) + if not config.enabled: + return ChannelConnectionsResponse(connections=[]) + repo = _get_repository(request, config) + rows = await repo.list_connections(_get_user_id(request)) + return ChannelConnectionsResponse(connections=[ChannelConnectionResponse(**row) for row in rows]) + + +@router.delete("/connections/{connection_id}", status_code=204) +async def disconnect_channel_connection(connection_id: str, request: Request) -> Response: + config = _get_channel_connections_config(request) + if not config.enabled: + raise HTTPException(status_code=400, detail="Channel connections are disabled") + + repo = _get_repository(request, config) + disconnected = await repo.disconnect_connection( + connection_id=connection_id, + owner_user_id=_get_user_id(request), + ) + if not disconnected: + raise HTTPException(status_code=404, detail="Channel connection not found") + return Response(status_code=204) + + +@router.get("/slack/callback") +async def slack_oauth_callback(request: Request, code: str | None = None, state: str | None = None, error: str | None = None): + if error: + raise HTTPException(status_code=400, detail=f"Slack OAuth failed: {error}") + if not code or not state: + raise HTTPException(status_code=400, detail="Slack OAuth callback is missing code or state") + + config = _get_channel_connections_config(request) + provider_config = _provider_config(config, "slack") + if not config.enabled or not provider_config.enabled or not provider_config.configured: + raise HTTPException(status_code=400, detail="Channel provider is not configured") + + repo = _get_repository(request, config) + state_data = await repo.consume_oauth_state(provider="slack", state=state) + if state_data is None: + raise HTTPException(status_code=400, detail="Invalid or expired OAuth state") + + redirect_uri = f"{config.public_base_url.rstrip('/')}/api/channels/slack/callback" + install = await slack_connect.exchange_slack_oauth_code( + client_id=provider_config.client_id, + client_secret=provider_config.client_secret, + code=code, + redirect_uri=redirect_uri, + ) + connection = await repo.upsert_connection( + owner_user_id=state_data["owner_user_id"], + provider="slack", + external_account_id=install.authed_user_id, + workspace_id=install.team_id, + workspace_name=install.team_name, + bot_user_id=install.bot_user_id, + scopes=install.scopes or state_data.get("requested_scopes", []), + metadata={"team_id": install.team_id, "team_name": install.team_name}, + status="connected", + ) + await repo.store_credentials( + connection["id"], + access_token=install.bot_access_token, + token_type="Bearer", + extra={"bot_user_id": install.bot_user_id, "team_id": install.team_id}, + ) + return _callback_redirect("slack", state_data) + + +@router.get("/discord/callback") +async def discord_oauth_callback(request: Request, code: str | None = None, state: str | None = None, error: str | None = None): + if error: + raise HTTPException(status_code=400, detail=f"Discord OAuth failed: {error}") + if not code or not state: + raise HTTPException(status_code=400, detail="Discord OAuth callback is missing code or state") + + config = _get_channel_connections_config(request) + provider_config = _provider_config(config, "discord") + if not config.enabled or not provider_config.enabled or not provider_config.configured: + raise HTTPException(status_code=400, detail="Channel provider is not configured") + + repo = _get_repository(request, config) + state_data = await repo.consume_oauth_state(provider="discord", state=state) + if state_data is None: + raise HTTPException(status_code=400, detail="Invalid or expired OAuth state") + + redirect_uri = f"{config.public_base_url.rstrip('/')}/api/channels/discord/callback" + identity = await discord_connect.complete_discord_oauth( + client_id=provider_config.client_id, + client_secret=provider_config.client_secret, + code=code, + redirect_uri=redirect_uri, + ) + connection = await repo.upsert_connection( + owner_user_id=state_data["owner_user_id"], + provider="discord", + external_account_id=identity.user_id, + external_account_name=identity.display_name or identity.username, + scopes=identity.scopes or state_data.get("requested_scopes", []), + capabilities={"message_content_intent_required": provider_config.require_message_content_intent}, + metadata={"username": identity.username, "guilds": identity.guilds}, + status="connected", + ) + await repo.store_credentials( + connection["id"], + access_token=identity.access_token, + refresh_token=identity.refresh_token, + token_type=identity.token_type, + expires_at=identity.expires_at, + extra={"guilds": identity.guilds}, + ) + return _callback_redirect("discord", state_data) + + +@router.post("/webhooks/slack/events") +async def slack_events_webhook(request: Request): + config = _get_channel_connections_config(request) + provider_config = _provider_config(config, "slack") + if not config.enabled or not provider_config.enabled or not provider_config.configured: + raise HTTPException(status_code=400, detail="Channel provider is not configured") + + body = await request.body() + if not slack_connect.verify_slack_signature( + signing_secret=provider_config.signing_secret, + timestamp=request.headers.get("X-Slack-Request-Timestamp"), + body=body, + signature=request.headers.get("X-Slack-Signature"), + ): + raise HTTPException(status_code=401, detail="Invalid Slack signature") + + try: + payload = json.loads(body.decode("utf-8")) + except json.JSONDecodeError as exc: + raise HTTPException(status_code=400, detail="Invalid Slack payload") from exc + + if payload.get("type") == "url_verification": + challenge = payload.get("challenge") + if not isinstance(challenge, str): + raise HTTPException(status_code=400, detail="Slack challenge is missing") + return PlainTextResponse(challenge) + + repo = _get_repository(request, config) + delivery_id = str(payload.get("event_id") or hashlib.sha256(body).hexdigest()) + payload_hash = hashlib.sha256(body).hexdigest() + event = payload.get("event") or {} + is_new = await repo.record_webhook_delivery( + provider="slack", + delivery_id=delivery_id, + payload_sha256=payload_hash, + event_type=event.get("type"), + ) + if not is_new: + return {"ok": True, "duplicate": True, "processed": False} + + bus = _get_message_bus(request) + processed = False + if bus is not None: + processed = await _publish_slack_event(repo=repo, bus=bus, payload=payload) + return {"ok": True, "processed": processed} + + +@router.post("/webhooks/telegram") +async def telegram_webhook(request: Request): + config = _get_channel_connections_config(request) + provider_config = _provider_config(config, "telegram") + if not config.enabled or not provider_config.enabled or not provider_config.configured: + raise HTTPException(status_code=400, detail="Channel provider is not configured") + + secret_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") + if not secret_header or not secrets.compare_digest(secret_header, provider_config.webhook_secret): + raise HTTPException(status_code=401, detail="Invalid Telegram webhook secret") + + body = await request.body() + try: + payload = json.loads(body.decode("utf-8")) + except json.JSONDecodeError as exc: + raise HTTPException(status_code=400, detail="Invalid Telegram payload") from exc + + repo = _get_repository(request, config) + delivery_id = str(payload.get("update_id") or hashlib.sha256(body).hexdigest()) + is_new = await repo.record_webhook_delivery( + provider="telegram", + delivery_id=delivery_id, + payload_sha256=hashlib.sha256(body).hexdigest(), + event_type="update", + ) + if not is_new: + return {"ok": True, "duplicate": True, "processed": False} + + processed = False + channel = _get_channel_instance(request, "telegram") + process_update = getattr(channel, "process_webhook_update", None) + if process_update is not None: + processed = bool(await process_update(payload)) + return {"ok": True, "processed": processed} + + +@router.post("/{provider}/connect", response_model=ChannelConnectResponse) +async def connect_channel_provider(provider: str, request: Request) -> ChannelConnectResponse: + config = _get_channel_connections_config(request) + if not config.enabled: + raise HTTPException(status_code=400, detail="Channel connections are disabled") + + provider_config = _provider_config(config, provider) + if not provider_config.enabled or not provider_config.configured: + raise HTTPException(status_code=400, detail="Channel provider is not configured") + + repo = _get_repository(request, config) + state = await _create_state( + repo, + owner_user_id=_get_user_id(request), + provider=provider, + requested_scopes=getattr(provider_config, "scopes", []), + ) + return ChannelConnectResponse( + provider=provider, + mode=_PROVIDER_META[provider]["auth_mode"], + url=_build_connect_url(config, provider, state), + expires_in=_STATE_TTL_SECONDS, + ) diff --git a/backend/docs/IM_CHANNEL_CONNECTIONS.md b/backend/docs/IM_CHANNEL_CONNECTIONS.md new file mode 100644 index 000000000..313b8f40c --- /dev/null +++ b/backend/docs/IM_CHANNEL_CONNECTIONS.md @@ -0,0 +1,86 @@ +# IM Channel Connections + +DeerFlow supports user-owned IM channel connections for Telegram, Slack, and Discord. A logged-in user connects a provider from the frontend, and incoming IM messages run under that DeerFlow user account instead of the raw platform user id. + +## Configuration + +Enable the top-level `channel_connections` block in `config.yaml`: + +```yaml +channel_connections: + enabled: true + public_base_url: https://deerflow.example.com + encryption_key: $DEER_FLOW_CHANNEL_CONNECTIONS_KEY + + telegram: + enabled: true + bot_token: $TELEGRAM_BOT_TOKEN + bot_username: $TELEGRAM_BOT_USERNAME + webhook_secret: $TELEGRAM_WEBHOOK_SECRET + + slack: + enabled: true + client_id: $SLACK_CLIENT_ID + client_secret: $SLACK_CLIENT_SECRET + signing_secret: $SLACK_SIGNING_SECRET + event_delivery: http + + discord: + enabled: true + client_id: $DISCORD_CLIENT_ID + client_secret: $DISCORD_CLIENT_SECRET + bot_token: $DISCORD_BOT_TOKEN + permissions: "274877975552" +``` + +`public_base_url` must be the externally reachable HTTPS origin used by provider callbacks and webhooks. `encryption_key` encrypts provider tokens at rest with Fernet. Keep it stable; v1 does not support transparent key rotation, so changing it requires users to reconnect. + +## Frontend Flow + +The workspace sidebar shows a Channels group with Telegram, Slack, and Discord. Settings > Channels exposes the management surface for connect, disconnect, and reconnect. Browser state-changing calls use the existing CSRF-aware frontend fetch wrapper. + +## Provider Setup + +Telegram: + +- Register a bot with BotFather. +- Configure the bot username, bot token, and a random webhook secret. +- Users connect with a deep link: `https://t.me/?start=`. +- Production webhook path: `POST /api/channels/webhooks/telegram`, protected by `X-Telegram-Bot-Api-Secret-Token`. +- Local/self-hosted long polling still works through the existing Telegram channel worker. + +Slack: + +- Create a Slack app with OAuth V2. +- Redirect URL: `https:///api/channels/slack/callback`. +- Event request URL: `https:///api/channels/webhooks/slack/events`. +- Required signing secret: Slack's request signing secret, not the deprecated verification token. +- Suggested MVP bot scopes: `app_mentions:read`, `chat:write`, `channels:history`, `channels:read`. +- Slack events are signature-verified, deduplicated by `event_id`, and then routed to a matching user connection. + +Discord: + +- Create a Discord application and bot. +- Redirect URL: `https:///api/channels/discord/callback`. +- DeerFlow starts OAuth with `identify guilds bot applications.commands` and the configured bot permissions. +- The Discord Gateway is still handled by `discord.py`; message content may require the privileged Message Content Intent depending on your bot setup. + +## Runtime Model + +Connection records live in SQL tables under `deerflow.persistence.channel_connections`: + +- `channel_connections`: owner user, provider identity, workspace/guild/team, status, metadata. +- `channel_credentials`: encrypted access/refresh/bot tokens. +- `channel_oauth_states`: one-time OAuth/deep-link states. +- `channel_conversations`: connection-scoped IM conversation to DeerFlow thread mapping. +- `channel_webhook_deliveries`: provider webhook dedupe records. + +Incoming messages that resolve to a connection carry `connection_id`, `owner_user_id`, and `workspace_id`. `ChannelManager` uses `owner_user_id` as the DeerFlow run user id and preserves the platform user id as `channel_user_id`. Legacy operator-owned channels keep the existing JSON `ChannelStore` behavior when no `connection_id` is present. + +## Security Notes + +- OAuth state tokens are one-time and short-lived. +- Provider tokens are never returned from browser APIs. +- Public callback/webhook routes bypass cookie auth only because they validate provider state/signatures/secrets themselves. +- Slack and Telegram webhooks skip CSRF because they are called by providers, not browsers. +- Logs should never include access tokens, refresh tokens, bot tokens, OAuth codes, or raw signed webhook bodies. diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index 842b49d7a..95448ed55 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict, Field from deerflow.config.acp_config import ACPAgentConfig, load_acp_config_from_dict from deerflow.config.agents_api_config import AgentsApiConfig, load_agents_api_config_from_dict +from deerflow.config.channel_connections_config import ChannelConnectionsConfig from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict from deerflow.config.database_config import DatabaseConfig from deerflow.config.extensions_config import ExtensionsConfig @@ -116,6 +117,7 @@ class AppConfig(BaseModel): subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration") guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration") circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration") + channel_connections: ChannelConnectionsConfig = Field(default_factory=ChannelConnectionsConfig, description="User-facing IM channel connection configuration") loop_detection: LoopDetectionConfig = Field(default_factory=LoopDetectionConfig, description="Loop detection middleware configuration") safety_finish_reason: SafetyFinishReasonConfig = Field(default_factory=SafetyFinishReasonConfig, description="Provider safety-filter finish_reason interception middleware configuration") model_config = ConfigDict(extra="allow") diff --git a/backend/packages/harness/deerflow/config/channel_connections_config.py b/backend/packages/harness/deerflow/config/channel_connections_config.py new file mode 100644 index 000000000..bca41838b --- /dev/null +++ b/backend/packages/harness/deerflow/config/channel_connections_config.py @@ -0,0 +1,82 @@ +"""Configuration for user-owned IM channel connections.""" + +from __future__ import annotations + +from pydantic import BaseModel, Field, model_validator + + +class SlackChannelConnectionConfig(BaseModel): + enabled: bool = False + client_id: str = "" + client_secret: str = "" + signing_secret: str = "" + scopes: list[str] = Field( + default_factory=lambda: [ + "app_mentions:read", + "chat:write", + "channels:history", + "channels:read", + ] + ) + event_delivery: str = "http" + + @property + def configured(self) -> bool: + return bool(self.client_id and self.client_secret and self.signing_secret) + + +class TelegramChannelConnectionConfig(BaseModel): + enabled: bool = False + bot_token: str = "" + bot_username: str = "" + webhook_secret: str = "" + oidc_client_id: str = "" + oidc_client_secret: str = "" + + @property + def configured(self) -> bool: + return bool(self.bot_token and self.bot_username and self.webhook_secret) + + +class DiscordChannelConnectionConfig(BaseModel): + enabled: bool = False + client_id: str = "" + client_secret: str = "" + bot_token: str = "" + permissions: str = "" + require_message_content_intent: bool = True + + @property + def configured(self) -> bool: + return bool(self.client_id and self.client_secret and self.bot_token) + + +class ChannelConnectionsConfig(BaseModel): + """Top-level config for browser-connectable IM channels.""" + + enabled: bool = False + public_base_url: str = "" + encryption_key: str = "" + slack: SlackChannelConnectionConfig = Field(default_factory=SlackChannelConnectionConfig) + telegram: TelegramChannelConnectionConfig = Field(default_factory=TelegramChannelConnectionConfig) + discord: DiscordChannelConnectionConfig = Field(default_factory=DiscordChannelConnectionConfig) + + @model_validator(mode="after") + def _require_shared_config_when_enabled(self) -> ChannelConnectionsConfig: + missing: list[str] = [] + if self.enabled and not self.public_base_url: + missing.append("public_base_url is required when channel_connections.enabled is true") + if self.enabled and not self.encryption_key: + missing.append("encryption_key is required when channel_connections.enabled is true") + if missing: + raise ValueError("; ".join(missing)) + return self + + def provider_status(self, provider: str) -> dict[str, bool]: + config = getattr(self, provider, None) + if config is None: + return {"enabled": False, "configured": False} + return { + "enabled": bool(config.enabled), + "configured": bool(config.configured), + } diff --git a/backend/packages/harness/deerflow/persistence/channel_connections/__init__.py b/backend/packages/harness/deerflow/persistence/channel_connections/__init__.py new file mode 100644 index 000000000..994661a85 --- /dev/null +++ b/backend/packages/harness/deerflow/persistence/channel_connections/__init__.py @@ -0,0 +1,23 @@ +"""User-owned IM channel connection persistence.""" + +from deerflow.persistence.channel_connections.model import ( + ChannelConnectionRow, + ChannelConversationRow, + ChannelCredentialRow, + ChannelOAuthStateRow, + ChannelWebhookDeliveryRow, +) +from deerflow.persistence.channel_connections.sql import ( + ChannelConnectionRepository, + ChannelCredentialCipher, +) + +__all__ = [ + "ChannelConnectionRepository", + "ChannelConnectionRow", + "ChannelConversationRow", + "ChannelCredentialCipher", + "ChannelCredentialRow", + "ChannelOAuthStateRow", + "ChannelWebhookDeliveryRow", +] diff --git a/backend/packages/harness/deerflow/persistence/channel_connections/model.py b/backend/packages/harness/deerflow/persistence/channel_connections/model.py new file mode 100644 index 000000000..9b6d63a35 --- /dev/null +++ b/backend/packages/harness/deerflow/persistence/channel_connections/model.py @@ -0,0 +1,121 @@ +"""ORM models for user-owned IM channel connections.""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from sqlalchemy import JSON, DateTime, ForeignKey, Index, Integer, String, Text, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column + +from deerflow.persistence.base import Base + + +def _utc_now() -> datetime: + return datetime.now(UTC) + + +class ChannelConnectionRow(Base): + __tablename__ = "channel_connections" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + owner_user_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + provider: Mapped[str] = mapped_column(String(32), nullable=False, index=True) + status: Mapped[str] = mapped_column(String(32), nullable=False, default="connected") + + external_account_id: Mapped[str] = mapped_column(String(128), nullable=False, default="") + external_account_name: Mapped[str | None] = mapped_column(String(256), nullable=True) + workspace_id: Mapped[str] = mapped_column(String(128), nullable=False, default="") + workspace_name: Mapped[str | None] = mapped_column(String(256), nullable=True) + bot_user_id: Mapped[str | None] = mapped_column(String(128), nullable=True) + + scopes_json: Mapped[list] = mapped_column(JSON, default=list) + capabilities_json: Mapped[dict] = mapped_column(JSON, default=dict) + metadata_json: Mapped[dict] = mapped_column(JSON, default=dict) + + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now, onupdate=_utc_now) + last_seen_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + __table_args__ = ( + UniqueConstraint( + "owner_user_id", + "provider", + "external_account_id", + "workspace_id", + name="uq_channel_connection_owner_provider_identity", + ), + Index("idx_channel_connections_event_lookup", "provider", "workspace_id", "bot_user_id"), + ) + + +class ChannelCredentialRow(Base): + __tablename__ = "channel_credentials" + + connection_id: Mapped[str] = mapped_column( + String(64), + ForeignKey("channel_connections.id", ondelete="CASCADE"), + primary_key=True, + ) + encrypted_access_token: Mapped[str | None] = mapped_column(Text, nullable=True) + encrypted_refresh_token: Mapped[str | None] = mapped_column(Text, nullable=True) + token_type: Mapped[str | None] = mapped_column(String(32), nullable=True) + expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + refresh_expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + encrypted_extra_json: Mapped[str | None] = mapped_column(Text, nullable=True) + version: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now, onupdate=_utc_now) + + +class ChannelOAuthStateRow(Base): + __tablename__ = "channel_oauth_states" + + state_hash: Mapped[str] = mapped_column(String(128), primary_key=True) + owner_user_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + provider: Mapped[str] = mapped_column(String(32), nullable=False, index=True) + code_verifier_encrypted: Mapped[str | None] = mapped_column(Text, nullable=True) + nonce_hash: Mapped[str | None] = mapped_column(String(128), nullable=True) + redirect_after: Mapped[str | None] = mapped_column(Text, nullable=True) + requested_scopes_json: Mapped[list] = mapped_column(JSON, default=list) + metadata_json: Mapped[dict] = mapped_column(JSON, default=dict) + expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + consumed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now) + + +class ChannelConversationRow(Base): + __tablename__ = "channel_conversations" + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + connection_id: Mapped[str] = mapped_column( + String(64), + ForeignKey("channel_connections.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + owner_user_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + provider: Mapped[str] = mapped_column(String(32), nullable=False, index=True) + external_conversation_id: Mapped[str] = mapped_column(String(128), nullable=False) + external_topic_id: Mapped[str] = mapped_column(String(128), nullable=False, default="") + thread_id: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now, onupdate=_utc_now) + + __table_args__ = ( + UniqueConstraint( + "connection_id", + "external_conversation_id", + "external_topic_id", + name="uq_channel_conversation_connection_external", + ), + ) + + +class ChannelWebhookDeliveryRow(Base): + __tablename__ = "channel_webhook_deliveries" + + provider: Mapped[str] = mapped_column(String(32), primary_key=True) + delivery_id: Mapped[str] = mapped_column(String(128), primary_key=True) + payload_sha256: Mapped[str] = mapped_column(String(64), nullable=False) + event_type: Mapped[str | None] = mapped_column(String(64), nullable=True) + processed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, default=_utc_now) diff --git a/backend/packages/harness/deerflow/persistence/channel_connections/sql.py b/backend/packages/harness/deerflow/persistence/channel_connections/sql.py new file mode 100644 index 000000000..60f12f6aa --- /dev/null +++ b/backend/packages/harness/deerflow/persistence/channel_connections/sql.py @@ -0,0 +1,363 @@ +"""SQL repository for user-owned IM channel connections.""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import uuid +from datetime import UTC, datetime +from typing import Any + +from cryptography.fernet import Fernet +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from deerflow.persistence.channel_connections.model import ( + ChannelConnectionRow, + ChannelConversationRow, + ChannelCredentialRow, + ChannelOAuthStateRow, + ChannelWebhookDeliveryRow, +) +from deerflow.utils.time import coerce_iso + + +class ChannelCredentialCipher: + """Encrypts provider credentials before they are persisted.""" + + def __init__(self, fernet: Fernet) -> None: + self._fernet = fernet + + @classmethod + def from_key(cls, key: str) -> ChannelCredentialCipher: + digest = hashlib.sha256(key.encode("utf-8")).digest() + return cls(Fernet(base64.urlsafe_b64encode(digest))) + + def encrypt_text(self, value: str | None) -> str | None: + if value is None: + return None + return "fernet:v1:" + self._fernet.encrypt(value.encode("utf-8")).decode("ascii") + + def decrypt_text(self, value: str | None) -> str | None: + if value is None: + return None + token = value.removeprefix("fernet:v1:") + return self._fernet.decrypt(token.encode("ascii")).decode("utf-8") + + +class ChannelConnectionRepository: + """Persistence facade for channel connections, credentials, and conversations.""" + + def __init__( + self, + session_factory: async_sessionmaker[AsyncSession], + *, + cipher: ChannelCredentialCipher, + ) -> None: + self.session_factory = session_factory + self._cipher = cipher + + async def close(self) -> None: + from deerflow.persistence.engine import close_engine + + await close_engine() + + @staticmethod + def _new_id() -> str: + return uuid.uuid4().hex + + @staticmethod + def _normalize_optional_identity(value: str | None) -> str: + return value or "" + + @staticmethod + def _coerce_datetime(value: datetime | None) -> datetime | None: + if value is None or value.tzinfo is not None: + return value + return value.replace(tzinfo=UTC) + + @staticmethod + def _connection_to_dict(row: ChannelConnectionRow) -> dict[str, Any]: + data = row.to_dict() + data["external_account_id"] = data["external_account_id"] or None + data["workspace_id"] = data["workspace_id"] or None + data["scopes"] = data.pop("scopes_json") or [] + data["capabilities"] = data.pop("capabilities_json") or {} + data["metadata"] = data.pop("metadata_json") or {} + for key in ("created_at", "updated_at", "last_seen_at", "last_error_at"): + value = data.get(key) + if isinstance(value, datetime): + data[key] = coerce_iso(value) + return data + + async def upsert_connection( + self, + *, + owner_user_id: str, + provider: str, + external_account_id: str | None = None, + external_account_name: str | None = None, + workspace_id: str | None = None, + workspace_name: str | None = None, + bot_user_id: str | None = None, + scopes: list[str] | None = None, + capabilities: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + status: str = "connected", + ) -> dict[str, Any]: + external_account_id_value = self._normalize_optional_identity(external_account_id) + workspace_id_value = self._normalize_optional_identity(workspace_id) + async with self.session_factory() as session: + stmt = select(ChannelConnectionRow).where( + ChannelConnectionRow.owner_user_id == owner_user_id, + ChannelConnectionRow.provider == provider, + ChannelConnectionRow.external_account_id == external_account_id_value, + ChannelConnectionRow.workspace_id == workspace_id_value, + ) + row = (await session.execute(stmt)).scalar_one_or_none() + if row is None: + row = ChannelConnectionRow( + id=self._new_id(), + owner_user_id=owner_user_id, + provider=provider, + external_account_id=external_account_id_value, + workspace_id=workspace_id_value, + ) + session.add(row) + + row.status = status + row.external_account_name = external_account_name + row.workspace_name = workspace_name + row.bot_user_id = bot_user_id + row.scopes_json = list(scopes or []) + row.capabilities_json = dict(capabilities or {}) + row.metadata_json = dict(metadata or {}) + await session.commit() + await session.refresh(row) + return self._connection_to_dict(row) + + async def list_connections(self, owner_user_id: str) -> list[dict[str, Any]]: + async with self.session_factory() as session: + result = await session.execute(select(ChannelConnectionRow).where(ChannelConnectionRow.owner_user_id == owner_user_id).order_by(ChannelConnectionRow.updated_at.desc(), ChannelConnectionRow.id.desc())) + return [self._connection_to_dict(row) for row in result.scalars()] + + async def disconnect_connection(self, *, connection_id: str, owner_user_id: str) -> bool: + async with self.session_factory() as session: + row = await session.get(ChannelConnectionRow, connection_id) + if row is None or row.owner_user_id != owner_user_id: + return False + + row.status = "revoked" + credential = await session.get(ChannelCredentialRow, connection_id) + if credential is not None: + await session.delete(credential) + await session.commit() + return True + + async def store_credentials( + self, + connection_id: str, + *, + access_token: str | None, + refresh_token: str | None = None, + token_type: str | None = None, + expires_at: datetime | None = None, + refresh_expires_at: datetime | None = None, + extra: dict[str, Any] | None = None, + ) -> None: + async with self.session_factory() as session: + row = await session.get(ChannelCredentialRow, connection_id) + if row is None: + row = ChannelCredentialRow(connection_id=connection_id) + session.add(row) + row.encrypted_access_token = self._cipher.encrypt_text(access_token) + row.encrypted_refresh_token = self._cipher.encrypt_text(refresh_token) + row.token_type = token_type + row.expires_at = expires_at + row.refresh_expires_at = refresh_expires_at + row.encrypted_extra_json = self._cipher.encrypt_text(json.dumps(extra or {}, ensure_ascii=False)) + row.version = (row.version or 0) + 1 + await session.commit() + + async def get_credentials(self, connection_id: str) -> dict[str, Any] | None: + async with self.session_factory() as session: + row = await session.get(ChannelCredentialRow, connection_id) + if row is None: + return None + extra_raw = self._cipher.decrypt_text(row.encrypted_extra_json) + return { + "connection_id": row.connection_id, + "access_token": self._cipher.decrypt_text(row.encrypted_access_token), + "refresh_token": self._cipher.decrypt_text(row.encrypted_refresh_token), + "token_type": row.token_type, + "expires_at": self._coerce_datetime(row.expires_at), + "refresh_expires_at": self._coerce_datetime(row.refresh_expires_at), + "extra": json.loads(extra_raw) if extra_raw else {}, + } + + @staticmethod + def hash_state(state: str) -> str: + return hashlib.sha256(state.encode("utf-8")).hexdigest() + + async def create_oauth_state( + self, + *, + owner_user_id: str, + provider: str, + state: str, + expires_at: datetime, + code_verifier: str | None = None, + nonce_hash: str | None = None, + redirect_after: str | None = None, + requested_scopes: list[str] | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + row = ChannelOAuthStateRow( + state_hash=self.hash_state(state), + owner_user_id=owner_user_id, + provider=provider, + code_verifier_encrypted=self._cipher.encrypt_text(code_verifier), + nonce_hash=nonce_hash, + redirect_after=redirect_after, + requested_scopes_json=list(requested_scopes or []), + metadata_json=dict(metadata or {}), + expires_at=expires_at, + ) + async with self.session_factory() as session: + session.add(row) + await session.commit() + + async def count_oauth_states(self, *, owner_user_id: str, provider: str) -> int: + async with self.session_factory() as session: + result = await session.execute( + select(ChannelOAuthStateRow).where( + ChannelOAuthStateRow.owner_user_id == owner_user_id, + ChannelOAuthStateRow.provider == provider, + ) + ) + return len(list(result.scalars())) + + async def consume_oauth_state( + self, + *, + provider: str, + state: str, + now: datetime | None = None, + ) -> dict[str, Any] | None: + current_time = now or datetime.now(UTC) + async with self.session_factory() as session: + row = await session.get(ChannelOAuthStateRow, self.hash_state(state)) + if row is None or row.provider != provider or row.consumed_at is not None: + return None + expires_at = self._coerce_datetime(row.expires_at) + if expires_at is not None and expires_at < current_time: + return None + + row.consumed_at = current_time + await session.commit() + return { + "owner_user_id": row.owner_user_id, + "provider": row.provider, + "requested_scopes": row.requested_scopes_json or [], + "metadata": row.metadata_json or {}, + "redirect_after": row.redirect_after, + } + + async def find_connection_by_external_identity( + self, + *, + provider: str, + external_account_id: str, + workspace_id: str | None = None, + ) -> dict[str, Any] | None: + async with self.session_factory() as session: + result = await session.execute( + select(ChannelConnectionRow) + .where( + ChannelConnectionRow.provider == provider, + ChannelConnectionRow.external_account_id == self._normalize_optional_identity(external_account_id), + ChannelConnectionRow.workspace_id == self._normalize_optional_identity(workspace_id), + ChannelConnectionRow.status == "connected", + ) + .order_by(ChannelConnectionRow.updated_at.desc(), ChannelConnectionRow.id.desc()) + .limit(1) + ) + row = result.scalar_one_or_none() + return self._connection_to_dict(row) if row is not None else None + + async def set_thread_id( + self, + *, + connection_id: str, + owner_user_id: str, + provider: str, + external_conversation_id: str, + thread_id: str, + external_topic_id: str | None = None, + ) -> None: + topic_id = external_topic_id or "" + async with self.session_factory() as session: + stmt = select(ChannelConversationRow).where( + ChannelConversationRow.connection_id == connection_id, + ChannelConversationRow.external_conversation_id == external_conversation_id, + ChannelConversationRow.external_topic_id == topic_id, + ) + row = (await session.execute(stmt)).scalar_one_or_none() + if row is None: + row = ChannelConversationRow( + id=self._new_id(), + connection_id=connection_id, + owner_user_id=owner_user_id, + provider=provider, + external_conversation_id=external_conversation_id, + external_topic_id=topic_id, + thread_id=thread_id, + ) + session.add(row) + else: + row.thread_id = thread_id + row.owner_user_id = owner_user_id + row.provider = provider + await session.commit() + + async def get_thread_id( + self, + connection_id: str, + external_conversation_id: str, + external_topic_id: str | None = None, + ) -> str | None: + async with self.session_factory() as session: + stmt = select(ChannelConversationRow.thread_id).where( + ChannelConversationRow.connection_id == connection_id, + ChannelConversationRow.external_conversation_id == external_conversation_id, + ChannelConversationRow.external_topic_id == (external_topic_id or ""), + ) + return (await session.execute(stmt)).scalar_one_or_none() + + async def record_webhook_delivery( + self, + *, + provider: str, + delivery_id: str, + payload_sha256: str, + event_type: str | None = None, + ) -> bool: + async with self.session_factory() as session: + existing = await session.get( + ChannelWebhookDeliveryRow, + {"provider": provider, "delivery_id": delivery_id}, + ) + if existing is not None: + return False + + session.add( + ChannelWebhookDeliveryRow( + provider=provider, + delivery_id=delivery_id, + payload_sha256=payload_sha256, + event_type=event_type, + ) + ) + await session.commit() + return True diff --git a/backend/packages/harness/deerflow/persistence/models/__init__.py b/backend/packages/harness/deerflow/persistence/models/__init__.py index ab29a3536..b78292e08 100644 --- a/backend/packages/harness/deerflow/persistence/models/__init__.py +++ b/backend/packages/harness/deerflow/persistence/models/__init__.py @@ -14,10 +14,28 @@ its storage implementation lives in ``deerflow.runtime.events.store.db`` and there is no matching entity directory. """ +from deerflow.persistence.channel_connections.model import ( + ChannelConnectionRow, + ChannelConversationRow, + ChannelCredentialRow, + ChannelOAuthStateRow, + ChannelWebhookDeliveryRow, +) from deerflow.persistence.feedback.model import FeedbackRow from deerflow.persistence.models.run_event import RunEventRow from deerflow.persistence.run.model import RunRow from deerflow.persistence.thread_meta.model import ThreadMetaRow from deerflow.persistence.user.model import UserRow -__all__ = ["FeedbackRow", "RunEventRow", "RunRow", "ThreadMetaRow", "UserRow"] +__all__ = [ + "ChannelConnectionRow", + "ChannelConversationRow", + "ChannelCredentialRow", + "ChannelOAuthStateRow", + "ChannelWebhookDeliveryRow", + "FeedbackRow", + "RunEventRow", + "RunRow", + "ThreadMetaRow", + "UserRow", +] diff --git a/backend/packages/harness/pyproject.toml b/backend/packages/harness/pyproject.toml index 47cd1afad..1d6d002d8 100644 --- a/backend/packages/harness/pyproject.toml +++ b/backend/packages/harness/pyproject.toml @@ -36,6 +36,7 @@ dependencies = [ "sqlalchemy[asyncio]>=2.0,<3.0", "aiosqlite>=0.19", "alembic>=1.13", + "cryptography>=43.0.0", ] [project.optional-dependencies] diff --git a/backend/tests/test_auth_middleware.py b/backend/tests/test_auth_middleware.py index 726786ac9..1fd4a507f 100644 --- a/backend/tests/test_auth_middleware.py +++ b/backend/tests/test_auth_middleware.py @@ -21,6 +21,10 @@ from app.gateway.auth_middleware import AuthMiddleware, _is_public "/api/v1/auth/register", "/api/v1/auth/logout", "/api/v1/auth/setup-status", + "/api/channels/slack/callback", + "/api/channels/discord/callback", + "/api/channels/webhooks/slack/events", + "/api/channels/webhooks/telegram", ], ) def test_public_paths(path: str): diff --git a/backend/tests/test_channel_connections_config.py b/backend/tests/test_channel_connections_config.py new file mode 100644 index 000000000..f2bffd1e1 --- /dev/null +++ b/backend/tests/test_channel_connections_config.py @@ -0,0 +1,54 @@ +"""Tests for user-facing IM channel connection configuration.""" + +import pytest +from pydantic import ValidationError + +from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + +def test_channel_connections_disabled_by_default(): + config = ChannelConnectionsConfig() + + assert config.enabled is False + assert config.public_base_url == "" + assert config.encryption_key == "" + assert config.slack.enabled is False + assert config.telegram.enabled is False + assert config.discord.enabled is False + + +def test_enabled_channel_connections_require_public_url_and_encryption_key(): + with pytest.raises(ValidationError) as excinfo: + ChannelConnectionsConfig(enabled=True) + + message = str(excinfo.value) + assert "public_base_url is required" in message + assert "encryption_key is required" in message + + +def test_provider_config_completeness_is_reported_without_crashing(): + config = ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "test-secret", + "slack": { + "enabled": True, + "client_id": "slack-client", + "client_secret": "slack-secret", + "signing_secret": "slack-signing", + }, + "telegram": { + "enabled": True, + "bot_token": "telegram-token", + "bot_username": "deerflow_bot", + "webhook_secret": "telegram-webhook", + }, + "discord": {"enabled": True, "client_id": "discord-client"}, + } + ) + + assert config.provider_status("slack") == {"enabled": True, "configured": True} + assert config.provider_status("telegram") == {"enabled": True, "configured": True} + assert config.provider_status("discord") == {"enabled": True, "configured": False} + assert config.provider_status("unknown") == {"enabled": False, "configured": False} diff --git a/backend/tests/test_channel_connections_repository.py b/backend/tests/test_channel_connections_repository.py new file mode 100644 index 000000000..1edfafbfa --- /dev/null +++ b/backend/tests/test_channel_connections_repository.py @@ -0,0 +1,225 @@ +"""Tests for per-user IM channel connection persistence.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest +from sqlalchemy import select + +from deerflow.persistence.channel_connections import ( + ChannelConnectionRepository, + ChannelConnectionRow, + ChannelCredentialCipher, + ChannelCredentialRow, + ChannelWebhookDeliveryRow, +) + + +@pytest.fixture +async def repo(tmp_path): + from deerflow.persistence.engine import close_engine, get_session_factory, init_engine + + url = f"sqlite+aiosqlite:///{tmp_path / 'channels.db'}" + await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path)) + try: + yield ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("test-encryption-key"), + ) + finally: + await close_engine() + + +class TestChannelConnectionRepository: + @pytest.mark.anyio + async def test_connections_are_listed_per_owner(self, repo): + alice = await repo.upsert_connection( + owner_user_id="alice", + provider="slack", + external_account_id="U-alice", + external_account_name="Alice", + workspace_id="T1", + workspace_name="Team One", + scopes=["chat:write"], + ) + await repo.upsert_connection( + owner_user_id="bob", + provider="slack", + external_account_id="U-bob", + external_account_name="Bob", + workspace_id="T1", + workspace_name="Team One", + scopes=["chat:write"], + ) + + results = await repo.list_connections("alice") + + assert [item["id"] for item in results] == [alice["id"]] + assert results[0]["owner_user_id"] == "alice" + assert results[0]["provider"] == "slack" + assert results[0]["scopes"] == ["chat:write"] + assert "encrypted_access_token" not in results[0] + + @pytest.mark.anyio + async def test_upsert_connection_updates_existing_provider_identity(self, repo): + first = await repo.upsert_connection( + owner_user_id="alice", + provider="telegram", + external_account_id="42", + external_account_name="Alice", + workspace_id=None, + workspace_name=None, + status="pending", + ) + second = await repo.upsert_connection( + owner_user_id="alice", + provider="telegram", + external_account_id="42", + external_account_name="Alice Telegram", + workspace_id=None, + workspace_name=None, + status="connected", + ) + + assert second["id"] == first["id"] + assert second["status"] == "connected" + assert second["external_account_name"] == "Alice Telegram" + assert len(await repo.list_connections("alice")) == 1 + + @pytest.mark.anyio + async def test_credentials_are_encrypted_at_rest_and_decrypted_by_repository(self, repo): + connection = await repo.upsert_connection( + owner_user_id="alice", + provider="slack", + external_account_id="U-alice", + workspace_id="T1", + ) + expires_at = datetime.now(UTC) + timedelta(hours=1) + + await repo.store_credentials( + connection["id"], + access_token="xoxb-secret-access-token", + refresh_token="secret-refresh-token", + token_type="Bearer", + expires_at=expires_at, + extra={"bot_user_id": "B123"}, + ) + + async with repo.session_factory() as session: + row = (await session.execute(select(ChannelCredentialRow))).scalar_one() + assert row.encrypted_access_token is not None + assert "xoxb-secret-access-token" not in row.encrypted_access_token + assert "secret-refresh-token" not in (row.encrypted_refresh_token or "") + assert "B123" not in (row.encrypted_extra_json or "") + + credentials = await repo.get_credentials(connection["id"]) + + assert credentials is not None + assert credentials["access_token"] == "xoxb-secret-access-token" + assert credentials["refresh_token"] == "secret-refresh-token" + assert credentials["token_type"] == "Bearer" + assert credentials["expires_at"] == expires_at + assert credentials["extra"] == {"bot_user_id": "B123"} + + @pytest.mark.anyio + async def test_conversations_are_scoped_by_connection(self, repo): + alice = await repo.upsert_connection( + owner_user_id="alice", + provider="slack", + external_account_id="U-alice", + workspace_id="T1", + ) + bob = await repo.upsert_connection( + owner_user_id="bob", + provider="slack", + external_account_id="U-bob", + workspace_id="T1", + ) + + await repo.set_thread_id( + connection_id=alice["id"], + owner_user_id="alice", + provider="slack", + external_conversation_id="C-shared", + external_topic_id="1710000000.000100", + thread_id="thread-alice", + ) + await repo.set_thread_id( + connection_id=bob["id"], + owner_user_id="bob", + provider="slack", + external_conversation_id="C-shared", + external_topic_id="1710000000.000100", + thread_id="thread-bob", + ) + + assert await repo.get_thread_id(alice["id"], "C-shared", "1710000000.000100") == "thread-alice" + assert await repo.get_thread_id(bob["id"], "C-shared", "1710000000.000100") == "thread-bob" + + @pytest.mark.anyio + async def test_disconnect_connection_revokes_owner_connection_and_removes_credentials(self, repo): + connection = await repo.upsert_connection( + owner_user_id="alice", + provider="telegram", + external_account_id="42", + ) + await repo.store_credentials(connection["id"], access_token="secret-token") + + disconnected = await repo.disconnect_connection( + connection_id=connection["id"], + owner_user_id="alice", + ) + + assert disconnected is True + async with repo.session_factory() as session: + connection_row = await session.get(ChannelConnectionRow, connection["id"]) + credential_row = await session.get(ChannelCredentialRow, connection["id"]) + assert connection_row is not None + assert connection_row.status == "revoked" + assert credential_row is None + assert ( + await repo.find_connection_by_external_identity( + provider="telegram", + external_account_id="42", + ) + is None + ) + + @pytest.mark.anyio + async def test_disconnect_connection_is_owner_scoped(self, repo): + connection = await repo.upsert_connection( + owner_user_id="alice", + provider="telegram", + external_account_id="42", + ) + + disconnected = await repo.disconnect_connection( + connection_id=connection["id"], + owner_user_id="bob", + ) + + assert disconnected is False + assert (await repo.list_connections("alice"))[0]["status"] == "connected" + + @pytest.mark.anyio + async def test_record_webhook_delivery_returns_false_for_duplicate_delivery_id(self, repo): + first = await repo.record_webhook_delivery( + provider="slack", + delivery_id="Ev123", + payload_sha256="abc", + event_type="app_mention", + ) + second = await repo.record_webhook_delivery( + provider="slack", + delivery_id="Ev123", + payload_sha256="abc", + event_type="app_mention", + ) + + assert first is True + assert second is False + async with repo.session_factory() as session: + rows = (await session.execute(select(ChannelWebhookDeliveryRow))).scalars().all() + assert len(rows) == 1 + assert rows[0].event_type == "app_mention" diff --git a/backend/tests/test_channel_connections_router.py b/backend/tests/test_channel_connections_router.py new file mode 100644 index 000000000..a8c73d28e --- /dev/null +++ b/backend/tests/test_channel_connections_router.py @@ -0,0 +1,456 @@ +"""Router tests for browser-connectable IM channels.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from urllib.parse import parse_qs, urlparse +from uuid import UUID + +from _router_auth_helpers import make_authed_test_app +from fastapi.testclient import TestClient + +from app.channels.providers.discord_connect import DiscordIdentity +from app.channels.providers.slack_connect import SlackInstall +from app.gateway.auth.models import User +from app.gateway.routers import channel_connections +from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + +def _user() -> User: + return User( + id=UUID("11111111-2222-3333-4444-555555555555"), + email="alice@example.com", + password_hash="x", + system_role="user", + ) + + +async def _make_repo(tmp_path): + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / 'router.db'}", sqlite_dir=str(tmp_path)) + return ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("router-secret"), + ) + + +def _make_app(config: ChannelConnectionsConfig, repo): + app = make_authed_test_app(user_factory=_user) + app.state.channel_connections_config = config + app.state.channel_connection_repo = repo + app.include_router(channel_connections.router) + return app + + +def test_get_providers_returns_catalog_and_current_status(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + config = ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "telegram": { + "enabled": True, + "bot_token": "telegram-token", + "bot_username": "deerflow_bot", + "webhook_secret": "telegram-secret", + }, + "slack": {"enabled": True, "client_id": "slack-client"}, + } + ) + app = _make_app(config, repo) + + with TestClient(app) as client: + response = client.get("/api/channels/providers") + + assert response.status_code == 200 + body = response.json() + assert body["enabled"] is True + telegram = next(item for item in body["providers"] if item["provider"] == "telegram") + slack = next(item for item in body["providers"] if item["provider"] == "slack") + assert telegram["enabled"] is True + assert telegram["configured"] is True + assert telegram["connection_status"] == "not_connected" + assert slack["enabled"] is True + assert slack["configured"] is False + + anyio.run(repo.close) + + +def test_get_connections_returns_current_user_connections_only(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + + async def seed_connections(): + await repo.upsert_connection( + owner_user_id=str(_user().id), + provider="telegram", + external_account_id="42", + external_account_name="Alice", + status="connected", + ) + await repo.upsert_connection( + owner_user_id="other-user", + provider="telegram", + external_account_id="99", + external_account_name="Bob", + status="connected", + ) + + anyio.run(seed_connections) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.get("/api/channels/connections") + + assert response.status_code == 200 + body = response.json() + assert len(body["connections"]) == 1 + assert body["connections"][0]["provider"] == "telegram" + assert body["connections"][0]["external_account_id"] == "42" + + anyio.run(repo.close) + + +def test_connect_telegram_returns_deep_link_and_persists_state(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "telegram": { + "enabled": True, + "bot_token": "telegram-token", + "bot_username": "deerflow_bot", + "webhook_secret": "telegram-secret", + }, + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.post("/api/channels/telegram/connect") + + assert response.status_code == 200 + body = response.json() + assert body["provider"] == "telegram" + assert body["mode"] == "deep_link" + assert body["url"].startswith("https://t.me/deerflow_bot?start=") + + async def count_states(): + return await repo.count_oauth_states(owner_user_id=str(_user().id), provider="telegram") + + assert anyio.run(count_states) == 1 + + anyio.run(repo.close) + + +def test_connect_unconfigured_provider_returns_400(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "slack": {"enabled": True, "client_id": "slack-client"}, + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.post("/api/channels/slack/connect") + + assert response.status_code == 400 + assert response.json()["detail"] == "Channel provider is not configured" + + anyio.run(repo.close) + + +def test_connect_discord_includes_bot_install_scope_and_permissions(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "discord": { + "enabled": True, + "client_id": "discord-client", + "client_secret": "discord-secret", + "bot_token": "discord-bot", + "permissions": "274877975552", + }, + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.post("/api/channels/discord/connect") + + assert response.status_code == 200 + url = response.json()["url"] + parsed = urlparse(url) + query = parse_qs(parsed.query) + scopes = set(query["scope"][0].split()) + assert {"identify", "guilds", "bot", "applications.commands"}.issubset(scopes) + assert query["permissions"] == ["274877975552"] + + anyio.run(repo.close) + + +def test_slack_callback_exchanges_code_and_stores_connection(tmp_path, monkeypatch): + import anyio + + from app.channels.providers import slack_connect + + repo = anyio.run(_make_repo, tmp_path) + state_token = "slack-state-token" + + async def seed_state(): + await repo.create_oauth_state( + owner_user_id=str(_user().id), + provider="slack", + state=state_token, + expires_at=datetime.now(UTC) + timedelta(minutes=5), + requested_scopes=["chat:write"], + ) + + async def fake_exchange_slack_oauth_code(**kwargs): + assert kwargs["code"] == "slack-code" + assert kwargs["redirect_uri"] == "https://deerflow.example.com/api/channels/slack/callback" + return SlackInstall( + team_id="T123", + team_name="Deer Team", + authed_user_id="U123", + bot_user_id="B123", + bot_access_token="xoxb-secret", + scopes=["chat:write"], + raw={"ok": True}, + ) + + anyio.run(seed_state) + monkeypatch.setattr(slack_connect, "exchange_slack_oauth_code", fake_exchange_slack_oauth_code) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "slack": { + "enabled": True, + "client_id": "slack-client", + "client_secret": "slack-secret", + "signing_secret": "slack-signing-secret", + }, + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.get( + f"/api/channels/slack/callback?code=slack-code&state={state_token}", + follow_redirects=False, + ) + + assert response.status_code in {302, 307} + assert response.headers["location"] == "/workspace?channel_connected=slack" + + async def get_connection_and_credentials(): + connections = await repo.list_connections(str(_user().id)) + credentials = await repo.get_credentials(connections[0]["id"]) + return connections[0], credentials + + connection, credentials = anyio.run(get_connection_and_credentials) + assert connection["provider"] == "slack" + assert connection["external_account_id"] == "U123" + assert connection["workspace_id"] == "T123" + assert connection["bot_user_id"] == "B123" + assert connection["scopes"] == ["chat:write"] + assert credentials["access_token"] == "xoxb-secret" + + anyio.run(repo.close) + + +def test_discord_callback_exchanges_code_and_stores_identity(tmp_path, monkeypatch): + import anyio + + from app.channels.providers import discord_connect + + repo = anyio.run(_make_repo, tmp_path) + state_token = "discord-state-token" + + async def seed_state(): + await repo.create_oauth_state( + owner_user_id=str(_user().id), + provider="discord", + state=state_token, + expires_at=datetime.now(UTC) + timedelta(minutes=5), + requested_scopes=["identify", "guilds"], + ) + + async def fake_complete_discord_oauth(**kwargs): + assert kwargs["code"] == "discord-code" + assert kwargs["redirect_uri"] == "https://deerflow.example.com/api/channels/discord/callback" + return DiscordIdentity( + user_id="987", + display_name="Alice", + username="alice", + guilds=[{"id": "G1", "name": "Guild One"}], + access_token="discord-access-token", + refresh_token="discord-refresh-token", + token_type="Bearer", + scopes=["identify", "guilds"], + expires_at=datetime.now(UTC) + timedelta(hours=1), + raw_token={"scope": "identify guilds"}, + ) + + anyio.run(seed_state) + monkeypatch.setattr(discord_connect, "complete_discord_oauth", fake_complete_discord_oauth) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + "discord": { + "enabled": True, + "client_id": "discord-client", + "client_secret": "discord-secret", + "bot_token": "discord-bot", + }, + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.get( + f"/api/channels/discord/callback?code=discord-code&state={state_token}", + follow_redirects=False, + ) + + assert response.status_code in {302, 307} + assert response.headers["location"] == "/workspace?channel_connected=discord" + + async def get_connection_and_credentials(): + connections = await repo.list_connections(str(_user().id)) + credentials = await repo.get_credentials(connections[0]["id"]) + return connections[0], credentials + + connection, credentials = anyio.run(get_connection_and_credentials) + assert connection["provider"] == "discord" + assert connection["external_account_id"] == "987" + assert connection["external_account_name"] == "Alice" + assert connection["metadata"]["guilds"] == [{"id": "G1", "name": "Guild One"}] + assert credentials["access_token"] == "discord-access-token" + assert credentials["refresh_token"] == "discord-refresh-token" + + anyio.run(repo.close) + + +def test_disconnect_connection_revokes_current_user_connection(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + + async def seed_connection(): + connection = await repo.upsert_connection( + owner_user_id=str(_user().id), + provider="telegram", + external_account_id="42", + status="connected", + ) + await repo.store_credentials(connection["id"], access_token="secret-token") + return connection["id"] + + connection_id = anyio.run(seed_connection) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.delete(f"/api/channels/connections/{connection_id}") + + assert response.status_code == 204 + + async def get_connection_status(): + return (await repo.list_connections(str(_user().id)))[0]["status"] + + assert anyio.run(get_connection_status) == "revoked" + assert anyio.run(repo.get_credentials, connection_id) is None + + anyio.run(repo.close) + + +def test_disconnect_connection_is_current_user_scoped(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + + async def seed_connection(): + connection = await repo.upsert_connection( + owner_user_id="other-user", + provider="telegram", + external_account_id="42", + status="connected", + ) + return connection["id"] + + connection_id = anyio.run(seed_connection) + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "router-secret", + } + ), + repo, + ) + + with TestClient(app) as client: + response = client.delete(f"/api/channels/connections/{connection_id}") + + assert response.status_code == 404 + + async def get_connection_status(): + return (await repo.list_connections("other-user"))[0]["status"] + + assert anyio.run(get_connection_status) == "connected" + + anyio.run(repo.close) diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 060d414b2..a15888af0 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -468,6 +468,17 @@ def _make_mock_langgraph_client(thread_id="test-thread-123", run_result=None): return mock_client +async def _make_channel_connection_repo(tmp_path: Path): + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / 'channel-connections.db'}", sqlite_dir=str(tmp_path)) + return ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("test-channel-key"), + ) + + def _make_stream_part(event: str, data): return SimpleNamespace(event=event, data=data) @@ -1808,6 +1819,22 @@ class TestResolveRunParamsUserId: assert run_context["user_id"] == "123456" assert run_context["channel_user_id"] == "123456" + def test_connection_owner_user_id_takes_precedence_over_platform_user_id(self): + manager = self._manager() + msg = InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + text="hi", + ) + + _, _, run_context = manager._resolve_run_params(msg, "thread-1") + + assert run_context["user_id"] == "deerflow-user-1" + assert run_context["channel_user_id"] == "U-platform" + def test_unsafe_user_id_is_normalized_but_raw_preserved(self): from deerflow.config.paths import make_safe_user_id @@ -1832,6 +1859,80 @@ class TestResolveRunParamsUserId: assert "channel_user_id" not in run_context +class TestChannelManagerConnectionRouting: + def test_connection_scoped_conversations_do_not_share_threads(self, tmp_path): + from app.channels.manager import ChannelManager + from deerflow.persistence.engine import close_engine + + async def go(): + repo = await _make_channel_connection_repo(tmp_path) + alice = await repo.upsert_connection( + owner_user_id="alice", + provider="slack", + external_account_id="U-alice", + workspace_id="T1", + ) + bob = await repo.upsert_connection( + owner_user_id="bob", + provider="slack", + external_account_id="U-bob", + workspace_id="T1", + ) + + bus = MessageBus() + store = ChannelStore(path=tmp_path / "legacy-store.json") + manager = ChannelManager(bus=bus, store=store, connection_repo=repo) + mock_client = _make_mock_langgraph_client() + mock_client.threads.create = AsyncMock( + side_effect=[ + {"thread_id": "thread-alice"}, + {"thread_id": "thread-bob"}, + ] + ) + manager._client = mock_client + + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C-shared", + user_id="U-alice", + owner_user_id="alice", + connection_id=alice["id"], + text="hello", + thread_ts="1710000000.000100", + topic_id="1710000000.000100", + ) + ) + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C-shared", + user_id="U-bob", + owner_user_id="bob", + connection_id=bob["id"], + text="hello", + thread_ts="1710000000.000100", + topic_id="1710000000.000100", + ) + ) + + assert await repo.get_thread_id(alice["id"], "C-shared", "1710000000.000100") == "thread-alice" + assert await repo.get_thread_id(bob["id"], "C-shared", "1710000000.000100") == "thread-bob" + assert store.list_entries() == [] + + first_context = mock_client.runs.wait.call_args_list[0].kwargs["context"] + second_context = mock_client.runs.wait.call_args_list[1].kwargs["context"] + assert first_context["user_id"] == "alice" + assert first_context["channel_user_id"] == "U-alice" + assert second_context["user_id"] == "bob" + assert second_context["channel_user_id"] == "U-bob" + + try: + _run(go()) + finally: + _run(close_engine()) + + # --------------------------------------------------------------------------- # ChannelService tests # --------------------------------------------------------------------------- @@ -2619,6 +2720,93 @@ class TestChannelService: assert service._config == {"telegram": {"enabled": False}} + def test_from_app_config_merges_telegram_channel_connections_config(self): + from app.channels.service import ChannelService + from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + app_config = SimpleNamespace( + model_extra={}, + channel_connections=ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "secret", + "telegram": { + "enabled": True, + "bot_token": "telegram-token", + "bot_username": "deerflow_bot", + "webhook_secret": "webhook-secret", + }, + } + ), + ) + + service = ChannelService.from_app_config(app_config) + + assert service._config["telegram"]["enabled"] is True + assert service._config["telegram"]["bot_token"] == "telegram-token" + + def test_from_app_config_merges_slack_http_channel_connections_config(self): + from app.channels.service import ChannelService + from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + app_config = SimpleNamespace( + model_extra={}, + channel_connections=ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "secret", + "slack": { + "enabled": True, + "client_id": "slack-client", + "client_secret": "slack-secret", + "signing_secret": "signing-secret", + "event_delivery": "http", + }, + } + ), + ) + + service = ChannelService.from_app_config(app_config) + + assert service._config["slack"]["enabled"] is True + assert service._config["slack"]["event_delivery"] == "http" + + def test_from_app_config_merges_discord_channel_connections_config(self): + from app.channels.service import ChannelService + from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + app_config = SimpleNamespace( + model_extra={}, + channel_connections=ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "secret", + "discord": { + "enabled": True, + "client_id": "discord-client", + "client_secret": "discord-secret", + "bot_token": "discord-bot-token", + }, + } + ), + ) + + service = ChannelService.from_app_config(app_config) + + assert service._config["discord"]["enabled"] is True + assert service._config["discord"]["bot_token"] == "discord-bot-token" + + def test_connection_repo_is_forwarded_to_manager(self): + from app.channels.service import ChannelService + + repo = object() + service = ChannelService(channels_config={}, connection_repo=repo) + + assert service.manager._connection_repo is repo + def test_disabled_channel_with_string_creds_emits_warning(self, caplog): """Warning is emitted when a channel has string credentials but enabled=false.""" import logging diff --git a/backend/tests/test_csrf_middleware.py b/backend/tests/test_csrf_middleware.py index 28a65c8d7..1683b801e 100644 --- a/backend/tests/test_csrf_middleware.py +++ b/backend/tests/test_csrf_middleware.py @@ -22,6 +22,10 @@ def _make_app() -> FastAPI: async def protected_mutation(): return {"ok": True} + @app.post("/api/channels/webhooks/slack/events") + async def slack_events_webhook(): + return {"ok": True} + return app @@ -233,3 +237,14 @@ def test_non_auth_mutation_rejects_mismatched_double_submit_token(): assert response.status_code == 403 assert response.json()["detail"] == "CSRF token mismatch." + + +def test_channel_webhook_post_skips_double_submit_csrf(): + client = TestClient(_make_app(), base_url="https://deerflow.example") + + response = client.post( + "/api/channels/webhooks/slack/events", + headers={"Origin": "https://slack.com"}, + ) + + assert response.status_code == 200 diff --git a/backend/tests/test_discord_channel_connections.py b/backend/tests/test_discord_channel_connections.py new file mode 100644 index 000000000..b5ddd61f5 --- /dev/null +++ b/backend/tests/test_discord_channel_connections.py @@ -0,0 +1,50 @@ +"""Discord connection routing tests.""" + +from __future__ import annotations + +import pytest + +from app.channels.discord import DiscordChannel +from app.channels.message_bus import InboundMessage, MessageBus + + +@pytest.fixture +async def repo(tmp_path): + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import close_engine, get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / 'discord.db'}", sqlite_dir=str(tmp_path)) + try: + yield ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("discord-secret"), + ) + finally: + await close_engine() + + +@pytest.mark.anyio +async def test_discord_inbound_attaches_owner_identity_from_user_level_connection(repo): + connection = await repo.upsert_connection( + owner_user_id="alice", + provider="discord", + external_account_id="987", + external_account_name="Alice", + status="connected", + ) + channel = DiscordChannel( + bus=MessageBus(), + config={"bot_token": "discord-bot", "connection_repo": repo}, + ) + inbound = InboundMessage( + channel_name="discord", + chat_id="C123", + user_id="987", + text="hello", + ) + + attached = await channel._attach_connection_identity(inbound, guild_id="G123") + + assert attached.connection_id == connection["id"] + assert attached.owner_user_id == "alice" + assert attached.workspace_id is None diff --git a/backend/tests/test_slack_channel_connections.py b/backend/tests/test_slack_channel_connections.py new file mode 100644 index 000000000..4350f14b4 --- /dev/null +++ b/backend/tests/test_slack_channel_connections.py @@ -0,0 +1,190 @@ +"""Slack OAuth Events tests for user-owned channel connections.""" + +from __future__ import annotations + +import hashlib +import hmac +import json +import time +from unittest.mock import AsyncMock, MagicMock +from uuid import UUID + +from _router_auth_helpers import make_authed_test_app +from fastapi.testclient import TestClient + +from app.channels.message_bus import MessageBus, OutboundMessage +from app.channels.providers.slack_connect import verify_slack_signature +from app.gateway.auth.models import User +from app.gateway.routers import channel_connections +from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + +def _user() -> User: + return User( + id=UUID("11111111-2222-3333-4444-555555555555"), + email="alice@example.com", + password_hash="x", + system_role="user", + ) + + +async def _make_repo(tmp_path): + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / 'slack.db'}", sqlite_dir=str(tmp_path)) + return ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("slack-secret"), + ) + + +def _make_app(config: ChannelConnectionsConfig, repo, bus): + app = make_authed_test_app(user_factory=_user) + app.state.channel_connections_config = config + app.state.channel_connection_repo = repo + app.state.channel_message_bus = bus + app.include_router(channel_connections.router) + return app + + +def _slack_signature(signing_secret: str, timestamp: str, body: bytes) -> str: + base = f"v0:{timestamp}:".encode() + body + digest = hmac.new(signing_secret.encode("utf-8"), base, hashlib.sha256).hexdigest() + return f"v0={digest}" + + +def test_verify_slack_signature_accepts_valid_signature(): + body = b'{"type":"event_callback"}' + timestamp = "1710000000" + signature = _slack_signature("secret", timestamp, body) + + assert verify_slack_signature( + signing_secret="secret", + timestamp=timestamp, + body=body, + signature=signature, + now=1710000001, + ) + + +def test_verify_slack_signature_rejects_stale_timestamp(): + body = b'{"type":"event_callback"}' + timestamp = "1710000000" + signature = _slack_signature("secret", timestamp, body) + + assert not verify_slack_signature( + signing_secret="secret", + timestamp=timestamp, + body=body, + signature=signature, + now=1710001000, + ) + + +def test_slack_events_webhook_publishes_connection_scoped_inbound(tmp_path): + import anyio + + repo = anyio.run(_make_repo, tmp_path) + + async def seed_connection(): + return await repo.upsert_connection( + owner_user_id=str(_user().id), + provider="slack", + external_account_id="U123", + workspace_id="T123", + workspace_name="Deer Team", + status="connected", + ) + + connection = anyio.run(seed_connection) + bus = AsyncMock() + app = _make_app( + ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "slack-secret", + "slack": { + "enabled": True, + "client_id": "slack-client", + "client_secret": "slack-secret", + "signing_secret": "slack-signing-secret", + }, + } + ), + repo, + bus, + ) + payload = { + "type": "event_callback", + "event_id": "Ev123", + "team_id": "T123", + "event": { + "type": "app_mention", + "user": "U123", + "channel": "C123", + "text": "hello deerflow", + "ts": "1710000000.000100", + }, + } + body = json.dumps(payload, separators=(",", ":")).encode("utf-8") + timestamp = str(int(time.time())) + headers = { + "X-Slack-Request-Timestamp": timestamp, + "X-Slack-Signature": _slack_signature("slack-signing-secret", timestamp, body), + } + + with TestClient(app) as client: + response = client.post("/api/channels/webhooks/slack/events", content=body, headers=headers) + duplicate = client.post("/api/channels/webhooks/slack/events", content=body, headers=headers) + + assert response.status_code == 200 + assert response.json() == {"ok": True, "processed": True} + assert duplicate.status_code == 200 + assert duplicate.json() == {"ok": True, "duplicate": True, "processed": False} + bus.publish_inbound.assert_awaited_once() + inbound = bus.publish_inbound.call_args.args[0] + assert inbound.connection_id == connection["id"] + assert inbound.owner_user_id == str(_user().id) + assert inbound.workspace_id == "T123" + assert inbound.chat_id == "C123" + assert inbound.user_id == "U123" + assert inbound.text == "hello deerflow" + assert inbound.topic_id == "1710000000.000100" + + anyio.run(repo.close) + + +def test_slack_send_uses_connection_bot_token_when_connection_id_is_present(): + import anyio + + from app.channels.slack import SlackChannel + + async def go(): + repo = AsyncMock() + repo.get_credentials.return_value = {"access_token": "xoxb-connection-token"} + web_client = MagicMock() + web_client_factory = MagicMock(return_value=web_client) + channel = SlackChannel( + bus=MessageBus(), + config={ + "connection_repo": repo, + "web_client_factory": web_client_factory, + }, + ) + + msg = OutboundMessage( + channel_name="slack", + chat_id="C123", + thread_id="thread-1", + text="hello", + connection_id="connection-1", + ) + await channel.send(msg) + + repo.get_credentials.assert_awaited_once_with("connection-1") + web_client_factory.assert_called_once_with(token="xoxb-connection-token") + web_client.chat_postMessage.assert_called_once() + + anyio.run(go) diff --git a/backend/tests/test_telegram_channel_connections.py b/backend/tests/test_telegram_channel_connections.py new file mode 100644 index 000000000..a31885481 --- /dev/null +++ b/backend/tests/test_telegram_channel_connections.py @@ -0,0 +1,145 @@ +"""Tests for Telegram deep-link channel connections.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from app.channels.message_bus import MessageBus +from app.channels.telegram import TelegramChannel +from app.gateway.routers import channel_connections +from deerflow.config.channel_connections_config import ChannelConnectionsConfig + + +@pytest.fixture +async def repo(tmp_path: Path): + from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher + from deerflow.persistence.engine import close_engine, get_session_factory, init_engine + + await init_engine("sqlite", url=f"sqlite+aiosqlite:///{tmp_path / 'telegram.db'}", sqlite_dir=str(tmp_path)) + try: + yield ChannelConnectionRepository( + get_session_factory(), + cipher=ChannelCredentialCipher.from_key("telegram-secret"), + ) + finally: + await close_engine() + + +def _telegram_update(*, text: str = "/start", user_id: int = 42, chat_id: int = 100, chat_type: str = "private"): + update = MagicMock() + update.effective_user.id = user_id + update.effective_user.username = "alice" + update.effective_user.full_name = "Alice Example" + update.effective_chat.id = chat_id + update.effective_chat.type = chat_type + update.message.text = text + update.message.message_id = 55 + update.message.reply_to_message = None + update.message.reply_text = AsyncMock() + return update + + +@pytest.mark.anyio +async def test_start_with_deep_link_state_binds_telegram_chat(repo): + state = "telegram-bind-state" + await repo.create_oauth_state( + owner_user_id="deerflow-user-1", + provider="telegram", + state=state, + expires_at=datetime.now(UTC) + timedelta(minutes=5), + ) + channel = TelegramChannel( + bus=MessageBus(), + config={"bot_token": "test-token", "connection_repo": repo}, + ) + update = _telegram_update(text=f"/start {state}") + context = MagicMock() + context.args = [state] + + await channel._cmd_start(update, context) + + connections = await repo.list_connections("deerflow-user-1") + assert len(connections) == 1 + assert connections[0]["provider"] == "telegram" + assert connections[0]["external_account_id"] == "42" + assert connections[0]["external_account_name"] == "Alice Example" + assert connections[0]["workspace_id"] == "100" + assert connections[0]["metadata"]["chat_type"] == "private" + update.message.reply_text.assert_awaited_once() + assert "connected" in update.message.reply_text.await_args.args[0].lower() + + +@pytest.mark.anyio +async def test_bound_telegram_message_publishes_connection_identity(repo): + connection = await repo.upsert_connection( + owner_user_id="deerflow-user-1", + provider="telegram", + external_account_id="42", + external_account_name="Alice Example", + workspace_id="100", + metadata={"chat_type": "private"}, + ) + bus = MessageBus() + channel = TelegramChannel( + bus=bus, + config={"bot_token": "test-token", "connection_repo": repo}, + ) + channel._main_loop = __import__("asyncio").get_event_loop() + channel._send_running_reply = AsyncMock() + + await channel._on_text(_telegram_update(text="hello"), None) + inbound = await bus.get_inbound() + + assert inbound.connection_id == connection["id"] + assert inbound.owner_user_id == "deerflow-user-1" + assert inbound.workspace_id == "100" + assert inbound.user_id == "42" + assert inbound.chat_id == "100" + assert inbound.text == "hello" + + +@pytest.mark.anyio +async def test_telegram_webhook_verifies_secret_and_deduplicates_updates(repo): + channel = MagicMock() + channel.process_webhook_update = AsyncMock(return_value=True) + app = FastAPI() + app.state.channel_connections_config = ChannelConnectionsConfig.model_validate( + { + "enabled": True, + "public_base_url": "https://deerflow.example.com", + "encryption_key": "telegram-secret", + "telegram": { + "enabled": True, + "bot_token": "telegram-token", + "bot_username": "deerflow_bot", + "webhook_secret": "webhook-secret", + }, + } + ) + app.state.channel_connection_repo = repo + app.state.channel_instances = {"telegram": channel} + app.include_router(channel_connections.router) + + with TestClient(app) as client: + response = client.post( + "/api/channels/webhooks/telegram", + json={"update_id": 123, "message": {"text": "hello"}}, + headers={"X-Telegram-Bot-Api-Secret-Token": "webhook-secret"}, + ) + duplicate = client.post( + "/api/channels/webhooks/telegram", + json={"update_id": 123, "message": {"text": "hello"}}, + headers={"X-Telegram-Bot-Api-Secret-Token": "webhook-secret"}, + ) + + assert response.status_code == 200 + assert response.json() == {"ok": True, "processed": True} + assert duplicate.status_code == 200 + assert duplicate.json() == {"ok": True, "duplicate": True, "processed": False} + channel.process_webhook_update.assert_awaited_once_with({"update_id": 123, "message": {"text": "hello"}}) diff --git a/backend/uv.lock b/backend/uv.lock index f4008b9a1..91627c878 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -820,6 +820,7 @@ dependencies = [ { name = "agent-sandbox" }, { name = "aiosqlite" }, { name = "alembic" }, + { name = "cryptography" }, { name = "ddgs" }, { name = "dotenv" }, { name = "duckdb" }, @@ -871,6 +872,7 @@ requires-dist = [ { name = "aiosqlite", specifier = ">=0.19" }, { name = "alembic", specifier = ">=1.13" }, { name = "asyncpg", marker = "extra == 'postgres'", specifier = ">=0.29" }, + { name = "cryptography", specifier = ">=43.0.0" }, { name = "ddgs", specifier = ">=9.10.0" }, { name = "dotenv", specifier = ">=0.9.9" }, { name = "duckdb", specifier = ">=1.4.4" }, diff --git a/config.example.yaml b/config.example.yaml index 5de11e226..7e5c3d313 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -15,7 +15,7 @@ # ============================================================================ # Bump this number when the config schema changes. # Run `make config-upgrade` to merge new fields into your local config.yaml. -config_version: 11 +config_version: 12 # ============================================================================ # Logging @@ -1101,6 +1101,53 @@ run_events: max_trace_content: 10240 track_token_usage: true +# ============================================================================ +# User-Owned IM Channel Connections +# ============================================================================ +# Lets logged-in users connect their own Telegram, Slack, and Discord accounts +# from the DeerFlow frontend. This is separate from the legacy operator-owned +# `channels` block below: +# - `channel_connections` stores per-user connection records and encrypted +# provider credentials. +# - `channels` still configures legacy operator-owned bots and local polling / +# socket-mode workers. +# +# Security notes: +# - `enabled: true` requires a public HTTPS base URL for OAuth callbacks and +# webhooks. +# - `encryption_key` is used to encrypt provider tokens at rest. Generate a +# long random value and keep it stable. V1 does not support transparent key +# rotation; changing it requires users to reconnect. +# - OAuth callbacks and provider webhooks are public routes, but they are +# protected by one-time state tokens or provider signatures/secrets. +# +# channel_connections: +# enabled: false +# public_base_url: https://deerflow.example.com +# encryption_key: $DEER_FLOW_CHANNEL_CONNECTIONS_KEY +# +# telegram: +# enabled: false +# bot_token: $TELEGRAM_BOT_TOKEN +# bot_username: $TELEGRAM_BOT_USERNAME +# webhook_secret: $TELEGRAM_WEBHOOK_SECRET +# +# slack: +# enabled: false +# client_id: $SLACK_CLIENT_ID +# client_secret: $SLACK_CLIENT_SECRET +# signing_secret: $SLACK_SIGNING_SECRET +# scopes: ["app_mentions:read", "chat:write", "channels:history", "channels:read"] +# event_delivery: http +# +# discord: +# enabled: false +# client_id: $DISCORD_CLIENT_ID +# client_secret: $DISCORD_CLIENT_SECRET +# bot_token: $DISCORD_BOT_TOKEN +# permissions: "274877975552" +# require_message_content_intent: true + # ============================================================================ # IM Channels Configuration # ============================================================================ diff --git a/frontend/src/components/workspace/channels/channel-provider-icon.tsx b/frontend/src/components/workspace/channels/channel-provider-icon.tsx new file mode 100644 index 000000000..4193231f6 --- /dev/null +++ b/frontend/src/components/workspace/channels/channel-provider-icon.tsx @@ -0,0 +1,102 @@ +"use client"; + +import { MessageCircleIcon } from "lucide-react"; +import type { SVGProps } from "react"; + +import { cn } from "@/lib/utils"; + +type ChannelProviderIconProps = SVGProps & { + provider: string; +}; + +export function ChannelProviderIcon({ + provider, + className, + ...props +}: ChannelProviderIconProps) { + const normalizedProvider = provider.toLowerCase(); + + if (normalizedProvider === "telegram") { + return ( + + ); + } + + if (normalizedProvider === "slack") { + return ( + + ); + } + + if (normalizedProvider === "discord") { + return ( + + ); + } + + return ( +