feat(telegram): stream agent replies by editing the placeholder message in place (#3534)

* docs(spec): telegram streaming output design

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* docs(plan): telegram streaming implementation plan

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(telegram): report streaming support for telegram channel

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test(channels): use slack as the non-streaming sample channel in manager tests

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(telegram): register running-reply placeholder as stream target

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test(telegram): pin last_edit_at sentinel in placeholder registration test

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* refactor(telegram): extract _send_new_message from send()

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(telegram): edit streamed message in place for non-final updates

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(telegram): finalize streamed message with overflow splitting

When is_final=True arrives and stream state exists, pop the state, edit
the streamed placeholder with the final text, split overflow into follow-up
send_message calls, update _last_bot_message, and clear stream state.
Falls back to _send_new_message when no stream state is registered.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test(telegram): exercise the not-modified handler in final edit path

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* docs: telegram channel now streams replies via message editing

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(telegram): harden final-delivery path with guarded retry and chunk retries

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(channels): accept runtime 'messages' SSE event for streaming text accumulation

The embedded runtime (matching LangGraph Platform semantics) emits SSE
event name 'messages' for the requested 'messages-tuple' stream mode,
so the manager never accumulated token deltas and streaming channels
only updated from end-of-step 'values' snapshots — on Telegram this
looked like 'Working on it...' followed by the full answer in one block.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* feat(telegram): widen stream-edit throttle to 3s in group chats

Telegram caps bots at 20 messages/minute per group, stricter than the
1 msg/s per-chat guideline. Groups have negative chat ids, so pick the
interval by sign.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* fix(telegram): address review findings — thread fallback messages, bound stream registry, share stream-event constants

- Fallback/new stream messages now carry reply_to_message_id parsed from
  thread_ts so they stay nested under the user's message (finding 1)
- STREAM_MODES / MESSAGE_STREAM_EVENTS constants link the requested
  stream modes to the SSE event names they arrive under (finding 2)
- _register_stream_message bounds the in-flight registry at 256 entries,
  evicting oldest, guarding against leaks when a final never arrives (finding 4)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
DanielWalnut
2026-06-13 08:38:28 +08:00
committed by GitHub
parent 3475f7cdad
commit 839fa99237
6 changed files with 1557 additions and 23 deletions
+513 -7
View File
@@ -873,7 +873,7 @@ class TestChannelManager:
bus=bus,
store=store,
channel_sessions={
"telegram": {
"slack": {
"assistant_id": "mobile_agent",
"config": {"recursion_limit": 55},
"context": {
@@ -896,7 +896,7 @@ class TestChannelManager:
await manager.start()
inbound = InboundMessage(channel_name="telegram", chat_id="chat1", user_id="user1", text="hi")
inbound = InboundMessage(channel_name="slack", chat_id="chat1", user_id="user1", text="hi")
await bus.publish_inbound(inbound)
await _wait_for(lambda: len(outbound_received) >= 1)
await manager.stop()
@@ -1047,7 +1047,7 @@ class TestChannelManager:
store=store,
default_session={"context": {"is_plan_mode": True}},
channel_sessions={
"telegram": {
"slack": {
"assistant_id": "mobile_agent",
"config": {"recursion_limit": 55},
"context": {
@@ -1080,7 +1080,7 @@ class TestChannelManager:
await manager.start()
inbound = InboundMessage(channel_name="telegram", chat_id="chat1", user_id="vip-user", text="hi")
inbound = InboundMessage(channel_name="slack", chat_id="chat1", user_id="vip-user", text="hi")
await bus.publish_inbound(inbound)
await _wait_for(lambda: len(outbound_received) >= 1)
await manager.stop()
@@ -1202,6 +1202,76 @@ class TestChannelManager:
_run(go())
def test_handle_streaming_chat_accepts_runtime_messages_event(self, monkeypatch):
"""The embedded runtime emits SSE event name "messages" (LangGraph
Platform semantics) for the requested "messages-tuple" stream mode —
the manager must accumulate text from those events too."""
from app.channels.manager import ChannelManager
monkeypatch.setattr("app.channels.manager.STREAM_UPDATE_MIN_INTERVAL_SECONDS", 0.0)
async def go():
bus = MessageBus()
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
manager = ChannelManager(bus=bus, store=store)
outbound_received = []
async def capture_outbound(msg):
outbound_received.append(msg)
bus.subscribe_outbound(capture_outbound)
stream_events = [
_make_stream_part(
"messages",
[
{"id": "ai-1", "content": "Hello", "type": "AIMessageChunk"},
{"langgraph_node": "agent"},
],
),
_make_stream_part(
"messages",
[
{"id": "ai-1", "content": " world", "type": "AIMessageChunk"},
{"langgraph_node": "agent"},
],
),
_make_stream_part(
"values",
{
"messages": [
{"type": "human", "content": "hi"},
{"type": "ai", "content": "Hello world"},
],
"artifacts": [],
},
),
]
mock_client = _make_mock_langgraph_client()
mock_client.runs.stream = MagicMock(return_value=_make_async_iterator(stream_events))
manager._client = mock_client
await manager.start()
inbound = InboundMessage(
channel_name="telegram",
chat_id="chat1",
user_id="user1",
text="hi",
thread_ts="42",
)
await bus.publish_inbound(inbound)
await _wait_for(lambda: len(outbound_received) >= 3)
await manager.stop()
mock_client.runs.stream.assert_called_once()
assert [msg.text for msg in outbound_received] == ["Hello", "Hello world", "Hello world"]
assert [msg.is_final for msg in outbound_received] == [False, False, True]
_run(go())
def test_handle_feishu_streaming_marks_only_final_clarification_outbound(self, monkeypatch):
from app.channels.manager import ChannelManager
@@ -2044,7 +2114,7 @@ class TestChannelManager:
_run(go())
def test_none_topic_reuses_thread(self):
"""Messages with topic_id=None should reuse the same thread (e.g. Telegram private chat)."""
"""Messages with topic_id=None should reuse the same thread (e.g. a private/direct chat)."""
from app.channels.manager import ChannelManager
async def go():
@@ -2063,10 +2133,10 @@ class TestChannelManager:
bus.subscribe_outbound(capture)
await manager.start()
# Send two messages with topic_id=None (simulates Telegram private chat)
# Send two messages with topic_id=None (simulates a private/direct chat)
for text in ["hello", "what did I just say?"]:
msg = InboundMessage(
channel_name="telegram",
channel_name="slack",
chat_id="chat1",
user_id="user1",
text=text,
@@ -4766,3 +4836,439 @@ class TestSlackMarkdownConversion:
result = _slack_md_converter.convert("# Title")
assert "*Title*" in result
assert "#" not in result
# ---------------------------------------------------------------------------
# Telegram streaming tests
# ---------------------------------------------------------------------------
class TestTelegramStreaming:
@staticmethod
def _make_channel_with_bot():
from app.channels.telegram import TelegramChannel
bus = MessageBus()
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
mock_app = MagicMock()
bot = SimpleNamespace()
bot.sent = []
bot.edited = []
bot.next_message_id = 100
async def send_message(**kwargs):
bot.sent.append(kwargs)
result = MagicMock()
result.message_id = bot.next_message_id
bot.next_message_id += 1
return result
async def edit_message_text(**kwargs):
bot.edited.append(kwargs)
result = MagicMock()
result.message_id = kwargs["message_id"]
return result
bot.send_message = send_message
bot.edit_message_text = edit_message_text
mock_app.bot = bot
ch._application = mock_app
return ch, bot
def test_stream_updates_edit_placeholder_in_place(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
placeholder_id = ch._stream_messages["12345:42"]["message_id"]
update1 = OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="Hello", is_final=False, thread_ts="42")
await ch.send(update1)
clock["now"] += 2.0
update2 = OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="Hello world", is_final=False, thread_ts="42")
await ch.send(update2)
assert len(bot.sent) == 1 # only the placeholder
assert [e["message_id"] for e in bot.edited] == [placeholder_id, placeholder_id]
assert [e["text"] for e in bot.edited] == ["Hello", "Hello world"]
_run(go())
def test_stream_updates_throttled_within_interval(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="a", is_final=False, thread_ts="42"))
clock["now"] += 0.3 # within 1s window -> dropped
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="ab", is_final=False, thread_ts="42"))
clock["now"] += 1.0 # past window -> edited
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="abc", is_final=False, thread_ts="42"))
assert [e["text"] for e in bot.edited] == ["a", "abc"]
_run(go())
def test_stream_updates_in_group_chat_use_wider_throttle(self, monkeypatch):
"""Telegram groups (negative chat_id) are capped at 20 messages/minute,
so group-chat stream edits throttle at 3s instead of 1s."""
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("-100123", 42)
await ch.send(OutboundMessage(channel_name="telegram", chat_id="-100123", thread_id="t1", text="a", is_final=False, thread_ts="42"))
clock["now"] += 1.2 # past the 1s private window, within the 3s group window -> dropped
await ch.send(OutboundMessage(channel_name="telegram", chat_id="-100123", thread_id="t1", text="ab", is_final=False, thread_ts="42"))
clock["now"] += 2.0 # 3.2s since last edit -> edited
await ch.send(OutboundMessage(channel_name="telegram", chat_id="-100123", thread_id="t1", text="abc", is_final=False, thread_ts="42"))
assert [e["text"] for e in bot.edited] == ["a", "abc"]
_run(go())
def test_stream_update_without_placeholder_sends_new_message(self):
async def go():
ch, bot = self._make_channel_with_bot()
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="Hi", is_final=False, thread_ts="42"))
assert len(bot.sent) == 1
assert bot.sent[0]["text"] == "Hi"
# Threads under the user's message that started this turn
assert bot.sent[0]["reply_to_message_id"] == 42
assert ch._stream_messages["12345:42"]["message_id"] == 100
_run(go())
def test_stream_edit_fallback_message_threads_under_user_message(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
async def edit_gone(**kwargs):
raise Exception("Bad Request: message to edit not found")
bot.edit_message_text = edit_gone
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="Hi", is_final=False, thread_ts="42"))
# Fallback message threads under the user's message and becomes the new stream target
assert bot.sent[1]["text"] == "Hi"
assert bot.sent[1]["reply_to_message_id"] == 42
assert ch._stream_messages["12345:42"]["message_id"] == 101
_run(go())
def test_stream_message_registry_is_bounded(self):
from app.channels.telegram import MAX_TRACKED_STREAM_MESSAGES
async def go():
ch, _bot = self._make_channel_with_bot()
for i in range(MAX_TRACKED_STREAM_MESSAGES + 1):
ch._register_stream_message(f"chat:{i}", message_id=i, last_text="x", last_edit_at=0.0)
assert len(ch._stream_messages) == MAX_TRACKED_STREAM_MESSAGES
assert "chat:0" not in ch._stream_messages # oldest evicted
assert f"chat:{MAX_TRACKED_STREAM_MESSAGES}" in ch._stream_messages
_run(go())
def test_stream_update_truncates_long_text(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
long_text = "x" * 5000
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text=long_text, is_final=False, thread_ts="42"))
assert len(bot.edited) == 1
assert len(bot.edited[0]["text"]) == 4096
assert bot.edited[0]["text"].endswith("")
_run(go())
def test_stream_update_retry_after_is_dropped(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
async def edit_rate_limited(**kwargs):
exc = Exception("Flood control exceeded")
exc.retry_after = 5
raise exc
bot.edit_message_text = edit_rate_limited
# Must not raise, must not send a new message
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="Hi", is_final=False, thread_ts="42"))
assert len(bot.sent) == 1 # placeholder only
_run(go())
def test_telegram_reports_streaming_support(self):
from app.channels.manager import CHANNEL_CAPABILITIES
from app.channels.telegram import TelegramChannel
bus = MessageBus()
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
assert ch.supports_streaming is True
assert CHANNEL_CAPABILITIES["telegram"]["supports_streaming"] is True
def test_running_reply_registers_stream_placeholder(self):
from app.channels.telegram import TelegramChannel
async def go():
bus = MessageBus()
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
mock_app = MagicMock()
mock_bot = AsyncMock()
sent = MagicMock()
sent.message_id = 777
mock_bot.send_message = AsyncMock(return_value=sent)
mock_app.bot = mock_bot
ch._application = mock_app
await ch._send_running_reply("12345", 42)
state = ch._stream_messages["12345:42"]
assert state["message_id"] == 777
assert state["last_edit_at"] == 0.0
assert state["last_text"] == "Working on it..."
mock_bot.send_message.assert_awaited_once_with(
chat_id=12345,
text="Working on it...",
reply_to_message_id=42,
)
_run(go())
def test_final_message_edits_stream_message_and_clears_state(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
placeholder_id = ch._stream_messages["12345:42"]["message_id"]
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="partial", is_final=False, thread_ts="42"))
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="full answer", is_final=True, thread_ts="42"))
assert [e["text"] for e in bot.edited] == ["partial", "full answer"]
assert len(bot.sent) == 1 # placeholder only — final edited, not re-sent
assert "12345:42" not in ch._stream_messages
assert ch._last_bot_message["12345"] == placeholder_id
_run(go())
def test_final_message_splits_long_text(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
long_text = "a" * 4096 + "b" * 100
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text=long_text, is_final=True, thread_ts="42"))
assert len(bot.edited) == 1
assert bot.edited[0]["text"] == "a" * 4096
follow_ups = bot.sent[1:] # bot.sent[0] is the placeholder
assert [m["text"] for m in follow_ups] == ["b" * 100]
# Fake bot assigns ids sequentially: placeholder=100, follow-up chunk=101
assert ch._last_bot_message["12345"] == 101
assert "12345:42" not in ch._stream_messages
_run(go())
def test_final_message_not_modified_error_is_ignored(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="done", is_final=False, thread_ts="42"))
async def edit_not_modified(**kwargs):
raise Exception("Bad Request: message is not modified")
bot.edit_message_text = edit_not_modified
# Same text again as final — skipped via the equal-text guard:
# must not raise, must not send a new message
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="done", is_final=True, thread_ts="42"))
assert len(bot.sent) == 1 # placeholder only
assert "12345:42" not in ch._stream_messages
_run(go())
def test_final_edit_raising_not_modified_is_swallowed(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
await ch._send_running_reply("12345", 42)
placeholder_id = ch._stream_messages["12345:42"]["message_id"]
async def edit_not_modified(**kwargs):
raise Exception("Bad Request: message is not modified")
bot.edit_message_text = edit_not_modified
# Final text differs from last_text, so the edit IS attempted and
# raises not-modified — must be swallowed, no fallback send.
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="done", is_final=True, thread_ts="42"))
assert len(bot.sent) == 1 # placeholder only
assert "12345:42" not in ch._stream_messages
assert ch._last_bot_message["12345"] == placeholder_id
_run(go())
def test_final_without_stream_state_sends_plain_message(self):
async def go():
ch, bot = self._make_channel_with_bot()
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="direct", is_final=True, thread_ts=None))
assert len(bot.sent) == 1
assert bot.sent[0]["text"] == "direct"
assert len(bot.edited) == 0
_run(go())
def test_final_edit_retries_once_after_rate_limit(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
sleeps = []
async def fake_sleep(delay):
sleeps.append(delay)
monkeypatch.setattr("app.channels.telegram.asyncio.sleep", fake_sleep)
await ch._send_running_reply("12345", 42)
placeholder_id = ch._stream_messages["12345:42"]["message_id"]
real_edit = bot.edit_message_text
calls = {"n": 0}
async def edit_flaky(**kwargs):
calls["n"] += 1
if calls["n"] == 1:
exc = Exception("Flood control exceeded")
exc.retry_after = 3
raise exc
return await real_edit(**kwargs)
bot.edit_message_text = edit_flaky
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="final", is_final=True, thread_ts="42"))
assert sleeps == [3.0]
assert [e["text"] for e in bot.edited] == ["final"]
assert len(bot.sent) == 1 # placeholder only
assert ch._last_bot_message["12345"] == placeholder_id
assert "12345:42" not in ch._stream_messages
_run(go())
def test_final_edit_double_rate_limit_falls_back_to_new_message(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
sleeps = []
async def fake_sleep(delay):
sleeps.append(delay)
monkeypatch.setattr("app.channels.telegram.asyncio.sleep", fake_sleep)
await ch._send_running_reply("12345", 42)
async def edit_rate_limited(**kwargs):
exc = Exception("Flood control exceeded")
exc.retry_after = 2
raise exc
bot.edit_message_text = edit_rate_limited
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text="final", is_final=True, thread_ts="42"))
# Fallback delivered the final text as a new message (after the placeholder)
assert [m["text"] for m in bot.sent] == ["Working on it...", "final"]
assert ch._last_bot_message["12345"] == 101
assert "12345:42" not in ch._stream_messages
_run(go())
def test_final_overflow_chunk_send_is_retried(self, monkeypatch):
async def go():
ch, bot = self._make_channel_with_bot()
clock = {"now": 1000.0}
monkeypatch.setattr("app.channels.telegram._monotonic", lambda: clock["now"])
sleeps = []
async def fake_sleep(delay):
sleeps.append(delay)
monkeypatch.setattr("app.channels.telegram.asyncio.sleep", fake_sleep)
await ch._send_running_reply("12345", 42)
real_send = bot.send_message
failures = {"left": 1}
async def send_flaky(**kwargs):
if failures["left"] > 0:
failures["left"] -= 1
raise ConnectionError("transient")
return await real_send(**kwargs)
bot.send_message = send_flaky
long_text = "a" * 4096 + "b" * 10
await ch.send(OutboundMessage(channel_name="telegram", chat_id="12345", thread_id="t1", text=long_text, is_final=True, thread_ts="42"))
assert bot.edited[0]["text"] == "a" * 4096
assert [m["text"] for m in bot.sent] == ["Working on it...", "b" * 10]
assert ch._last_bot_message["12345"] == 101
_run(go())