mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-17 04:56:04 +00:00
Merge branch 'main' into 2.0.0-release
This commit is contained in:
+1
-1
@@ -311,7 +311,7 @@ Proxied through nginx: `/api/langgraph/*` → Gateway LangGraph-compatible runti
|
|||||||
|
|
||||||
**Built-in Agents**: `general-purpose` (all tools except `task`) and `bash` (command specialist)
|
**Built-in Agents**: `general-purpose` (all tools except `task`) and `bash` (command specialist)
|
||||||
**Execution**: Dual thread pool - `_scheduler_pool` (3 workers) + `_execution_pool` (3 workers)
|
**Execution**: Dual thread pool - `_scheduler_pool` (3 workers) + `_execution_pool` (3 workers)
|
||||||
**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`), 15-minute timeout
|
**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`); default subagent timeout `subagents.timeout_seconds=1800` (30 min) and built-in `general-purpose` `max_turns=150` (raised from 100/15-min so deep-research subtasks stop hitting `GraphRecursionError` out of the box)
|
||||||
**Flow**: `task()` tool → `SubagentExecutor` → background thread → poll 5s → SSE events → result
|
**Flow**: `task()` tool → `SubagentExecutor` → background thread → poll 5s → SSE events → result
|
||||||
**Events**: `task_started`, `task_running`, `task_completed`/`task_failed`/`task_timed_out`
|
**Events**: `task_started`, `task_running`, `task_completed`/`task_failed`/`task_timed_out`
|
||||||
**Deferred MCP tools** (if `tool_search.enabled`): `SubagentExecutor._build_initial_state` assembles deferral after policy filtering via the shared `assemble_deferred_tools` (fail-closed), appends the `tool_search` tool, injects the `<available-deferred-tools>` section into the subagent's `SystemMessage`, and threads the setup to `_create_agent`, which attaches `DeferredToolFilterMiddleware` through `build_subagent_runtime_middlewares(deferred_setup=...)`. Subagents thus withhold full MCP schemas until promotion, same as the lead agent; each task run gets a fresh `ThreadState` so promotion is isolated per run
|
**Deferred MCP tools** (if `tool_search.enabled`): `SubagentExecutor._build_initial_state` assembles deferral after policy filtering via the shared `assemble_deferred_tools` (fail-closed), appends the `tool_search` tool, injects the `<available-deferred-tools>` section into the subagent's `SystemMessage`, and threads the setup to `_create_agent`, which attaches `DeferredToolFilterMiddleware` through `build_subagent_runtime_middlewares(deferred_setup=...)`. Subagents thus withhold full MCP schemas until promotion, same as the lead agent; each task run gets a fresh `ThreadState` so promotion is isolated per run
|
||||||
|
|||||||
+150
-29
@@ -42,6 +42,11 @@ DEFAULT_GATEWAY_URL = "http://localhost:8001"
|
|||||||
DEFAULT_ASSISTANT_ID = "lead_agent"
|
DEFAULT_ASSISTANT_ID = "lead_agent"
|
||||||
CUSTOM_AGENT_NAME_PATTERN = re.compile(r"^[A-Za-z0-9-]+$")
|
CUSTOM_AGENT_NAME_PATTERN = re.compile(r"^[A-Za-z0-9-]+$")
|
||||||
|
|
||||||
|
# Lead-agent recursion budget (LangGraph super-steps for the lead graph only).
|
||||||
|
# This is independent of subagent depth: a `task()` dispatch runs the whole
|
||||||
|
# subagent inside ONE lead tools-node step, and subagents enforce their own
|
||||||
|
# limit via `subagents.max_turns` (see SubagentExecutor). Do not conflate this
|
||||||
|
# 100 with the general-purpose subagent's max_turns.
|
||||||
DEFAULT_RUN_CONFIG: dict[str, Any] = {"recursion_limit": 100}
|
DEFAULT_RUN_CONFIG: dict[str, Any] = {"recursion_limit": 100}
|
||||||
DEFAULT_RUN_CONTEXT: dict[str, Any] = {
|
DEFAULT_RUN_CONTEXT: dict[str, Any] = {
|
||||||
"thinking_enabled": True,
|
"thinking_enabled": True,
|
||||||
@@ -55,6 +60,8 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35
|
|||||||
STREAM_MODES = ["messages-tuple", "values"]
|
STREAM_MODES = ["messages-tuple", "values"]
|
||||||
MESSAGE_STREAM_EVENTS = ("messages-tuple", "messages")
|
MESSAGE_STREAM_EVENTS = ("messages-tuple", "messages")
|
||||||
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
|
THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again."
|
||||||
|
BOUND_IDENTITY_REQUIRED_MESSAGE = "Connect this channel from DeerFlow Settings, complete the in-channel connect step, then send your message again."
|
||||||
|
BOUND_IDENTITY_UNAVAILABLE_MESSAGE = "Channel connection verification is temporarily unavailable. Please try again later or contact the DeerFlow operator."
|
||||||
|
|
||||||
CHANNEL_CAPABILITIES = {
|
CHANNEL_CAPABILITIES = {
|
||||||
"dingtalk": {"supports_streaming": False},
|
"dingtalk": {"supports_streaming": False},
|
||||||
@@ -145,6 +152,19 @@ class _SlashSkillCommandResolution:
|
|||||||
failure_message: str | None = None
|
failure_message: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True, slots=True)
|
||||||
|
class _BoundIdentityRejection:
|
||||||
|
message: str = BOUND_IDENTITY_REQUIRED_MESSAGE
|
||||||
|
# Server-side connection id that may be used only as an outbound routing
|
||||||
|
# hint for the rejection message. This is never copied from the inbound
|
||||||
|
# message; it comes from the repository re-read when available.
|
||||||
|
outbound_connection_id: str | None = None
|
||||||
|
# Server-side owner for the outbound routing connection above. It lets
|
||||||
|
# channel senders preserve per-connection context without trusting the
|
||||||
|
# rejected inbound identity assertion.
|
||||||
|
outbound_owner_user_id: str | None = None
|
||||||
|
|
||||||
|
|
||||||
def _is_thread_busy_error(exc: BaseException | None) -> bool:
|
def _is_thread_busy_error(exc: BaseException | None) -> bool:
|
||||||
if exc is None:
|
if exc is None:
|
||||||
return False
|
return False
|
||||||
@@ -735,6 +755,7 @@ class ChannelManager:
|
|||||||
default_session: dict[str, Any] | None = None,
|
default_session: dict[str, Any] | None = None,
|
||||||
channel_sessions: dict[str, Any] | None = None,
|
channel_sessions: dict[str, Any] | None = None,
|
||||||
connection_repo: Any | None = None,
|
connection_repo: Any | None = None,
|
||||||
|
require_bound_identity: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.bus = bus
|
self.bus = bus
|
||||||
self.store = store
|
self.store = store
|
||||||
@@ -745,6 +766,7 @@ class ChannelManager:
|
|||||||
self._default_session = _as_dict(default_session)
|
self._default_session = _as_dict(default_session)
|
||||||
self._channel_sessions = dict(channel_sessions or {})
|
self._channel_sessions = dict(channel_sessions or {})
|
||||||
self._connection_repo = connection_repo
|
self._connection_repo = connection_repo
|
||||||
|
self._require_bound_identity = require_bound_identity
|
||||||
self._client = None # lazy init — langgraph_sdk async client
|
self._client = None # lazy init — langgraph_sdk async client
|
||||||
self._channel_metadata_synced: set[str] = set()
|
self._channel_metadata_synced: set[str] = set()
|
||||||
self._skill_storage: SkillStorage | None = None
|
self._skill_storage: SkillStorage | None = None
|
||||||
@@ -918,38 +940,111 @@ class ChannelManager:
|
|||||||
|
|
||||||
async def _handle_message(self, msg: InboundMessage) -> None:
|
async def _handle_message(self, msg: InboundMessage) -> None:
|
||||||
msg = _apply_effective_owner(msg)
|
msg = _apply_effective_owner(msg)
|
||||||
async with self._semaphore:
|
try:
|
||||||
try:
|
# Non-command chat can be rejected before it consumes a semaphore
|
||||||
|
# slot. Commands are handled below because provider adapters consume
|
||||||
|
# binding commands before manager dispatch, and _handle_command()
|
||||||
|
# applies its own admission gate for manager-level commands.
|
||||||
|
bound_identity_rejection = None
|
||||||
|
if msg.msg_type != InboundMessageType.COMMAND:
|
||||||
|
bound_identity_rejection = await self._get_bound_identity_rejection(msg)
|
||||||
|
if bound_identity_rejection is not None:
|
||||||
|
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
|
||||||
|
return
|
||||||
|
|
||||||
|
async with self._semaphore:
|
||||||
if msg.msg_type == InboundMessageType.COMMAND:
|
if msg.msg_type == InboundMessageType.COMMAND:
|
||||||
await self._handle_command(msg)
|
await self._handle_command(msg)
|
||||||
else:
|
else:
|
||||||
await self._handle_chat(msg)
|
await self._handle_chat(msg, bound_identity_checked=True)
|
||||||
except InvalidChannelSessionConfigError as exc:
|
except InvalidChannelSessionConfigError as exc:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Invalid channel session config for %s (chat=%s): %s",
|
"Invalid channel session config for %s (chat=%s): %s",
|
||||||
msg.channel_name,
|
msg.channel_name,
|
||||||
msg.chat_id,
|
msg.chat_id,
|
||||||
exc,
|
exc,
|
||||||
)
|
)
|
||||||
await self._send_error(msg, str(exc))
|
await self._send_error(msg, str(exc))
|
||||||
except SlashSkillCommandResolutionError as exc:
|
except SlashSkillCommandResolutionError as exc:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Slash skill command resolution failed for %s (chat=%s): %s",
|
"Slash skill command resolution failed for %s (chat=%s): %s",
|
||||||
msg.channel_name,
|
msg.channel_name,
|
||||||
msg.chat_id,
|
msg.chat_id,
|
||||||
exc,
|
exc,
|
||||||
)
|
)
|
||||||
await self._send_error(msg, str(exc))
|
await self._send_error(msg, str(exc))
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Error handling message from %s (chat=%s)",
|
"Error handling message from %s (chat=%s)",
|
||||||
msg.channel_name,
|
msg.channel_name,
|
||||||
msg.chat_id,
|
msg.chat_id,
|
||||||
)
|
)
|
||||||
await self._send_error(msg, "An internal error occurred. Please try again.")
|
await self._send_error(msg, "An internal error occurred. Please try again.")
|
||||||
|
|
||||||
# -- chat handling -----------------------------------------------------
|
# -- chat handling -----------------------------------------------------
|
||||||
|
|
||||||
|
async def _get_bound_identity_rejection(self, msg: InboundMessage) -> _BoundIdentityRejection | None:
|
||||||
|
"""Return None when *msg* may proceed; otherwise return rejection routing hints.
|
||||||
|
|
||||||
|
The returned object means the message lacks a verified bound identity.
|
||||||
|
Its fields are intentionally limited to server-side values re-read from
|
||||||
|
the connection repository, so rejection outbounds never trust a rejected
|
||||||
|
inbound message's asserted connection metadata.
|
||||||
|
"""
|
||||||
|
if not self._require_bound_identity:
|
||||||
|
return None
|
||||||
|
if _auth_disabled_owner_user_id():
|
||||||
|
return None
|
||||||
|
|
||||||
|
has_connection = bool(msg.connection_id)
|
||||||
|
has_owner = bool(msg.owner_user_id)
|
||||||
|
if not (has_connection and has_owner):
|
||||||
|
return _BoundIdentityRejection()
|
||||||
|
if self._connection_repo is None:
|
||||||
|
return _BoundIdentityRejection(message=BOUND_IDENTITY_UNAVAILABLE_MESSAGE)
|
||||||
|
|
||||||
|
# The manager is the run-creation security boundary, so it does not
|
||||||
|
# trust mutable InboundMessage identity fields by themselves. Re-read
|
||||||
|
# the binding by provider identity before creating DeerFlow threads or
|
||||||
|
# runs. If the asserted identity does not match, keep only the
|
||||||
|
# server-side connection fields as outbound routing hints.
|
||||||
|
connection = await self._connection_repo.find_connection_by_external_identity(
|
||||||
|
provider=msg.channel_name,
|
||||||
|
external_account_id=msg.user_id,
|
||||||
|
workspace_id=msg.workspace_id or None,
|
||||||
|
)
|
||||||
|
if connection is None:
|
||||||
|
return _BoundIdentityRejection()
|
||||||
|
|
||||||
|
connection_id = connection.get("id")
|
||||||
|
owner_user_id = connection.get("owner_user_id")
|
||||||
|
if connection_id == msg.connection_id and owner_user_id == msg.owner_user_id:
|
||||||
|
return None
|
||||||
|
return _BoundIdentityRejection(outbound_connection_id=connection_id, outbound_owner_user_id=owner_user_id)
|
||||||
|
|
||||||
|
async def _reject_unbound_channel_message(
|
||||||
|
self,
|
||||||
|
msg: InboundMessage,
|
||||||
|
*,
|
||||||
|
bound_identity_rejection: _BoundIdentityRejection,
|
||||||
|
) -> None:
|
||||||
|
logger.info(
|
||||||
|
"[Manager] rejecting unbound channel message: channel=%s, chat_id=%s",
|
||||||
|
msg.channel_name,
|
||||||
|
msg.chat_id,
|
||||||
|
)
|
||||||
|
outbound = OutboundMessage(
|
||||||
|
channel_name=msg.channel_name,
|
||||||
|
chat_id=msg.chat_id,
|
||||||
|
thread_id="",
|
||||||
|
text=bound_identity_rejection.message,
|
||||||
|
thread_ts=msg.thread_ts,
|
||||||
|
connection_id=bound_identity_rejection.outbound_connection_id,
|
||||||
|
owner_user_id=bound_identity_rejection.outbound_owner_user_id,
|
||||||
|
metadata=_slim_metadata(msg.metadata),
|
||||||
|
)
|
||||||
|
await self.bus.publish_outbound(outbound)
|
||||||
|
|
||||||
async def _lookup_thread_id(self, msg: InboundMessage) -> str | None:
|
async def _lookup_thread_id(self, msg: InboundMessage) -> str | None:
|
||||||
if msg.connection_id and self._connection_repo is not None:
|
if msg.connection_id and self._connection_repo is not None:
|
||||||
return await self._connection_repo.get_thread_id(
|
return await self._connection_repo.get_thread_id(
|
||||||
@@ -1011,7 +1106,21 @@ class ChannelManager:
|
|||||||
self._channel_metadata_synced.clear()
|
self._channel_metadata_synced.clear()
|
||||||
self._channel_metadata_synced.add(thread_id)
|
self._channel_metadata_synced.add(thread_id)
|
||||||
|
|
||||||
async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any] | None = None) -> None:
|
async def _handle_chat(
|
||||||
|
self,
|
||||||
|
msg: InboundMessage,
|
||||||
|
extra_context: dict[str, Any] | None = None,
|
||||||
|
*,
|
||||||
|
bound_identity_checked: bool = False,
|
||||||
|
) -> None:
|
||||||
|
# Normal entry paths already run the bound-identity check in
|
||||||
|
# _handle_message() or _handle_command(). Keep this default False so
|
||||||
|
# direct callers and future internal paths still fail closed.
|
||||||
|
bound_identity_rejection = None if bound_identity_checked else await self._get_bound_identity_rejection(msg)
|
||||||
|
if bound_identity_rejection is not None:
|
||||||
|
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
|
||||||
|
return
|
||||||
|
|
||||||
client = self._get_client()
|
client = self._get_client()
|
||||||
|
|
||||||
# Look up existing DeerFlow thread.
|
# Look up existing DeerFlow thread.
|
||||||
@@ -1237,6 +1346,18 @@ class ChannelManager:
|
|||||||
# -- command handling --------------------------------------------------
|
# -- command handling --------------------------------------------------
|
||||||
|
|
||||||
async def _handle_command(self, msg: InboundMessage) -> None:
|
async def _handle_command(self, msg: InboundMessage) -> None:
|
||||||
|
# Commands are the other run-creation entry point besides chat: /new
|
||||||
|
# calls _create_thread() directly, and /bootstrap routes into
|
||||||
|
# _handle_chat(). Apply the same bound-identity admission boundary here
|
||||||
|
# so unbound platform users cannot create unowned threads/checkpoints or
|
||||||
|
# query Gateway state via commands. Provider-level binding flows
|
||||||
|
# (/connect <code>, /start <code>) are consumed by the provider adapter
|
||||||
|
# before the message reaches the manager, so they are unaffected.
|
||||||
|
bound_identity_rejection = await self._get_bound_identity_rejection(msg)
|
||||||
|
if bound_identity_rejection is not None:
|
||||||
|
await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection)
|
||||||
|
return
|
||||||
|
|
||||||
raw_text = msg.text
|
raw_text = msg.text
|
||||||
text = raw_text.strip()
|
text = raw_text.strip()
|
||||||
parts = text.split(maxsplit=1)
|
parts = text.split(maxsplit=1)
|
||||||
@@ -1255,7 +1376,7 @@ class ChannelManager:
|
|||||||
|
|
||||||
chat_text = parts[1] if len(parts) > 1 else "Initialize workspace"
|
chat_text = parts[1] if len(parts) > 1 else "Initialize workspace"
|
||||||
chat_msg = _dc_replace(msg, text=chat_text, msg_type=InboundMessageType.CHAT)
|
chat_msg = _dc_replace(msg, text=chat_text, msg_type=InboundMessageType.CHAT)
|
||||||
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True})
|
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True}, bound_identity_checked=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
if reply is None and command == "new":
|
if reply is None and command == "new":
|
||||||
@@ -1295,7 +1416,7 @@ class ChannelManager:
|
|||||||
from dataclasses import replace as _dc_replace
|
from dataclasses import replace as _dc_replace
|
||||||
|
|
||||||
chat_msg = _dc_replace(msg, msg_type=InboundMessageType.CHAT)
|
chat_msg = _dc_replace(msg, msg_type=InboundMessageType.CHAT)
|
||||||
await self._handle_chat(chat_msg)
|
await self._handle_chat(chat_msg, bound_identity_checked=True)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
reply = _unknown_command_reply(command)
|
reply = _unknown_command_reply(command)
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from deerflow.config.app_config import AppConfig
|
from deerflow.config.app_config import AppConfig
|
||||||
|
from deerflow.config.channel_connections_config import ChannelConnectionsConfig
|
||||||
|
|
||||||
# Channel name → import path for lazy loading
|
# Channel name → import path for lazy loading
|
||||||
_CHANNEL_REGISTRY: dict[str, str] = {
|
_CHANNEL_REGISTRY: dict[str, str] = {
|
||||||
@@ -64,8 +65,7 @@ def _merge_channel_connection_runtime_config(channels_config: dict[str, Any], ap
|
|||||||
merge_runtime_channel_configs(channels_config, connection_config)
|
merge_runtime_channel_configs(channels_config, connection_config)
|
||||||
|
|
||||||
|
|
||||||
def _make_connection_repo(app_config: AppConfig):
|
def _make_connection_repo(connection_config: ChannelConnectionsConfig | None):
|
||||||
connection_config = getattr(app_config, "channel_connections", None)
|
|
||||||
if connection_config is None or not getattr(connection_config, "enabled", False):
|
if connection_config is None or not getattr(connection_config, "enabled", False):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@@ -90,7 +90,13 @@ class ChannelService:
|
|||||||
instantiates enabled channels, and starts the ChannelManager dispatcher.
|
instantiates enabled channels, and starts the ChannelManager dispatcher.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, channels_config: dict[str, Any] | None = None, *, connection_repo: Any | None = None) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
channels_config: dict[str, Any] | None = None,
|
||||||
|
*,
|
||||||
|
connection_repo: Any | None = None,
|
||||||
|
require_bound_identity: bool = False,
|
||||||
|
) -> None:
|
||||||
self.bus = MessageBus()
|
self.bus = MessageBus()
|
||||||
self.store = ChannelStore()
|
self.store = ChannelStore()
|
||||||
self._connection_repo = connection_repo
|
self._connection_repo = connection_repo
|
||||||
@@ -107,6 +113,7 @@ class ChannelService:
|
|||||||
default_session=default_session if isinstance(default_session, dict) else None,
|
default_session=default_session if isinstance(default_session, dict) else None,
|
||||||
channel_sessions=channel_sessions,
|
channel_sessions=channel_sessions,
|
||||||
connection_repo=connection_repo,
|
connection_repo=connection_repo,
|
||||||
|
require_bound_identity=require_bound_identity,
|
||||||
)
|
)
|
||||||
self._channels: dict[str, Any] = {} # name -> Channel instance
|
self._channels: dict[str, Any] = {} # name -> Channel instance
|
||||||
self._config = config
|
self._config = config
|
||||||
@@ -126,7 +133,14 @@ class ChannelService:
|
|||||||
if "channels" in extra:
|
if "channels" in extra:
|
||||||
channels_config = dict(extra["channels"] or {})
|
channels_config = dict(extra["channels"] or {})
|
||||||
_merge_channel_connection_runtime_config(channels_config, app_config)
|
_merge_channel_connection_runtime_config(channels_config, app_config)
|
||||||
return cls(channels_config=channels_config, connection_repo=_make_connection_repo(app_config))
|
connection_config = getattr(app_config, "channel_connections", None)
|
||||||
|
connections_enabled = connection_config is not None and getattr(connection_config, "enabled", False)
|
||||||
|
require_bound_identity = bool(connections_enabled and getattr(connection_config, "require_bound_identity", True))
|
||||||
|
return cls(
|
||||||
|
channels_config=channels_config,
|
||||||
|
connection_repo=_make_connection_repo(connection_config),
|
||||||
|
require_bound_identity=require_bound_identity,
|
||||||
|
)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start the manager and all enabled channels."""
|
"""Start the manager and all enabled channels."""
|
||||||
|
|||||||
@@ -135,6 +135,8 @@ async def generate_suggestions(
|
|||||||
request: Request,
|
request: Request,
|
||||||
config: AppConfig = Depends(get_config),
|
config: AppConfig = Depends(get_config),
|
||||||
) -> SuggestionsResponse:
|
) -> SuggestionsResponse:
|
||||||
|
if not config.suggestions.enabled:
|
||||||
|
return SuggestionsResponse(suggestions=[])
|
||||||
if not body.messages:
|
if not body.messages:
|
||||||
return SuggestionsResponse(suggestions=[])
|
return SuggestionsResponse(suggestions=[])
|
||||||
|
|
||||||
|
|||||||
@@ -224,6 +224,11 @@ def build_run_config(
|
|||||||
the LangGraph Platform-compatible HTTP API and the IM channel path behave
|
the LangGraph Platform-compatible HTTP API and the IM channel path behave
|
||||||
identically.
|
identically.
|
||||||
"""
|
"""
|
||||||
|
# Lead-agent recursion budget (LangGraph super-steps for the lead graph
|
||||||
|
# only). Independent of subagent depth: a `task()` dispatch runs the whole
|
||||||
|
# subagent inside ONE lead tools-node step, and subagents enforce their own
|
||||||
|
# limit via `subagents.max_turns`. Do not conflate this 100 with the
|
||||||
|
# general-purpose subagent's max_turns.
|
||||||
config: dict[str, Any] = {"recursion_limit": 100}
|
config: dict[str, Any] = {"recursion_limit": 100}
|
||||||
if request_config:
|
if request_config:
|
||||||
# LangGraph >= 0.6.0 introduced ``context`` as the preferred way to
|
# LangGraph >= 0.6.0 introduced ``context`` as the preferred way to
|
||||||
|
|||||||
@@ -48,6 +48,11 @@ Then enable user bindings in `channel_connections`:
|
|||||||
```yaml
|
```yaml
|
||||||
channel_connections:
|
channel_connections:
|
||||||
enabled: true
|
enabled: true
|
||||||
|
# Auth-enabled deployments require ordinary IM messages to come from a
|
||||||
|
# connected DeerFlow user by default. Set this to false only for legacy
|
||||||
|
# operator-owned/open-bot deployments that intentionally route unbound
|
||||||
|
# platform users to platform-ID user buckets.
|
||||||
|
require_bound_identity: true
|
||||||
|
|
||||||
telegram:
|
telegram:
|
||||||
enabled: true
|
enabled: true
|
||||||
@@ -74,6 +79,10 @@ channel_connections:
|
|||||||
|
|
||||||
`channel_connections` does not duplicate provider secrets. It only controls the browser-facing connect UI and stores per-user binding records. Telegram needs `bot_username` only so the frontend can open a deep link.
|
`channel_connections` does not duplicate provider secrets. It only controls the browser-facing connect UI and stores per-user binding records. Telegram needs `bot_username` only so the frontend can open a deep link.
|
||||||
|
|
||||||
|
When `channel_connections.enabled` and `require_bound_identity` are true, auth-enabled deployments reject ordinary unbound IM messages before creating a DeerFlow thread or run. Users must connect the channel from DeerFlow Settings first. Auth-disabled local mode still routes channel messages to the auth-disabled default user, and legacy open-bot behavior can be restored explicitly with `require_bound_identity: false`.
|
||||||
|
|
||||||
|
Upgrade note: existing auth-enabled deployments that already have `channel_connections.enabled: true` will start rejecting ordinary unbound IM messages after this field is introduced because `require_bound_identity` defaults to true. Legacy operator-owned/open-bot deployments that intentionally allow unbound platform users to create DeerFlow runs should set `require_bound_identity: false` before upgrading and restart the service.
|
||||||
|
|
||||||
## Connect Flow
|
## Connect Flow
|
||||||
|
|
||||||
Telegram:
|
Telegram:
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from deerflow.config.skill_evolution_config import SkillEvolutionConfig
|
|||||||
from deerflow.config.skills_config import SkillsConfig
|
from deerflow.config.skills_config import SkillsConfig
|
||||||
from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict
|
from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict
|
||||||
from deerflow.config.subagents_config import SubagentsAppConfig, load_subagents_config_from_dict
|
from deerflow.config.subagents_config import SubagentsAppConfig, load_subagents_config_from_dict
|
||||||
|
from deerflow.config.suggestions_config import SuggestionsConfig
|
||||||
from deerflow.config.summarization_config import SummarizationConfig, load_summarization_config_from_dict
|
from deerflow.config.summarization_config import SummarizationConfig, load_summarization_config_from_dict
|
||||||
from deerflow.config.title_config import TitleConfig, load_title_config_from_dict
|
from deerflow.config.title_config import TitleConfig, load_title_config_from_dict
|
||||||
from deerflow.config.token_usage_config import TokenUsageConfig
|
from deerflow.config.token_usage_config import TokenUsageConfig
|
||||||
@@ -116,6 +117,7 @@ class AppConfig(BaseModel):
|
|||||||
acp_agents: dict[str, ACPAgentConfig] = Field(default_factory=dict, description="ACP-compatible agent configuration")
|
acp_agents: dict[str, ACPAgentConfig] = Field(default_factory=dict, description="ACP-compatible agent configuration")
|
||||||
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
|
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
|
||||||
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
|
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
|
||||||
|
suggestions: SuggestionsConfig = Field(default_factory=SuggestionsConfig, description="Follow-up suggestions configuration.")
|
||||||
circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration")
|
circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration")
|
||||||
channel_connections: ChannelConnectionsConfig = Field(
|
channel_connections: ChannelConnectionsConfig = Field(
|
||||||
default_factory=ChannelConnectionsConfig,
|
default_factory=ChannelConnectionsConfig,
|
||||||
|
|||||||
@@ -42,6 +42,7 @@ class ChannelConnectionsConfig(BaseModel):
|
|||||||
"""Top-level config for browser-connectable IM channels."""
|
"""Top-level config for browser-connectable IM channels."""
|
||||||
|
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
|
require_bound_identity: bool = True
|
||||||
slack: SlackChannelConnectionConfig = Field(default_factory=SlackChannelConnectionConfig)
|
slack: SlackChannelConnectionConfig = Field(default_factory=SlackChannelConnectionConfig)
|
||||||
telegram: TelegramChannelConnectionConfig = Field(default_factory=TelegramChannelConnectionConfig)
|
telegram: TelegramChannelConnectionConfig = Field(default_factory=TelegramChannelConnectionConfig)
|
||||||
discord: DiscordChannelConnectionConfig = Field(default_factory=DiscordChannelConnectionConfig)
|
discord: DiscordChannelConnectionConfig = Field(default_factory=DiscordChannelConnectionConfig)
|
||||||
|
|||||||
@@ -72,9 +72,9 @@ class SubagentsAppConfig(BaseModel):
|
|||||||
"""Configuration for the subagent system."""
|
"""Configuration for the subagent system."""
|
||||||
|
|
||||||
timeout_seconds: int = Field(
|
timeout_seconds: int = Field(
|
||||||
default=900,
|
default=1800,
|
||||||
ge=1,
|
ge=1,
|
||||||
description="Default timeout in seconds for all subagents (default: 900 = 15 minutes)",
|
description="Default timeout in seconds for built-in subagents (default: 1800 = 30 minutes); custom agents use their own timeout_seconds unless given a per-agent override",
|
||||||
)
|
)
|
||||||
max_turns: int | None = Field(
|
max_turns: int | None = Field(
|
||||||
default=None,
|
default=None,
|
||||||
|
|||||||
@@ -0,0 +1,7 @@
|
|||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
|
||||||
|
class SuggestionsConfig(BaseModel):
|
||||||
|
"""Configuration for automatic follow-up suggestions."""
|
||||||
|
|
||||||
|
enabled: bool = Field(default=True, description="Whether to enable follow-up question suggestions at the end of an AI response")
|
||||||
@@ -57,5 +57,5 @@ You have access to the same sandbox environment as the parent agent:
|
|||||||
tools=None, # Inherit all tools from parent
|
tools=None, # Inherit all tools from parent
|
||||||
disallowed_tools=["task", "ask_clarification", "present_files"], # Prevent nesting and clarification
|
disallowed_tools=["task", "ask_clarification", "present_files"], # Prevent nesting and clarification
|
||||||
model="inherit",
|
model="inherit",
|
||||||
max_turns=100,
|
max_turns=150,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -20,8 +20,13 @@ class SubagentConfig:
|
|||||||
skills: Optional list of skill names to load. If None, inherits all enabled skills.
|
skills: Optional list of skill names to load. If None, inherits all enabled skills.
|
||||||
If an empty list, no skills are loaded.
|
If an empty list, no skills are loaded.
|
||||||
model: Model to use - 'inherit' uses parent's model.
|
model: Model to use - 'inherit' uses parent's model.
|
||||||
max_turns: Maximum number of agent turns before stopping.
|
max_turns: Maximum agent turns before stopping. Built-in agents use the
|
||||||
timeout_seconds: Maximum execution time in seconds (default: 900 = 15 minutes).
|
value set here (general-purpose=150, bash=60) unless the global
|
||||||
|
``subagents.max_turns`` is set.
|
||||||
|
timeout_seconds: Bare fallback execution-time cap. For built-in agents the
|
||||||
|
effective limit is the global ``subagents.timeout_seconds`` (default
|
||||||
|
1800 = 30 min), layered on by the registry; this 900 only applies
|
||||||
|
when no differing global value exists.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
name: str
|
name: str
|
||||||
|
|||||||
@@ -316,7 +316,7 @@ def test_get_app_config_resets_singleton_configs_when_sections_removed(tmp_path,
|
|||||||
assert get_title_config().enabled is True
|
assert get_title_config().enabled is True
|
||||||
assert get_summarization_config().enabled is False
|
assert get_summarization_config().enabled is False
|
||||||
assert get_memory_config().enabled is True
|
assert get_memory_config().enabled is True
|
||||||
assert get_subagents_app_config().timeout_seconds == 900
|
assert get_subagents_app_config().timeout_seconds == 1800
|
||||||
assert get_tool_search_config().enabled is False
|
assert get_tool_search_config().enabled is False
|
||||||
assert get_guardrails_config().enabled is False
|
assert get_guardrails_config().enabled is False
|
||||||
assert get_checkpointer_config() is None
|
assert get_checkpointer_config() is None
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ def test_channel_connections_disabled_by_default():
|
|||||||
config = ChannelConnectionsConfig()
|
config = ChannelConnectionsConfig()
|
||||||
|
|
||||||
assert config.enabled is False
|
assert config.enabled is False
|
||||||
|
assert config.require_bound_identity is True
|
||||||
assert config.slack.enabled is False
|
assert config.slack.enabled is False
|
||||||
assert config.telegram.enabled is False
|
assert config.telegram.enabled is False
|
||||||
assert config.discord.enabled is False
|
assert config.discord.enabled is False
|
||||||
@@ -43,6 +44,13 @@ def test_enabled_channel_connections_do_not_require_public_url_or_encryption_key
|
|||||||
assert config.provider_status("wecom") == {"enabled": True, "configured": True}
|
assert config.provider_status("wecom") == {"enabled": True, "configured": True}
|
||||||
|
|
||||||
|
|
||||||
|
def test_require_bound_identity_can_be_disabled_for_legacy_open_bot_mode():
|
||||||
|
config = ChannelConnectionsConfig.model_validate({"enabled": True, "require_bound_identity": False})
|
||||||
|
|
||||||
|
assert config.enabled is True
|
||||||
|
assert config.require_bound_identity is False
|
||||||
|
|
||||||
|
|
||||||
def test_provider_status_reports_disabled_and_unknown_providers():
|
def test_provider_status_reports_disabled_and_unknown_providers():
|
||||||
config = ChannelConnectionsConfig.model_validate({"enabled": True})
|
config = ChannelConnectionsConfig.model_validate({"enabled": True})
|
||||||
|
|
||||||
|
|||||||
@@ -2565,6 +2565,445 @@ class TestResolveRunParamsUserId:
|
|||||||
assert "channel_user_id" not in run_context
|
assert "channel_user_id" not in run_context
|
||||||
|
|
||||||
|
|
||||||
|
class _BoundIdentityRepo:
|
||||||
|
def __init__(self, connections: list[dict[str, str | None]] | None = None) -> None:
|
||||||
|
self.connections = list(connections or [])
|
||||||
|
self.lookups: list[dict[str, str | None]] = []
|
||||||
|
self.thread_sets: list[dict[str, str | None]] = []
|
||||||
|
|
||||||
|
async def find_connection_by_external_identity(self, *, provider: str, external_account_id: str, workspace_id: str | None = None):
|
||||||
|
self.lookups.append(
|
||||||
|
{
|
||||||
|
"provider": provider,
|
||||||
|
"external_account_id": external_account_id,
|
||||||
|
"workspace_id": workspace_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
for connection in self.connections:
|
||||||
|
if connection.get("provider") == provider and connection.get("external_account_id") == external_account_id and connection.get("workspace_id") == workspace_id:
|
||||||
|
return connection
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_thread_id(self, connection_id: str, chat_id: str, topic_id: str | None = None):
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def set_thread_id(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
connection_id: str,
|
||||||
|
owner_user_id: str,
|
||||||
|
provider: str,
|
||||||
|
external_conversation_id: str,
|
||||||
|
external_topic_id: str | None,
|
||||||
|
thread_id: str,
|
||||||
|
) -> None:
|
||||||
|
self.thread_sets.append(
|
||||||
|
{
|
||||||
|
"connection_id": connection_id,
|
||||||
|
"owner_user_id": owner_user_id,
|
||||||
|
"provider": provider,
|
||||||
|
"external_conversation_id": external_conversation_id,
|
||||||
|
"external_topic_id": external_topic_id,
|
||||||
|
"thread_id": thread_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestChannelManagerBoundIdentityPolicy:
|
||||||
|
def test_unbound_auth_enabled_chat_is_rejected_before_thread_or_run_creation(self, monkeypatch):
|
||||||
|
from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client()
|
||||||
|
manager._client = mock_client
|
||||||
|
outbound_received = []
|
||||||
|
|
||||||
|
async def capture(msg):
|
||||||
|
outbound_received.append(msg)
|
||||||
|
|
||||||
|
bus.subscribe_outbound(capture)
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
text="hi",
|
||||||
|
thread_ts="1710000000.000100",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(outbound_received) == 1
|
||||||
|
assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE
|
||||||
|
assert outbound_received[0].thread_id == ""
|
||||||
|
assert outbound_received[0].connection_id is None
|
||||||
|
assert outbound_received[0].owner_user_id is None
|
||||||
|
mock_client.threads.create.assert_not_called()
|
||||||
|
mock_client.runs.wait.assert_not_called()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_bound_identity_repo_unavailable_uses_transient_failure_message(self, monkeypatch):
|
||||||
|
from app.channels.manager import BOUND_IDENTITY_UNAVAILABLE_MESSAGE, ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client()
|
||||||
|
manager._client = mock_client
|
||||||
|
outbound_received = []
|
||||||
|
|
||||||
|
async def capture(msg):
|
||||||
|
outbound_received.append(msg)
|
||||||
|
|
||||||
|
bus.subscribe_outbound(capture)
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
owner_user_id="deerflow-user-1",
|
||||||
|
connection_id="connection-1",
|
||||||
|
workspace_id="T123",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(outbound_received) == 1
|
||||||
|
assert outbound_received[0].text == BOUND_IDENTITY_UNAVAILABLE_MESSAGE
|
||||||
|
assert outbound_received[0].connection_id is None
|
||||||
|
assert outbound_received[0].owner_user_id is None
|
||||||
|
mock_client.threads.create.assert_not_called()
|
||||||
|
mock_client.runs.wait.assert_not_called()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_unbound_auth_enabled_chat_is_rejected_before_semaphore(self, monkeypatch):
|
||||||
|
from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=True)
|
||||||
|
outbound_received = []
|
||||||
|
|
||||||
|
async def capture(msg):
|
||||||
|
outbound_received.append(msg)
|
||||||
|
|
||||||
|
bus.subscribe_outbound(capture)
|
||||||
|
await manager.start()
|
||||||
|
assert manager._semaphore is not None
|
||||||
|
await manager._semaphore.acquire()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
manager._handle_message(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
),
|
||||||
|
timeout=0.5,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
manager._semaphore.release()
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
assert len(outbound_received) == 1
|
||||||
|
assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE
|
||||||
|
assert outbound_received[0].connection_id is None
|
||||||
|
assert outbound_received[0].owner_user_id is None
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_bound_auth_enabled_chat_is_allowed_when_bound_identity_is_required(self, monkeypatch):
|
||||||
|
from app.channels.manager import ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
repo = _BoundIdentityRepo(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": "connection-1",
|
||||||
|
"owner_user_id": "deerflow-user-1",
|
||||||
|
"provider": "slack",
|
||||||
|
"external_account_id": "U-platform",
|
||||||
|
"workspace_id": "T123",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client(thread_id="thread-bound")
|
||||||
|
manager._client = mock_client
|
||||||
|
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
owner_user_id="deerflow-user-1",
|
||||||
|
connection_id="connection-1",
|
||||||
|
workspace_id="T123",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client.threads.create.assert_called_once()
|
||||||
|
mock_client.runs.wait.assert_called_once()
|
||||||
|
run_context = mock_client.runs.wait.call_args.kwargs["context"]
|
||||||
|
assert run_context["user_id"] == "deerflow-user-1"
|
||||||
|
assert run_context["channel_user_id"] == "U-platform"
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_bound_auth_enabled_message_checks_bound_identity_once_on_hot_path(self, monkeypatch):
|
||||||
|
from app.channels.manager import ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
repo = _BoundIdentityRepo(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": "connection-1",
|
||||||
|
"owner_user_id": "deerflow-user-1",
|
||||||
|
"provider": "slack",
|
||||||
|
"external_account_id": "U-platform",
|
||||||
|
"workspace_id": "T123",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client(thread_id="thread-bound")
|
||||||
|
manager._client = mock_client
|
||||||
|
await manager.start()
|
||||||
|
try:
|
||||||
|
await manager._handle_message(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
owner_user_id="deerflow-user-1",
|
||||||
|
connection_id="connection-1",
|
||||||
|
workspace_id="T123",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
assert repo.lookups == [
|
||||||
|
{
|
||||||
|
"provider": "slack",
|
||||||
|
"external_account_id": "U-platform",
|
||||||
|
"workspace_id": "T123",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
mock_client.threads.create.assert_called_once()
|
||||||
|
mock_client.runs.wait.assert_called_once()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_auth_enabled_chat_rejects_unverified_bound_identity(self, monkeypatch):
|
||||||
|
from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
repo = _BoundIdentityRepo(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": "actual-connection",
|
||||||
|
"owner_user_id": "actual-owner",
|
||||||
|
"provider": "slack",
|
||||||
|
"external_account_id": "U-platform",
|
||||||
|
"workspace_id": None,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client()
|
||||||
|
manager._client = mock_client
|
||||||
|
outbound_received = []
|
||||||
|
|
||||||
|
async def capture(msg):
|
||||||
|
outbound_received.append(msg)
|
||||||
|
|
||||||
|
bus.subscribe_outbound(capture)
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
owner_user_id="forged-owner",
|
||||||
|
connection_id="forged-connection",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(outbound_received) == 1
|
||||||
|
assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE
|
||||||
|
assert outbound_received[0].connection_id == "actual-connection"
|
||||||
|
assert outbound_received[0].owner_user_id == "actual-owner"
|
||||||
|
mock_client.threads.create.assert_not_called()
|
||||||
|
mock_client.runs.wait.assert_not_called()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_auth_disabled_chat_keeps_default_user_when_bound_identity_is_required(self, monkeypatch):
|
||||||
|
from app.channels.manager import ChannelManager
|
||||||
|
from app.gateway.auth_disabled import AUTH_DISABLED_USER_ID
|
||||||
|
|
||||||
|
monkeypatch.setenv("DEER_FLOW_AUTH_DISABLED", "1")
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client(thread_id="thread-local")
|
||||||
|
manager._client = mock_client
|
||||||
|
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client.threads.create.assert_called_once()
|
||||||
|
mock_client.runs.wait.assert_called_once()
|
||||||
|
run_context = mock_client.runs.wait.call_args.kwargs["context"]
|
||||||
|
assert run_context["user_id"] == AUTH_DISABLED_USER_ID
|
||||||
|
assert run_context["channel_user_id"] == "U-platform"
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_legacy_open_bot_mode_allows_unbound_auth_enabled_chat(self, monkeypatch):
|
||||||
|
from app.channels.manager import ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=False)
|
||||||
|
mock_client = _make_mock_langgraph_client(thread_id="thread-legacy")
|
||||||
|
manager._client = mock_client
|
||||||
|
|
||||||
|
await manager._handle_chat(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
text="hi",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client.threads.create.assert_called_once()
|
||||||
|
mock_client.runs.wait.assert_called_once()
|
||||||
|
run_context = mock_client.runs.wait.call_args.kwargs["context"]
|
||||||
|
assert run_context["user_id"] == "U-platform"
|
||||||
|
assert run_context["channel_user_id"] == "U-platform"
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_unbound_auth_enabled_new_command_is_rejected_before_thread_creation(self, monkeypatch):
|
||||||
|
from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
manager = ChannelManager(bus=bus, store=store, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client()
|
||||||
|
manager._client = mock_client
|
||||||
|
outbound_received = []
|
||||||
|
|
||||||
|
async def capture(msg):
|
||||||
|
outbound_received.append(msg)
|
||||||
|
|
||||||
|
bus.subscribe_outbound(capture)
|
||||||
|
await manager._handle_command(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
text="/new",
|
||||||
|
msg_type=InboundMessageType.COMMAND,
|
||||||
|
thread_ts="1710000000.000100",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(outbound_received) == 1
|
||||||
|
assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE
|
||||||
|
assert outbound_received[0].thread_id == ""
|
||||||
|
assert outbound_received[0].connection_id is None
|
||||||
|
assert outbound_received[0].owner_user_id is None
|
||||||
|
mock_client.threads.create.assert_not_called()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
def test_bound_auth_enabled_new_command_creates_thread(self, monkeypatch):
|
||||||
|
from app.channels.manager import ChannelManager
|
||||||
|
|
||||||
|
monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False)
|
||||||
|
|
||||||
|
async def go():
|
||||||
|
bus = MessageBus()
|
||||||
|
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||||
|
repo = _BoundIdentityRepo(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"id": "connection-1",
|
||||||
|
"owner_user_id": "deerflow-user-1",
|
||||||
|
"provider": "slack",
|
||||||
|
"external_account_id": "U-platform",
|
||||||
|
"workspace_id": "T123",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
)
|
||||||
|
manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True)
|
||||||
|
mock_client = _make_mock_langgraph_client(thread_id="thread-bound")
|
||||||
|
manager._client = mock_client
|
||||||
|
|
||||||
|
await manager._handle_command(
|
||||||
|
InboundMessage(
|
||||||
|
channel_name="slack",
|
||||||
|
chat_id="C123",
|
||||||
|
user_id="U-platform",
|
||||||
|
owner_user_id="deerflow-user-1",
|
||||||
|
connection_id="connection-1",
|
||||||
|
workspace_id="T123",
|
||||||
|
text="/new",
|
||||||
|
msg_type=InboundMessageType.COMMAND,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_client.threads.create.assert_called_once()
|
||||||
|
|
||||||
|
_run(go())
|
||||||
|
|
||||||
|
|
||||||
class TestChannelManagerConnectionRouting:
|
class TestChannelManagerConnectionRouting:
|
||||||
def test_connection_scoped_conversations_do_not_share_threads(self, tmp_path, monkeypatch):
|
def test_connection_scoped_conversations_do_not_share_threads(self, tmp_path, monkeypatch):
|
||||||
from app.channels.manager import ChannelManager
|
from app.channels.manager import ChannelManager
|
||||||
@@ -3811,6 +4250,13 @@ class TestChannelService:
|
|||||||
|
|
||||||
assert service.manager._connection_repo is repo
|
assert service.manager._connection_repo is repo
|
||||||
|
|
||||||
|
def test_require_bound_identity_is_forwarded_to_manager(self):
|
||||||
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
service = ChannelService(channels_config={}, require_bound_identity=True)
|
||||||
|
|
||||||
|
assert service.manager._require_bound_identity is True
|
||||||
|
|
||||||
def test_remove_channel_stops_running_channel_and_forgets_config(self):
|
def test_remove_channel_stops_running_channel_and_forgets_config(self):
|
||||||
from app.channels.service import ChannelService
|
from app.channels.service import ChannelService
|
||||||
|
|
||||||
|
|||||||
@@ -97,7 +97,7 @@ class TestSubagentOverrideConfig:
|
|||||||
class TestSubagentsAppConfigDefaults:
|
class TestSubagentsAppConfigDefaults:
|
||||||
def test_default_timeout(self):
|
def test_default_timeout(self):
|
||||||
config = SubagentsAppConfig()
|
config = SubagentsAppConfig()
|
||||||
assert config.timeout_seconds == 900
|
assert config.timeout_seconds == 1800
|
||||||
|
|
||||||
def test_default_max_turns_override_is_none(self):
|
def test_default_max_turns_override_is_none(self):
|
||||||
config = SubagentsAppConfig()
|
config = SubagentsAppConfig()
|
||||||
@@ -281,7 +281,7 @@ class TestLoadSubagentsConfig:
|
|||||||
def test_load_empty_dict_uses_defaults(self):
|
def test_load_empty_dict_uses_defaults(self):
|
||||||
load_subagents_config_from_dict({})
|
load_subagents_config_from_dict({})
|
||||||
cfg = get_subagents_app_config()
|
cfg = get_subagents_app_config()
|
||||||
assert cfg.timeout_seconds == 900
|
assert cfg.timeout_seconds == 1800
|
||||||
assert cfg.max_turns is None
|
assert cfg.max_turns is None
|
||||||
assert cfg.agents == {}
|
assert cfg.agents == {}
|
||||||
|
|
||||||
@@ -319,13 +319,30 @@ class TestRegistryGetSubagentConfig:
|
|||||||
assert get_subagent_config("general-purpose") is not None
|
assert get_subagent_config("general-purpose") is not None
|
||||||
assert get_subagent_config("bash") is not None
|
assert get_subagent_config("bash") is not None
|
||||||
|
|
||||||
def test_default_timeout_preserved_when_no_config(self):
|
def test_explicit_global_timeout_propagates_to_general_purpose(self):
|
||||||
|
"""An explicit global timeout (here the non-default 900) propagates to a
|
||||||
|
built-in agent, while max_turns still comes from the builtin def (150).
|
||||||
|
"""
|
||||||
from deerflow.subagents.registry import get_subagent_config
|
from deerflow.subagents.registry import get_subagent_config
|
||||||
|
|
||||||
_reset_subagents_config(timeout_seconds=900)
|
_reset_subagents_config(timeout_seconds=900)
|
||||||
config = get_subagent_config("general-purpose")
|
config = get_subagent_config("general-purpose")
|
||||||
assert config.timeout_seconds == 900
|
assert config.timeout_seconds == 900
|
||||||
assert config.max_turns == 100
|
assert config.max_turns == 150
|
||||||
|
|
||||||
|
def test_builtin_defaults_have_research_headroom(self):
|
||||||
|
"""Out-of-box defaults (no config.yaml subagents section) must give
|
||||||
|
general-purpose enough turns/time for deep research, which previously
|
||||||
|
failed with GraphRecursionError at the old max_turns=100 limit.
|
||||||
|
"""
|
||||||
|
from deerflow.subagents.registry import get_subagent_config
|
||||||
|
|
||||||
|
load_subagents_config_from_dict({}) # no subagents config -> model defaults
|
||||||
|
config = get_subagent_config("general-purpose")
|
||||||
|
assert config.max_turns == 150
|
||||||
|
assert config.timeout_seconds == 1800
|
||||||
|
# Pin bash too so the config.example.yaml "bash=60" doc cannot drift.
|
||||||
|
assert get_subagent_config("bash").max_turns == 60
|
||||||
|
|
||||||
def test_global_timeout_override_applied(self):
|
def test_global_timeout_override_applied(self):
|
||||||
from deerflow.subagents.registry import get_subagent_config
|
from deerflow.subagents.registry import get_subagent_config
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ def test_generate_suggestions_strips_inline_think_block(monkeypatch):
|
|||||||
fake_model.ainvoke = AsyncMock(return_value=MagicMock(content=content))
|
fake_model.ainvoke = AsyncMock(return_value=MagicMock(content=content))
|
||||||
monkeypatch.setattr(suggestions, "create_chat_model", lambda **kwargs: fake_model)
|
monkeypatch.setattr(suggestions, "create_chat_model", lambda **kwargs: fake_model)
|
||||||
|
|
||||||
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace()))
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True))))
|
||||||
|
|
||||||
assert result.suggestions == ["深度学习和机器学习的区别?", "常用框架有哪些?", "需要什么数学基础?"]
|
assert result.suggestions == ["深度学习和机器学习的区别?", "常用框架有哪些?", "需要什么数学基础?"]
|
||||||
|
|
||||||
@@ -103,7 +103,7 @@ def test_generate_suggestions_parses_and_limits(monkeypatch):
|
|||||||
|
|
||||||
# Bypass the require_permission decorator (which needs request +
|
# Bypass the require_permission decorator (which needs request +
|
||||||
# thread_store) — these tests cover the parsing logic.
|
# thread_store) — these tests cover the parsing logic.
|
||||||
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace()))
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True))))
|
||||||
|
|
||||||
assert result.suggestions == ["Q1", "Q2", "Q3"]
|
assert result.suggestions == ["Q1", "Q2", "Q3"]
|
||||||
fake_model.ainvoke.assert_awaited_once()
|
fake_model.ainvoke.assert_awaited_once()
|
||||||
@@ -125,7 +125,7 @@ def test_generate_suggestions_parses_list_block_content(monkeypatch):
|
|||||||
|
|
||||||
# Bypass the require_permission decorator (which needs request +
|
# Bypass the require_permission decorator (which needs request +
|
||||||
# thread_store) — these tests cover the parsing logic.
|
# thread_store) — these tests cover the parsing logic.
|
||||||
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace()))
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True))))
|
||||||
|
|
||||||
assert result.suggestions == ["Q1", "Q2"]
|
assert result.suggestions == ["Q1", "Q2"]
|
||||||
fake_model.ainvoke.assert_awaited_once()
|
fake_model.ainvoke.assert_awaited_once()
|
||||||
@@ -147,7 +147,7 @@ def test_generate_suggestions_parses_output_text_block_content(monkeypatch):
|
|||||||
|
|
||||||
# Bypass the require_permission decorator (which needs request +
|
# Bypass the require_permission decorator (which needs request +
|
||||||
# thread_store) — these tests cover the parsing logic.
|
# thread_store) — these tests cover the parsing logic.
|
||||||
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace()))
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True))))
|
||||||
|
|
||||||
assert result.suggestions == ["Q1", "Q2"]
|
assert result.suggestions == ["Q1", "Q2"]
|
||||||
fake_model.ainvoke.assert_awaited_once()
|
fake_model.ainvoke.assert_awaited_once()
|
||||||
@@ -166,6 +166,29 @@ def test_generate_suggestions_returns_empty_on_model_error(monkeypatch):
|
|||||||
|
|
||||||
# Bypass the require_permission decorator (which needs request +
|
# Bypass the require_permission decorator (which needs request +
|
||||||
# thread_store) — these tests cover the parsing logic.
|
# thread_store) — these tests cover the parsing logic.
|
||||||
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace()))
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True))))
|
||||||
|
|
||||||
assert result.suggestions == []
|
assert result.suggestions == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_generate_suggestions_returns_empty_when_disabled(monkeypatch):
|
||||||
|
"""Ensure suggestions are bypassed and returned an empty list when disabled in config."""
|
||||||
|
req = suggestions.SuggestionsRequest(
|
||||||
|
messages=[
|
||||||
|
suggestions.SuggestionMessage(role="user", content="Hi"),
|
||||||
|
suggestions.SuggestionMessage(role="assistant", content="Hello"),
|
||||||
|
],
|
||||||
|
n=3,
|
||||||
|
model_name=None,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_config = SimpleNamespace(suggestions=SimpleNamespace(enabled=False))
|
||||||
|
|
||||||
|
fake_model = MagicMock()
|
||||||
|
fake_model.ainvoke = AsyncMock(side_effect=RuntimeError("Model should not be called."))
|
||||||
|
monkeypatch.setattr(suggestions, "create_chat_model", lambda **kwargs: fake_model)
|
||||||
|
|
||||||
|
result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=mock_config))
|
||||||
|
|
||||||
|
assert result.suggestions == []
|
||||||
|
fake_model.ainvoke.assert_not_called()
|
||||||
|
|||||||
+21
-6
@@ -15,7 +15,7 @@
|
|||||||
# ============================================================================
|
# ============================================================================
|
||||||
# Bump this number when the config schema changes.
|
# Bump this number when the config schema changes.
|
||||||
# Run `make config-upgrade` to merge new fields into your local config.yaml.
|
# Run `make config-upgrade` to merge new fields into your local config.yaml.
|
||||||
config_version: 12
|
config_version: 14
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
# Logging
|
# Logging
|
||||||
@@ -711,6 +711,16 @@ tool_output:
|
|||||||
# web_search: 8000
|
# web_search: 8000
|
||||||
# bash: 20000
|
# bash: 20000
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# Suggestions Configuration
|
||||||
|
# ============================================================================
|
||||||
|
# Configure whether the agent automatically generates follow-up question
|
||||||
|
# suggestions at the end of each response.
|
||||||
|
|
||||||
|
suggestions:
|
||||||
|
enabled: true
|
||||||
|
|
||||||
|
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
# Loop Detection Configuration
|
# Loop Detection Configuration
|
||||||
# ============================================================================
|
# ============================================================================
|
||||||
@@ -882,16 +892,18 @@ sandbox:
|
|||||||
# Subagents are background workers delegated tasks by the lead agent
|
# Subagents are background workers delegated tasks by the lead agent
|
||||||
|
|
||||||
# subagents:
|
# subagents:
|
||||||
# # Default timeout in seconds for all subagents (default: 900 = 15 minutes)
|
# # Default timeout (seconds) for built-in subagents (default: 1800 = 30 min).
|
||||||
# timeout_seconds: 900
|
# # Custom agents use their own timeout_seconds (default 900) unless overridden.
|
||||||
# # Optional global max-turn override for all subagents
|
# timeout_seconds: 1800
|
||||||
|
# # Optional global max-turn override for all subagents.
|
||||||
|
# # Built-in defaults: general-purpose=150, bash=60. Leave unset to keep them.
|
||||||
# # max_turns: 120
|
# # max_turns: 120
|
||||||
#
|
#
|
||||||
# # Optional per-agent overrides (applies to both built-in and custom agents)
|
# # Optional per-agent overrides (applies to both built-in and custom agents)
|
||||||
# agents:
|
# agents:
|
||||||
# general-purpose:
|
# general-purpose:
|
||||||
# timeout_seconds: 1800 # 30 minutes for complex multi-step tasks
|
# timeout_seconds: 2700 # 45 minutes for very long deep-research tasks
|
||||||
# max_turns: 160
|
# max_turns: 250 # raise above the 150 default for very deep tasks
|
||||||
# # model: qwen3:32b # Use a specific model (default: inherit from lead agent)
|
# # model: qwen3:32b # Use a specific model (default: inherit from lead agent)
|
||||||
# # skills: # Skill whitelist (default: inherit all enabled skills)
|
# # skills: # Skill whitelist (default: inherit all enabled skills)
|
||||||
# # - web-search
|
# # - web-search
|
||||||
@@ -1196,6 +1208,9 @@ run_events:
|
|||||||
#
|
#
|
||||||
# channel_connections:
|
# channel_connections:
|
||||||
# enabled: false
|
# enabled: false
|
||||||
|
# # Security: keep this enabled unless you intentionally want legacy open-bot behavior.
|
||||||
|
# # Disabling it lets unbound external IM users create DeerFlow threads/runs.
|
||||||
|
# require_bound_identity: true
|
||||||
#
|
#
|
||||||
# telegram:
|
# telegram:
|
||||||
# enabled: false
|
# enabled: false
|
||||||
|
|||||||
Reference in New Issue
Block a user