diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 82d750740..058a7ec24 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -311,7 +311,7 @@ Proxied through nginx: `/api/langgraph/*` → Gateway LangGraph-compatible runti **Built-in Agents**: `general-purpose` (all tools except `task`) and `bash` (command specialist) **Execution**: Dual thread pool - `_scheduler_pool` (3 workers) + `_execution_pool` (3 workers) -**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`), 15-minute timeout +**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`); default subagent timeout `subagents.timeout_seconds=1800` (30 min) and built-in `general-purpose` `max_turns=150` (raised from 100/15-min so deep-research subtasks stop hitting `GraphRecursionError` out of the box) **Flow**: `task()` tool → `SubagentExecutor` → background thread → poll 5s → SSE events → result **Events**: `task_started`, `task_running`, `task_completed`/`task_failed`/`task_timed_out` **Deferred MCP tools** (if `tool_search.enabled`): `SubagentExecutor._build_initial_state` assembles deferral after policy filtering via the shared `assemble_deferred_tools` (fail-closed), appends the `tool_search` tool, injects the `` section into the subagent's `SystemMessage`, and threads the setup to `_create_agent`, which attaches `DeferredToolFilterMiddleware` through `build_subagent_runtime_middlewares(deferred_setup=...)`. Subagents thus withhold full MCP schemas until promotion, same as the lead agent; each task run gets a fresh `ThreadState` so promotion is isolated per run diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index f7f6afaad..5d1705524 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -42,6 +42,11 @@ DEFAULT_GATEWAY_URL = "http://localhost:8001" DEFAULT_ASSISTANT_ID = "lead_agent" CUSTOM_AGENT_NAME_PATTERN = re.compile(r"^[A-Za-z0-9-]+$") +# Lead-agent recursion budget (LangGraph super-steps for the lead graph only). +# This is independent of subagent depth: a `task()` dispatch runs the whole +# subagent inside ONE lead tools-node step, and subagents enforce their own +# limit via `subagents.max_turns` (see SubagentExecutor). Do not conflate this +# 100 with the general-purpose subagent's max_turns. DEFAULT_RUN_CONFIG: dict[str, Any] = {"recursion_limit": 100} DEFAULT_RUN_CONTEXT: dict[str, Any] = { "thinking_enabled": True, @@ -55,6 +60,8 @@ STREAM_UPDATE_MIN_INTERVAL_SECONDS = 0.35 STREAM_MODES = ["messages-tuple", "values"] MESSAGE_STREAM_EVENTS = ("messages-tuple", "messages") THREAD_BUSY_MESSAGE = "This conversation is already processing another request. Please wait for it to finish and try again." +BOUND_IDENTITY_REQUIRED_MESSAGE = "Connect this channel from DeerFlow Settings, complete the in-channel connect step, then send your message again." +BOUND_IDENTITY_UNAVAILABLE_MESSAGE = "Channel connection verification is temporarily unavailable. Please try again later or contact the DeerFlow operator." CHANNEL_CAPABILITIES = { "dingtalk": {"supports_streaming": False}, @@ -145,6 +152,19 @@ class _SlashSkillCommandResolution: failure_message: str | None = None +@dataclass(frozen=True, slots=True) +class _BoundIdentityRejection: + message: str = BOUND_IDENTITY_REQUIRED_MESSAGE + # Server-side connection id that may be used only as an outbound routing + # hint for the rejection message. This is never copied from the inbound + # message; it comes from the repository re-read when available. + outbound_connection_id: str | None = None + # Server-side owner for the outbound routing connection above. It lets + # channel senders preserve per-connection context without trusting the + # rejected inbound identity assertion. + outbound_owner_user_id: str | None = None + + def _is_thread_busy_error(exc: BaseException | None) -> bool: if exc is None: return False @@ -735,6 +755,7 @@ class ChannelManager: default_session: dict[str, Any] | None = None, channel_sessions: dict[str, Any] | None = None, connection_repo: Any | None = None, + require_bound_identity: bool = False, ) -> None: self.bus = bus self.store = store @@ -745,6 +766,7 @@ class ChannelManager: self._default_session = _as_dict(default_session) self._channel_sessions = dict(channel_sessions or {}) self._connection_repo = connection_repo + self._require_bound_identity = require_bound_identity self._client = None # lazy init — langgraph_sdk async client self._channel_metadata_synced: set[str] = set() self._skill_storage: SkillStorage | None = None @@ -918,38 +940,111 @@ class ChannelManager: async def _handle_message(self, msg: InboundMessage) -> None: msg = _apply_effective_owner(msg) - async with self._semaphore: - try: + try: + # Non-command chat can be rejected before it consumes a semaphore + # slot. Commands are handled below because provider adapters consume + # binding commands before manager dispatch, and _handle_command() + # applies its own admission gate for manager-level commands. + bound_identity_rejection = None + if msg.msg_type != InboundMessageType.COMMAND: + bound_identity_rejection = await self._get_bound_identity_rejection(msg) + if bound_identity_rejection is not None: + await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection) + return + + async with self._semaphore: if msg.msg_type == InboundMessageType.COMMAND: await self._handle_command(msg) else: - await self._handle_chat(msg) - except InvalidChannelSessionConfigError as exc: - logger.warning( - "Invalid channel session config for %s (chat=%s): %s", - msg.channel_name, - msg.chat_id, - exc, - ) - await self._send_error(msg, str(exc)) - except SlashSkillCommandResolutionError as exc: - logger.warning( - "Slash skill command resolution failed for %s (chat=%s): %s", - msg.channel_name, - msg.chat_id, - exc, - ) - await self._send_error(msg, str(exc)) - except Exception: - logger.exception( - "Error handling message from %s (chat=%s)", - msg.channel_name, - msg.chat_id, - ) - await self._send_error(msg, "An internal error occurred. Please try again.") + await self._handle_chat(msg, bound_identity_checked=True) + except InvalidChannelSessionConfigError as exc: + logger.warning( + "Invalid channel session config for %s (chat=%s): %s", + msg.channel_name, + msg.chat_id, + exc, + ) + await self._send_error(msg, str(exc)) + except SlashSkillCommandResolutionError as exc: + logger.warning( + "Slash skill command resolution failed for %s (chat=%s): %s", + msg.channel_name, + msg.chat_id, + exc, + ) + await self._send_error(msg, str(exc)) + except Exception: + logger.exception( + "Error handling message from %s (chat=%s)", + msg.channel_name, + msg.chat_id, + ) + await self._send_error(msg, "An internal error occurred. Please try again.") # -- chat handling ----------------------------------------------------- + async def _get_bound_identity_rejection(self, msg: InboundMessage) -> _BoundIdentityRejection | None: + """Return None when *msg* may proceed; otherwise return rejection routing hints. + + The returned object means the message lacks a verified bound identity. + Its fields are intentionally limited to server-side values re-read from + the connection repository, so rejection outbounds never trust a rejected + inbound message's asserted connection metadata. + """ + if not self._require_bound_identity: + return None + if _auth_disabled_owner_user_id(): + return None + + has_connection = bool(msg.connection_id) + has_owner = bool(msg.owner_user_id) + if not (has_connection and has_owner): + return _BoundIdentityRejection() + if self._connection_repo is None: + return _BoundIdentityRejection(message=BOUND_IDENTITY_UNAVAILABLE_MESSAGE) + + # The manager is the run-creation security boundary, so it does not + # trust mutable InboundMessage identity fields by themselves. Re-read + # the binding by provider identity before creating DeerFlow threads or + # runs. If the asserted identity does not match, keep only the + # server-side connection fields as outbound routing hints. + connection = await self._connection_repo.find_connection_by_external_identity( + provider=msg.channel_name, + external_account_id=msg.user_id, + workspace_id=msg.workspace_id or None, + ) + if connection is None: + return _BoundIdentityRejection() + + connection_id = connection.get("id") + owner_user_id = connection.get("owner_user_id") + if connection_id == msg.connection_id and owner_user_id == msg.owner_user_id: + return None + return _BoundIdentityRejection(outbound_connection_id=connection_id, outbound_owner_user_id=owner_user_id) + + async def _reject_unbound_channel_message( + self, + msg: InboundMessage, + *, + bound_identity_rejection: _BoundIdentityRejection, + ) -> None: + logger.info( + "[Manager] rejecting unbound channel message: channel=%s, chat_id=%s", + msg.channel_name, + msg.chat_id, + ) + outbound = OutboundMessage( + channel_name=msg.channel_name, + chat_id=msg.chat_id, + thread_id="", + text=bound_identity_rejection.message, + thread_ts=msg.thread_ts, + connection_id=bound_identity_rejection.outbound_connection_id, + owner_user_id=bound_identity_rejection.outbound_owner_user_id, + metadata=_slim_metadata(msg.metadata), + ) + await self.bus.publish_outbound(outbound) + async def _lookup_thread_id(self, msg: InboundMessage) -> str | None: if msg.connection_id and self._connection_repo is not None: return await self._connection_repo.get_thread_id( @@ -1011,7 +1106,21 @@ class ChannelManager: self._channel_metadata_synced.clear() self._channel_metadata_synced.add(thread_id) - async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any] | None = None) -> None: + async def _handle_chat( + self, + msg: InboundMessage, + extra_context: dict[str, Any] | None = None, + *, + bound_identity_checked: bool = False, + ) -> None: + # Normal entry paths already run the bound-identity check in + # _handle_message() or _handle_command(). Keep this default False so + # direct callers and future internal paths still fail closed. + bound_identity_rejection = None if bound_identity_checked else await self._get_bound_identity_rejection(msg) + if bound_identity_rejection is not None: + await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection) + return + client = self._get_client() # Look up existing DeerFlow thread. @@ -1237,6 +1346,18 @@ class ChannelManager: # -- command handling -------------------------------------------------- async def _handle_command(self, msg: InboundMessage) -> None: + # Commands are the other run-creation entry point besides chat: /new + # calls _create_thread() directly, and /bootstrap routes into + # _handle_chat(). Apply the same bound-identity admission boundary here + # so unbound platform users cannot create unowned threads/checkpoints or + # query Gateway state via commands. Provider-level binding flows + # (/connect , /start ) are consumed by the provider adapter + # before the message reaches the manager, so they are unaffected. + bound_identity_rejection = await self._get_bound_identity_rejection(msg) + if bound_identity_rejection is not None: + await self._reject_unbound_channel_message(msg, bound_identity_rejection=bound_identity_rejection) + return + raw_text = msg.text text = raw_text.strip() parts = text.split(maxsplit=1) @@ -1255,7 +1376,7 @@ class ChannelManager: chat_text = parts[1] if len(parts) > 1 else "Initialize workspace" chat_msg = _dc_replace(msg, text=chat_text, msg_type=InboundMessageType.CHAT) - await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True}) + await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True}, bound_identity_checked=True) return if reply is None and command == "new": @@ -1295,7 +1416,7 @@ class ChannelManager: from dataclasses import replace as _dc_replace chat_msg = _dc_replace(msg, msg_type=InboundMessageType.CHAT) - await self._handle_chat(chat_msg) + await self._handle_chat(chat_msg, bound_identity_checked=True) return else: reply = _unknown_command_reply(command) diff --git a/backend/app/channels/service.py b/backend/app/channels/service.py index a222c5a01..b689c9651 100644 --- a/backend/app/channels/service.py +++ b/backend/app/channels/service.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) if TYPE_CHECKING: from deerflow.config.app_config import AppConfig + from deerflow.config.channel_connections_config import ChannelConnectionsConfig # Channel name → import path for lazy loading _CHANNEL_REGISTRY: dict[str, str] = { @@ -64,8 +65,7 @@ def _merge_channel_connection_runtime_config(channels_config: dict[str, Any], ap merge_runtime_channel_configs(channels_config, connection_config) -def _make_connection_repo(app_config: AppConfig): - connection_config = getattr(app_config, "channel_connections", None) +def _make_connection_repo(connection_config: ChannelConnectionsConfig | None): if connection_config is None or not getattr(connection_config, "enabled", False): return None @@ -90,7 +90,13 @@ class ChannelService: instantiates enabled channels, and starts the ChannelManager dispatcher. """ - def __init__(self, channels_config: dict[str, Any] | None = None, *, connection_repo: Any | None = None) -> None: + def __init__( + self, + channels_config: dict[str, Any] | None = None, + *, + connection_repo: Any | None = None, + require_bound_identity: bool = False, + ) -> None: self.bus = MessageBus() self.store = ChannelStore() self._connection_repo = connection_repo @@ -107,6 +113,7 @@ class ChannelService: default_session=default_session if isinstance(default_session, dict) else None, channel_sessions=channel_sessions, connection_repo=connection_repo, + require_bound_identity=require_bound_identity, ) self._channels: dict[str, Any] = {} # name -> Channel instance self._config = config @@ -126,7 +133,14 @@ class ChannelService: if "channels" in extra: channels_config = dict(extra["channels"] or {}) _merge_channel_connection_runtime_config(channels_config, app_config) - return cls(channels_config=channels_config, connection_repo=_make_connection_repo(app_config)) + connection_config = getattr(app_config, "channel_connections", None) + connections_enabled = connection_config is not None and getattr(connection_config, "enabled", False) + require_bound_identity = bool(connections_enabled and getattr(connection_config, "require_bound_identity", True)) + return cls( + channels_config=channels_config, + connection_repo=_make_connection_repo(connection_config), + require_bound_identity=require_bound_identity, + ) async def start(self) -> None: """Start the manager and all enabled channels.""" diff --git a/backend/app/gateway/routers/suggestions.py b/backend/app/gateway/routers/suggestions.py index 39f7d250a..373a48b38 100644 --- a/backend/app/gateway/routers/suggestions.py +++ b/backend/app/gateway/routers/suggestions.py @@ -135,6 +135,8 @@ async def generate_suggestions( request: Request, config: AppConfig = Depends(get_config), ) -> SuggestionsResponse: + if not config.suggestions.enabled: + return SuggestionsResponse(suggestions=[]) if not body.messages: return SuggestionsResponse(suggestions=[]) diff --git a/backend/app/gateway/services.py b/backend/app/gateway/services.py index 77a9e8076..04a9f567b 100644 --- a/backend/app/gateway/services.py +++ b/backend/app/gateway/services.py @@ -224,6 +224,11 @@ def build_run_config( the LangGraph Platform-compatible HTTP API and the IM channel path behave identically. """ + # Lead-agent recursion budget (LangGraph super-steps for the lead graph + # only). Independent of subagent depth: a `task()` dispatch runs the whole + # subagent inside ONE lead tools-node step, and subagents enforce their own + # limit via `subagents.max_turns`. Do not conflate this 100 with the + # general-purpose subagent's max_turns. config: dict[str, Any] = {"recursion_limit": 100} if request_config: # LangGraph >= 0.6.0 introduced ``context`` as the preferred way to diff --git a/backend/docs/IM_CHANNEL_CONNECTIONS.md b/backend/docs/IM_CHANNEL_CONNECTIONS.md index 996c83568..8ed2e85a1 100644 --- a/backend/docs/IM_CHANNEL_CONNECTIONS.md +++ b/backend/docs/IM_CHANNEL_CONNECTIONS.md @@ -48,6 +48,11 @@ Then enable user bindings in `channel_connections`: ```yaml channel_connections: enabled: true + # Auth-enabled deployments require ordinary IM messages to come from a + # connected DeerFlow user by default. Set this to false only for legacy + # operator-owned/open-bot deployments that intentionally route unbound + # platform users to platform-ID user buckets. + require_bound_identity: true telegram: enabled: true @@ -74,6 +79,10 @@ channel_connections: `channel_connections` does not duplicate provider secrets. It only controls the browser-facing connect UI and stores per-user binding records. Telegram needs `bot_username` only so the frontend can open a deep link. +When `channel_connections.enabled` and `require_bound_identity` are true, auth-enabled deployments reject ordinary unbound IM messages before creating a DeerFlow thread or run. Users must connect the channel from DeerFlow Settings first. Auth-disabled local mode still routes channel messages to the auth-disabled default user, and legacy open-bot behavior can be restored explicitly with `require_bound_identity: false`. + +Upgrade note: existing auth-enabled deployments that already have `channel_connections.enabled: true` will start rejecting ordinary unbound IM messages after this field is introduced because `require_bound_identity` defaults to true. Legacy operator-owned/open-bot deployments that intentionally allow unbound platform users to create DeerFlow runs should set `require_bound_identity: false` before upgrading and restart the service. + ## Connect Flow Telegram: diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index 5091b7d31..311824782 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -28,6 +28,7 @@ from deerflow.config.skill_evolution_config import SkillEvolutionConfig from deerflow.config.skills_config import SkillsConfig from deerflow.config.stream_bridge_config import StreamBridgeConfig, load_stream_bridge_config_from_dict from deerflow.config.subagents_config import SubagentsAppConfig, load_subagents_config_from_dict +from deerflow.config.suggestions_config import SuggestionsConfig from deerflow.config.summarization_config import SummarizationConfig, load_summarization_config_from_dict from deerflow.config.title_config import TitleConfig, load_title_config_from_dict from deerflow.config.token_usage_config import TokenUsageConfig @@ -116,6 +117,7 @@ class AppConfig(BaseModel): acp_agents: dict[str, ACPAgentConfig] = Field(default_factory=dict, description="ACP-compatible agent configuration") subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration") guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration") + suggestions: SuggestionsConfig = Field(default_factory=SuggestionsConfig, description="Follow-up suggestions configuration.") circuit_breaker: CircuitBreakerConfig = Field(default_factory=CircuitBreakerConfig, description="LLM circuit breaker configuration") channel_connections: ChannelConnectionsConfig = Field( default_factory=ChannelConnectionsConfig, diff --git a/backend/packages/harness/deerflow/config/channel_connections_config.py b/backend/packages/harness/deerflow/config/channel_connections_config.py index 4092d5863..7e740b7af 100644 --- a/backend/packages/harness/deerflow/config/channel_connections_config.py +++ b/backend/packages/harness/deerflow/config/channel_connections_config.py @@ -42,6 +42,7 @@ class ChannelConnectionsConfig(BaseModel): """Top-level config for browser-connectable IM channels.""" enabled: bool = False + require_bound_identity: bool = True slack: SlackChannelConnectionConfig = Field(default_factory=SlackChannelConnectionConfig) telegram: TelegramChannelConnectionConfig = Field(default_factory=TelegramChannelConnectionConfig) discord: DiscordChannelConnectionConfig = Field(default_factory=DiscordChannelConnectionConfig) diff --git a/backend/packages/harness/deerflow/config/subagents_config.py b/backend/packages/harness/deerflow/config/subagents_config.py index 026016b21..738616884 100644 --- a/backend/packages/harness/deerflow/config/subagents_config.py +++ b/backend/packages/harness/deerflow/config/subagents_config.py @@ -72,9 +72,9 @@ class SubagentsAppConfig(BaseModel): """Configuration for the subagent system.""" timeout_seconds: int = Field( - default=900, + default=1800, ge=1, - description="Default timeout in seconds for all subagents (default: 900 = 15 minutes)", + description="Default timeout in seconds for built-in subagents (default: 1800 = 30 minutes); custom agents use their own timeout_seconds unless given a per-agent override", ) max_turns: int | None = Field( default=None, diff --git a/backend/packages/harness/deerflow/config/suggestions_config.py b/backend/packages/harness/deerflow/config/suggestions_config.py new file mode 100644 index 000000000..a7b8817bd --- /dev/null +++ b/backend/packages/harness/deerflow/config/suggestions_config.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel, Field + + +class SuggestionsConfig(BaseModel): + """Configuration for automatic follow-up suggestions.""" + + enabled: bool = Field(default=True, description="Whether to enable follow-up question suggestions at the end of an AI response") diff --git a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py index 176194729..c5291d1b5 100644 --- a/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py +++ b/backend/packages/harness/deerflow/subagents/builtins/general_purpose.py @@ -57,5 +57,5 @@ You have access to the same sandbox environment as the parent agent: tools=None, # Inherit all tools from parent disallowed_tools=["task", "ask_clarification", "present_files"], # Prevent nesting and clarification model="inherit", - max_turns=100, + max_turns=150, ) diff --git a/backend/packages/harness/deerflow/subagents/config.py b/backend/packages/harness/deerflow/subagents/config.py index 9081e2df9..a3ae6024d 100644 --- a/backend/packages/harness/deerflow/subagents/config.py +++ b/backend/packages/harness/deerflow/subagents/config.py @@ -20,8 +20,13 @@ class SubagentConfig: skills: Optional list of skill names to load. If None, inherits all enabled skills. If an empty list, no skills are loaded. model: Model to use - 'inherit' uses parent's model. - max_turns: Maximum number of agent turns before stopping. - timeout_seconds: Maximum execution time in seconds (default: 900 = 15 minutes). + max_turns: Maximum agent turns before stopping. Built-in agents use the + value set here (general-purpose=150, bash=60) unless the global + ``subagents.max_turns`` is set. + timeout_seconds: Bare fallback execution-time cap. For built-in agents the + effective limit is the global ``subagents.timeout_seconds`` (default + 1800 = 30 min), layered on by the registry; this 900 only applies + when no differing global value exists. """ name: str diff --git a/backend/tests/test_app_config_reload.py b/backend/tests/test_app_config_reload.py index c0bc00bff..7a7fd02df 100644 --- a/backend/tests/test_app_config_reload.py +++ b/backend/tests/test_app_config_reload.py @@ -316,7 +316,7 @@ def test_get_app_config_resets_singleton_configs_when_sections_removed(tmp_path, assert get_title_config().enabled is True assert get_summarization_config().enabled is False assert get_memory_config().enabled is True - assert get_subagents_app_config().timeout_seconds == 900 + assert get_subagents_app_config().timeout_seconds == 1800 assert get_tool_search_config().enabled is False assert get_guardrails_config().enabled is False assert get_checkpointer_config() is None diff --git a/backend/tests/test_channel_connections_config.py b/backend/tests/test_channel_connections_config.py index 8a14878c0..48038abb1 100644 --- a/backend/tests/test_channel_connections_config.py +++ b/backend/tests/test_channel_connections_config.py @@ -7,6 +7,7 @@ def test_channel_connections_disabled_by_default(): config = ChannelConnectionsConfig() assert config.enabled is False + assert config.require_bound_identity is True assert config.slack.enabled is False assert config.telegram.enabled is False assert config.discord.enabled is False @@ -43,6 +44,13 @@ def test_enabled_channel_connections_do_not_require_public_url_or_encryption_key assert config.provider_status("wecom") == {"enabled": True, "configured": True} +def test_require_bound_identity_can_be_disabled_for_legacy_open_bot_mode(): + config = ChannelConnectionsConfig.model_validate({"enabled": True, "require_bound_identity": False}) + + assert config.enabled is True + assert config.require_bound_identity is False + + def test_provider_status_reports_disabled_and_unknown_providers(): config = ChannelConnectionsConfig.model_validate({"enabled": True}) diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index b4eea74ea..59f1539c1 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -2565,6 +2565,445 @@ class TestResolveRunParamsUserId: assert "channel_user_id" not in run_context +class _BoundIdentityRepo: + def __init__(self, connections: list[dict[str, str | None]] | None = None) -> None: + self.connections = list(connections or []) + self.lookups: list[dict[str, str | None]] = [] + self.thread_sets: list[dict[str, str | None]] = [] + + async def find_connection_by_external_identity(self, *, provider: str, external_account_id: str, workspace_id: str | None = None): + self.lookups.append( + { + "provider": provider, + "external_account_id": external_account_id, + "workspace_id": workspace_id, + } + ) + for connection in self.connections: + if connection.get("provider") == provider and connection.get("external_account_id") == external_account_id and connection.get("workspace_id") == workspace_id: + return connection + return None + + async def get_thread_id(self, connection_id: str, chat_id: str, topic_id: str | None = None): + return None + + async def set_thread_id( + self, + *, + connection_id: str, + owner_user_id: str, + provider: str, + external_conversation_id: str, + external_topic_id: str | None, + thread_id: str, + ) -> None: + self.thread_sets.append( + { + "connection_id": connection_id, + "owner_user_id": owner_user_id, + "provider": provider, + "external_conversation_id": external_conversation_id, + "external_topic_id": external_topic_id, + "thread_id": thread_id, + } + ) + + +class TestChannelManagerBoundIdentityPolicy: + def test_unbound_auth_enabled_chat_is_rejected_before_thread_or_run_creation(self, monkeypatch): + from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=True) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + text="hi", + thread_ts="1710000000.000100", + ) + ) + + assert len(outbound_received) == 1 + assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE + assert outbound_received[0].thread_id == "" + assert outbound_received[0].connection_id is None + assert outbound_received[0].owner_user_id is None + mock_client.threads.create.assert_not_called() + mock_client.runs.wait.assert_not_called() + + _run(go()) + + def test_bound_identity_repo_unavailable_uses_transient_failure_message(self, monkeypatch): + from app.channels.manager import BOUND_IDENTITY_UNAVAILABLE_MESSAGE, ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=True) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + workspace_id="T123", + text="hi", + ) + ) + + assert len(outbound_received) == 1 + assert outbound_received[0].text == BOUND_IDENTITY_UNAVAILABLE_MESSAGE + assert outbound_received[0].connection_id is None + assert outbound_received[0].owner_user_id is None + mock_client.threads.create.assert_not_called() + mock_client.runs.wait.assert_not_called() + + _run(go()) + + def test_unbound_auth_enabled_chat_is_rejected_before_semaphore(self, monkeypatch): + from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=True) + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager.start() + assert manager._semaphore is not None + await manager._semaphore.acquire() + try: + await asyncio.wait_for( + manager._handle_message( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + text="hi", + ) + ), + timeout=0.5, + ) + finally: + manager._semaphore.release() + await manager.stop() + + assert len(outbound_received) == 1 + assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE + assert outbound_received[0].connection_id is None + assert outbound_received[0].owner_user_id is None + + _run(go()) + + def test_bound_auth_enabled_chat_is_allowed_when_bound_identity_is_required(self, monkeypatch): + from app.channels.manager import ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + repo = _BoundIdentityRepo( + [ + { + "id": "connection-1", + "owner_user_id": "deerflow-user-1", + "provider": "slack", + "external_account_id": "U-platform", + "workspace_id": "T123", + } + ] + ) + manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True) + mock_client = _make_mock_langgraph_client(thread_id="thread-bound") + manager._client = mock_client + + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + workspace_id="T123", + text="hi", + ) + ) + + mock_client.threads.create.assert_called_once() + mock_client.runs.wait.assert_called_once() + run_context = mock_client.runs.wait.call_args.kwargs["context"] + assert run_context["user_id"] == "deerflow-user-1" + assert run_context["channel_user_id"] == "U-platform" + + _run(go()) + + def test_bound_auth_enabled_message_checks_bound_identity_once_on_hot_path(self, monkeypatch): + from app.channels.manager import ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + repo = _BoundIdentityRepo( + [ + { + "id": "connection-1", + "owner_user_id": "deerflow-user-1", + "provider": "slack", + "external_account_id": "U-platform", + "workspace_id": "T123", + } + ] + ) + manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True) + mock_client = _make_mock_langgraph_client(thread_id="thread-bound") + manager._client = mock_client + await manager.start() + try: + await manager._handle_message( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + workspace_id="T123", + text="hi", + ) + ) + finally: + await manager.stop() + + assert repo.lookups == [ + { + "provider": "slack", + "external_account_id": "U-platform", + "workspace_id": "T123", + } + ] + mock_client.threads.create.assert_called_once() + mock_client.runs.wait.assert_called_once() + + _run(go()) + + def test_auth_enabled_chat_rejects_unverified_bound_identity(self, monkeypatch): + from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + repo = _BoundIdentityRepo( + [ + { + "id": "actual-connection", + "owner_user_id": "actual-owner", + "provider": "slack", + "external_account_id": "U-platform", + "workspace_id": None, + } + ] + ) + manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="forged-owner", + connection_id="forged-connection", + text="hi", + ) + ) + + assert len(outbound_received) == 1 + assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE + assert outbound_received[0].connection_id == "actual-connection" + assert outbound_received[0].owner_user_id == "actual-owner" + mock_client.threads.create.assert_not_called() + mock_client.runs.wait.assert_not_called() + + _run(go()) + + def test_auth_disabled_chat_keeps_default_user_when_bound_identity_is_required(self, monkeypatch): + from app.channels.manager import ChannelManager + from app.gateway.auth_disabled import AUTH_DISABLED_USER_ID + + monkeypatch.setenv("DEER_FLOW_AUTH_DISABLED", "1") + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=True) + mock_client = _make_mock_langgraph_client(thread_id="thread-local") + manager._client = mock_client + + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + text="hi", + ) + ) + + mock_client.threads.create.assert_called_once() + mock_client.runs.wait.assert_called_once() + run_context = mock_client.runs.wait.call_args.kwargs["context"] + assert run_context["user_id"] == AUTH_DISABLED_USER_ID + assert run_context["channel_user_id"] == "U-platform" + + _run(go()) + + def test_legacy_open_bot_mode_allows_unbound_auth_enabled_chat(self, monkeypatch): + from app.channels.manager import ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=False) + mock_client = _make_mock_langgraph_client(thread_id="thread-legacy") + manager._client = mock_client + + await manager._handle_chat( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + text="hi", + ) + ) + + mock_client.threads.create.assert_called_once() + mock_client.runs.wait.assert_called_once() + run_context = mock_client.runs.wait.call_args.kwargs["context"] + assert run_context["user_id"] == "U-platform" + assert run_context["channel_user_id"] == "U-platform" + + _run(go()) + + def test_unbound_auth_enabled_new_command_is_rejected_before_thread_creation(self, monkeypatch): + from app.channels.manager import BOUND_IDENTITY_REQUIRED_MESSAGE, ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store, require_bound_identity=True) + mock_client = _make_mock_langgraph_client() + manager._client = mock_client + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager._handle_command( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + text="/new", + msg_type=InboundMessageType.COMMAND, + thread_ts="1710000000.000100", + ) + ) + + assert len(outbound_received) == 1 + assert outbound_received[0].text == BOUND_IDENTITY_REQUIRED_MESSAGE + assert outbound_received[0].thread_id == "" + assert outbound_received[0].connection_id is None + assert outbound_received[0].owner_user_id is None + mock_client.threads.create.assert_not_called() + + _run(go()) + + def test_bound_auth_enabled_new_command_creates_thread(self, monkeypatch): + from app.channels.manager import ChannelManager + + monkeypatch.delenv("DEER_FLOW_AUTH_DISABLED", raising=False) + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + repo = _BoundIdentityRepo( + [ + { + "id": "connection-1", + "owner_user_id": "deerflow-user-1", + "provider": "slack", + "external_account_id": "U-platform", + "workspace_id": "T123", + } + ] + ) + manager = ChannelManager(bus=bus, store=store, connection_repo=repo, require_bound_identity=True) + mock_client = _make_mock_langgraph_client(thread_id="thread-bound") + manager._client = mock_client + + await manager._handle_command( + InboundMessage( + channel_name="slack", + chat_id="C123", + user_id="U-platform", + owner_user_id="deerflow-user-1", + connection_id="connection-1", + workspace_id="T123", + text="/new", + msg_type=InboundMessageType.COMMAND, + ) + ) + + mock_client.threads.create.assert_called_once() + + _run(go()) + + class TestChannelManagerConnectionRouting: def test_connection_scoped_conversations_do_not_share_threads(self, tmp_path, monkeypatch): from app.channels.manager import ChannelManager @@ -3811,6 +4250,13 @@ class TestChannelService: assert service.manager._connection_repo is repo + def test_require_bound_identity_is_forwarded_to_manager(self): + from app.channels.service import ChannelService + + service = ChannelService(channels_config={}, require_bound_identity=True) + + assert service.manager._require_bound_identity is True + def test_remove_channel_stops_running_channel_and_forgets_config(self): from app.channels.service import ChannelService diff --git a/backend/tests/test_subagent_timeout_config.py b/backend/tests/test_subagent_timeout_config.py index b20bbe7a9..d68e0f99f 100644 --- a/backend/tests/test_subagent_timeout_config.py +++ b/backend/tests/test_subagent_timeout_config.py @@ -97,7 +97,7 @@ class TestSubagentOverrideConfig: class TestSubagentsAppConfigDefaults: def test_default_timeout(self): config = SubagentsAppConfig() - assert config.timeout_seconds == 900 + assert config.timeout_seconds == 1800 def test_default_max_turns_override_is_none(self): config = SubagentsAppConfig() @@ -281,7 +281,7 @@ class TestLoadSubagentsConfig: def test_load_empty_dict_uses_defaults(self): load_subagents_config_from_dict({}) cfg = get_subagents_app_config() - assert cfg.timeout_seconds == 900 + assert cfg.timeout_seconds == 1800 assert cfg.max_turns is None assert cfg.agents == {} @@ -319,13 +319,30 @@ class TestRegistryGetSubagentConfig: assert get_subagent_config("general-purpose") is not None assert get_subagent_config("bash") is not None - def test_default_timeout_preserved_when_no_config(self): + def test_explicit_global_timeout_propagates_to_general_purpose(self): + """An explicit global timeout (here the non-default 900) propagates to a + built-in agent, while max_turns still comes from the builtin def (150). + """ from deerflow.subagents.registry import get_subagent_config _reset_subagents_config(timeout_seconds=900) config = get_subagent_config("general-purpose") assert config.timeout_seconds == 900 - assert config.max_turns == 100 + assert config.max_turns == 150 + + def test_builtin_defaults_have_research_headroom(self): + """Out-of-box defaults (no config.yaml subagents section) must give + general-purpose enough turns/time for deep research, which previously + failed with GraphRecursionError at the old max_turns=100 limit. + """ + from deerflow.subagents.registry import get_subagent_config + + load_subagents_config_from_dict({}) # no subagents config -> model defaults + config = get_subagent_config("general-purpose") + assert config.max_turns == 150 + assert config.timeout_seconds == 1800 + # Pin bash too so the config.example.yaml "bash=60" doc cannot drift. + assert get_subagent_config("bash").max_turns == 60 def test_global_timeout_override_applied(self): from deerflow.subagents.registry import get_subagent_config diff --git a/backend/tests/test_suggestions_router.py b/backend/tests/test_suggestions_router.py index bd0a998ff..1a7531fde 100644 --- a/backend/tests/test_suggestions_router.py +++ b/backend/tests/test_suggestions_router.py @@ -74,7 +74,7 @@ def test_generate_suggestions_strips_inline_think_block(monkeypatch): fake_model.ainvoke = AsyncMock(return_value=MagicMock(content=content)) monkeypatch.setattr(suggestions, "create_chat_model", lambda **kwargs: fake_model) - result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace())) + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True)))) assert result.suggestions == ["深度学习和机器学习的区别?", "常用框架有哪些?", "需要什么数学基础?"] @@ -103,7 +103,7 @@ def test_generate_suggestions_parses_and_limits(monkeypatch): # Bypass the require_permission decorator (which needs request + # thread_store) — these tests cover the parsing logic. - result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace())) + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True)))) assert result.suggestions == ["Q1", "Q2", "Q3"] fake_model.ainvoke.assert_awaited_once() @@ -125,7 +125,7 @@ def test_generate_suggestions_parses_list_block_content(monkeypatch): # Bypass the require_permission decorator (which needs request + # thread_store) — these tests cover the parsing logic. - result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace())) + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True)))) assert result.suggestions == ["Q1", "Q2"] fake_model.ainvoke.assert_awaited_once() @@ -147,7 +147,7 @@ def test_generate_suggestions_parses_output_text_block_content(monkeypatch): # Bypass the require_permission decorator (which needs request + # thread_store) — these tests cover the parsing logic. - result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace())) + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True)))) assert result.suggestions == ["Q1", "Q2"] fake_model.ainvoke.assert_awaited_once() @@ -166,6 +166,29 @@ def test_generate_suggestions_returns_empty_on_model_error(monkeypatch): # Bypass the require_permission decorator (which needs request + # thread_store) — these tests cover the parsing logic. - result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace())) + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=SimpleNamespace(suggestions=SimpleNamespace(enabled=True)))) assert result.suggestions == [] + + +def test_generate_suggestions_returns_empty_when_disabled(monkeypatch): + """Ensure suggestions are bypassed and returned an empty list when disabled in config.""" + req = suggestions.SuggestionsRequest( + messages=[ + suggestions.SuggestionMessage(role="user", content="Hi"), + suggestions.SuggestionMessage(role="assistant", content="Hello"), + ], + n=3, + model_name=None, + ) + + mock_config = SimpleNamespace(suggestions=SimpleNamespace(enabled=False)) + + fake_model = MagicMock() + fake_model.ainvoke = AsyncMock(side_effect=RuntimeError("Model should not be called.")) + monkeypatch.setattr(suggestions, "create_chat_model", lambda **kwargs: fake_model) + + result = asyncio.run(suggestions.generate_suggestions.__wrapped__("t1", req, request=None, config=mock_config)) + + assert result.suggestions == [] + fake_model.ainvoke.assert_not_called() diff --git a/config.example.yaml b/config.example.yaml index e20ce5443..9ac6e177d 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -15,7 +15,7 @@ # ============================================================================ # Bump this number when the config schema changes. # Run `make config-upgrade` to merge new fields into your local config.yaml. -config_version: 12 +config_version: 14 # ============================================================================ # Logging @@ -711,6 +711,16 @@ tool_output: # web_search: 8000 # bash: 20000 +# ============================================================================ +# Suggestions Configuration +# ============================================================================ +# Configure whether the agent automatically generates follow-up question +# suggestions at the end of each response. + +suggestions: + enabled: true + + # ============================================================================ # Loop Detection Configuration # ============================================================================ @@ -882,16 +892,18 @@ sandbox: # Subagents are background workers delegated tasks by the lead agent # subagents: -# # Default timeout in seconds for all subagents (default: 900 = 15 minutes) -# timeout_seconds: 900 -# # Optional global max-turn override for all subagents +# # Default timeout (seconds) for built-in subagents (default: 1800 = 30 min). +# # Custom agents use their own timeout_seconds (default 900) unless overridden. +# timeout_seconds: 1800 +# # Optional global max-turn override for all subagents. +# # Built-in defaults: general-purpose=150, bash=60. Leave unset to keep them. # # max_turns: 120 # # # Optional per-agent overrides (applies to both built-in and custom agents) # agents: # general-purpose: -# timeout_seconds: 1800 # 30 minutes for complex multi-step tasks -# max_turns: 160 +# timeout_seconds: 2700 # 45 minutes for very long deep-research tasks +# max_turns: 250 # raise above the 150 default for very deep tasks # # model: qwen3:32b # Use a specific model (default: inherit from lead agent) # # skills: # Skill whitelist (default: inherit all enabled skills) # # - web-search @@ -1196,6 +1208,9 @@ run_events: # # channel_connections: # enabled: false +# # Security: keep this enabled unless you intentionally want legacy open-bot behavior. +# # Disabling it lets unbound external IM users create DeerFlow threads/runs. +# require_bound_identity: true # # telegram: # enabled: false