"""Tool search — deferred tool discovery at runtime. Contains: - DeferredToolCatalog: immutable, searchable catalog of deferred tools. - build_tool_search_tool: builds the `tool_search` tool as a closure over a catalog; it records promotions into graph state via ``Command``. - build_deferred_tool_setup: assembles the catalog + tool from a policy-filtered tool list (call AFTER tool-policy filtering). The agent sees deferred tool names in but cannot call them until it fetches their full schema via the tool_search tool. The deferred set rides on a build-time closure and promotion lives in per-thread graph state — there is no ContextVar. Source-agnostic: a tool is "deferred" when it carries the ``deerflow_mcp`` metadata tag. """ import hashlib import json import logging import re from dataclasses import dataclass from functools import cached_property from typing import Annotated from langchain.tools import BaseTool from langchain_core.messages import ToolMessage from langchain_core.tools import InjectedToolCallId, tool from langchain_core.utils.function_calling import convert_to_openai_function from langgraph.types import Command from deerflow.tools.mcp_metadata import is_mcp_tool logger = logging.getLogger(__name__) MAX_RESULTS = 5 # Max tools returned per search def _compile_catalog_regex(pattern: str) -> re.Pattern[str]: """Compile ``pattern`` case-insensitively, falling back to a literal match. Search queries come from the model, so an invalid regex (e.g. an unbalanced paren) must degrade to a literal substring match rather than raise. """ try: return re.compile(pattern, re.IGNORECASE) except re.error: return re.compile(re.escape(pattern), re.IGNORECASE) # ── Catalog ── # NOTE: frozen=True without slots=True keeps __dict__, which is what lets the # @cached_property fields below cache (they write to instance.__dict__, bypassing # the frozen __setattr__). Do NOT add slots=True or hash/names break at runtime. @dataclass(frozen=True) class DeferredToolCatalog: """Immutable catalog of deferred tools. Pure search, no mutation.""" tools: tuple[BaseTool, ...] @cached_property def names(self) -> frozenset[str]: return frozenset(t.name for t in self.tools) @cached_property def hash(self) -> str: canon = [{"name": t.name, "schema": convert_to_openai_function(t)} for t in sorted(self.tools, key=lambda t: t.name)] blob = json.dumps(canon, sort_keys=True, ensure_ascii=False, default=str) return hashlib.sha256(blob.encode("utf-8")).hexdigest()[:16] def search(self, query: str) -> list[BaseTool]: query = query.strip() if not query: return [] if query.startswith("select:"): wanted = {n.strip() for n in query[7:].split(",")} return [t for t in self.tools if t.name in wanted][:MAX_RESULTS] if query.startswith("+"): parts = query[1:].split(None, 1) if not parts: return [] # bare "+" with no required token — nothing to require required = parts[0].lower() candidates = [t for t in self.tools if required in t.name.lower()] if len(parts) > 1: candidates.sort(key=lambda t: _catalog_regex_score(parts[1], t), reverse=True) return candidates[:MAX_RESULTS] regex = _compile_catalog_regex(query) scored: list[tuple[int, BaseTool]] = [] for t in self.tools: searchable = f"{t.name} {t.description or ''}" if regex.search(searchable): scored.append((2 if regex.search(t.name) else 1, t)) scored.sort(key=lambda x: x[0], reverse=True) return [t for _, t in scored][:MAX_RESULTS] def _catalog_regex_score(pattern: str, t: BaseTool) -> int: regex = _compile_catalog_regex(pattern) return len(regex.findall(f"{t.name} {t.description or ''}")) # ── Setup / tool ── @dataclass(frozen=True) class DeferredToolSetup: """Result of assembling deferred-tool support for one agent build. The three fields move as a unit, so callers branch on ``tool_search_tool``: - **Empty** ``(None, frozenset(), None)``: deferral is disabled, or no MCP tool survived policy filtering. Nothing is deferred — bind tools as-is. - **Populated**: ``tool_search_tool`` is appended to the agent's tools, ``deferred_names`` are withheld from the model until promoted, and ``catalog_hash`` scopes those promotions in graph state. Invariant: ``tool_search_tool is None`` ⟺ ``deferred_names`` is empty ⟺ ``catalog_hash is None``. """ tool_search_tool: BaseTool | None deferred_names: frozenset[str] catalog_hash: str | None def build_tool_search_tool(catalog: DeferredToolCatalog) -> BaseTool: catalog_hash = catalog.hash @tool def tool_search(query: str, tool_call_id: Annotated[str, InjectedToolCallId]) -> Command: """Fetches full schema definitions for deferred tools so they can be called. Deferred tools appear by name in in the system prompt. Until fetched, only the name is known. This tool matches a query against the deferred tools and returns the matched tools complete schemas; once returned, a tool becomes callable. Query forms: - "select:Read,Edit" -- fetch these exact tools by name - "notebook jupyter" -- keyword search, up to max_results best matches - "+slack send" -- require "slack" in the name, rank by remaining terms """ matched = catalog.search(query)[:MAX_RESULTS] if not matched: content, names = f"No tools found matching: {query}", [] else: content = json.dumps([convert_to_openai_function(t) for t in matched], indent=2, ensure_ascii=False) names = [t.name for t in matched] return Command( update={ "promoted": {"catalog_hash": catalog_hash, "names": names}, "messages": [ToolMessage(content=content, tool_call_id=tool_call_id, name="tool_search")], } ) return tool_search def build_deferred_tool_setup(filtered_tools: list[BaseTool], *, enabled: bool) -> DeferredToolSetup: """Build the deferred-tool setup from a POLICY-FILTERED tool list. Must be called after skill/agent tool-policy filtering so the catalog never exposes a tool the current agent is not allowed to use. Returns an empty setup (see :class:`DeferredToolSetup`) in two distinct cases: deferral is disabled, or it is enabled but no MCP tool survived filtering. """ if not enabled: # Deferral disabled: defer nothing; the model binds every tool as before. return DeferredToolSetup(None, frozenset(), None) deferred = [t for t in filtered_tools if is_mcp_tool(t)] if not deferred: # Enabled, but no MCP tool to defer: same empty result, different reason. return DeferredToolSetup(None, frozenset(), None) catalog = DeferredToolCatalog(tuple(deferred)) return DeferredToolSetup(build_tool_search_tool(catalog), catalog.names, catalog.hash) def assemble_deferred_tools(filtered_tools: list[BaseTool], *, enabled: bool) -> tuple[list[BaseTool], DeferredToolSetup]: """Build the final tool list + deferred setup from a POLICY-FILTERED list. Call AFTER tool-policy filtering so the deferred catalog never exposes a tool the agent is not allowed to use. Fail-closed: if tool_search is enabled and MCP tools survived filtering but no deferred set was recovered, raise rather than silently binding their full schemas to the model. Shared by every agent-build path (lead, embedded client, subagent) so they all get the same fail-closed guarantee from one place. """ deferred_setup = build_deferred_tool_setup(filtered_tools, enabled=enabled) if enabled and not deferred_setup.deferred_names and any(is_mcp_tool(t) for t in filtered_tools): raise RuntimeError("tool_search enabled and MCP tools survived policy filtering, but no deferred set was recovered - refusing to bind MCP schemas (fail-closed).") final_tools = list(filtered_tools) if deferred_setup.tool_search_tool: final_tools.append(deferred_setup.tool_search_tool) return final_tools, deferred_setup # Prompt rendering def get_deferred_tools_prompt_section(*, deferred_names: frozenset[str] = frozenset()) -> str: """Generate from an explicit deferred-name set. Lists only names so the agent knows what exists and can use tool_search to load them. Returns empty string when there are no deferred tools. The set is computed at agent build time (after tool-policy filtering) and passed in. Lives here, next to the assembly that produces ``deferred_names``, so every agent-build path (lead, embedded client, subagent) renders the section the same way without coupling back to ``lead_agent.prompt``. """ if not deferred_names: return "" names = "\n".join(sorted(deferred_names)) return f"\n{names}\n"