mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-17 13:05:58 +00:00
fix(channel): unsubscribe channel listeners by equality (#3608)
This commit is contained in:
@@ -172,7 +172,7 @@ class MessageBus:
|
|||||||
|
|
||||||
def unsubscribe_outbound(self, callback: OutboundCallback) -> None:
|
def unsubscribe_outbound(self, callback: OutboundCallback) -> None:
|
||||||
"""Remove a previously registered outbound callback."""
|
"""Remove a previously registered outbound callback."""
|
||||||
self._outbound_listeners = [cb for cb in self._outbound_listeners if cb is not callback]
|
self._outbound_listeners = [cb for cb in self._outbound_listeners if cb != callback]
|
||||||
|
|
||||||
async def publish_outbound(self, msg: OutboundMessage) -> None:
|
async def publish_outbound(self, msg: OutboundMessage) -> None:
|
||||||
"""Dispatch an outbound message to all registered listeners."""
|
"""Dispatch an outbound message to all registered listeners."""
|
||||||
|
|||||||
@@ -148,6 +148,27 @@ class TestMessageBus:
|
|||||||
|
|
||||||
_run(go())
|
_run(go())
|
||||||
|
|
||||||
|
def test_unsubscribe_outbound_removes_fresh_bound_method_reference(self):
|
||||||
|
bus = MessageBus()
|
||||||
|
received = []
|
||||||
|
|
||||||
|
class Handler:
|
||||||
|
async def callback(self, msg):
|
||||||
|
received.append((self, msg))
|
||||||
|
|
||||||
|
handler = Handler()
|
||||||
|
other_handler = Handler()
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus.subscribe_outbound(handler.callback)
|
||||||
|
bus.subscribe_outbound(other_handler.callback)
|
||||||
|
bus.unsubscribe_outbound(handler.callback)
|
||||||
|
out = OutboundMessage(channel_name="test", chat_id="c1", thread_id="t1", text="reply")
|
||||||
|
await bus.publish_outbound(out)
|
||||||
|
assert received == [(other_handler, out)]
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
def test_outbound_error_does_not_crash(self):
|
def test_outbound_error_does_not_crash(self):
|
||||||
bus = MessageBus()
|
bus = MessageBus()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user