fix(channels): require bound identity for user-owned IM messages (#3578)

* fix(channels): require bound identity for user-owned IM messages

* make format

* docs: document bound identity channel config

* refactor: reuse channel connection config

* refactor _requires_bound_identity()

* refactor from_app_config()

* make format

* fix: reject unbound channel chats before semaphore

* security enhancement

* make format

* fix: enforce bound-identity admission at command entry point

The bound-identity gate only ran for non-command messages in
_handle_message() and as a fallback inside _handle_chat(). Commands had
no equivalent boundary, so an unbound platform user could send /new and
reach _create_thread() directly, creating an unowned Gateway thread and
empty checkpoint. Info commands (/status, /models, /memory) likewise
leaked Gateway state to unbound users.

Add the same _requires_bound_identity() check at the top of
_handle_command(), rejecting via _reject_unbound_channel_message() before
any thread creation or Gateway query. The gate is a no-op in legacy
open-bot mode (require_bound_identity=False) and auth-disabled mode.
Provider-level binding flows (/connect, /start) are consumed by the
provider adapter before reaching the manager, so they are unaffected.

Tests:
- unbound auth-enabled /new is rejected before threads.create
- bound auth-enabled /new still creates the thread

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* fix(channels): carry workspace fallback decision on inbound messages

* fix(channels): recheck bound identity by normalized workspace

* fix(channels): avoid duplicate bound identity checks

* fix(channels): preserve verified routing for bound identity rejects

* fix(channels): clarify bound identity upgrade failures

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Nan Gao
2026-06-16 17:04:39 +02:00
committed by GitHub
parent 05be7ea688
commit 0966131b31
7 changed files with 631 additions and 34 deletions
+145 -29
View File
@@ -60,6 +60,8 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35
STREAM_MODES = ["messages-tuple", "values"]
MESSAGE_STREAM_EVENTS = ("messages-tuple", "messages")
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
BOUND_IDENTITY_REQUIRED_MESSAGE = "Connect this channel from DeerFlow Settings, complete the in-channel connect step, then send your message again."
BOUND_IDENTITY_UNAVAILABLE_MESSAGE = "Channel connection verification is temporarily unavailable. Please try again later or contact the DeerFlow operator."
CHANNEL_CAPABILITIES = {
"dingtalk": {"supports_streaming": False},
@@ -150,6 +152,19 @@ class _SlashSkillCommandResolution:
failure_message: str | None = None
@dataclass(frozen=True, slots=True)
class _BoundIdentityRejection:
message: str = BOUND_IDENTITY_REQUIRED_MESSAGE
# Server-side connection id that may be used only as an outbound routing
# hint for the rejection message. This is never copied from the inbound
# message; it comes from the repository re-read when available.
outbound_connection_id: str | None = None
# Server-side owner for the outbound routing connection above. It lets
# channel senders preserve per-connection context without trusting the
# rejected inbound identity assertion.
outbound_owner_user_id: str | None = None
def _is_thread_busy_error(exc: BaseException | None) -> bool:
if exc is None:
return False
@@ -740,6 +755,7 @@ class ChannelManager:
default_session: dict[str, Any] | None = None,
channel_sessions: dict[str, Any] | None = None,
connection_repo: Any | None = None,
require_bound_identity: bool = False,
) -> None:
self.bus = bus
self.store = store
@@ -750,6 +766,7 @@ class ChannelManager:
self._default_session = _as_dict(default_session)
self._channel_sessions = dict(channel_sessions or {})
self._connection_repo = connection_repo
self._require_bound_identity = require_bound_identity
self._client = None # lazy init — langgraph_sdk async client
self._channel_metadata_synced: set[str] = set()
self._skill_storage: SkillStorage | None = None
@@ -923,38 +940,111 @@ class ChannelManager:
async def _handle_message(self, msg: InboundMessage) -> None:
msg = _apply_effective_owner(msg)
async with self._semaphore:
try:
try:
# Non-command chat can be rejected before it consumes a semaphore
# slot. Commands are handled below because provider adapters consume
# binding commands before manager dispatch, and _handle_command()
# applies its own admission gate for manager-level commands.
bound_identity_rejection = None
if msg.msg_type != InboundMessageType.COMMAND:
bound_identity_rejection = await self._get_bound_identity_rejection(msg)
if bound_identity_rejection is not None:
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
return
async with self._semaphore:
if msg.msg_type == InboundMessageType.COMMAND:
await self._handle_command(msg)
else:
await self._handle_chat(msg)
except InvalidChannelSessionConfigError as exc:
logger.warning(
"Invalid channel session config for %s (chat=%s): %s",
msg.channel_name,
msg.chat_id,
exc,
)
await self._send_error(msg, str(exc))
except SlashSkillCommandResolutionError as exc:
logger.warning(
"Slash skill command resolution failed for %s (chat=%s): %s",
msg.channel_name,
msg.chat_id,
exc,
)
await self._send_error(msg, str(exc))
except Exception:
logger.exception(
"Error handling message from %s (chat=%s)",
msg.channel_name,
msg.chat_id,
)
await self._send_error(msg, "An internal error occurred. Please try again.")
await self._handle_chat(msg, bound_identity_checked=True)
except InvalidChannelSessionConfigError as exc:
logger.warning(
"Invalid channel session config for %s (chat=%s): %s",
msg.channel_name,
msg.chat_id,
exc,
)
await self._send_error(msg, str(exc))
except SlashSkillCommandResolutionError as exc:
logger.warning(
"Slash skill command resolution failed for %s (chat=%s): %s",
msg.channel_name,
msg.chat_id,
exc,
)
await self._send_error(msg, str(exc))
except Exception:
logger.exception(
"Error handling message from %s (chat=%s)",
msg.channel_name,
msg.chat_id,
)
await self._send_error(msg, "An internal error occurred. Please try again.")
# -- chat handling -----------------------------------------------------
async def _get_bound_identity_rejection(self, msg: InboundMessage) -> _BoundIdentityRejection | None:
"""Return None when *msg* may proceed; otherwise return rejection routing hints.
The returned object means the message lacks a verified bound identity.
Its fields are intentionally limited to server-side values re-read from
the connection repository, so rejection outbounds never trust a rejected
inbound message's asserted connection metadata.
"""
if not self._require_bound_identity:
return None
if _auth_disabled_owner_user_id():
return None
has_connection = bool(msg.connection_id)
has_owner = bool(msg.owner_user_id)
if not (has_connection and has_owner):
return _BoundIdentityRejection()
if self._connection_repo is None:
return _BoundIdentityRejection(message=BOUND_IDENTITY_UNAVAILABLE_MESSAGE)
# The manager is the run-creation security boundary, so it does not
# trust mutable InboundMessage identity fields by themselves. Re-read
# the binding by provider identity before creating DeerFlow threads or
# runs. If the asserted identity does not match, keep only the
# server-side connection fields as outbound routing hints.
connection = await self._connection_repo.find_connection_by_external_identity(
provider=msg.channel_name,
external_account_id=msg.user_id,
workspace_id=msg.workspace_id or None,
)
if connection is None:
return _BoundIdentityRejection()
connection_id = connection.get("id")
owner_user_id = connection.get("owner_user_id")
if connection_id == msg.connection_id and owner_user_id == msg.owner_user_id:
return None
return _BoundIdentityRejection(outbound_connection_id=connection_id, outbound_owner_user_id=owner_user_id)
async def _reject_unbound_channel_message(
self,
msg: InboundMessage,
*,
bound_identity_rejection: _BoundIdentityRejection,
) -> None:
logger.info(
"[Manager] rejecting unbound channel message: channel=%s, chat_id=%s",
msg.channel_name,
msg.chat_id,
)
outbound = OutboundMessage(
channel_name=msg.channel_name,
chat_id=msg.chat_id,
thread_id="",
text=bound_identity_rejection.message,
thread_ts=msg.thread_ts,
connection_id=bound_identity_rejection.outbound_connection_id,
owner_user_id=bound_identity_rejection.outbound_owner_user_id,
metadata=_slim_metadata(msg.metadata),
)
await self.bus.publish_outbound(outbound)
async def _lookup_thread_id(self, msg: InboundMessage) -> str | None:
if msg.connection_id and self._connection_repo is not None:
return await self._connection_repo.get_thread_id(
@@ -1016,7 +1106,21 @@ class ChannelManager:
self._channel_metadata_synced.clear()
self._channel_metadata_synced.add(thread_id)
async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any] | None = None) -> None:
async def _handle_chat(
self,
msg: InboundMessage,
extra_context: dict[str, Any] | None = None,
*,
bound_identity_checked: bool = False,
) -> None:
# Normal entry paths already run the bound-identity check in
# _handle_message() or _handle_command(). Keep this default False so
# direct callers and future internal paths still fail closed.
bound_identity_rejection = None if bound_identity_checked else await self._get_bound_identity_rejection(msg)
if bound_identity_rejection is not None:
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
return
client = self._get_client()
# Look up existing DeerFlow thread.
@@ -1242,6 +1346,18 @@ class ChannelManager:
# -- command handling --------------------------------------------------
async def _handle_command(self, msg: InboundMessage) -> None:
# Commands are the other run-creation entry point besides chat: /new
# calls _create_thread() directly, and /bootstrap routes into
# _handle_chat(). Apply the same bound-identity admission boundary here
# so unbound platform users cannot create unowned threads/checkpoints or
# query Gateway state via commands. Provider-level binding flows
# (/connect <code>, /start <code>) are consumed by the provider adapter
# before the message reaches the manager, so they are unaffected.
bound_identity_rejection = await self._get_bound_identity_rejection(msg)
if bound_identity_rejection is not None:
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
return
raw_text = msg.text
text = raw_text.strip()
parts = text.split(maxsplit=1)
@@ -1260,7 +1376,7 @@ class ChannelManager:
chat_text = parts[1] if len(parts) > 1 else "Initialize workspace"
chat_msg = _dc_replace(msg, text=chat_text, msg_type=InboundMessageType.CHAT)
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True})
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True}, bound_identity_checked=True)
return
if reply is None and command == "new":
@@ -1300,7 +1416,7 @@ class ChannelManager:
from dataclasses import replace as _dc_replace
chat_msg = _dc_replace(msg, msg_type=InboundMessageType.CHAT)
await self._handle_chat(chat_msg)
await self._handle_chat(chat_msg, bound_identity_checked=True)
return
else:
reply = _unknown_command_reply(command)