Files
deer-flow/backend/app/channels/base.py
T

183 lines
6.7 KiB
Python

"""Abstract base class for IM channels."""
from __future__ import annotations
import asyncio
import logging
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from concurrent.futures import CancelledError as FutureCancelledError
from typing import Any, TypeVar
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
T = TypeVar("T")
class Channel(ABC):
"""Base class for all IM channel implementations.
Each channel connects to an external messaging platform and:
1. Receives messages, wraps them as InboundMessage, publishes to the bus.
2. Subscribes to outbound messages and sends replies back to the platform.
Subclasses must implement ``start``, ``stop``, and ``send``.
"""
def __init__(self, name: str, bus: MessageBus, config: dict[str, Any]) -> None:
self.name = name
self.bus = bus
self.config = config
self._running = False
@property
def is_running(self) -> bool:
return self._running
@property
def supports_streaming(self) -> bool:
return False
# -- lifecycle ---------------------------------------------------------
@abstractmethod
async def start(self) -> None:
"""Start listening for messages from the external platform."""
@abstractmethod
async def stop(self) -> None:
"""Gracefully stop the channel."""
# -- outbound ----------------------------------------------------------
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""Send a message back to the external platform.
The implementation should use ``msg.chat_id`` and ``msg.thread_ts``
to route the reply to the correct conversation/thread.
"""
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
"""Upload a single file attachment to the platform.
Returns True if the upload succeeded, False otherwise.
Default implementation returns False (no file upload support).
"""
return False
# -- helpers -----------------------------------------------------------
async def _send_with_retry(
self,
operation: Callable[[], Awaitable[T]],
*,
max_retries: int,
log_prefix: str | None = None,
operation_name: str = "send",
) -> T:
"""Run an outbound send operation with the shared channel retry policy."""
prefix = log_prefix or f"[{self.name}]"
last_exc: Exception | None = None
for attempt in range(max_retries):
try:
return await operation()
except Exception as exc:
last_exc = exc
if attempt < max_retries - 1:
delay = 2**attempt
logger.warning(
"%s %s failed (attempt %d/%d), retrying in %ds: %s",
prefix,
operation_name,
attempt + 1,
max_retries,
delay,
exc,
)
await asyncio.sleep(delay)
logger.error("%s %s failed after %d attempts: %s", prefix, operation_name, max_retries, last_exc)
if last_exc is None:
raise RuntimeError(f"{self.name} {operation_name} failed without an exception from any attempt")
raise last_exc
def _log_future_error(self, fut: Any, name: str, msg_id: Any) -> None:
"""Callback for concurrent futures scheduled from channel worker threads."""
try:
exc = fut.exception()
except (asyncio.CancelledError, FutureCancelledError, asyncio.InvalidStateError):
return
except Exception:
logger.exception("[%s] failed to inspect future for %s (msg_id=%s)", self.name, name, msg_id)
return
if exc:
logger.error("[%s] %s failed for msg_id=%s: %s", self.name, name, msg_id, exc)
def _make_inbound(
self,
chat_id: str,
user_id: str,
text: str,
*,
msg_type: InboundMessageType = InboundMessageType.CHAT,
thread_ts: str | None = None,
files: list[dict[str, Any]] | None = None,
metadata: dict[str, Any] | None = None,
) -> InboundMessage:
"""Convenience factory for creating InboundMessage instances."""
return InboundMessage(
channel_name=self.name,
chat_id=chat_id,
user_id=user_id,
text=text,
msg_type=msg_type,
thread_ts=thread_ts,
files=files or [],
metadata=metadata or {},
)
async def _on_outbound(self, msg: OutboundMessage) -> None:
"""Outbound callback registered with the bus.
Only forwards messages targeted at this channel.
Sends the text message first, then uploads any file attachments.
File uploads are skipped entirely when the text send fails to avoid
partial deliveries (files without accompanying text).
"""
if msg.channel_name == self.name:
try:
await self.send(msg)
except Exception:
logger.exception("Failed to send outbound message on channel %s", self.name)
return # Do not attempt file uploads when the text message failed
for attachment in msg.attachments:
try:
success = await self.send_file(msg, attachment)
if not success:
logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename)
except Exception:
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)
async def receive_file(self, msg: InboundMessage, thread_id: str) -> InboundMessage:
"""
Optionally process and materialize inbound file attachments for this channel.
By default, this method does nothing and simply returns the original message.
Subclasses (e.g. FeishuChannel) may override this to download files (images, documents, etc)
referenced in msg.files, save them to the sandbox, and update msg.text to include
the sandbox file paths for downstream model consumption.
Args:
msg: The inbound message, possibly containing file metadata in msg.files.
thread_id: The resolved DeerFlow thread ID for sandbox path context.
Returns:
The (possibly modified) InboundMessage, with text and/or files updated as needed.
"""
return msg