mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-22 07:56:48 +00:00
Merge branch 'main' into fix-2804
This commit is contained in:
@@ -40,6 +40,15 @@ class MemoryUpdateQueue:
|
||||
self._timer: threading.Timer | None = None
|
||||
self._processing = False
|
||||
|
||||
@staticmethod
|
||||
def _queue_key(
|
||||
thread_id: str,
|
||||
user_id: str | None,
|
||||
agent_name: str | None,
|
||||
) -> tuple[str, str | None, str | None]:
|
||||
"""Return the debounce identity for a memory update target."""
|
||||
return (thread_id, user_id, agent_name)
|
||||
|
||||
def add(
|
||||
self,
|
||||
thread_id: str,
|
||||
@@ -115,8 +124,9 @@ class MemoryUpdateQueue:
|
||||
correction_detected: bool,
|
||||
reinforcement_detected: bool,
|
||||
) -> None:
|
||||
queue_key = self._queue_key(thread_id, user_id, agent_name)
|
||||
existing_context = next(
|
||||
(context for context in self._queue if context.thread_id == thread_id),
|
||||
(context for context in self._queue if self._queue_key(context.thread_id, context.user_id, context.agent_name) == queue_key),
|
||||
None,
|
||||
)
|
||||
merged_correction_detected = correction_detected or (existing_context.correction_detected if existing_context is not None else False)
|
||||
@@ -130,7 +140,7 @@ class MemoryUpdateQueue:
|
||||
reinforcement_detected=merged_reinforcement_detected,
|
||||
)
|
||||
|
||||
self._queue = [c for c in self._queue if c.thread_id != thread_id]
|
||||
self._queue = [context for context in self._queue if self._queue_key(context.thread_id, context.user_id, context.agent_name) != queue_key]
|
||||
self._queue.append(context)
|
||||
|
||||
def _reset_timer(self) -> None:
|
||||
|
||||
@@ -6,6 +6,7 @@ from deerflow.agents.memory.message_processing import detect_correction, detect_
|
||||
from deerflow.agents.memory.queue import get_memory_queue
|
||||
from deerflow.agents.middlewares.summarization_middleware import SummarizationEvent
|
||||
from deerflow.config.memory_config import get_memory_config
|
||||
from deerflow.runtime.user_context import resolve_runtime_user_id
|
||||
|
||||
|
||||
def memory_flush_hook(event: SummarizationEvent) -> None:
|
||||
@@ -21,11 +22,13 @@ def memory_flush_hook(event: SummarizationEvent) -> None:
|
||||
|
||||
correction_detected = detect_correction(filtered_messages)
|
||||
reinforcement_detected = not correction_detected and detect_reinforcement(filtered_messages)
|
||||
user_id = resolve_runtime_user_id(event.runtime)
|
||||
queue = get_memory_queue()
|
||||
queue.add_nowait(
|
||||
thread_id=event.thread_id,
|
||||
messages=filtered_messages,
|
||||
agent_name=event.agent_name,
|
||||
user_id=user_id,
|
||||
correction_detected=correction_detected,
|
||||
reinforcement_detected=reinforcement_detected,
|
||||
)
|
||||
|
||||
+27
-22
@@ -104,45 +104,46 @@ class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
|
||||
return "[Tool call was interrupted and did not return a result.]"
|
||||
|
||||
def _build_patched_messages(self, messages: list) -> list | None:
|
||||
"""Return a new message list with patches inserted at the correct positions.
|
||||
"""Return messages with tool results grouped after their tool-call AIMessage.
|
||||
|
||||
For each AIMessage with dangling tool_calls (no corresponding ToolMessage),
|
||||
a synthetic ToolMessage is inserted immediately after that AIMessage.
|
||||
Returns None if no patches are needed.
|
||||
This normalizes model-bound causal order before provider serialization while
|
||||
preserving already-valid transcripts unchanged.
|
||||
"""
|
||||
# Collect IDs of all existing ToolMessages
|
||||
existing_tool_msg_ids: set[str] = set()
|
||||
tool_messages_by_id: dict[str, ToolMessage] = {}
|
||||
for msg in messages:
|
||||
if isinstance(msg, ToolMessage):
|
||||
existing_tool_msg_ids.add(msg.tool_call_id)
|
||||
tool_messages_by_id.setdefault(msg.tool_call_id, msg)
|
||||
|
||||
# Check if any patching is needed
|
||||
needs_patch = False
|
||||
tool_call_ids: set[str] = set()
|
||||
for msg in messages:
|
||||
if getattr(msg, "type", None) != "ai":
|
||||
continue
|
||||
for tc in self._message_tool_calls(msg):
|
||||
tc_id = tc.get("id")
|
||||
if tc_id and tc_id not in existing_tool_msg_ids:
|
||||
needs_patch = True
|
||||
break
|
||||
if needs_patch:
|
||||
break
|
||||
if tc_id:
|
||||
tool_call_ids.add(tc_id)
|
||||
|
||||
if not needs_patch:
|
||||
return None
|
||||
|
||||
# Build new list with patches inserted right after each dangling AIMessage
|
||||
patched: list = []
|
||||
patched_ids: set[str] = set()
|
||||
consumed_tool_msg_ids: set[str] = set()
|
||||
patch_count = 0
|
||||
for msg in messages:
|
||||
if isinstance(msg, ToolMessage) and msg.tool_call_id in tool_call_ids:
|
||||
continue
|
||||
|
||||
patched.append(msg)
|
||||
if getattr(msg, "type", None) != "ai":
|
||||
continue
|
||||
|
||||
for tc in self._message_tool_calls(msg):
|
||||
tc_id = tc.get("id")
|
||||
if tc_id and tc_id not in existing_tool_msg_ids and tc_id not in patched_ids:
|
||||
if not tc_id or tc_id in consumed_tool_msg_ids:
|
||||
continue
|
||||
|
||||
existing_tool_msg = tool_messages_by_id.get(tc_id)
|
||||
if existing_tool_msg is not None:
|
||||
patched.append(existing_tool_msg)
|
||||
consumed_tool_msg_ids.add(tc_id)
|
||||
else:
|
||||
patched.append(
|
||||
ToolMessage(
|
||||
content=self._synthetic_tool_message_content(tc),
|
||||
@@ -151,10 +152,14 @@ class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
|
||||
status="error",
|
||||
)
|
||||
)
|
||||
patched_ids.add(tc_id)
|
||||
consumed_tool_msg_ids.add(tc_id)
|
||||
patch_count += 1
|
||||
|
||||
logger.warning(f"Injecting {patch_count} placeholder ToolMessage(s) for dangling tool calls")
|
||||
if patched == messages:
|
||||
return None
|
||||
|
||||
if patch_count:
|
||||
logger.warning(f"Injecting {patch_count} placeholder ToolMessage(s) for dangling tool calls")
|
||||
return patched
|
||||
|
||||
@override
|
||||
|
||||
@@ -7,17 +7,21 @@ reminder message so the model still knows about the outstanding todo list.
|
||||
|
||||
Additionally, this middleware prevents the agent from exiting the loop while
|
||||
there are still incomplete todo items. When the model produces a final response
|
||||
(no tool calls) but todos are not yet complete, the middleware injects a reminder
|
||||
and jumps back to the model node to force continued engagement.
|
||||
(no tool calls) but todos are not yet complete, the middleware queues a reminder
|
||||
for the next model request and jumps back to the model node to force continued
|
||||
engagement. The completion reminder is injected via ``wrap_model_call`` instead
|
||||
of being persisted into graph state as a normal user-visible message.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any, override
|
||||
|
||||
from langchain.agents.middleware import TodoListMiddleware
|
||||
from langchain.agents.middleware.todo import PlanningState, Todo
|
||||
from langchain.agents.middleware.types import hook_config
|
||||
from langchain.agents.middleware.types import ModelCallResult, ModelRequest, ModelResponse, hook_config
|
||||
from langchain_core.messages import AIMessage, HumanMessage
|
||||
from langgraph.runtime import Runtime
|
||||
|
||||
@@ -55,6 +59,51 @@ def _format_todos(todos: list[Todo]) -> str:
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _format_completion_reminder(todos: list[Todo]) -> str:
|
||||
"""Format a completion reminder for incomplete todo items."""
|
||||
incomplete = [t for t in todos if t.get("status") != "completed"]
|
||||
incomplete_text = "\n".join(f"- [{t.get('status', 'pending')}] {t.get('content', '')}" for t in incomplete)
|
||||
return (
|
||||
"<system_reminder>\n"
|
||||
"You have incomplete todo items that must be finished before giving your final response:\n\n"
|
||||
f"{incomplete_text}\n\n"
|
||||
"Please continue working on these tasks. Call `write_todos` to mark items as completed "
|
||||
"as you finish them, and only respond when all items are done.\n"
|
||||
"</system_reminder>"
|
||||
)
|
||||
|
||||
|
||||
_TOOL_CALL_FINISH_REASONS = {"tool_calls", "function_call"}
|
||||
|
||||
|
||||
def _has_tool_call_intent_or_error(message: AIMessage) -> bool:
|
||||
"""Return True when an AIMessage is not a clean final answer.
|
||||
|
||||
Todo completion reminders should only fire when the model has produced a
|
||||
plain final response. Provider/tool parsing details have moved across
|
||||
LangChain versions and integrations, so keep all tool-intent/error signals
|
||||
behind this helper instead of checking one concrete field at the call site.
|
||||
"""
|
||||
if message.tool_calls:
|
||||
return True
|
||||
|
||||
if getattr(message, "invalid_tool_calls", None):
|
||||
return True
|
||||
|
||||
# Backward/provider compatibility: some integrations preserve raw or legacy
|
||||
# tool-call intent in additional_kwargs even when structured tool_calls is
|
||||
# empty. If this helper changes, update the matching sentinel test
|
||||
# `TestToolCallIntentOrError.test_langchain_ai_message_tool_fields_are_explicitly_handled`;
|
||||
# if that test fails after a LangChain upgrade, review this helper so new
|
||||
# tool-call/error fields are not silently treated as clean final answers.
|
||||
additional_kwargs = getattr(message, "additional_kwargs", {}) or {}
|
||||
if additional_kwargs.get("tool_calls") or additional_kwargs.get("function_call"):
|
||||
return True
|
||||
|
||||
response_metadata = getattr(message, "response_metadata", {}) or {}
|
||||
return response_metadata.get("finish_reason") in _TOOL_CALL_FINISH_REASONS
|
||||
|
||||
|
||||
class TodoMiddleware(TodoListMiddleware):
|
||||
"""Extends TodoListMiddleware with `write_todos` context-loss detection.
|
||||
|
||||
@@ -89,6 +138,7 @@ class TodoMiddleware(TodoListMiddleware):
|
||||
formatted = _format_todos(todos)
|
||||
reminder = HumanMessage(
|
||||
name="todo_reminder",
|
||||
additional_kwargs={"hide_from_ui": True},
|
||||
content=(
|
||||
"<system_reminder>\n"
|
||||
"Your todo list from earlier is no longer visible in the current context window, "
|
||||
@@ -113,6 +163,100 @@ class TodoMiddleware(TodoListMiddleware):
|
||||
# Maximum number of completion reminders before allowing the agent to exit.
|
||||
# This prevents infinite loops when the agent cannot make further progress.
|
||||
_MAX_COMPLETION_REMINDERS = 2
|
||||
# Hard cap for per-run reminder bookkeeping in long-lived middleware instances.
|
||||
_MAX_COMPLETION_REMINDER_KEYS = 4096
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self._lock = threading.Lock()
|
||||
self._pending_completion_reminders: dict[tuple[str, str], list[str]] = {}
|
||||
self._completion_reminder_counts: dict[tuple[str, str], int] = {}
|
||||
self._completion_reminder_touch_order: dict[tuple[str, str], int] = {}
|
||||
self._completion_reminder_next_order = 0
|
||||
|
||||
@staticmethod
|
||||
def _get_thread_id(runtime: Runtime) -> str:
|
||||
context = getattr(runtime, "context", None)
|
||||
thread_id = context.get("thread_id") if context else None
|
||||
return str(thread_id) if thread_id else "default"
|
||||
|
||||
@staticmethod
|
||||
def _get_run_id(runtime: Runtime) -> str:
|
||||
context = getattr(runtime, "context", None)
|
||||
run_id = context.get("run_id") if context else None
|
||||
return str(run_id) if run_id else "default"
|
||||
|
||||
def _pending_key(self, runtime: Runtime) -> tuple[str, str]:
|
||||
return self._get_thread_id(runtime), self._get_run_id(runtime)
|
||||
|
||||
def _touch_completion_reminder_key_locked(self, key: tuple[str, str]) -> None:
|
||||
self._completion_reminder_next_order += 1
|
||||
self._completion_reminder_touch_order[key] = self._completion_reminder_next_order
|
||||
|
||||
def _completion_reminder_keys_locked(self) -> set[tuple[str, str]]:
|
||||
keys = set(self._pending_completion_reminders)
|
||||
keys.update(self._completion_reminder_counts)
|
||||
keys.update(self._completion_reminder_touch_order)
|
||||
return keys
|
||||
|
||||
def _drop_completion_reminder_key_locked(self, key: tuple[str, str]) -> None:
|
||||
self._pending_completion_reminders.pop(key, None)
|
||||
self._completion_reminder_counts.pop(key, None)
|
||||
self._completion_reminder_touch_order.pop(key, None)
|
||||
|
||||
def _prune_completion_reminder_state_locked(self, protected_key: tuple[str, str]) -> None:
|
||||
keys = self._completion_reminder_keys_locked()
|
||||
overflow = len(keys) - self._MAX_COMPLETION_REMINDER_KEYS
|
||||
if overflow <= 0:
|
||||
return
|
||||
|
||||
candidates = [key for key in keys if key != protected_key]
|
||||
candidates.sort(key=lambda key: self._completion_reminder_touch_order.get(key, 0))
|
||||
for key in candidates[:overflow]:
|
||||
self._drop_completion_reminder_key_locked(key)
|
||||
|
||||
def _queue_completion_reminder(self, runtime: Runtime, reminder: str) -> None:
|
||||
key = self._pending_key(runtime)
|
||||
with self._lock:
|
||||
self._pending_completion_reminders.setdefault(key, []).append(reminder)
|
||||
self._completion_reminder_counts[key] = self._completion_reminder_counts.get(key, 0) + 1
|
||||
self._touch_completion_reminder_key_locked(key)
|
||||
self._prune_completion_reminder_state_locked(protected_key=key)
|
||||
|
||||
def _completion_reminder_count_for_runtime(self, runtime: Runtime) -> int:
|
||||
key = self._pending_key(runtime)
|
||||
with self._lock:
|
||||
return self._completion_reminder_counts.get(key, 0)
|
||||
|
||||
def _drain_completion_reminders(self, runtime: Runtime) -> list[str]:
|
||||
key = self._pending_key(runtime)
|
||||
with self._lock:
|
||||
reminders = self._pending_completion_reminders.pop(key, [])
|
||||
if reminders or key in self._completion_reminder_counts:
|
||||
self._touch_completion_reminder_key_locked(key)
|
||||
return reminders
|
||||
|
||||
def _clear_other_run_completion_reminders(self, runtime: Runtime) -> None:
|
||||
thread_id, current_run_id = self._pending_key(runtime)
|
||||
with self._lock:
|
||||
for key in self._completion_reminder_keys_locked():
|
||||
if key[0] == thread_id and key[1] != current_run_id:
|
||||
self._drop_completion_reminder_key_locked(key)
|
||||
|
||||
def _clear_current_run_completion_reminders(self, runtime: Runtime) -> None:
|
||||
key = self._pending_key(runtime)
|
||||
with self._lock:
|
||||
self._drop_completion_reminder_key_locked(key)
|
||||
|
||||
@override
|
||||
def before_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
|
||||
self._clear_other_run_completion_reminders(runtime)
|
||||
return None
|
||||
|
||||
@override
|
||||
async def abefore_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
|
||||
self._clear_other_run_completion_reminders(runtime)
|
||||
return None
|
||||
|
||||
@hook_config(can_jump_to=["model"])
|
||||
@override
|
||||
@@ -137,10 +281,12 @@ class TodoMiddleware(TodoListMiddleware):
|
||||
if base_result is not None:
|
||||
return base_result
|
||||
|
||||
# 2. Only intervene when the agent wants to exit (no tool calls).
|
||||
# 2. Only intervene when the agent wants to exit cleanly. Tool-call
|
||||
# intent or tool-call parse errors should be handled by the tool path
|
||||
# instead of being masked by todo reminders.
|
||||
messages = state.get("messages") or []
|
||||
last_ai = next((m for m in reversed(messages) if isinstance(m, AIMessage)), None)
|
||||
if not last_ai or last_ai.tool_calls:
|
||||
if not last_ai or _has_tool_call_intent_or_error(last_ai):
|
||||
return None
|
||||
|
||||
# 3. Allow exit when all todos are completed or there are no todos.
|
||||
@@ -149,24 +295,14 @@ class TodoMiddleware(TodoListMiddleware):
|
||||
return None
|
||||
|
||||
# 4. Enforce a reminder cap to prevent infinite re-engagement loops.
|
||||
if _completion_reminder_count(messages) >= self._MAX_COMPLETION_REMINDERS:
|
||||
if self._completion_reminder_count_for_runtime(runtime) >= self._MAX_COMPLETION_REMINDERS:
|
||||
return None
|
||||
|
||||
# 5. Inject a reminder and force the agent back to the model.
|
||||
incomplete = [t for t in todos if t.get("status") != "completed"]
|
||||
incomplete_text = "\n".join(f"- [{t.get('status', 'pending')}] {t.get('content', '')}" for t in incomplete)
|
||||
reminder = HumanMessage(
|
||||
name="todo_completion_reminder",
|
||||
content=(
|
||||
"<system_reminder>\n"
|
||||
"You have incomplete todo items that must be finished before giving your final response:\n\n"
|
||||
f"{incomplete_text}\n\n"
|
||||
"Please continue working on these tasks. Call `write_todos` to mark items as completed "
|
||||
"as you finish them, and only respond when all items are done.\n"
|
||||
"</system_reminder>"
|
||||
),
|
||||
)
|
||||
return {"jump_to": "model", "messages": [reminder]}
|
||||
# 5. Queue a reminder for the next model request and jump back. We must
|
||||
# not persist this control prompt as a normal HumanMessage, otherwise it
|
||||
# can leak into user-visible message streams and saved transcripts.
|
||||
self._queue_completion_reminder(runtime, _format_completion_reminder(todos))
|
||||
return {"jump_to": "model"}
|
||||
|
||||
@override
|
||||
@hook_config(can_jump_to=["model"])
|
||||
@@ -177,3 +313,47 @@ class TodoMiddleware(TodoListMiddleware):
|
||||
) -> dict[str, Any] | None:
|
||||
"""Async version of after_model."""
|
||||
return self.after_model(state, runtime)
|
||||
|
||||
@staticmethod
|
||||
def _format_pending_completion_reminders(reminders: list[str]) -> str:
|
||||
return "\n\n".join(dict.fromkeys(reminders))
|
||||
|
||||
def _augment_request(self, request: ModelRequest) -> ModelRequest:
|
||||
reminders = self._drain_completion_reminders(request.runtime)
|
||||
if not reminders:
|
||||
return request
|
||||
new_messages = [
|
||||
*request.messages,
|
||||
HumanMessage(
|
||||
content=self._format_pending_completion_reminders(reminders),
|
||||
name="todo_completion_reminder",
|
||||
additional_kwargs={"hide_from_ui": True},
|
||||
),
|
||||
]
|
||||
return request.override(messages=new_messages)
|
||||
|
||||
@override
|
||||
def wrap_model_call(
|
||||
self,
|
||||
request: ModelRequest,
|
||||
handler: Callable[[ModelRequest], ModelResponse],
|
||||
) -> ModelCallResult:
|
||||
return handler(self._augment_request(request))
|
||||
|
||||
@override
|
||||
async def awrap_model_call(
|
||||
self,
|
||||
request: ModelRequest,
|
||||
handler: Callable[[ModelRequest], Awaitable[ModelResponse]],
|
||||
) -> ModelCallResult:
|
||||
return await handler(self._augment_request(request))
|
||||
|
||||
@override
|
||||
def after_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
|
||||
self._clear_current_run_completion_reminders(runtime)
|
||||
return None
|
||||
|
||||
@override
|
||||
async def aafter_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
|
||||
self._clear_current_run_completion_reminders(runtime)
|
||||
return None
|
||||
|
||||
@@ -9,7 +9,7 @@ from typing import Any, override
|
||||
from langchain.agents import AgentState
|
||||
from langchain.agents.middleware import AgentMiddleware
|
||||
from langchain.agents.middleware.todo import Todo
|
||||
from langchain_core.messages import AIMessage
|
||||
from langchain_core.messages import AIMessage, ToolMessage
|
||||
from langgraph.runtime import Runtime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -217,6 +217,17 @@ def _infer_step_kind(message: AIMessage, actions: list[dict[str, Any]]) -> str:
|
||||
return "thinking"
|
||||
|
||||
|
||||
def _has_tool_call(message: AIMessage, tool_call_id: str) -> bool:
|
||||
"""Return True if the AIMessage contains a tool_call with the given id."""
|
||||
for tc in message.tool_calls or []:
|
||||
if isinstance(tc, dict):
|
||||
if tc.get("id") == tool_call_id:
|
||||
return True
|
||||
elif hasattr(tc, "id") and tc.id == tool_call_id:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _build_attribution(message: AIMessage, todos: list[Todo]) -> dict[str, Any]:
|
||||
tool_calls = getattr(message, "tool_calls", None) or []
|
||||
actions: list[dict[str, Any]] = []
|
||||
@@ -261,8 +272,51 @@ class TokenUsageMiddleware(AgentMiddleware):
|
||||
if not messages:
|
||||
return None
|
||||
|
||||
# Annotate subagent token usage onto the AIMessage that dispatched it.
|
||||
# When a task tool completes, its usage is cached by tool_call_id. Detect
|
||||
# the ToolMessage → search backward for the corresponding AIMessage → merge.
|
||||
# Walk backward through consecutive ToolMessages before the new AIMessage
|
||||
# so that multiple concurrent task tool calls all get their subagent tokens
|
||||
# written back to the same dispatch message (merging into one update).
|
||||
state_updates: dict[int, AIMessage] = {}
|
||||
if len(messages) >= 2:
|
||||
from deerflow.tools.builtins.task_tool import pop_cached_subagent_usage
|
||||
|
||||
idx = len(messages) - 2
|
||||
while idx >= 0:
|
||||
tool_msg = messages[idx]
|
||||
if not isinstance(tool_msg, ToolMessage) or not tool_msg.tool_call_id:
|
||||
break
|
||||
|
||||
subagent_usage = pop_cached_subagent_usage(tool_msg.tool_call_id)
|
||||
if subagent_usage:
|
||||
# Search backward from the ToolMessage to find the AIMessage
|
||||
# that dispatched it. A single model response can dispatch
|
||||
# multiple task tool calls, so we can't assume a fixed offset.
|
||||
dispatch_idx = idx - 1
|
||||
while dispatch_idx >= 0:
|
||||
candidate = messages[dispatch_idx]
|
||||
if isinstance(candidate, AIMessage) and _has_tool_call(candidate, tool_msg.tool_call_id):
|
||||
# Accumulate into an existing update for the same
|
||||
# AIMessage (multiple task calls in one response),
|
||||
# or merge fresh from the original message.
|
||||
existing_update = state_updates.get(dispatch_idx)
|
||||
prev = existing_update.usage_metadata if existing_update else (getattr(candidate, "usage_metadata", None) or {})
|
||||
merged = {
|
||||
**prev,
|
||||
"input_tokens": prev.get("input_tokens", 0) + subagent_usage["input_tokens"],
|
||||
"output_tokens": prev.get("output_tokens", 0) + subagent_usage["output_tokens"],
|
||||
"total_tokens": prev.get("total_tokens", 0) + subagent_usage["total_tokens"],
|
||||
}
|
||||
state_updates[dispatch_idx] = candidate.model_copy(update={"usage_metadata": merged})
|
||||
break
|
||||
dispatch_idx -= 1
|
||||
idx -= 1
|
||||
|
||||
last = messages[-1]
|
||||
if not isinstance(last, AIMessage):
|
||||
if state_updates:
|
||||
return {"messages": [state_updates[idx] for idx in sorted(state_updates)]}
|
||||
return None
|
||||
|
||||
usage = getattr(last, "usage_metadata", None)
|
||||
@@ -288,11 +342,12 @@ class TokenUsageMiddleware(AgentMiddleware):
|
||||
additional_kwargs = dict(getattr(last, "additional_kwargs", {}) or {})
|
||||
|
||||
if additional_kwargs.get(TOKEN_USAGE_ATTRIBUTION_KEY) == attribution:
|
||||
return None
|
||||
return {"messages": [state_updates[idx] for idx in sorted(state_updates)]} if state_updates else None
|
||||
|
||||
additional_kwargs[TOKEN_USAGE_ATTRIBUTION_KEY] = attribution
|
||||
updated_msg = last.model_copy(update={"additional_kwargs": additional_kwargs})
|
||||
return {"messages": [updated_msg]}
|
||||
state_updates[len(messages) - 1] = updated_msg
|
||||
return {"messages": [state_updates[idx] for idx in sorted(state_updates)]}
|
||||
|
||||
@override
|
||||
def after_model(self, state: AgentState, runtime: Runtime) -> dict | None:
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
"""Dialect-aware JSON value matching for SQLAlchemy (SQLite + PostgreSQL)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import BigInteger, Float, String, bindparam
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.compiler import SQLCompiler
|
||||
from sqlalchemy.sql.expression import ColumnElement
|
||||
from sqlalchemy.sql.visitors import InternalTraversal
|
||||
from sqlalchemy.types import Boolean, TypeEngine
|
||||
|
||||
# Key is interpolated into compiled SQL; restrict charset to prevent injection.
|
||||
_KEY_CHARSET_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
|
||||
|
||||
# Allowed value types for metadata filter values (same set accepted by JsonMatch).
|
||||
ALLOWED_FILTER_VALUE_TYPES: tuple[type, ...] = (type(None), bool, int, float, str)
|
||||
|
||||
# SQLite raises an overflow when binding values outside signed 64-bit range;
|
||||
# PostgreSQL overflows during BIGINT cast. Reject at validation time instead.
|
||||
_INT64_MIN = -(2**63)
|
||||
_INT64_MAX = 2**63 - 1
|
||||
|
||||
|
||||
def validate_metadata_filter_key(key: object) -> bool:
|
||||
"""Return True if *key* is safe for use as a JSON metadata filter key.
|
||||
|
||||
A key is "safe" when it is a string matching ``[A-Za-z0-9_-]+``. The
|
||||
charset is restricted because the key is interpolated into the
|
||||
compiled SQL path expression (``$."<key>"`` / ``->`` literal), so any
|
||||
laxer pattern would open a SQL/JSONPath injection surface.
|
||||
"""
|
||||
return isinstance(key, str) and bool(_KEY_CHARSET_RE.match(key))
|
||||
|
||||
|
||||
def validate_metadata_filter_value(value: object) -> bool:
|
||||
"""Return True if *value* is an allowed type for a JSON metadata filter.
|
||||
|
||||
Matches the set of types ``_build_clause`` knows how to compile into
|
||||
a dialect-portable predicate. Anything else (list/dict/bytes/...) is
|
||||
intentionally rejected rather than silently coerced via ``str()`` —
|
||||
silent coercion would (a) produce wrong matches and (b) break
|
||||
SQLAlchemy's ``inherit_cache`` invariant when ``value`` is unhashable.
|
||||
|
||||
Integer values are additionally restricted to the signed 64-bit range
|
||||
``[-2**63, 2**63 - 1]``: SQLite overflows when binding larger values
|
||||
and PostgreSQL overflows during the ``BIGINT`` cast.
|
||||
"""
|
||||
if not isinstance(value, ALLOWED_FILTER_VALUE_TYPES):
|
||||
return False
|
||||
if isinstance(value, int) and not isinstance(value, bool):
|
||||
if not (_INT64_MIN <= value <= _INT64_MAX):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class JsonMatch(ColumnElement):
|
||||
"""Dialect-portable ``column[key] == value`` for JSON columns.
|
||||
|
||||
Compiles to ``json_type``/``json_extract`` on SQLite and
|
||||
``json_typeof``/``->>`` on PostgreSQL, with type-safe comparison
|
||||
that distinguishes bool vs int and NULL vs missing key.
|
||||
|
||||
*key* must be a single literal key matching ``[A-Za-z0-9_-]+``.
|
||||
*value* must be one of: ``None``, ``bool``, ``int`` (signed 64-bit), ``float``, ``str``.
|
||||
"""
|
||||
|
||||
inherit_cache = True
|
||||
type = Boolean()
|
||||
_is_implicitly_boolean = True
|
||||
|
||||
_traverse_internals = [
|
||||
("column", InternalTraversal.dp_clauseelement),
|
||||
("key", InternalTraversal.dp_string),
|
||||
("value", InternalTraversal.dp_plain_obj),
|
||||
]
|
||||
|
||||
def __init__(self, column: ColumnElement, key: str, value: object) -> None:
|
||||
if not validate_metadata_filter_key(key):
|
||||
raise ValueError(f"JsonMatch key must match {_KEY_CHARSET_RE.pattern!r}; got: {key!r}")
|
||||
if not validate_metadata_filter_value(value):
|
||||
if isinstance(value, int) and not isinstance(value, bool):
|
||||
raise TypeError(f"JsonMatch int value out of signed 64-bit range [-2**63, 2**63-1]: {value!r}")
|
||||
raise TypeError(f"JsonMatch value must be None, bool, int, float, or str; got: {type(value).__name__!r}")
|
||||
self.column = column
|
||||
self.key = key
|
||||
self.value = value
|
||||
super().__init__()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _Dialect:
|
||||
"""Per-dialect names used when emitting JSON type/value comparisons."""
|
||||
|
||||
null_type: str
|
||||
num_types: tuple[str, ...]
|
||||
num_cast: str
|
||||
int_types: tuple[str, ...]
|
||||
int_cast: str
|
||||
# None for SQLite where json_type already returns 'integer'/'real';
|
||||
# regex literal for PostgreSQL where json_typeof returns 'number' for
|
||||
# both ints and floats, so an extra guard prevents CAST errors on floats.
|
||||
int_guard: str | None
|
||||
string_type: str
|
||||
bool_type: str | None
|
||||
|
||||
|
||||
_SQLITE = _Dialect(
|
||||
null_type="null",
|
||||
num_types=("integer", "real"),
|
||||
num_cast="REAL",
|
||||
int_types=("integer",),
|
||||
int_cast="INTEGER",
|
||||
int_guard=None,
|
||||
string_type="text",
|
||||
bool_type=None,
|
||||
)
|
||||
|
||||
_PG = _Dialect(
|
||||
null_type="null",
|
||||
num_types=("number",),
|
||||
num_cast="DOUBLE PRECISION",
|
||||
int_types=("number",),
|
||||
int_cast="BIGINT",
|
||||
int_guard="'^-?[0-9]+$'",
|
||||
string_type="string",
|
||||
bool_type="boolean",
|
||||
)
|
||||
|
||||
|
||||
def _bind(compiler: SQLCompiler, value: object, sa_type: TypeEngine[Any], **kw: Any) -> str:
|
||||
param = bindparam(None, value, type_=sa_type)
|
||||
return compiler.process(param, **kw)
|
||||
|
||||
|
||||
def _type_check(typeof: str, types: tuple[str, ...]) -> str:
|
||||
if len(types) == 1:
|
||||
return f"{typeof} = '{types[0]}'"
|
||||
quoted = ", ".join(f"'{t}'" for t in types)
|
||||
return f"{typeof} IN ({quoted})"
|
||||
|
||||
|
||||
def _build_clause(compiler: SQLCompiler, typeof: str, extract: str, value: object, dialect: _Dialect, **kw: Any) -> str:
|
||||
if value is None:
|
||||
return f"{typeof} = '{dialect.null_type}'"
|
||||
if isinstance(value, bool):
|
||||
# bool check must precede int check — bool is a subclass of int in Python
|
||||
bool_str = "true" if value else "false"
|
||||
if dialect.bool_type is None:
|
||||
return f"{typeof} = '{bool_str}'"
|
||||
return f"({typeof} = '{dialect.bool_type}' AND {extract} = '{bool_str}')"
|
||||
if isinstance(value, int):
|
||||
bp = _bind(compiler, value, BigInteger(), **kw)
|
||||
if dialect.int_guard:
|
||||
# CASE prevents CAST error when json_typeof = 'number' also matches floats
|
||||
return f"(CASE WHEN {_type_check(typeof, dialect.int_types)} AND {extract} ~ {dialect.int_guard} THEN CAST({extract} AS {dialect.int_cast}) END = {bp})"
|
||||
return f"({_type_check(typeof, dialect.int_types)} AND CAST({extract} AS {dialect.int_cast}) = {bp})"
|
||||
if isinstance(value, float):
|
||||
bp = _bind(compiler, value, Float(), **kw)
|
||||
return f"({_type_check(typeof, dialect.num_types)} AND CAST({extract} AS {dialect.num_cast}) = {bp})"
|
||||
bp = _bind(compiler, str(value), String(), **kw)
|
||||
return f"({typeof} = '{dialect.string_type}' AND {extract} = {bp})"
|
||||
|
||||
|
||||
@compiles(JsonMatch, "sqlite")
|
||||
def _compile_sqlite(element: JsonMatch, compiler: SQLCompiler, **kw: Any) -> str:
|
||||
if not validate_metadata_filter_key(element.key):
|
||||
raise ValueError(f"Key escaped validation: {element.key!r}")
|
||||
col = compiler.process(element.column, **kw)
|
||||
path = f'$."{element.key}"'
|
||||
typeof = f"json_type({col}, '{path}')"
|
||||
extract = f"json_extract({col}, '{path}')"
|
||||
return _build_clause(compiler, typeof, extract, element.value, _SQLITE, **kw)
|
||||
|
||||
|
||||
@compiles(JsonMatch, "postgresql")
|
||||
def _compile_pg(element: JsonMatch, compiler: SQLCompiler, **kw: Any) -> str:
|
||||
if not validate_metadata_filter_key(element.key):
|
||||
raise ValueError(f"Key escaped validation: {element.key!r}")
|
||||
col = compiler.process(element.column, **kw)
|
||||
typeof = f"json_typeof({col} -> '{element.key}')"
|
||||
extract = f"({col} ->> '{element.key}')"
|
||||
return _build_clause(compiler, typeof, extract, element.value, _PG, **kw)
|
||||
|
||||
|
||||
@compiles(JsonMatch)
|
||||
def _compile_default(element: JsonMatch, compiler: SQLCompiler, **kw: Any) -> str:
|
||||
raise NotImplementedError(f"JsonMatch supports only sqlite and postgresql; got dialect: {compiler.dialect.name}")
|
||||
|
||||
|
||||
def json_match(column: ColumnElement, key: str, value: object) -> JsonMatch:
|
||||
return JsonMatch(column, key, value)
|
||||
@@ -223,10 +223,11 @@ class RunRepository(RunStore):
|
||||
"""Aggregate token usage via a single SQL GROUP BY query."""
|
||||
_completed = RunRow.status.in_(("success", "error"))
|
||||
_thread = RunRow.thread_id == thread_id
|
||||
model_name = func.coalesce(RunRow.model_name, "unknown")
|
||||
|
||||
stmt = (
|
||||
select(
|
||||
func.coalesce(RunRow.model_name, "unknown").label("model"),
|
||||
model_name.label("model"),
|
||||
func.count().label("runs"),
|
||||
func.coalesce(func.sum(RunRow.total_tokens), 0).label("total_tokens"),
|
||||
func.coalesce(func.sum(RunRow.total_input_tokens), 0).label("total_input_tokens"),
|
||||
@@ -236,7 +237,7 @@ class RunRepository(RunStore):
|
||||
func.coalesce(func.sum(RunRow.middleware_tokens), 0).label("middleware"),
|
||||
)
|
||||
.where(_thread, _completed)
|
||||
.group_by(func.coalesce(RunRow.model_name, "unknown"))
|
||||
.group_by(model_name)
|
||||
)
|
||||
|
||||
async with self._sf() as session:
|
||||
|
||||
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from deerflow.persistence.thread_meta.base import ThreadMetaStore
|
||||
from deerflow.persistence.thread_meta.base import InvalidMetadataFilterError, ThreadMetaStore
|
||||
from deerflow.persistence.thread_meta.memory import MemoryThreadMetaStore
|
||||
from deerflow.persistence.thread_meta.model import ThreadMetaRow
|
||||
from deerflow.persistence.thread_meta.sql import ThreadMetaRepository
|
||||
@@ -14,6 +14,7 @@ if TYPE_CHECKING:
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
__all__ = [
|
||||
"InvalidMetadataFilterError",
|
||||
"MemoryThreadMetaStore",
|
||||
"ThreadMetaRepository",
|
||||
"ThreadMetaRow",
|
||||
|
||||
@@ -15,10 +15,15 @@ three-state semantics (see :mod:`deerflow.runtime.user_context`):
|
||||
from __future__ import annotations
|
||||
|
||||
import abc
|
||||
from typing import Any
|
||||
|
||||
from deerflow.runtime.user_context import AUTO, _AutoSentinel
|
||||
|
||||
|
||||
class InvalidMetadataFilterError(ValueError):
|
||||
"""Raised when all client-supplied metadata filter keys are rejected."""
|
||||
|
||||
|
||||
class ThreadMetaStore(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
async def create(
|
||||
@@ -40,12 +45,12 @@ class ThreadMetaStore(abc.ABC):
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
status: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
user_id: str | None | _AutoSentinel = AUTO,
|
||||
) -> list[dict]:
|
||||
) -> list[dict[str, Any]]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
|
||||
@@ -69,12 +69,12 @@ class MemoryThreadMetaStore(ThreadMetaStore):
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
status: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
user_id: str | None | _AutoSentinel = AUTO,
|
||||
) -> list[dict]:
|
||||
) -> list[dict[str, Any]]:
|
||||
resolved_user_id = resolve_user_id(user_id, method_name="MemoryThreadMetaStore.search")
|
||||
filter_dict: dict[str, Any] = {}
|
||||
if metadata:
|
||||
|
||||
@@ -2,16 +2,20 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
from deerflow.persistence.thread_meta.base import ThreadMetaStore
|
||||
from deerflow.persistence.json_compat import json_match
|
||||
from deerflow.persistence.thread_meta.base import InvalidMetadataFilterError, ThreadMetaStore
|
||||
from deerflow.persistence.thread_meta.model import ThreadMetaRow
|
||||
from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_user_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ThreadMetaRepository(ThreadMetaStore):
|
||||
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
|
||||
@@ -20,7 +24,7 @@ class ThreadMetaRepository(ThreadMetaStore):
|
||||
@staticmethod
|
||||
def _row_to_dict(row: ThreadMetaRow) -> dict[str, Any]:
|
||||
d = row.to_dict()
|
||||
d["metadata"] = d.pop("metadata_json", {})
|
||||
d["metadata"] = d.pop("metadata_json", None) or {}
|
||||
for key in ("created_at", "updated_at"):
|
||||
val = d.get(key)
|
||||
if isinstance(val, datetime):
|
||||
@@ -104,39 +108,43 @@ class ThreadMetaRepository(ThreadMetaStore):
|
||||
async def search(
|
||||
self,
|
||||
*,
|
||||
metadata: dict | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
status: str | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
user_id: str | None | _AutoSentinel = AUTO,
|
||||
) -> list[dict]:
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Search threads with optional metadata and status filters.
|
||||
|
||||
Owner filter is enforced by default: caller must be in a user
|
||||
context. Pass ``user_id=None`` to bypass (migration/CLI).
|
||||
"""
|
||||
resolved_user_id = resolve_user_id(user_id, method_name="ThreadMetaRepository.search")
|
||||
stmt = select(ThreadMetaRow).order_by(ThreadMetaRow.updated_at.desc())
|
||||
stmt = select(ThreadMetaRow).order_by(ThreadMetaRow.updated_at.desc(), ThreadMetaRow.thread_id.desc())
|
||||
if resolved_user_id is not None:
|
||||
stmt = stmt.where(ThreadMetaRow.user_id == resolved_user_id)
|
||||
if status:
|
||||
stmt = stmt.where(ThreadMetaRow.status == status)
|
||||
|
||||
if metadata:
|
||||
# When metadata filter is active, fetch a larger window and filter
|
||||
# in Python. TODO(Phase 2): use JSON DB operators (Postgres @>,
|
||||
# SQLite json_extract) for server-side filtering.
|
||||
stmt = stmt.limit(limit * 5 + offset)
|
||||
async with self._sf() as session:
|
||||
result = await session.execute(stmt)
|
||||
rows = [self._row_to_dict(r) for r in result.scalars()]
|
||||
rows = [r for r in rows if all(r.get("metadata", {}).get(k) == v for k, v in metadata.items())]
|
||||
return rows[offset : offset + limit]
|
||||
else:
|
||||
stmt = stmt.limit(limit).offset(offset)
|
||||
async with self._sf() as session:
|
||||
result = await session.execute(stmt)
|
||||
return [self._row_to_dict(r) for r in result.scalars()]
|
||||
applied = 0
|
||||
for key, value in metadata.items():
|
||||
try:
|
||||
stmt = stmt.where(json_match(ThreadMetaRow.metadata_json, key, value))
|
||||
applied += 1
|
||||
except (ValueError, TypeError) as exc:
|
||||
logger.warning("Skipping metadata filter key %s: %s", ascii(key), exc)
|
||||
if applied == 0:
|
||||
# Comma-separated plain string (no list repr / nested
|
||||
# quoting) so the 400 detail surfaced by the Gateway is
|
||||
# easy for clients to read. Sorted for determinism.
|
||||
rejected_keys = ", ".join(sorted(str(k) for k in metadata))
|
||||
raise InvalidMetadataFilterError(f"All metadata filter keys were rejected as unsafe: {rejected_keys}")
|
||||
|
||||
stmt = stmt.limit(limit).offset(offset)
|
||||
async with self._sf() as session:
|
||||
result = await session.execute(stmt)
|
||||
return [self._row_to_dict(r) for r in result.scalars()]
|
||||
|
||||
async def _check_ownership(self, session: AsyncSession, thread_id: str, resolved_user_id: str | None) -> bool:
|
||||
"""Return True if the row exists and is owned (or filter bypassed)."""
|
||||
|
||||
@@ -11,7 +11,7 @@ import logging
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import delete, func, select
|
||||
from sqlalchemy import delete, func, select, text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
from deerflow.persistence.models.run_event import RunEventRow
|
||||
@@ -86,6 +86,28 @@ class DbRunEventStore(RunEventStore):
|
||||
user = get_current_user()
|
||||
return str(user.id) if user is not None else None
|
||||
|
||||
@staticmethod
|
||||
async def _max_seq_for_thread(session: AsyncSession, thread_id: str) -> int | None:
|
||||
"""Return the current max seq while serializing writers per thread.
|
||||
|
||||
PostgreSQL rejects ``SELECT max(...) FOR UPDATE`` because aggregate
|
||||
results are not lockable rows. As a release-safe workaround, take a
|
||||
transaction-level advisory lock keyed by thread_id before reading the
|
||||
aggregate. Other dialects keep the existing row-locking statement.
|
||||
"""
|
||||
stmt = select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id)
|
||||
bind = session.get_bind()
|
||||
dialect_name = bind.dialect.name if bind is not None else ""
|
||||
|
||||
if dialect_name == "postgresql":
|
||||
await session.execute(
|
||||
text("SELECT pg_advisory_xact_lock(hashtext(CAST(:thread_id AS text))::bigint)"),
|
||||
{"thread_id": thread_id},
|
||||
)
|
||||
return await session.scalar(stmt)
|
||||
|
||||
return await session.scalar(stmt.with_for_update())
|
||||
|
||||
async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None): # noqa: D401
|
||||
"""Write a single event — low-frequency path only.
|
||||
|
||||
@@ -100,10 +122,7 @@ class DbRunEventStore(RunEventStore):
|
||||
user_id = self._user_id_from_context()
|
||||
async with self._sf() as session:
|
||||
async with session.begin():
|
||||
# Use FOR UPDATE to serialize seq assignment within a thread.
|
||||
# NOTE: with_for_update() on aggregates is a no-op on SQLite;
|
||||
# the UNIQUE(thread_id, seq) constraint catches races there.
|
||||
max_seq = await session.scalar(select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id).with_for_update())
|
||||
max_seq = await self._max_seq_for_thread(session, thread_id)
|
||||
seq = (max_seq or 0) + 1
|
||||
row = RunEventRow(
|
||||
thread_id=thread_id,
|
||||
@@ -126,10 +145,8 @@ class DbRunEventStore(RunEventStore):
|
||||
async with self._sf() as session:
|
||||
async with session.begin():
|
||||
# Get max seq for the thread (assume all events in batch belong to same thread).
|
||||
# NOTE: with_for_update() on aggregates is a no-op on SQLite;
|
||||
# the UNIQUE(thread_id, seq) constraint catches races there.
|
||||
thread_id = events[0]["thread_id"]
|
||||
max_seq = await session.scalar(select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id).with_for_update())
|
||||
max_seq = await self._max_seq_for_thread(session, thread_id)
|
||||
seq = max_seq or 0
|
||||
rows = []
|
||||
for e in events:
|
||||
|
||||
@@ -109,6 +109,34 @@ def get_effective_user_id() -> str:
|
||||
return str(user.id)
|
||||
|
||||
|
||||
def resolve_runtime_user_id(runtime: object | None) -> str:
|
||||
"""Single source of truth for a tool/middleware's effective user_id.
|
||||
|
||||
Resolution order (most authoritative first):
|
||||
1. ``runtime.context["user_id"]`` — set by ``inject_authenticated_user_context``
|
||||
in the gateway from the auth-validated ``request.state.user``. This is
|
||||
the only source that survives boundaries where the contextvar may have
|
||||
been lost (background tasks scheduled outside the request task,
|
||||
worker pools that don't copy_context, future cross-process drivers).
|
||||
2. The ``_current_user`` ContextVar — set by the auth middleware at
|
||||
request entry. Reliable for in-task work; copied by ``asyncio``
|
||||
child tasks and by ``ContextThreadPoolExecutor``.
|
||||
3. ``DEFAULT_USER_ID`` — last-resort fallback so unauthenticated
|
||||
CLI / migration / test paths keep working without raising.
|
||||
|
||||
Tools that persist user-scoped state (custom agents, memory, uploads)
|
||||
MUST call this instead of ``get_effective_user_id()`` directly so they
|
||||
benefit from the runtime.context channel that ``setup_agent`` already
|
||||
relies on.
|
||||
"""
|
||||
context = getattr(runtime, "context", None)
|
||||
if isinstance(context, dict):
|
||||
ctx_user_id = context.get("user_id")
|
||||
if ctx_user_id:
|
||||
return str(ctx_user_id)
|
||||
return get_effective_user_id()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sentinel-based user_id resolution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -7,19 +7,12 @@ from langgraph.types import Command
|
||||
|
||||
from deerflow.config.agents_config import validate_agent_name
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.runtime.user_context import resolve_runtime_user_id
|
||||
from deerflow.tools.types import Runtime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_runtime_user_id(runtime: Runtime) -> str:
|
||||
context_user_id = runtime.context.get("user_id") if runtime.context else None
|
||||
if context_user_id:
|
||||
return str(context_user_id)
|
||||
return get_effective_user_id()
|
||||
|
||||
|
||||
@tool(parse_docstring=True)
|
||||
def setup_agent(
|
||||
soul: str,
|
||||
@@ -45,7 +38,7 @@ def setup_agent(
|
||||
if agent_name:
|
||||
# Custom agents are persisted under the current user's bucket so
|
||||
# different users do not see each other's agents.
|
||||
user_id = _get_runtime_user_id(runtime)
|
||||
user_id = resolve_runtime_user_id(runtime)
|
||||
agent_dir = paths.user_agent_dir(user_id, agent_name)
|
||||
else:
|
||||
# Default agent (no agent_name): SOUL.md lives at the global base dir.
|
||||
|
||||
@@ -26,6 +26,28 @@ if TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Cache subagent token usage by tool_call_id so TokenUsageMiddleware can
|
||||
# write it back to the triggering AIMessage's usage_metadata.
|
||||
_subagent_usage_cache: dict[str, dict[str, int]] = {}
|
||||
|
||||
|
||||
def _token_usage_cache_enabled(app_config: "AppConfig | None") -> bool:
|
||||
if app_config is None:
|
||||
try:
|
||||
app_config = get_app_config()
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
return bool(getattr(getattr(app_config, "token_usage", None), "enabled", False))
|
||||
|
||||
|
||||
def _cache_subagent_usage(tool_call_id: str, usage: dict | None, *, enabled: bool = True) -> None:
|
||||
if enabled and usage:
|
||||
_subagent_usage_cache[tool_call_id] = usage
|
||||
|
||||
|
||||
def pop_cached_subagent_usage(tool_call_id: str) -> dict | None:
|
||||
return _subagent_usage_cache.pop(tool_call_id, None)
|
||||
|
||||
|
||||
def _is_subagent_terminal(result: Any) -> bool:
|
||||
"""Return whether a background subagent result is safe to clean up."""
|
||||
@@ -92,6 +114,17 @@ def _find_usage_recorder(runtime: Any) -> Any | None:
|
||||
return None
|
||||
|
||||
|
||||
def _summarize_usage(records: list[dict] | None) -> dict | None:
|
||||
"""Summarize token usage records into a compact dict for SSE events."""
|
||||
if not records:
|
||||
return None
|
||||
return {
|
||||
"input_tokens": sum(r.get("input_tokens", 0) or 0 for r in records),
|
||||
"output_tokens": sum(r.get("output_tokens", 0) or 0 for r in records),
|
||||
"total_tokens": sum(r.get("total_tokens", 0) or 0 for r in records),
|
||||
}
|
||||
|
||||
|
||||
def _report_subagent_usage(runtime: Any, result: Any) -> None:
|
||||
"""Report subagent token usage to the parent RunJournal, if available.
|
||||
|
||||
@@ -177,6 +210,7 @@ async def task_tool(
|
||||
subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD.
|
||||
"""
|
||||
runtime_app_config = _get_runtime_app_config(runtime)
|
||||
cache_token_usage = _token_usage_cache_enabled(runtime_app_config)
|
||||
available_subagent_names = get_available_subagent_names(app_config=runtime_app_config) if runtime_app_config is not None else get_available_subagent_names()
|
||||
|
||||
# Get subagent configuration
|
||||
@@ -312,27 +346,32 @@ async def task_tool(
|
||||
last_message_count = current_message_count
|
||||
|
||||
# Check if task completed, failed, or timed out
|
||||
usage = _summarize_usage(getattr(result, "token_usage_records", None))
|
||||
if result.status == SubagentStatus.COMPLETED:
|
||||
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_completed", "task_id": task_id, "result": result.result})
|
||||
writer({"type": "task_completed", "task_id": task_id, "result": result.result, "usage": usage})
|
||||
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
|
||||
cleanup_background_task(task_id)
|
||||
return f"Task Succeeded. Result: {result.result}"
|
||||
elif result.status == SubagentStatus.FAILED:
|
||||
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_failed", "task_id": task_id, "error": result.error})
|
||||
writer({"type": "task_failed", "task_id": task_id, "error": result.error, "usage": usage})
|
||||
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
return f"Task failed. Error: {result.error}"
|
||||
elif result.status == SubagentStatus.CANCELLED:
|
||||
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_cancelled", "task_id": task_id, "error": result.error})
|
||||
writer({"type": "task_cancelled", "task_id": task_id, "error": result.error, "usage": usage})
|
||||
logger.info(f"[trace={trace_id}] Task {task_id} cancelled: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
return "Task cancelled by user."
|
||||
elif result.status == SubagentStatus.TIMED_OUT:
|
||||
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_timed_out", "task_id": task_id, "error": result.error})
|
||||
writer({"type": "task_timed_out", "task_id": task_id, "error": result.error, "usage": usage})
|
||||
logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}")
|
||||
cleanup_background_task(task_id)
|
||||
return f"Task timed out. Error: {result.error}"
|
||||
@@ -351,7 +390,9 @@ async def task_tool(
|
||||
timeout_minutes = config.timeout_seconds // 60
|
||||
logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)")
|
||||
_report_subagent_usage(runtime, result)
|
||||
writer({"type": "task_timed_out", "task_id": task_id})
|
||||
usage = _summarize_usage(getattr(result, "token_usage_records", None))
|
||||
_cache_subagent_usage(tool_call_id, usage, enabled=cache_token_usage)
|
||||
writer({"type": "task_timed_out", "task_id": task_id, "usage": usage})
|
||||
return f"Task polling timed out after {timeout_minutes} minutes. This may indicate the background task is stuck. Status: {result.status.value}"
|
||||
except asyncio.CancelledError:
|
||||
# Signal the background subagent thread to stop cooperatively.
|
||||
@@ -374,4 +415,8 @@ async def task_tool(
|
||||
cleanup_background_task(task_id)
|
||||
else:
|
||||
_schedule_deferred_subagent_cleanup(task_id, trace_id, max_poll_count)
|
||||
_subagent_usage_cache.pop(tool_call_id, None)
|
||||
raise
|
||||
except Exception:
|
||||
_subagent_usage_cache.pop(tool_call_id, None)
|
||||
raise
|
||||
|
||||
@@ -27,7 +27,7 @@ from langgraph.types import Command
|
||||
from deerflow.config.agents_config import load_agent_config, validate_agent_name
|
||||
from deerflow.config.app_config import get_app_config
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
from deerflow.runtime.user_context import resolve_runtime_user_id
|
||||
from deerflow.tools.types import Runtime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -118,9 +118,13 @@ def update_agent(
|
||||
return _err("update_agent is only available inside a custom agent's chat. There is no agent_name in the current runtime context, so there is nothing to update. If you are inside the bootstrap flow, use setup_agent instead.")
|
||||
|
||||
# Resolve the active user so that updates only affect this user's agent.
|
||||
# ``get_effective_user_id`` returns DEFAULT_USER_ID when no auth context
|
||||
# is set (matching how memory and thread storage behave).
|
||||
user_id = get_effective_user_id()
|
||||
# ``resolve_runtime_user_id`` prefers ``runtime.context["user_id"]`` (set by
|
||||
# the gateway from the auth-validated request) and falls back to the
|
||||
# contextvar, then DEFAULT_USER_ID. This matches setup_agent so a user
|
||||
# creating an agent and later refining it always touches the same files,
|
||||
# even if the contextvar gets lost across an async/thread boundary
|
||||
# (issue #2782 / #2862 class of bugs).
|
||||
user_id = resolve_runtime_user_id(runtime)
|
||||
|
||||
# Reject an unknown ``model`` *before* touching the filesystem. Otherwise
|
||||
# ``_resolve_model_name`` silently falls back to the default at runtime
|
||||
|
||||
@@ -7,7 +7,7 @@ from deerflow.config.app_config import AppConfig
|
||||
from deerflow.reflection import resolve_variable
|
||||
from deerflow.sandbox.security import is_host_bash_allowed
|
||||
from deerflow.tools.builtins import ask_clarification_tool, present_file_tool, task_tool, view_image_tool
|
||||
from deerflow.tools.builtins.tool_search import reset_deferred_registry
|
||||
from deerflow.tools.builtins.tool_search import get_deferred_registry
|
||||
from deerflow.tools.sync import make_sync_tool_wrapper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -116,8 +116,6 @@ def get_available_tools(
|
||||
# made through the Gateway API (which runs in a separate process) are immediately
|
||||
# reflected when loading MCP tools.
|
||||
mcp_tools = []
|
||||
# Reset deferred registry upfront to prevent stale state from previous calls
|
||||
reset_deferred_registry()
|
||||
if include_mcp:
|
||||
try:
|
||||
from deerflow.config.extensions_config import ExtensionsConfig
|
||||
@@ -135,12 +133,51 @@ def get_available_tools(
|
||||
from deerflow.tools.builtins.tool_search import DeferredToolRegistry, set_deferred_registry
|
||||
from deerflow.tools.builtins.tool_search import tool_search as tool_search_tool
|
||||
|
||||
registry = DeferredToolRegistry()
|
||||
for t in mcp_tools:
|
||||
registry.register(t)
|
||||
set_deferred_registry(registry)
|
||||
# Reuse the existing registry if one is already set for
|
||||
# this async context. ``get_available_tools`` is
|
||||
# re-entered whenever a subagent is spawned
|
||||
# (``task_tool`` calls it to build the child agent's
|
||||
# toolset), and previously we used to unconditionally
|
||||
# rebuild the registry — wiping out the parent agent's
|
||||
# tool_search promotions. The
|
||||
# ``DeferredToolFilterMiddleware`` then re-hid those
|
||||
# tools from subsequent model calls, leaving the agent
|
||||
# able to see a tool's name but unable to invoke it
|
||||
# (issue #2884). ``contextvars`` already gives us the
|
||||
# lifetime semantics we want: a fresh request / graph
|
||||
# run starts in a new asyncio task with the
|
||||
# ContextVar at its default of ``None``, so reuse is
|
||||
# only triggered for re-entrant calls inside one run.
|
||||
#
|
||||
# Intentionally NOT reconciling against the current
|
||||
# ``mcp_tools`` snapshot. The MCP cache only refreshes
|
||||
# on ``extensions_config.json`` mtime changes, which
|
||||
# in practice happens between graph runs — not inside
|
||||
# one. And even if a refresh did happen mid-run, the
|
||||
# already-built lead agent's ``ToolNode`` still holds
|
||||
# the *previous* tool set (LangGraph binds tools at
|
||||
# graph construction time), so a brand-new MCP tool
|
||||
# couldn't actually be invoked anyway. The
|
||||
# ``DeferredToolRegistry`` doesn't retain the names
|
||||
# of previously-promoted tools (``promote()`` drops
|
||||
# the entry entirely), so re-syncing the registry
|
||||
# against a fresh ``mcp_tools`` list would
|
||||
# mis-classify those promotions as new tools and
|
||||
# re-register them as deferred — exactly the bug
|
||||
# this fix exists to prevent.
|
||||
existing_registry = get_deferred_registry()
|
||||
if existing_registry is None:
|
||||
registry = DeferredToolRegistry()
|
||||
for t in mcp_tools:
|
||||
registry.register(t)
|
||||
set_deferred_registry(registry)
|
||||
logger.info(f"Tool search active: {len(mcp_tools)} tools deferred")
|
||||
else:
|
||||
mcp_tool_names = {t.name for t in mcp_tools}
|
||||
still_deferred = len(existing_registry)
|
||||
promoted_count = max(0, len(mcp_tool_names) - still_deferred)
|
||||
logger.info(f"Tool search active (preserved promotions): {still_deferred} tools deferred, {promoted_count} already promoted")
|
||||
builtin_tools.append(tool_search_tool)
|
||||
logger.info(f"Tool search active: {len(mcp_tools)} tools deferred")
|
||||
except ImportError:
|
||||
logger.warning("MCP module not available. Install 'langchain-mcp-adapters' package to enable MCP tools.")
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user