diff --git a/backend/app/channels/base.py b/backend/app/channels/base.py index baf542c48..7b3d25e16 100644 --- a/backend/app/channels/base.py +++ b/backend/app/channels/base.py @@ -2,14 +2,19 @@ from __future__ import annotations +import asyncio import logging from abc import ABC, abstractmethod -from typing import Any +from collections.abc import Awaitable, Callable +from concurrent.futures import CancelledError as FutureCancelledError +from typing import Any, TypeVar from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) +T = TypeVar("T") + class Channel(ABC): """Base class for all IM channel implementations. @@ -65,6 +70,53 @@ class Channel(ABC): # -- helpers ----------------------------------------------------------- + async def _send_with_retry( + self, + operation: Callable[[], Awaitable[T]], + *, + max_retries: int, + log_prefix: str | None = None, + operation_name: str = "send", + ) -> T: + """Run an outbound send operation with the shared channel retry policy.""" + prefix = log_prefix or f"[{self.name}]" + last_exc: Exception | None = None + for attempt in range(max_retries): + try: + return await operation() + except Exception as exc: + last_exc = exc + if attempt < max_retries - 1: + delay = 2**attempt + logger.warning( + "%s %s failed (attempt %d/%d), retrying in %ds: %s", + prefix, + operation_name, + attempt + 1, + max_retries, + delay, + exc, + ) + await asyncio.sleep(delay) + + logger.error("%s %s failed after %d attempts: %s", prefix, operation_name, max_retries, last_exc) + if last_exc is None: + raise RuntimeError(f"{self.name} {operation_name} failed without an exception from any attempt") + raise last_exc + + def _log_future_error(self, fut: Any, name: str, msg_id: Any) -> None: + """Callback for concurrent futures scheduled from channel worker threads.""" + try: + exc = fut.exception() + except (asyncio.CancelledError, FutureCancelledError, asyncio.InvalidStateError): + return + except Exception: + logger.exception("[%s] failed to inspect future for %s (msg_id=%s)", self.name, name, msg_id) + return + + if exc: + logger.error("[%s] %s failed for msg_id=%s: %s", self.name, name, msg_id, exc) + def _make_inbound( self, chat_id: str, diff --git a/backend/app/channels/dingtalk.py b/backend/app/channels/dingtalk.py index 85bbc30e5..8a8ddd943 100644 --- a/backend/app/channels/dingtalk.py +++ b/backend/app/channels/dingtalk.py @@ -247,32 +247,19 @@ class DingTalkChannel(Channel): self._card_repliers.pop(out_track_id, None) return - # Non-card mode: send sampleMarkdown with retry - last_exc: Exception | None = None - for attempt in range(_max_retries): - try: - if conversation_type == _CONVERSATION_TYPE_GROUP: - await self._send_group_message(robot_code, conversation_id, msg.text, at_user_ids=[sender_staff_id] if sender_staff_id else None) - else: - await self._send_p2p_message(robot_code, sender_staff_id, msg.text) - return - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - delay = 2**attempt - logger.warning( - "[DingTalk] send failed (attempt %d/%d), retrying in %ds: %s", - attempt + 1, - _max_retries, - delay, - exc, - ) - await asyncio.sleep(delay) + async def send_markdown() -> None: + if conversation_type == _CONVERSATION_TYPE_GROUP: + await self._send_group_message(robot_code, conversation_id, msg.text, at_user_ids=[sender_staff_id] if sender_staff_id else None) + else: + await self._send_p2p_message(robot_code, sender_staff_id, msg.text) - logger.error("[DingTalk] send failed after %d attempts: %s", _max_retries, last_exc) - if last_exc is None: - raise RuntimeError("DingTalk send failed without an exception from any attempt") - raise last_exc + # Non-card mode: send sampleMarkdown with retry + await self._send_with_retry( + send_markdown, + max_retries=_max_retries, + log_prefix="[DingTalk]", + ) + return async def _send_markdown_fallback( self, @@ -802,15 +789,6 @@ class DingTalkChannel(Channel): logger.exception("[DingTalk] failed to upload media: %s", file_path) return None - @staticmethod - def _log_future_error(fut: Any, name: str, msg_id: str) -> None: - try: - exc = fut.exception() - if exc: - logger.error("[DingTalk] %s failed for msg_id=%s: %s", name, msg_id, exc) - except (asyncio.CancelledError, asyncio.InvalidStateError): - pass - class _DingTalkMessageHandler: """Callback handler registered with dingtalk-stream.""" diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index fa1c4a5d3..b6b34424d 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -241,28 +241,11 @@ class FeishuChannel(Channel): len(msg.text), ) - last_exc: Exception | None = None - for attempt in range(_max_retries): - try: - await self._send_card_message(msg) - return # success - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - delay = 2**attempt # 1s, 2s - logger.warning( - "[Feishu] send failed (attempt %d/%d), retrying in %ds: %s", - attempt + 1, - _max_retries, - delay, - exc, - ) - await asyncio.sleep(delay) - - logger.error("[Feishu] send failed after %d attempts: %s", _max_retries, last_exc) - if last_exc is None: - raise RuntimeError("Feishu send failed without an exception from any attempt") - raise last_exc + await self._send_with_retry( + lambda: self._send_card_message(msg), + max_retries=_max_retries, + log_prefix="[Feishu]", + ) async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: if not self._api_client: @@ -725,16 +708,6 @@ class FeishuChannel(Channel): return root_id or msg_id, False - @staticmethod - def _log_future_error(fut, name: str, msg_id: str) -> None: - """Callback for run_coroutine_threadsafe futures to surface errors.""" - try: - exc = fut.exception() - if exc: - logger.error("[Feishu] %s failed for msg_id=%s: %s", name, msg_id, exc) - except Exception: - pass - @staticmethod def _log_task_error(task: asyncio.Task, name: str, msg_id: str) -> None: """Callback for background asyncio tasks to surface errors.""" diff --git a/backend/app/channels/slack.py b/backend/app/channels/slack.py index cfe03c50c..96141f3ca 100644 --- a/backend/app/channels/slack.py +++ b/backend/app/channels/slack.py @@ -141,49 +141,38 @@ class SlackChannel(Channel): if msg.thread_ts: kwargs["thread_ts"] = msg.thread_ts - last_exc: Exception | None = None - for attempt in range(_max_retries): - try: - await asyncio.to_thread(web_client.chat_postMessage, **kwargs) - # Add a completion reaction to the thread root - if msg.thread_ts: - await asyncio.to_thread( - self._add_reaction_with_client, - web_client, - msg.chat_id, - msg.thread_ts, - "white_check_mark", - ) - return - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - delay = 2**attempt # 1s, 2s - logger.warning( - "[Slack] send failed (attempt %d/%d), retrying in %ds: %s", - attempt + 1, - _max_retries, - delay, - exc, - ) - await asyncio.sleep(delay) - - logger.error("[Slack] send failed after %d attempts: %s", _max_retries, last_exc) - # Add failure reaction on error - if msg.thread_ts: - try: + async def post_message() -> None: + await asyncio.to_thread(web_client.chat_postMessage, **kwargs) + # Add a completion reaction to the thread root + if msg.thread_ts: await asyncio.to_thread( self._add_reaction_with_client, web_client, msg.chat_id, msg.thread_ts, - "x", + "white_check_mark", ) - except Exception: - pass - if last_exc is None: - raise RuntimeError("Slack send failed without an exception from any attempt") - raise last_exc + + try: + await self._send_with_retry( + post_message, + max_retries=_max_retries, + log_prefix="[Slack]", + ) + except Exception: + # Add failure reaction on error + if msg.thread_ts: + try: + await asyncio.to_thread( + self._add_reaction_with_client, + web_client, + msg.chat_id, + msg.thread_ts, + "x", + ) + except Exception: + pass + raise async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: web_client = await self._get_web_client_for_message(msg) diff --git a/backend/app/channels/telegram.py b/backend/app/channels/telegram.py index 34e63051d..0f92f0461 100644 --- a/backend/app/channels/telegram.py +++ b/backend/app/channels/telegram.py @@ -239,29 +239,17 @@ class TelegramChannel(Channel): kwargs["reply_to_message_id"] = reply_to bot = self._application.bot - last_exc: Exception | None = None - for attempt in range(_max_retries): - try: - sent = await bot.send_message(**kwargs) - self._last_bot_message[chat_key] = sent.message_id - return sent.message_id - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - delay = 2**attempt # 1s, 2s - logger.warning( - "[Telegram] send failed (attempt %d/%d), retrying in %ds: %s", - attempt + 1, - _max_retries, - delay, - exc, - ) - await asyncio.sleep(delay) - logger.error("[Telegram] send failed after %d attempts: %s", _max_retries, last_exc) - if last_exc is None: - raise RuntimeError("Telegram send failed without an exception from any attempt") - raise last_exc + async def send_message() -> int: + sent = await bot.send_message(**kwargs) + self._last_bot_message[chat_key] = sent.message_id + return sent.message_id + + return await self._send_with_retry( + send_message, + max_retries=_max_retries, + log_prefix="[Telegram]", + ) async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: if not self._application: @@ -368,16 +356,6 @@ class TelegramChannel(Channel): except Exception: logger.exception("[Telegram] failed to send running reply in chat=%s", chat_id) - # -- internal ---------------------------------------------------------- - @staticmethod - def _log_future_error(fut, name: str, msg_id: str): - try: - exc = fut.exception() - if exc: - logger.error("[Telegram] %s failed for msg_id=%s: %s", name, msg_id, exc) - except Exception: - logger.exception("[Telegram] Failed to inspect future for %s (msg_id=%s)", name, msg_id) - def _run_polling(self) -> None: """Run telegram polling in a dedicated thread.""" self._tg_loop = asyncio.new_event_loop() diff --git a/backend/app/channels/wechat.py b/backend/app/channels/wechat.py index a605a8d2f..f2db2c380 100644 --- a/backend/app/channels/wechat.py +++ b/backend/app/channels/wechat.py @@ -342,27 +342,15 @@ class WechatChannel(Channel): "base_info": self._base_info(), } - last_exc: Exception | None = None - for attempt in range(max_retries): - try: - data = await self._request_json("/ilink/bot/sendmessage", payload) - self._ensure_success(data, "sendmessage") - return - except Exception as exc: - last_exc = exc - if attempt < max_retries - 1: - delay = 2**attempt - logger.warning( - "[WeChat] send failed (attempt %d/%d), retrying in %ds: %s", - attempt + 1, - max_retries, - delay, - exc, - ) - await asyncio.sleep(delay) + async def send_message() -> None: + data = await self._request_json("/ilink/bot/sendmessage", payload) + self._ensure_success(data, "sendmessage") - logger.error("[WeChat] send failed after %d attempts: %s", max_retries, last_exc) - raise last_exc # type: ignore[misc] + await self._send_with_retry( + send_message, + max_retries=max_retries, + log_prefix="[WeChat]", + ) async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: if attachment.is_image: diff --git a/backend/app/channels/wecom.py b/backend/app/channels/wecom.py index 1a2757ada..19997ed54 100644 --- a/backend/app/channels/wecom.py +++ b/backend/app/channels/wecom.py @@ -389,30 +389,20 @@ class WeComChannel(Channel): if not stream_id: return - last_exc: Exception | None = None - for attempt in range(_max_retries): - try: - await self._ws_client.reply_stream(frame, stream_id, msg.text, bool(msg.is_final)) - return - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - await asyncio.sleep(2**attempt) - if last_exc: - raise last_exc + await self._send_with_retry( + lambda: self._ws_client.reply_stream(frame, stream_id, msg.text, bool(msg.is_final)), + max_retries=_max_retries, + log_prefix="[WeCom]", + operation_name="stream send", + ) + return body = {"msgtype": "markdown", "markdown": {"content": msg.text}} - last_exc = None - for attempt in range(_max_retries): - try: - await self._ws_client.send_message(msg.chat_id, body) - return - except Exception as exc: - last_exc = exc - if attempt < _max_retries - 1: - await asyncio.sleep(2**attempt) - if last_exc: - raise last_exc + await self._send_with_retry( + lambda: self._ws_client.send_message(msg.chat_id, body), + max_retries=_max_retries, + log_prefix="[WeCom]", + ) async def _upload_media_ws( self, diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 9dba24af8..cdfd0d8db 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -4,7 +4,9 @@ from __future__ import annotations import asyncio import json +import logging import tempfile +from concurrent.futures import Future from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch @@ -333,6 +335,71 @@ class TestChannelBase: _run(go()) + def test_send_with_retry_retries_until_success(self, monkeypatch): + bus = MessageBus() + ch = DummyChannel(bus) + attempts = 0 + sleep = AsyncMock() + monkeypatch.setattr("app.channels.base.asyncio.sleep", sleep) + + async def flaky_send(): + nonlocal attempts + attempts += 1 + if attempts < 3: + raise RuntimeError(f"failure {attempts}") + return "sent" + + result = _run(ch._send_with_retry(flaky_send, max_retries=3, log_prefix="[Dummy]")) + + assert result == "sent" + assert attempts == 3 + assert [call.args[0] for call in sleep.await_args_list] == [1, 2] + + def test_log_future_error_handles_cancelled_future(self, caplog): + bus = MessageBus() + ch = DummyChannel(bus) + fut = Future() + fut.cancel() + + with caplog.at_level(logging.ERROR): + ch._log_future_error(fut, "prepare_inbound", "m1") + + assert "prepare_inbound" not in caplog.text + + def test_log_future_error_surfaces_future_exception(self, caplog): + bus = MessageBus() + ch = DummyChannel(bus) + fut = Future() + fut.set_exception(RuntimeError("boom")) + + with caplog.at_level(logging.ERROR): + ch._log_future_error(fut, "prepare_inbound", "m1") + + assert "prepare_inbound failed for msg_id=m1: boom" in caplog.text + + def test_channel_capabilities_match_channel_defaults(self): + from app.channels.dingtalk import DingTalkChannel + from app.channels.discord import DiscordChannel + from app.channels.feishu import FeishuChannel + from app.channels.manager import CHANNEL_CAPABILITIES + from app.channels.slack import SlackChannel + from app.channels.telegram import TelegramChannel + from app.channels.wechat import WechatChannel + from app.channels.wecom import WeComChannel + + bus = MessageBus() + defaults = { + "dingtalk": DingTalkChannel(bus=bus, config={}).supports_streaming, + "discord": DiscordChannel(bus=bus, config={}).supports_streaming, + "feishu": FeishuChannel(bus=bus, config={}).supports_streaming, + "slack": SlackChannel(bus=bus, config={}).supports_streaming, + "telegram": TelegramChannel(bus=bus, config={}).supports_streaming, + "wechat": WechatChannel(bus=bus, config={}).supports_streaming, + "wecom": WeComChannel(bus=bus, config={}).supports_streaming, + } + + assert {name: caps["supports_streaming"] for name, caps in CHANNEL_CAPABILITIES.items()} == defaults + # --------------------------------------------------------------------------- # _extract_response_text tests