mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-18 13:46:02 +00:00
68ba4198b8
* fix(channels): make channel connect flow deterministic * make format * fix(channels): apply connect-code before allowed_users on telegram and wechat The bind-bootstrap reorder shipped for slack/dingtalk only. Telegram and WeChat still gate _check_user/allowed_users before connect-code dispatch, so a newly allowlisted-but-unbound user is silently rejected when binding via the browser deep-link / connect-code flow — the same deadlock the PR fixes. - telegram: consume the /start deep-link token before the allowed_users gate. - wechat: handle the /connect code before the allowed_users gate, and defer inbound file extraction + context-token tracking past the gate so blocked senders no longer trigger CDN downloads or token bookkeeping. Adds regression tests for both adapters mirroring the slack/dingtalk coverage. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(channels): enforce single-active-owner invariant at the DB layer _revoke_other_active_owners did a SELECT-then-UPDATE in app code with no row lock or constraint covering active rows. Under READ COMMITTED, two concurrent connect-code consumes for the same (provider, external_account_id, workspace_id) from different owners could each observe "no other active owner" and both commit a connected row, leaving find_connection_by_external_identity nondeterministic. - Add a partial unique index on (provider, external_account_id, workspace_id) WHERE status != 'revoked' (portable to SQLite >= 3.8.0 and PostgreSQL) so the database guarantees at most one non-revoked row per external identity. - Reorder upsert_connection to revoke other owners' active rows before the new connected row is flushed (so the index is satisfied at commit), wrapped in a bounded rollback-and-retry loop. A losing concurrent writer now retries against the now-visible state instead of committing a duplicate. Adds DB-constraint, revoked-slot-reuse, and concurrent-upsert regression tests. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(channels): harden connect-status polling primitive pollChannelConnectionUntilResolved was a free-floating recursive setTimeout started from onSuccess with no cancellation, no per-provider dedup, a redundant second endpoint per tick, and an unbounded loop on a non-finite expires_in. - Extract a framework-agnostic, cancellable poller (connect-poll.ts) that polls only listChannelConnections() and invalidates the providers query once when the bind resolves, instead of fetching both endpoints every tick. - Guard expires_in with a finite check + default window so undefined/NaN can no longer produce a poll loop that runs until the page closes. - Track one active poll handle per provider in useConnectChannelProvider via a ref Map: a new connect cancels the prior poll for that provider, and a useEffect cleanup cancels all polls on unmount. Adds unit tests for resolve-and-stop, cancellation, and non-finite-expiry. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * fix(channels): stop leaking blocked-sender content in DingTalk INFO log; document bind semantics Moving the allowed_users gate past _extract_text meant the parsed-message INFO log (text=%r, first 100 chars) fired for senders that allowed_users would have rejected, defeating the filter's noise/privacy role. Move that log to after the allowed_users gate so blocked senders' message text never reaches INFO logs. Also document the two operator-relevant semantic changes in backend/CLAUDE.md: connect-code dispatch runs before allowed_users (so allowed_users is no longer a bind-time defense; the model relies on code confidentiality + 600s TTL + one-time consumption), and the single-active-owner-per-external-identity transfer semantics now backed by the partial unique index. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * docs(channels): note connect-code-vs-allowlist and ownership transfer in operator guide Mirror the backend/CLAUDE.md notes in the operator-facing IM_CHANNEL_CONNECTIONS.md: connect codes are consumed before allowed_users (so a not-yet-allowlisted user can still complete a first bind, and allowed_users is not a bind-time defense), and an external identity has at most one active owner with last-bind-wins transfer enforced at the DB layer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * refactor(channels): lift connect-code dispatch into Channel base class Each adapter duplicated the ordering-sensitive boilerplate of extracting a /connect code and guarding on the connection repo before its allowed_users gate. The duplication is what let telegram/wechat drift and keep the gate ahead of the bind. Centralize it: - Move `_connection_repo` onto Channel.__init__ (removing 7 duplicate assignments). - Add Channel._pending_connect_code(text), which guards on the repo and extracts the code, documenting that adapters MUST consult it before authorization so a browser-initiated bind can bootstrap a not-yet-authorized identity. - Route slack, discord, feishu, dingtalk, wechat, and wecom through the helper. This also fixes a latent inconsistency where slack dispatched a bind even when no connection repo was configured. Pure refactor — the full channel suite stays green; adds a direct unit test for the base helper's contract. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * make format * fix(channels): redact DingTalk parsed-message INFO log content Log text_len instead of the first 100 chars of message text, so message content never reaches INFO logs (the after-gate move already keeps blocked senders out entirely). This takes over the redaction from #3584 so only this PR touches dingtalk.py, letting the two PRs merge in any order conflict-free. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
198 lines
7.4 KiB
Python
198 lines
7.4 KiB
Python
"""Abstract base class for IM channels."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from abc import ABC, abstractmethod
|
|
from collections.abc import Awaitable, Callable
|
|
from concurrent.futures import CancelledError as FutureCancelledError
|
|
from typing import Any, TypeVar
|
|
|
|
from app.channels.commands import extract_connect_code
|
|
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class Channel(ABC):
|
|
"""Base class for all IM channel implementations.
|
|
|
|
Each channel connects to an external messaging platform and:
|
|
1. Receives messages, wraps them as InboundMessage, publishes to the bus.
|
|
2. Subscribes to outbound messages and sends replies back to the platform.
|
|
|
|
Subclasses must implement ``start``, ``stop``, and ``send``.
|
|
"""
|
|
|
|
def __init__(self, name: str, bus: MessageBus, config: dict[str, Any]) -> None:
|
|
self.name = name
|
|
self.bus = bus
|
|
self.config = config
|
|
self._running = False
|
|
self._connection_repo: Any = config.get("connection_repo")
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
@property
|
|
def supports_streaming(self) -> bool:
|
|
return False
|
|
|
|
# -- lifecycle ---------------------------------------------------------
|
|
|
|
@abstractmethod
|
|
async def start(self) -> None:
|
|
"""Start listening for messages from the external platform."""
|
|
|
|
@abstractmethod
|
|
async def stop(self) -> None:
|
|
"""Gracefully stop the channel."""
|
|
|
|
# -- outbound ----------------------------------------------------------
|
|
|
|
@abstractmethod
|
|
async def send(self, msg: OutboundMessage) -> None:
|
|
"""Send a message back to the external platform.
|
|
|
|
The implementation should use ``msg.chat_id`` and ``msg.thread_ts``
|
|
to route the reply to the correct conversation/thread.
|
|
"""
|
|
|
|
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
|
|
"""Upload a single file attachment to the platform.
|
|
|
|
Returns True if the upload succeeded, False otherwise.
|
|
Default implementation returns False (no file upload support).
|
|
"""
|
|
return False
|
|
|
|
# -- helpers -----------------------------------------------------------
|
|
|
|
async def _send_with_retry(
|
|
self,
|
|
operation: Callable[[], Awaitable[T]],
|
|
*,
|
|
max_retries: int,
|
|
log_prefix: str | None = None,
|
|
operation_name: str = "send",
|
|
) -> T:
|
|
"""Run an outbound send operation with the shared channel retry policy."""
|
|
prefix = log_prefix or f"[{self.name}]"
|
|
last_exc: Exception | None = None
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await operation()
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
if attempt < max_retries - 1:
|
|
delay = 2**attempt
|
|
logger.warning(
|
|
"%s %s failed (attempt %d/%d), retrying in %ds: %s",
|
|
prefix,
|
|
operation_name,
|
|
attempt + 1,
|
|
max_retries,
|
|
delay,
|
|
exc,
|
|
)
|
|
await asyncio.sleep(delay)
|
|
|
|
logger.error("%s %s failed after %d attempts: %s", prefix, operation_name, max_retries, last_exc)
|
|
if last_exc is None:
|
|
raise RuntimeError(f"{self.name} {operation_name} failed without an exception from any attempt")
|
|
raise last_exc
|
|
|
|
def _log_future_error(self, fut: Any, name: str, msg_id: Any) -> None:
|
|
"""Callback for concurrent futures scheduled from channel worker threads."""
|
|
try:
|
|
exc = fut.exception()
|
|
except (asyncio.CancelledError, FutureCancelledError, asyncio.InvalidStateError):
|
|
return
|
|
except Exception:
|
|
logger.exception("[%s] failed to inspect future for %s (msg_id=%s)", self.name, name, msg_id)
|
|
return
|
|
|
|
if exc:
|
|
logger.error("[%s] %s failed for msg_id=%s: %s", self.name, name, msg_id, exc)
|
|
|
|
def _pending_connect_code(self, text: str) -> str | None:
|
|
"""Return the one-time bind code if *text* is a ``/connect <code>`` command
|
|
and channel connections are configured, else ``None``.
|
|
|
|
Adapters MUST consult this **before** applying their ``allowed_users`` /
|
|
``_check_user`` gate, so a browser-initiated bind can bootstrap an external
|
|
identity that the platform bot has never seen and is therefore not yet
|
|
authorized. (Telegram uses its deep-link ``/start <token>`` flow instead.)
|
|
"""
|
|
if self._connection_repo is None:
|
|
return None
|
|
return extract_connect_code(text)
|
|
|
|
def _make_inbound(
|
|
self,
|
|
chat_id: str,
|
|
user_id: str,
|
|
text: str,
|
|
*,
|
|
msg_type: InboundMessageType = InboundMessageType.CHAT,
|
|
thread_ts: str | None = None,
|
|
files: list[dict[str, Any]] | None = None,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> InboundMessage:
|
|
"""Convenience factory for creating InboundMessage instances."""
|
|
return InboundMessage(
|
|
channel_name=self.name,
|
|
chat_id=chat_id,
|
|
user_id=user_id,
|
|
text=text,
|
|
msg_type=msg_type,
|
|
thread_ts=thread_ts,
|
|
files=files or [],
|
|
metadata=metadata or {},
|
|
)
|
|
|
|
async def _on_outbound(self, msg: OutboundMessage) -> None:
|
|
"""Outbound callback registered with the bus.
|
|
|
|
Only forwards messages targeted at this channel.
|
|
Sends the text message first, then uploads any file attachments.
|
|
File uploads are skipped entirely when the text send fails to avoid
|
|
partial deliveries (files without accompanying text).
|
|
"""
|
|
if msg.channel_name == self.name:
|
|
try:
|
|
await self.send(msg)
|
|
except Exception:
|
|
logger.exception("Failed to send outbound message on channel %s", self.name)
|
|
return # Do not attempt file uploads when the text message failed
|
|
|
|
for attachment in msg.attachments:
|
|
try:
|
|
success = await self.send_file(msg, attachment)
|
|
if not success:
|
|
logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename)
|
|
except Exception:
|
|
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)
|
|
|
|
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
|
|
"""
|
|
Optionally process and materialize inbound file attachments for this channel.
|
|
|
|
By default, this method does nothing and simply returns the original message.
|
|
Subclasses (e.g. FeishuChannel) may override this to download files (images, documents, etc)
|
|
referenced in msg.files, save them to the sandbox, and update msg.text to include
|
|
the sandbox file paths for downstream model consumption.
|
|
|
|
Args:
|
|
msg: The inbound message, possibly containing file metadata in msg.files.
|
|
thread_id: The resolved DeerFlow thread ID for sandbox path context.
|
|
|
|
Returns:
|
|
The (possibly modified) InboundMessage, with text and/or files updated as needed.
|
|
"""
|
|
return msg
|