Files
deer-flow/backend/app/channels/slack.py
T
DanielWalnut aa015462a7 feat(im): Add user-owned IM channel connections (#3487)
* Add user-owned IM channel connections

* Fix dev startup and channel connect popup

* Use async channel connect flow

* Harden dev service daemon startup

* Support local IM channel connections

* Align IM connections with local channels

* Fix safe user id digest algorithm

* Address Copilot IM channel feedback

* Address IM channel review comments

* Support all integrated IM channel connections

* Format additional channel connection tests

* Keep unavailable channel connect buttons clickable

* Fix IM channel provider icons

* Add runtime setup for enabled IM channels

* Guard global shortcut key handling

* Keep configured IM channels editable

* Avoid password autofill for channel secrets

* Make channel threads visible to connection owners

* Persist IM runtime config locally

* Allow disconnecting runtime IM channels

* Route no-auth channel sessions to local user

* Use default user for auth-disabled local mode

* Show IM channel source on threads

* Prefill IM channel runtime config

* Reflect IM channel runtime health

* Ignore Feishu message read events

* Ignore Feishu non-content message events

* Let setup wizard enable IM channels

* Fix frontend formatting after merge

* Stabilize backend tests without local config

* Isolate channel runtime config tests

* Address channel connection review comments

* Use sha256 user buckets with legacy migration

* Ensure runtime IM channels are ready after restart

* Persist disconnected IM channel state

* Address channel connection review comments

* Address channel connection review findings

Frontend connect flow:
- Open the runtime-config dialog only when a provider still needs
  credentials; configured providers go straight to the connect flow, so
  the binding-code/deep-link path is reachable from the UI again.
- After saving credentials, continue into the connect flow when a user
  binding is still required (multi-user mode) instead of stopping at a
  "Connected" toast.
- Extract shared provider-state helpers to core/channels/provider-state
  and add unit + e2e coverage for the direct-connect and
  configure-then-connect paths.

Provider status semantics:
- Report connection_status from the user's newest connection row;
  with no binding it is not_connected, except in auth-disabled local
  mode where a configured running channel is effectively connected.

Concurrency and event-loop correctness:
- Offload ChannelRuntimeConfigStore construction and writes, channel
  service construction, and Slack connection replies to threads; add a
  tests/blocking_io/ anchor for the runtime-config handlers.
- Consume binding codes with a conditional UPDATE so a code can only be
  used once under concurrent workers; retry upsert_connection as an
  update when a concurrent insert wins the unique constraint.
- Serialize ensure_channel_ready per channel so concurrent provider
  polls cannot double-start a channel worker.

Config and migration hardening:
- Stop mutating the get_app_config()-cached Telegram provider config;
  the runtime store now owns the UI-entered bot username.
- Register channel_connections in STARTUP_ONLY_FIELDS with the
  standardized startup-only Field description.
- Match the legacy unsafe-id bucket by recomputing its exact SHA-1 name
  so another user's same-prefix bucket can never be migrated.
- Remove the unused Telegram process_webhook_update path and document
  src/core/channels in the frontend docs.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Address PR review comments on authz scoping and channel runtime

Security (review feedback from ShenAC-SAC):
- Scope internal-token callers to the connection owner carried in
  X-DeerFlow-Owner-User-Id instead of bypassing owner checks outright,
  in both require_permission(owner_check=True) and the stateless run
  endpoints. Internal callers keep access to their own and
  shared/legacy threads, and may claim a default-owned channel thread
  for its real owner, but a leaked internal token no longer grants
  cross-user thread access.
- Require admin privileges for POST/DELETE /api/channels/{provider}/
  runtime-config: runtime credentials and channel workers are
  instance-wide shared state (same model as the MCP config API).
  Read-only provider listing stays available to all users.

Performance (review feedback from willem-bd):
- Skip the redundant thread channel-metadata PATCH after the first
  successful backfill per thread.
- Reuse the per-connection Slack WebClient until its token changes
  instead of constructing one per outbound message.
- Reconcile channel readiness for all providers concurrently in
  GET /api/channels/providers.

Also resolve the code-quality unused-import flag in the blocking-io
anchor by pre-importing the channel service via importlib.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Fix prettier formatting in provider-state test

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* Reconcile UI runtime channel config with config reload on restart

Main now reloads a channel's config.yaml entry on restart_channel()
(#3514, issue #3497). Adapt the user-owned connection flow to coexist:

- configure_channel() restarts with reload_config=False — the caller
  just supplied the authoritative config (browser-entered credentials
  that are never written to config.yaml), so a file reload must not
  clobber it with the stale on-disk entry.
- _load_channel_config() re-applies the UI runtime-store overlay used
  at startup, so an operator-triggered restart keeps browser-entered
  credentials for channels without a config.yaml entry and does not
  resurrect a channel disconnected from the UI.
- Offload the reload's disk IO (config.yaml + runtime store) with
  asyncio.to_thread, matching the blocking-IO policy on this branch.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 15:24:58 +08:00

423 lines
17 KiB
Python

"""Slack channel — connects via Socket Mode (no public IP needed)."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from markdown_to_mrkdwn import SlackMarkdownConverter
from app.channels.base import Channel
from app.channels.commands import extract_connect_code, is_known_channel_command
from app.channels.connection_identity import attach_connection_identity
from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
_slack_md_converter = SlackMarkdownConverter()
def _normalize_allowed_users(allowed_users: Any) -> set[str]:
if allowed_users is None:
return set()
if isinstance(allowed_users, str):
values = [allowed_users]
elif isinstance(allowed_users, list | tuple | set):
values = allowed_users
else:
logger.warning(
"Slack allowed_users should be a list of Slack user IDs or a single Slack user ID string; treating %s as one string value",
type(allowed_users).__name__,
)
values = [allowed_users]
return {str(user_id) for user_id in values if str(user_id)}
def _strip_leading_slack_bot_mention(text: str, bot_user_id: str | None) -> str:
if not bot_user_id:
return text
if not text.startswith("<@"):
return text
end = text.find(">")
if end <= 2:
return text
mentioned_user_id = text[2:end].split("|", 1)[0].lstrip("!")
if mentioned_user_id != bot_user_id:
return text
return text[end + 1 :].lstrip()
class SlackChannel(Channel):
"""Slack IM channel using Socket Mode (WebSocket, no public IP).
Configuration keys (in ``config.yaml`` under ``channels.slack``):
- ``bot_token``: Slack Bot User OAuth Token (xoxb-...).
- ``app_token``: Slack App-Level Token (xapp-...) for Socket Mode.
- ``allowed_users``: (optional) List of allowed Slack user IDs, or a
single Slack user ID string as shorthand. Empty = allow all. Other
scalar values are treated as a single string with a warning.
"""
def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None:
super().__init__(name="slack", bus=bus, config=config)
self._socket_client = None
self._web_client = None
self._loop: asyncio.AbstractEventLoop | None = None
self._allowed_users = _normalize_allowed_users(config.get("allowed_users", []))
self._connection_repo = config.get("connection_repo")
self._web_client_factory = config.get("web_client_factory")
self._connection_web_clients: dict[str, tuple[str, Any]] = {}
configured_bot_user_id = config.get("bot_user_id")
self._bot_user_id = str(configured_bot_user_id).lstrip("@") if configured_bot_user_id else None
async def start(self) -> None:
if self._running:
return
try:
from slack_sdk import WebClient
from slack_sdk.socket_mode import SocketModeClient
from slack_sdk.socket_mode.response import SocketModeResponse
except ImportError:
logger.error("slack-sdk is not installed. Install it with: uv add slack-sdk")
return
self._SocketModeResponse = SocketModeResponse
if self._web_client_factory is None:
self._web_client_factory = WebClient
bot_token = self.config.get("bot_token", "")
app_token = self.config.get("app_token", "")
if self._connection_repo is not None and self.config.get("event_delivery") == "http":
if not bot_token:
logger.error("Slack HTTP Events mode requires bot_token")
return
await self._initialize_operator_web_client(str(bot_token))
self._loop = asyncio.get_event_loop()
self._running = True
self.bus.subscribe_outbound(self._on_outbound)
logger.info("Slack channel started in HTTP Events mode")
return
if not bot_token or not app_token:
logger.error("Slack channel requires bot_token and app_token")
return
await self._initialize_operator_web_client(str(bot_token))
self._socket_client = SocketModeClient(
app_token=app_token,
web_client=self._web_client,
)
self._loop = asyncio.get_event_loop()
self._socket_client.socket_mode_request_listeners.append(self._on_socket_event)
self._running = True
self.bus.subscribe_outbound(self._on_outbound)
# Start socket mode in background thread
asyncio.get_event_loop().run_in_executor(None, self._socket_client.connect)
logger.info("Slack channel started")
async def stop(self) -> None:
self._running = False
self.bus.unsubscribe_outbound(self._on_outbound)
if self._socket_client:
self._socket_client.close()
self._socket_client = None
logger.info("Slack channel stopped")
async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None:
web_client = await self._get_web_client_for_message(msg)
if not web_client:
return
kwargs: dict[str, Any] = {
"channel": msg.chat_id,
"text": _slack_md_converter.convert(msg.text),
}
if msg.thread_ts:
kwargs["thread_ts"] = msg.thread_ts
last_exc: Exception | None = None
for attempt in range(_max_retries):
try:
await asyncio.to_thread(web_client.chat_postMessage, **kwargs)
# Add a completion reaction to the thread root
if msg.thread_ts:
await asyncio.to_thread(
self._add_reaction_with_client,
web_client,
msg.chat_id,
msg.thread_ts,
"white_check_mark",
)
return
except Exception as exc:
last_exc = exc
if attempt < _max_retries - 1:
delay = 2**attempt # 1s, 2s
logger.warning(
"[Slack] send failed (attempt %d/%d), retrying in %ds: %s",
attempt + 1,
_max_retries,
delay,
exc,
)
await asyncio.sleep(delay)
logger.error("[Slack] send failed after %d attempts: %s", _max_retries, last_exc)
# Add failure reaction on error
if msg.thread_ts:
try:
await asyncio.to_thread(
self._add_reaction_with_client,
web_client,
msg.chat_id,
msg.thread_ts,
"x",
)
except Exception:
pass
if last_exc is None:
raise RuntimeError("Slack send failed without an exception from any attempt")
raise last_exc
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
web_client = await self._get_web_client_for_message(msg)
if not web_client:
return False
try:
kwargs: dict[str, Any] = {
"channel": msg.chat_id,
"file": str(attachment.actual_path),
"filename": attachment.filename,
"title": attachment.filename,
}
if msg.thread_ts:
kwargs["thread_ts"] = msg.thread_ts
await asyncio.to_thread(web_client.files_upload_v2, **kwargs)
logger.info("[Slack] file uploaded: %s to channel=%s", attachment.filename, msg.chat_id)
return True
except Exception:
logger.exception("[Slack] failed to upload file: %s", attachment.filename)
return False
# -- internal ----------------------------------------------------------
async def _initialize_operator_web_client(self, bot_token: str) -> None:
self._web_client = self._web_client_factory(token=bot_token)
if self._bot_user_id is not None:
return
try:
auth_info = await asyncio.to_thread(self._web_client.auth_test)
user_id = auth_info.get("user_id") if isinstance(auth_info, dict) else None
if user_id is None:
auth_get = getattr(auth_info, "get", None)
user_id = auth_get("user_id") if callable(auth_get) else None
if isinstance(user_id, str) and user_id:
self._bot_user_id = user_id
except Exception:
logger.warning("[Slack] failed to resolve bot user id; app mention text may include the bot mention", exc_info=True)
async def _get_web_client_for_message(self, msg: OutboundMessage):
if msg.connection_id and self._connection_repo is not None:
credentials = await self._connection_repo.get_credentials(msg.connection_id)
access_token = credentials.get("access_token") if credentials else None
if not access_token:
return self._web_client
# WebClient keeps its own HTTP session and rate-limit state, so
# reuse one per connection until its token changes.
cached = self._connection_web_clients.get(msg.connection_id)
if cached is not None and cached[0] == access_token:
return cached[1]
if self._web_client_factory is None:
from slack_sdk import WebClient
self._web_client_factory = WebClient
web_client = self._web_client_factory(token=access_token)
self._connection_web_clients[msg.connection_id] = (access_token, web_client)
return web_client
return self._web_client
@staticmethod
def _add_reaction_with_client(web_client, channel_id: str, timestamp: str, emoji: str) -> None:
try:
web_client.reactions_add(
channel=channel_id,
timestamp=timestamp,
name=emoji,
)
except Exception as exc:
if "already_reacted" not in str(exc):
logger.warning("[Slack] failed to add reaction %s: %s", emoji, exc)
def _add_reaction(self, channel_id: str, timestamp: str, emoji: str) -> None:
"""Add an emoji reaction to a message (best-effort, non-blocking)."""
if not self._web_client:
return
self._add_reaction_with_client(self._web_client, channel_id, timestamp, emoji)
def _send_running_reply(self, channel_id: str, thread_ts: str) -> None:
"""Send a 'Working on it......' reply in the thread (called from SDK thread)."""
if not self._web_client:
return
try:
self._web_client.chat_postMessage(
channel=channel_id,
text=":hourglass_flowing_sand: Working on it...",
thread_ts=thread_ts,
)
logger.info("[Slack] 'Working on it...' reply sent in channel=%s, thread_ts=%s", channel_id, thread_ts)
except Exception:
logger.exception("[Slack] failed to send running reply in channel=%s", channel_id)
def _on_socket_event(self, client, req) -> None:
"""Called by slack-sdk for each Socket Mode event."""
try:
# Acknowledge the event
response = self._SocketModeResponse(envelope_id=req.envelope_id)
client.send_socket_mode_response(response)
event_type = req.type
if event_type != "events_api":
return
if self._bot_user_id is None:
authorization = next((item for item in req.payload.get("authorizations", []) if isinstance(item, dict)), None)
user_id = authorization.get("user_id") if authorization else None
if isinstance(user_id, str) and user_id:
self._bot_user_id = user_id
event = req.payload.get("event", {})
etype = event.get("type", "")
# Handle message events (DM or @mention)
if etype in ("message", "app_mention"):
self._handle_message_event(
event,
team_id=req.payload.get("team_id") or req.payload.get("team") or event.get("team"),
)
except Exception:
logger.exception("Error processing Slack event")
def _handle_message_event(self, event: dict, *, team_id: str | None = None) -> None:
# Ignore bot messages
if event.get("bot_id") or event.get("subtype"):
return
user_id = event.get("user", "")
# Check allowed users
if self._allowed_users and user_id not in self._allowed_users:
logger.debug("Ignoring message from non-allowed user: %s", user_id)
return
text = event.get("text", "").strip()
if event.get("type") == "app_mention":
text = _strip_leading_slack_bot_mention(text, self._bot_user_id)
if not text:
return
connect_code = extract_connect_code(text)
if connect_code:
if self._loop and self._loop.is_running():
asyncio.run_coroutine_threadsafe(
self._bind_connection_from_connect_code(
event=event,
team_id=str(team_id or event.get("team") or ""),
code=connect_code,
),
self._loop,
)
return
channel_id = event.get("channel", "")
thread_ts = event.get("thread_ts") or event.get("ts", "")
if is_known_channel_command(text):
msg_type = InboundMessageType.COMMAND
else:
msg_type = InboundMessageType.CHAT
# topic_id: use thread_ts as the topic identifier.
# For threaded messages, thread_ts is the root message ts (shared topic).
# For non-threaded messages, thread_ts is the message's own ts (new topic).
inbound = self._make_inbound(
chat_id=channel_id,
user_id=user_id,
text=text,
msg_type=msg_type,
thread_ts=thread_ts,
)
inbound.topic_id = thread_ts
if self._loop and self._loop.is_running():
# Acknowledge with an eyes reaction
self._add_reaction(channel_id, event.get("ts", thread_ts), "eyes")
# Send "running" reply first (fire-and-forget from SDK thread)
self._send_running_reply(channel_id, thread_ts)
if self._connection_repo is None:
asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._loop)
else:
asyncio.run_coroutine_threadsafe(self._publish_inbound_with_connection(inbound, team_id=team_id), self._loop)
async def _publish_inbound_with_connection(self, inbound, *, team_id: str | None = None) -> None:
inbound = await self._attach_connection_identity(inbound, team_id=team_id)
await self.bus.publish_inbound(inbound)
async def _attach_connection_identity(self, inbound, *, team_id: str | None = None):
workspace_id = str(team_id or inbound.metadata.get("team_id") or "")
return await attach_connection_identity(
inbound,
repo=self._connection_repo,
provider="slack",
workspace_id=workspace_id,
)
async def _bind_connection_from_connect_code(self, *, event: dict, team_id: str, code: str) -> bool:
if self._connection_repo is None or not code:
return False
channel_id = str(event.get("channel") or "")
thread_ts = str(event.get("thread_ts") or event.get("ts") or "")
state = await self._connection_repo.consume_oauth_state(provider="slack", state=code)
if state is None:
await self._post_connection_reply(channel_id, "Slack connection code is invalid or expired.", thread_ts)
return True
user_id = str(event.get("user") or "")
if not user_id or not team_id:
await self._post_connection_reply(channel_id, "Slack connection could not be completed from this message.", thread_ts)
return True
await self._connection_repo.upsert_connection(
owner_user_id=state["owner_user_id"],
provider="slack",
external_account_id=user_id,
workspace_id=team_id,
metadata={
"team_id": team_id,
"channel_id": channel_id,
},
status="connected",
)
await self._post_connection_reply(channel_id, "Slack connected to DeerFlow.", thread_ts)
return True
async def _post_connection_reply(self, channel_id: str, text: str, thread_ts: str | None = None) -> None:
if not self._web_client or not channel_id:
return
kwargs: dict[str, Any] = {"channel": channel_id, "text": text}
if thread_ts:
kwargs["thread_ts"] = thread_ts
try:
await asyncio.to_thread(self._web_client.chat_postMessage, **kwargs)
except Exception:
logger.exception("[Slack] failed to send connection reply in channel=%s", channel_id)