Compare commits

...

11 Commits

Author SHA1 Message Date
Airene Fang b6b3650e50 fix(trace):memory 中文 in trace info is unicode escape sequence. (#3104)
* fix(trace):memory 中文 in trace is unicode

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-20 22:34:10 +08:00
DanielWalnut 9abe5a18e6 fix: clean up local nginx on stop (#3005)
* fix: clean up local nginx on stop

* fix: scope local service cleanup to repo

* fix: address serve port review comments
2026-05-20 22:26:02 +08:00
john lee 9b19cca91c fix(runtime): make RunManager.cancel() idempotent for already-interrupted runs (#3055) (#3058)
A second cancel() call on an interrupted run returned False, causing the
cancel and stream_existing_run router endpoints to raise 409 on double-stop.

Fix: return True inside the lock when record.status == RunStatus.interrupted.
This covers both the POST /cancel and POST /join endpoints without any
re-fetch or extra get() call — the idempotency lives at the source.

Also fixes stream_existing_run (the LangGraph SDK stop-button path), which
had the identical cancel() → 409 pattern and was not covered by the
original PR.  Both endpoints share the fix automatically.

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:37:36 +08:00
AochenShen99 6b922e4908 test(runtime): add lifecycle e2e coverage (#2946)
* test(runtime): add lifecycle e2e coverage

* test: isolate runtime lifecycle e2e config

* test(runtime): document lifecycle e2e tradeoffs
2026-05-20 14:52:58 +08:00
john lee 8cd4710b16 fix(deploy): fall back to python/openssl when python3 is absent for secret generation (#3074)
* fix(deploy): fall back to python/openssl when python3 is absent for secret generation

Bare python3 call in deploy.sh exits 49 on systems without python3 in
PATH (e.g. some Alpine/minimal containers, or Windows environments where
only 'python' is on PATH). Add a fallback chain: python3 → python →
openssl rand -hex 32. If all three are unavailable, emit a clear error
message and exit with a non-zero status instead of a cryptic recipe
failure.

Closes #2922

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-20 10:43:18 +08:00
Xun e37912e2c8 feat(sandbox) Adds download file interface in Sandbox (#3038)
* Add download interface in Sandbox

* fix

* fix

* del invalidate test

* fix

* safe download

* improve
2026-05-20 10:16:31 +08:00
AochenShen99 0c22349029 chore(dev): add async/thread boundary detector (#2936)
* chore(dev): add thread boundary detector

* chore(dev): reduce thread boundary detector false positives
2026-05-20 10:00:17 +08:00
dependabot[bot] 006948232c chore(deps): bump brace-expansion from 1.1.12 to 5.0.5 in /frontend (#3078)
Bumps [brace-expansion](https://github.com/juliangruber/brace-expansion) from 1.1.12 to 5.0.5.
- [Release notes](https://github.com/juliangruber/brace-expansion/releases)
- [Commits](https://github.com/juliangruber/brace-expansion/compare/v1.1.12...v5.0.5)

---
updated-dependencies:
- dependency-name: brace-expansion
  dependency-version: 5.0.5
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-20 07:42:03 +08:00
dependabot[bot] b1ec7e8111 chore(deps): bump idna from 3.13 to 3.15 in /backend (#3077)
Bumps [idna](https://github.com/kjd/idna) from 3.13 to 3.15.
- [Release notes](https://github.com/kjd/idna/releases)
- [Changelog](https://github.com/kjd/idna/blob/master/HISTORY.md)
- [Commits](https://github.com/kjd/idna/compare/v3.13...v3.15)

---
updated-dependencies:
- dependency-name: idna
  dependency-version: '3.15'
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-20 07:27:45 +08:00
Lawrance_YXLiao b69ca7ad97 test(middleware): lock tool-call transcript boundary invariants (#3049) 2026-05-19 22:34:51 +08:00
AochenShen99 3599b570a9 fix(harness): wrap all async-only tools for sync clients (#2935) 2026-05-19 22:11:46 +08:00
24 changed files with 2387 additions and 142 deletions
+5 -1
View File
@@ -1,6 +1,6 @@
# DeerFlow - Unified Development Environment # DeerFlow - Unified Development Environment
.PHONY: help config config-upgrade check install setup doctor dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway .PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway
BASH ?= bash BASH ?= bash
BACKEND_UV_RUN = cd backend && uv run BACKEND_UV_RUN = cd backend && uv run
@@ -23,6 +23,7 @@ help:
@echo " make config - Generate local config files (aborts if config already exists)" @echo " make config - Generate local config files (aborts if config already exists)"
@echo " make config-upgrade - Merge new fields from config.example.yaml into config.yaml" @echo " make config-upgrade - Merge new fields from config.example.yaml into config.yaml"
@echo " make check - Check if all required tools are installed" @echo " make check - Check if all required tools are installed"
@echo " make detect-thread-boundaries - Inventory async/thread boundary points"
@echo " make install - Install all dependencies (frontend + backend + pre-commit hooks)" @echo " make install - Install all dependencies (frontend + backend + pre-commit hooks)"
@echo " make setup-sandbox - Pre-pull sandbox container image (recommended)" @echo " make setup-sandbox - Pre-pull sandbox container image (recommended)"
@echo " make dev - Start all services in development mode (with hot-reloading)" @echo " make dev - Start all services in development mode (with hot-reloading)"
@@ -51,6 +52,9 @@ setup:
doctor: doctor:
@$(BACKEND_UV_RUN) python ../scripts/doctor.py @$(BACKEND_UV_RUN) python ../scripts/doctor.py
detect-thread-boundaries:
@$(PYTHON) ./scripts/detect_thread_boundaries.py
config: config:
@$(PYTHON) ./scripts/configure.py @$(PYTHON) ./scripts/configure.py
@@ -338,7 +338,7 @@ class MemoryUpdater:
reinforcement_detected=reinforcement_detected, reinforcement_detected=reinforcement_detected,
) )
prompt = MEMORY_UPDATE_PROMPT.format( prompt = MEMORY_UPDATE_PROMPT.format(
current_memory=json.dumps(current_memory, indent=2), current_memory=json.dumps(current_memory, indent=2, ensure_ascii=False),
conversation=conversation_text, conversation=conversation_text,
correction_hint=correction_hint, correction_hint=correction_hint,
) )
@@ -1,4 +1,5 @@
import base64 import base64
import errno
import logging import logging
import shlex import shlex
import threading import threading
@@ -6,11 +7,14 @@ import uuid
from agent_sandbox import Sandbox as AioSandboxClient from agent_sandbox import Sandbox as AioSandboxClient
from deerflow.config.paths import VIRTUAL_PATH_PREFIX
from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox import Sandbox
from deerflow.sandbox.search import GrepMatch, path_matches, should_ignore_path, truncate_line from deerflow.sandbox.search import GrepMatch, path_matches, should_ignore_path, truncate_line
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_MAX_DOWNLOAD_SIZE = 100 * 1024 * 1024 # 100 MB
_ERROR_OBSERVATION_SIGNATURE = "'ErrorObservation' object has no attribute 'exit_code'" _ERROR_OBSERVATION_SIGNATURE = "'ErrorObservation' object has no attribute 'exit_code'"
@@ -102,6 +106,49 @@ class AioSandbox(Sandbox):
logger.error(f"Failed to read file in sandbox: {e}") logger.error(f"Failed to read file in sandbox: {e}")
return f"Error: {e}" return f"Error: {e}"
def download_file(self, path: str) -> bytes:
"""Download file bytes from the sandbox.
Raises:
PermissionError: If the path contains '..' traversal segments or is
outside ``VIRTUAL_PATH_PREFIX``.
OSError: If the file cannot be retrieved from the sandbox.
"""
# Reject path traversal before sending to the container API.
# LocalSandbox gets this implicitly via _resolve_path;
# here the path is forwarded verbatim so we must check explicitly.
normalised = path.replace("\\", "/")
for segment in normalised.split("/"):
if segment == "..":
logger.error(f"Refused download due to path traversal: {path}")
raise PermissionError(f"Access denied: path traversal detected in '{path}'")
stripped_path = normalised.lstrip("/")
allowed_prefix = VIRTUAL_PATH_PREFIX.lstrip("/")
if stripped_path != allowed_prefix and not stripped_path.startswith(f"{allowed_prefix}/"):
logger.error("Refused download outside allowed directory: path=%s, allowed_prefix=%s", path, VIRTUAL_PATH_PREFIX)
raise PermissionError(f"Access denied: path must be under '{VIRTUAL_PATH_PREFIX}': '{path}'")
with self._lock:
try:
chunks: list[bytes] = []
total = 0
for chunk in self._client.file.download_file(path=path):
total += len(chunk)
if total > _MAX_DOWNLOAD_SIZE:
raise OSError(
errno.EFBIG,
f"File exceeds maximum download size of {_MAX_DOWNLOAD_SIZE} bytes",
path,
)
chunks.append(chunk)
return b"".join(chunks)
except OSError:
raise
except Exception as e:
logger.error(f"Failed to download file in sandbox: {e}")
raise OSError(f"Failed to download file '{path}' from sandbox: {e}") from e
def list_dir(self, path: str, max_depth: int = 2) -> list[str]: def list_dir(self, path: str, max_depth: int = 2) -> list[str]:
"""List the contents of a directory in the sandbox. """List the contents of a directory in the sandbox.
@@ -258,12 +258,17 @@ class RunManager:
action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state. action: "interrupt" keeps checkpoint, "rollback" reverts to pre-run state.
Sets the abort event with the action reason and cancels the asyncio task. Sets the abort event with the action reason and cancels the asyncio task.
Returns ``True`` if the run was in-flight and cancellation was initiated. Returns ``True`` if cancellation was initiated **or** the run was already
interrupted (idempotent — a second cancel is a no-op success).
Returns ``False`` only when the run is unknown to this worker or has
reached a terminal state other than interrupted (completed, failed, etc.).
""" """
async with self._lock: async with self._lock:
record = self._runs.get(run_id) record = self._runs.get(run_id)
if record is None: if record is None:
return False return False
if record.status == RunStatus.interrupted:
return True # idempotent — already cancelled on this worker
if record.status not in (RunStatus.pending, RunStatus.running): if record.status not in (RunStatus.pending, RunStatus.running):
return False return False
record.abort_action = action record.abort_action = action
@@ -1,4 +1,5 @@
import errno import errno
import logging
import ntpath import ntpath
import os import os
import shutil import shutil
@@ -7,10 +8,13 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import NamedTuple from typing import NamedTuple
from deerflow.config.paths import VIRTUAL_PATH_PREFIX
from deerflow.sandbox.local.list_dir import list_dir from deerflow.sandbox.local.list_dir import list_dir
from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox import Sandbox
from deerflow.sandbox.search import GrepMatch, find_glob_matches, find_grep_matches from deerflow.sandbox.search import GrepMatch, find_glob_matches, find_grep_matches
logger = logging.getLogger(__name__)
@dataclass(frozen=True) @dataclass(frozen=True)
class PathMapping: class PathMapping:
@@ -379,6 +383,28 @@ class LocalSandbox(Sandbox):
# Re-raise with the original path for clearer error messages, hiding internal resolved paths # Re-raise with the original path for clearer error messages, hiding internal resolved paths
raise type(e)(e.errno, e.strerror, path) from None raise type(e)(e.errno, e.strerror, path) from None
def download_file(self, path: str) -> bytes:
normalised = path.replace("\\", "/")
stripped_path = normalised.lstrip("/")
allowed_prefix = VIRTUAL_PATH_PREFIX.lstrip("/")
if stripped_path != allowed_prefix and not stripped_path.startswith(f"{allowed_prefix}/"):
logger.error("Refused download outside allowed directory: path=%s, allowed_prefix=%s", path, VIRTUAL_PATH_PREFIX)
raise PermissionError(errno.EACCES, f"Access denied: path must be under '{VIRTUAL_PATH_PREFIX}'", path)
resolved_path = self._resolve_path(path)
max_download_size = 100 * 1024 * 1024
try:
file_size = os.path.getsize(resolved_path)
if file_size > max_download_size:
raise OSError(errno.EFBIG, f"File exceeds maximum download size of {max_download_size} bytes", path)
# TOCTOU note: the file could grow between getsize() and read(); accepted
# tradeoff since this is a controlled sandbox environment.
with open(resolved_path, "rb") as f:
return f.read()
except OSError as e:
# Re-raise with the original path for clearer error messages, hiding internal resolved paths
raise type(e)(e.errno, e.strerror, path) from None
def write_file(self, path: str, content: str, append: bool = False) -> None: def write_file(self, path: str, content: str, append: bool = False) -> None:
resolved = self._resolve_path_with_mapping(path) resolved = self._resolve_path_with_mapping(path)
resolved_path = resolved.path resolved_path = resolved.path
@@ -39,6 +39,25 @@ class Sandbox(ABC):
""" """
pass pass
@abstractmethod
def download_file(self, path: str) -> bytes:
"""Download the binary content of a file.
Args:
path: The absolute path of the file to download.
Returns:
Raw file bytes.
Raises:
PermissionError: If path traversal is detected or the path is outside
the allowed virtual prefix.
OSError: If the file cannot be read or does not exist. Both local
and remote implementations must raise ``OSError`` so callers
have a single exception type to handle.
"""
pass
@abstractmethod @abstractmethod
def list_dir(self, path: str, max_depth=2) -> list[str]: def list_dir(self, path: str, max_depth=2) -> list[str]:
"""List the contents of a directory. """List the contents of a directory.
@@ -3,9 +3,13 @@
import asyncio import asyncio
import atexit import atexit
import concurrent.futures import concurrent.futures
import contextvars
import functools
import logging import logging
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any, get_type_hints
from langchain_core.runnables import RunnableConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -15,10 +19,49 @@ _SYNC_TOOL_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=10, thre
atexit.register(lambda: _SYNC_TOOL_EXECUTOR.shutdown(wait=False)) atexit.register(lambda: _SYNC_TOOL_EXECUTOR.shutdown(wait=False))
def make_sync_tool_wrapper(coro: Callable[..., Any], tool_name: str) -> Callable[..., Any]: def _get_runnable_config_param(func: Callable[..., Any]) -> str | None:
"""Build a synchronous wrapper for an asynchronous tool coroutine.""" """Return the coroutine parameter that expects LangChain RunnableConfig."""
if isinstance(func, functools.partial):
func = func.func
def sync_wrapper(*args: Any, **kwargs: Any) -> Any: try:
type_hints = get_type_hints(func)
except Exception:
return None
for name, type_ in type_hints.items():
if type_ is RunnableConfig:
return name
return None
def make_sync_tool_wrapper(coro: Callable[..., Any], tool_name: str) -> Callable[..., Any]:
"""Build a synchronous wrapper for an asynchronous tool coroutine.
Args:
coro: Async callable backing a LangChain tool.
tool_name: Tool name used in error logs.
Returns:
A sync callable suitable for ``BaseTool.func``.
Notes:
If ``coro`` declares a ``RunnableConfig`` parameter, this wrapper
exposes ``config: RunnableConfig`` so LangChain can inject runtime
config and then forwards it to the coroutine's detected config
parameter. This covers DeerFlow's current config-sensitive tools, such
as ``invoke_acp_agent``.
This wrapper intentionally does not synthesize a dynamic function
signature. A future async tool with a normal user-facing argument named
``config`` and a separate ``RunnableConfig`` parameter named something
else, such as ``run_config``, may collide with LangChain's injected
``config`` argument. Rename that user-facing field or extend this
helper before using that signature.
"""
config_param = _get_runnable_config_param(coro)
def run_coroutine(*args: Any, **kwargs: Any) -> Any:
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
@@ -26,11 +69,24 @@ def make_sync_tool_wrapper(coro: Callable[..., Any], tool_name: str) -> Callable
try: try:
if loop is not None and loop.is_running(): if loop is not None and loop.is_running():
future = _SYNC_TOOL_EXECUTOR.submit(asyncio.run, coro(*args, **kwargs)) context = contextvars.copy_context()
future = _SYNC_TOOL_EXECUTOR.submit(context.run, lambda: asyncio.run(coro(*args, **kwargs)))
return future.result() return future.result()
return asyncio.run(coro(*args, **kwargs)) return asyncio.run(coro(*args, **kwargs))
except Exception as e: except Exception as e:
logger.error("Error invoking tool %r via sync wrapper: %s", tool_name, e, exc_info=True) logger.error("Error invoking tool %r via sync wrapper: %s", tool_name, e, exc_info=True)
raise raise
if config_param:
def sync_wrapper(*args: Any, config: RunnableConfig = None, **kwargs: Any) -> Any:
if config is not None or config_param not in kwargs:
kwargs[config_param] = config
return run_coroutine(*args, **kwargs)
return sync_wrapper
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
return run_coroutine(*args, **kwargs)
return sync_wrapper return sync_wrapper
@@ -205,7 +205,7 @@ def get_available_tools(
# Deduplicate by tool name — config-loaded tools take priority, followed by # Deduplicate by tool name — config-loaded tools take priority, followed by
# built-ins, MCP tools, and ACP tools. Duplicate names cause the LLM to # built-ins, MCP tools, and ACP tools. Duplicate names cause the LLM to
# receive ambiguous or concatenated function schemas (issue #1803). # receive ambiguous or concatenated function schemas (issue #1803).
all_tools = loaded_tools + builtin_tools + mcp_tools + acp_tools all_tools = [_ensure_sync_invocable_tool(t) for t in loaded_tools + builtin_tools + mcp_tools + acp_tools]
seen_names: set[str] = set() seen_names: set[str] = set()
unique_tools: list[BaseTool] = [] unique_tools: list[BaseTool] = []
for t in all_tools: for t in all_tools:
@@ -0,0 +1,507 @@
#!/usr/bin/env python3
"""Inventory async/thread boundary points for developer review.
This detector is intentionally non-invasive: it parses Python source with AST
and reports places where code crosses sync/async/thread boundaries. Findings
are review evidence, not automatic bug decisions.
"""
from __future__ import annotations
import argparse
import ast
import json
import os
import sys
from collections.abc import Iterable, Sequence
from dataclasses import asdict, dataclass
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[4]
DEFAULT_SCAN_PATHS = (
REPO_ROOT / "backend" / "app",
REPO_ROOT / "backend" / "packages" / "harness" / "deerflow",
)
IGNORED_DIR_NAMES = {
".git",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".venv",
"__pycache__",
"node_modules",
}
SEVERITY_ORDER = {"INFO": 0, "WARN": 1, "FAIL": 2}
@dataclass(frozen=True)
class BoundaryFinding:
severity: str
category: str
path: str
line: int
column: int
function: str
async_context: bool
symbol: str
message: str
code: str
def to_dict(self) -> dict[str, object]:
return asdict(self)
@dataclass(frozen=True)
class _FunctionContext:
name: str
is_async: bool
@dataclass(frozen=True)
class _CallRule:
severity: str
category: str
message: str
EXACT_CALL_RULES: dict[str, _CallRule] = {
"asyncio.run": _CallRule(
"WARN",
"SYNC_ASYNC_BRIDGE",
"Runs a coroutine from synchronous code by creating an event loop boundary.",
),
"asyncio.to_thread": _CallRule(
"INFO",
"ASYNC_THREAD_OFFLOAD",
"Offloads synchronous work from an async context into a worker thread.",
),
"asyncio.new_event_loop": _CallRule(
"WARN",
"NEW_EVENT_LOOP",
"Creates a separate event loop; review resource ownership across loops.",
),
"asyncio.run_coroutine_threadsafe": _CallRule(
"WARN",
"CROSS_THREAD_COROUTINE",
"Submits a coroutine to an event loop from another thread.",
),
"concurrent.futures.ThreadPoolExecutor": _CallRule(
"INFO",
"THREAD_POOL",
"Creates a thread pool boundary.",
),
"threading.Thread": _CallRule(
"INFO",
"RAW_THREAD",
"Creates a raw thread; ContextVar values do not propagate automatically.",
),
"threading.Timer": _CallRule(
"INFO",
"RAW_TIMER_THREAD",
"Creates a timer-backed raw thread; ContextVar values do not propagate automatically.",
),
"make_sync_tool_wrapper": _CallRule(
"INFO",
"SYNC_TOOL_WRAPPER",
"Adapts an async tool coroutine for synchronous tool invocation.",
),
}
THREAD_POOL_CONSTRUCTORS = {"concurrent.futures.ThreadPoolExecutor"}
ASYNC_TOOL_FACTORY_CALLS = {
"StructuredTool.from_function",
"langchain.tools.StructuredTool.from_function",
"langchain_core.tools.StructuredTool.from_function",
}
LANGCHAIN_INVOKE_RECEIVER_NAMES = {
"agent",
"chain",
"chat_model",
"graph",
"llm",
"model",
"runnable",
}
LANGCHAIN_INVOKE_RECEIVER_SUFFIXES = (
"_agent",
"_chain",
"_graph",
"_llm",
"_model",
"_runnable",
)
ASYNC_BLOCKING_CALL_RULES: dict[str, _CallRule] = {
"time.sleep": _CallRule(
"WARN",
"BLOCKING_CALL_IN_ASYNC",
"Blocks the event loop when called directly inside async code.",
),
"subprocess.run": _CallRule(
"WARN",
"BLOCKING_SUBPROCESS_IN_ASYNC",
"Runs a blocking subprocess from async code.",
),
"subprocess.check_call": _CallRule(
"WARN",
"BLOCKING_SUBPROCESS_IN_ASYNC",
"Runs a blocking subprocess from async code.",
),
"subprocess.check_output": _CallRule(
"WARN",
"BLOCKING_SUBPROCESS_IN_ASYNC",
"Runs a blocking subprocess from async code.",
),
"subprocess.Popen": _CallRule(
"WARN",
"BLOCKING_SUBPROCESS_IN_ASYNC",
"Starts a subprocess from async code; review whether it blocks later.",
),
}
def dotted_name(node: ast.AST | None) -> str | None:
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
parent = dotted_name(node.value)
if parent:
return f"{parent}.{node.attr}"
return node.attr
return None
def call_receiver_name(node: ast.Call) -> str | None:
if not isinstance(node.func, ast.Attribute):
return None
return dotted_name(node.func.value)
def is_none_node(node: ast.AST | None) -> bool:
return isinstance(node, ast.Constant) and node.value is None
class BoundaryVisitor(ast.NodeVisitor):
def __init__(self, path: Path, relative_path: str, source_lines: Sequence[str]) -> None:
self.path = path
self.relative_path = relative_path
self.source_lines = source_lines
self.findings: list[BoundaryFinding] = []
self.function_stack: list[_FunctionContext] = []
self.import_aliases: dict[str, str] = {}
self.executor_names: set[str] = set()
@property
def current_function(self) -> str:
if not self.function_stack:
return "<module>"
return ".".join(context.name for context in self.function_stack)
@property
def in_async_context(self) -> bool:
return bool(self.function_stack and self.function_stack[-1].is_async)
def visit_Import(self, node: ast.Import) -> None:
for alias in node.names:
local_name = alias.asname or alias.name.split(".", 1)[0]
canonical_name = alias.name if alias.asname else local_name
self.import_aliases[local_name] = canonical_name
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module is None:
return
for alias in node.names:
local_name = alias.asname or alias.name
self.import_aliases[local_name] = f"{node.module}.{alias.name}"
def visit_Assign(self, node: ast.Assign) -> None:
self._record_executor_targets(node.value, node.targets)
self.generic_visit(node)
def visit_AnnAssign(self, node: ast.AnnAssign) -> None:
if node.value is not None:
self._record_executor_targets(node.value, [node.target])
self.generic_visit(node)
def visit_With(self, node: ast.With) -> None:
for item in node.items:
if item.optional_vars is not None:
self._record_executor_targets(item.context_expr, [item.optional_vars])
self.generic_visit(node)
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self.function_stack.append(_FunctionContext(node.name, is_async=False))
self.generic_visit(node)
self.function_stack.pop()
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self.function_stack.append(_FunctionContext(node.name, is_async=True))
try:
self._check_async_tool_definition(node)
self.generic_visit(node)
finally:
self.function_stack.pop()
def visit_Call(self, node: ast.Call) -> None:
call_name = self._canonical_name(dotted_name(node.func))
if call_name:
self._check_call(node, call_name)
self.generic_visit(node)
def _check_async_tool_definition(self, node: ast.AsyncFunctionDef) -> None:
for decorator in node.decorator_list:
decorator_call = decorator.func if isinstance(decorator, ast.Call) else decorator
decorator_name = self._canonical_name(dotted_name(decorator_call))
if decorator_name in {"langchain.tools.tool", "langchain_core.tools.tool"}:
self._emit(
node,
severity="INFO",
category="ASYNC_TOOL_DEFINITION",
symbol=decorator_name,
message="Defines an async LangChain tool; sync clients need a wrapper before invoke().",
)
return
def _check_call(self, node: ast.Call, call_name: str) -> None:
rule = EXACT_CALL_RULES.get(call_name)
if rule:
self._emit_rule(node, call_name, rule)
if call_name.endswith(".run_until_complete"):
self._emit(
node,
severity="WARN",
category="RUN_UNTIL_COMPLETE",
symbol=call_name,
message="Drives an event loop from synchronous code; review nested-loop behavior.",
)
if self._is_executor_submit(node, call_name):
self._emit(
node,
severity="INFO",
category="EXECUTOR_SUBMIT",
symbol=call_name,
message="Submits work to an executor; review context propagation and cancellation.",
)
if call_name in ASYNC_TOOL_FACTORY_CALLS:
if any(keyword.arg == "coroutine" and not is_none_node(keyword.value) for keyword in node.keywords):
self._emit(
node,
severity="INFO",
category="ASYNC_ONLY_TOOL_FACTORY",
symbol=call_name,
message="Creates a StructuredTool from a coroutine; sync clients need a wrapper.",
)
if self.in_async_context and call_name in ASYNC_BLOCKING_CALL_RULES:
self._emit_rule(node, call_name, ASYNC_BLOCKING_CALL_RULES[call_name])
if self.in_async_context and self._is_langchain_invoke(node, call_name, method_name="invoke"):
self._emit(
node,
severity="WARN",
category="SYNC_INVOKE_IN_ASYNC",
symbol=call_name,
message="Calls a synchronous invoke() from async code; review event-loop blocking.",
)
if not self.in_async_context and self._is_langchain_invoke(node, call_name, method_name="ainvoke"):
self._emit(
node,
severity="WARN",
category="ASYNC_INVOKE_IN_SYNC",
symbol=call_name,
message="Calls async ainvoke() from sync code; review how the coroutine is awaited.",
)
def _canonical_name(self, name: str | None) -> str | None:
if name is None:
return None
parts = name.split(".")
if parts and parts[0] in self.import_aliases:
return ".".join((self.import_aliases[parts[0]], *parts[1:]))
return name
def _record_executor_targets(self, value: ast.AST, targets: Sequence[ast.AST]) -> None:
if not isinstance(value, ast.Call):
return
call_name = self._canonical_name(dotted_name(value.func))
if call_name not in THREAD_POOL_CONSTRUCTORS:
return
for target in targets:
for name in self._target_names(target):
self.executor_names.add(name)
def _target_names(self, target: ast.AST) -> Iterable[str]:
if isinstance(target, ast.Name):
yield target.id
elif isinstance(target, (ast.Tuple, ast.List)):
for element in target.elts:
yield from self._target_names(element)
def _is_executor_submit(self, node: ast.Call, call_name: str) -> bool:
if not call_name.endswith(".submit"):
return False
receiver_name = call_receiver_name(node)
return receiver_name in self.executor_names
def _is_langchain_invoke(self, node: ast.Call, call_name: str, *, method_name: str) -> bool:
if not call_name.endswith(f".{method_name}"):
return False
receiver_name = call_receiver_name(node)
if receiver_name is None:
return False
receiver_leaf = receiver_name.rsplit(".", 1)[-1]
return receiver_leaf in LANGCHAIN_INVOKE_RECEIVER_NAMES or receiver_leaf.endswith(LANGCHAIN_INVOKE_RECEIVER_SUFFIXES)
def _emit_rule(self, node: ast.AST, symbol: str, rule: _CallRule) -> None:
self._emit(
node,
severity=rule.severity,
category=rule.category,
symbol=symbol,
message=rule.message,
)
def _emit(self, node: ast.AST, *, severity: str, category: str, symbol: str, message: str) -> None:
line = getattr(node, "lineno", 0)
column = getattr(node, "col_offset", 0)
code = ""
if line > 0 and line <= len(self.source_lines):
code = self.source_lines[line - 1].strip()
self.findings.append(
BoundaryFinding(
severity=severity,
category=category,
path=self.relative_path,
line=line,
column=column,
function=self.current_function,
async_context=self.in_async_context,
symbol=symbol,
message=message,
code=code,
)
)
def relative_to_repo(path: Path, repo_root: Path = REPO_ROOT) -> str:
try:
return path.resolve().relative_to(repo_root.resolve()).as_posix()
except ValueError:
return path.as_posix()
def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BoundaryFinding]:
source = path.read_text(encoding="utf-8")
source_lines = source.splitlines()
relative_path = relative_to_repo(path, repo_root)
try:
tree = ast.parse(source, filename=str(path))
except SyntaxError as exc:
line = exc.lineno or 0
code = source_lines[line - 1].strip() if line > 0 and line <= len(source_lines) else ""
return [
BoundaryFinding(
severity="WARN",
category="PARSE_ERROR",
path=relative_path,
line=line,
column=max((exc.offset or 1) - 1, 0),
function="<module>",
async_context=False,
symbol="SyntaxError",
message=str(exc),
code=code,
)
]
visitor = BoundaryVisitor(path, relative_path, source_lines)
visitor.visit(tree)
return visitor.findings
def is_ignored_path(path: Path) -> bool:
return any(part in IGNORED_DIR_NAMES for part in path.parts)
def iter_python_files(paths: Iterable[Path]) -> Iterable[Path]:
for path in paths:
if not path.exists() or is_ignored_path(path):
continue
if path.is_file():
if path.suffix == ".py" and not is_ignored_path(path):
yield path
continue
for dirpath, dirnames, filenames in os.walk(path):
dirnames[:] = [dirname for dirname in dirnames if dirname not in IGNORED_DIR_NAMES]
for filename in filenames:
if filename.endswith(".py"):
yield Path(dirpath) / filename
def scan_paths(paths: Iterable[Path], *, repo_root: Path = REPO_ROOT) -> list[BoundaryFinding]:
findings: list[BoundaryFinding] = []
for path in sorted(iter_python_files(paths)):
findings.extend(scan_file(path, repo_root=repo_root))
return sorted(findings, key=lambda finding: (finding.path, finding.line, finding.column, finding.category))
def filter_findings(findings: Iterable[BoundaryFinding], min_severity: str) -> list[BoundaryFinding]:
threshold = SEVERITY_ORDER[min_severity]
return [finding for finding in findings if SEVERITY_ORDER[finding.severity] >= threshold]
def format_text(findings: Sequence[BoundaryFinding]) -> str:
if not findings:
return "No async/thread boundary findings."
lines: list[str] = []
for finding in findings:
lines.append(f"{finding.severity} {finding.category} {finding.path}:{finding.line}:{finding.column + 1} in {finding.function} async={str(finding.async_context).lower()}")
lines.append(f" symbol: {finding.symbol}")
lines.append(f" note: {finding.message}")
if finding.code:
lines.append(f" code: {finding.code}")
return "\n".join(lines)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=("Detect async/thread boundary points for developer review. Findings are an inventory, not automatic bug decisions."))
parser.add_argument(
"paths",
nargs="*",
type=Path,
help="Files or directories to scan. Defaults to backend app and harness sources.",
)
parser.add_argument(
"--format",
choices=("text", "json"),
default="text",
help="Output format.",
)
parser.add_argument(
"--min-severity",
choices=tuple(SEVERITY_ORDER),
default="INFO",
help="Only show findings at or above this severity.",
)
return parser
def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
paths = args.paths or list(DEFAULT_SCAN_PATHS)
findings = filter_findings(scan_paths(paths), args.min_severity)
if args.format == "json":
print(json.dumps([finding.to_dict() for finding in findings], indent=2, sort_keys=True))
else:
print(format_text(findings))
return 0
if __name__ == "__main__":
sys.exit(main())
+85
View File
@@ -233,3 +233,88 @@ class TestConcurrentFileWrites:
thread.join() thread.join()
assert storage["content"] in {"seed\nA\nB\n", "seed\nB\nA\n"} assert storage["content"] in {"seed\nA\nB\n", "seed\nB\nA\n"}
class TestDownloadFile:
"""Tests for AioSandbox.download_file."""
def test_returns_concatenated_bytes(self, sandbox):
"""download_file should join chunks from the client iterator into bytes."""
sandbox._client.file.download_file = MagicMock(return_value=[b"hel", b"lo"])
result = sandbox.download_file("/mnt/user-data/outputs/file.bin")
assert result == b"hello"
sandbox._client.file.download_file.assert_called_once_with(path="/mnt/user-data/outputs/file.bin")
def test_returns_empty_bytes_for_empty_file(self, sandbox):
"""download_file should return b'' when the iterator yields nothing."""
sandbox._client.file.download_file = MagicMock(return_value=iter([]))
result = sandbox.download_file("/mnt/user-data/outputs/empty.bin")
assert result == b""
def test_uses_lock_during_download(self, sandbox):
"""download_file should hold the lock while calling the client."""
lock_was_held = []
def tracking_download(path):
lock_was_held.append(sandbox._lock.locked())
return iter([b"data"])
sandbox._client.file.download_file = tracking_download
sandbox.download_file("/mnt/user-data/outputs/file.bin")
assert lock_was_held == [True], "download_file must hold the lock during client call"
def test_raises_oserror_on_client_error(self, sandbox):
"""download_file should wrap client exceptions as OSError."""
sandbox._client.file.download_file = MagicMock(side_effect=RuntimeError("network error"))
with pytest.raises(OSError, match="network error"):
sandbox.download_file("/mnt/user-data/outputs/file.bin")
def test_preserves_oserror_from_client(self, sandbox):
"""OSError raised by the client should propagate without re-wrapping."""
sandbox._client.file.download_file = MagicMock(side_effect=OSError("disk error"))
with pytest.raises(OSError, match="disk error"):
sandbox.download_file("/mnt/user-data/outputs/file.bin")
def test_rejects_path_outside_virtual_prefix_and_logs_error(self, sandbox, caplog):
"""download_file must reject downloads outside /mnt/user-data and log the reason."""
sandbox._client.file.download_file = MagicMock()
with caplog.at_level("ERROR"):
with pytest.raises(PermissionError, match="must be under"):
sandbox.download_file("/etc/passwd")
assert "outside allowed directory" in caplog.text
sandbox._client.file.download_file.assert_not_called()
@pytest.mark.parametrize(
"path",
[
"/mnt/workspace/../../etc/passwd",
"../secret",
"/a/b/../../../etc/shadow",
],
)
def test_rejects_path_traversal(self, sandbox, path):
"""download_file must reject paths containing '..' before calling the client."""
sandbox._client.file.download_file = MagicMock()
with pytest.raises(PermissionError, match="path traversal"):
sandbox.download_file(path)
sandbox._client.file.download_file.assert_not_called()
def test_single_chunk(self, sandbox):
"""download_file should work correctly with a single-chunk response."""
sandbox._client.file.download_file = MagicMock(return_value=[b"single-chunk"])
result = sandbox.download_file("/mnt/user-data/outputs/single.bin")
assert result == b"single-chunk"
+142
View File
@@ -0,0 +1,142 @@
"""Tests for idempotent run cancellation (issue #3055).
RunManager.cancel() returns True when a run is already interrupted so that
a second cancel request from the same worker is treated as a no-op success
(202) rather than a conflict (409). Both the POST cancel endpoint and the
POST stream endpoint share this behaviour through the same cancel() call.
"""
from __future__ import annotations
import asyncio
from _router_auth_helpers import make_authed_test_app
from fastapi.testclient import TestClient
from app.gateway.routers import thread_runs
from deerflow.runtime import RunManager, RunStatus
THREAD_ID = "thread-cancel-test"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_app(mgr: RunManager) -> TestClient:
app = make_authed_test_app()
app.include_router(thread_runs.router)
app.state.run_manager = mgr
return TestClient(app, raise_server_exceptions=False)
def _create_interrupted_run(mgr: RunManager) -> str:
"""Create a run and cancel it, returning its run_id."""
async def _setup():
record = await mgr.create(THREAD_ID)
await mgr.set_status(record.run_id, RunStatus.running)
await mgr.cancel(record.run_id)
return record.run_id
return asyncio.run(_setup())
# ---------------------------------------------------------------------------
# RunManager.cancel() unit tests
# ---------------------------------------------------------------------------
class TestRunManagerCancelIdempotency:
def test_cancel_returns_true_for_already_interrupted_run(self):
"""cancel() must return True when the run is already interrupted."""
async def run():
mgr = RunManager()
record = await mgr.create(THREAD_ID)
await mgr.set_status(record.run_id, RunStatus.running)
first = await mgr.cancel(record.run_id)
assert first is True
second = await mgr.cancel(record.run_id)
assert second is True # idempotent
asyncio.run(run())
def test_cancel_returns_false_for_successful_run(self):
"""cancel() must still return False for runs that completed successfully."""
async def run():
mgr = RunManager()
record = await mgr.create(THREAD_ID)
await mgr.set_status(record.run_id, RunStatus.running)
await mgr.set_status(record.run_id, RunStatus.success)
result = await mgr.cancel(record.run_id)
assert result is False
asyncio.run(run())
def test_cancel_returns_false_for_unknown_run(self):
async def run():
mgr = RunManager()
result = await mgr.cancel("nonexistent-run-id")
assert result is False
asyncio.run(run())
# ---------------------------------------------------------------------------
# POST /cancel endpoint — idempotent 202
# ---------------------------------------------------------------------------
class TestCancelRunEndpointIdempotency:
def test_double_cancel_returns_202_not_409(self):
"""Second cancel on an already-interrupted run must return 202, not 409."""
mgr = RunManager()
run_id = _create_interrupted_run(mgr)
client = _make_app(mgr)
resp = client.post(f"/api/threads/{THREAD_ID}/runs/{run_id}/cancel")
assert resp.status_code == 202, f"Expected 202, got {resp.status_code}: {resp.text}"
def test_cancel_unknown_run_returns_404(self):
mgr = RunManager()
client = _make_app(mgr)
resp = client.post(f"/api/threads/{THREAD_ID}/runs/no-such-run/cancel")
assert resp.status_code == 404
def test_cancel_successful_run_returns_409(self):
"""Successfully-completed runs cannot be cancelled — must return 409."""
async def _setup():
mgr = RunManager()
record = await mgr.create(THREAD_ID)
await mgr.set_status(record.run_id, RunStatus.running)
await mgr.set_status(record.run_id, RunStatus.success)
return mgr, record.run_id
mgr, run_id = asyncio.run(_setup())
client = _make_app(mgr)
resp = client.post(f"/api/threads/{THREAD_ID}/runs/{run_id}/cancel")
assert resp.status_code == 409
# ---------------------------------------------------------------------------
# POST /{thread_id}/runs/{run_id}/join (stream_existing_run) — idempotent cancel
# ---------------------------------------------------------------------------
class TestStreamExistingRunIdempotentCancel:
def test_stream_cancel_already_interrupted_returns_not_409(self):
"""stream_existing_run with action=interrupt on an already-interrupted run
must not raise 409 — the idempotent cancel path returns 202/SSE."""
mgr = RunManager()
run_id = _create_interrupted_run(mgr)
client = _make_app(mgr)
resp = client.post(
f"/api/threads/{THREAD_ID}/runs/{run_id}/join",
params={"action": "interrupt"},
)
assert resp.status_code != 409, f"Should not 409 on idempotent cancel, got {resp.status_code}"
@@ -190,6 +190,24 @@ class TestBuildPatchedMessagesPatching:
assert [patched[1].tool_call_id, patched[2].tool_call_id] == ["call_1", "call_2"] assert [patched[1].tool_call_id, patched[2].tool_call_id] == ["call_1", "call_2"]
assert isinstance(patched[3], HumanMessage) assert isinstance(patched[3], HumanMessage)
def test_non_tool_message_inserted_between_partial_tool_results_is_regrouped(self):
mw = DanglingToolCallMiddleware()
msgs = [
_ai_with_tool_calls([_tc("bash", "call_1"), _tc("read", "call_2")]),
_tool_msg("call_1", "bash"),
HumanMessage(content="interruption"),
_tool_msg("call_2", "read"),
]
patched = mw._build_patched_messages(msgs)
assert patched is not None
assert isinstance(patched[0], AIMessage)
assert isinstance(patched[1], ToolMessage)
assert isinstance(patched[2], ToolMessage)
assert [patched[1].tool_call_id, patched[2].tool_call_id] == ["call_1", "call_2"]
assert isinstance(patched[3], HumanMessage)
def test_valid_adjacent_tool_results_are_unchanged(self): def test_valid_adjacent_tool_results_are_unchanged(self):
mw = DanglingToolCallMiddleware() mw = DanglingToolCallMiddleware()
msgs = [ msgs = [
@@ -237,7 +255,8 @@ class TestBuildPatchedMessagesPatching:
assert isinstance(patched[0], AIMessage) assert isinstance(patched[0], AIMessage)
assert isinstance(patched[1], ToolMessage) assert isinstance(patched[1], ToolMessage)
assert patched[1].tool_call_id == "call_1" assert patched[1].tool_call_id == "call_1"
assert orphan in patched assert patched[2] is orphan
assert isinstance(patched[3], HumanMessage)
assert patched.count(orphan) == 1 assert patched.count(orphan) == 1
def test_invalid_tool_call_is_patched(self): def test_invalid_tool_call_is_patched(self):
@@ -0,0 +1,182 @@
from __future__ import annotations
import json
import textwrap
from pathlib import Path
from support.detectors import thread_boundaries as detector
def _write_python(path: Path, source: str) -> Path:
path.write_text(textwrap.dedent(source).strip() + "\n", encoding="utf-8")
return path
def test_scan_file_detects_async_thread_and_tool_boundaries(tmp_path):
source_file = _write_python(
tmp_path / "sample.py",
"""
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from langchain.tools import tool
from langchain_core.tools import StructuredTool
@tool
async def async_tool(value: int) -> str:
return str(value)
async def handler(model):
await asyncio.to_thread(str, "x")
model.invoke("blocking")
time.sleep(1)
def sync_entry():
asyncio.run(handler(None))
pool = ThreadPoolExecutor(max_workers=1)
pool.submit(str, "x")
threading.Thread(target=sync_entry).start()
return StructuredTool.from_function(
name="factory_tool",
description="factory",
coroutine=async_tool,
)
""",
)
findings = detector.scan_file(source_file, repo_root=tmp_path)
categories = {finding.category for finding in findings}
async_tool_finding = next(finding for finding in findings if finding.category == "ASYNC_TOOL_DEFINITION")
assert "ASYNC_TOOL_DEFINITION" in categories
assert async_tool_finding.function == "async_tool"
assert async_tool_finding.async_context is True
assert "ASYNC_THREAD_OFFLOAD" in categories
assert "SYNC_INVOKE_IN_ASYNC" in categories
assert "BLOCKING_CALL_IN_ASYNC" in categories
assert "SYNC_ASYNC_BRIDGE" in categories
assert "THREAD_POOL" in categories
assert "EXECUTOR_SUBMIT" in categories
assert "RAW_THREAD" in categories
assert "ASYNC_ONLY_TOOL_FACTORY" in categories
def test_scan_file_ignores_unqualified_threads_and_generic_method_names(tmp_path):
source_file = _write_python(
tmp_path / "sample.py",
"""
class Thread:
pass
class Timer:
pass
async def handler(form, runner):
form.submit()
runner.invoke("not a langchain model")
def sync_entry(runner):
Thread()
Timer()
runner.ainvoke("not a langchain model")
""",
)
findings = detector.scan_file(source_file, repo_root=tmp_path)
categories = {finding.category for finding in findings}
assert "RAW_THREAD" not in categories
assert "RAW_TIMER_THREAD" not in categories
assert "EXECUTOR_SUBMIT" not in categories
assert "SYNC_INVOKE_IN_ASYNC" not in categories
assert "ASYNC_INVOKE_IN_SYNC" not in categories
def test_scan_file_uses_import_evidence_for_thread_and_executor_aliases(tmp_path):
source_file = _write_python(
tmp_path / "sample.py",
"""
from concurrent.futures import ThreadPoolExecutor as Pool
from threading import Thread as WorkerThread, Timer
def sync_entry():
pool = Pool(max_workers=1)
pool.submit(str, "x")
WorkerThread(target=sync_entry).start()
Timer(1, sync_entry).start()
""",
)
findings = detector.scan_file(source_file, repo_root=tmp_path)
categories = {finding.category for finding in findings}
assert "THREAD_POOL" in categories
assert "EXECUTOR_SUBMIT" in categories
assert "RAW_THREAD" in categories
assert "RAW_TIMER_THREAD" in categories
def test_scan_paths_ignores_virtualenv_like_directories(tmp_path):
scanned_file = _write_python(
tmp_path / "app.py",
"""
import asyncio
def main():
return asyncio.run(asyncio.sleep(0))
""",
)
ignored_dir = tmp_path / ".venv"
ignored_dir.mkdir()
_write_python(
ignored_dir / "ignored.py",
"""
import threading
thread = threading.Thread(target=lambda: None)
""",
)
findings = detector.scan_paths([tmp_path], repo_root=tmp_path)
assert any(finding.path == scanned_file.name for finding in findings)
assert all(".venv" not in finding.path for finding in findings)
def test_json_output_and_min_severity_filter(tmp_path, capsys):
source_file = _write_python(
tmp_path / "sample.py",
"""
import asyncio
async def handler(model):
await asyncio.to_thread(str, "x")
model.invoke("blocking")
""",
)
exit_code = detector.main(["--format", "json", "--min-severity", "WARN", str(source_file)])
assert exit_code == 0
payload = json.loads(capsys.readouterr().out)
categories = {finding["category"] for finding in payload}
assert categories == {"SYNC_INVOKE_IN_ASYNC"}
def test_parse_errors_are_reported_as_findings(tmp_path):
source_file = _write_python(
tmp_path / "broken.py",
"""
def broken(:
pass
""",
)
findings = detector.scan_file(source_file, repo_root=tmp_path)
assert len(findings) == 1
assert findings[0].category == "PARSE_ERROR"
assert findings[0].severity == "WARN"
assert findings[0].column == 11
assert f"{source_file.name}:1:12" in detector.format_text(findings)
@@ -699,6 +699,92 @@ def test_get_available_tools_includes_invoke_acp_agent_when_agents_configured(mo
load_acp_config_from_dict({}) load_acp_config_from_dict({})
def test_get_available_tools_sync_invoke_acp_agent_preserves_thread_workspace(monkeypatch, tmp_path):
from deerflow.config import paths as paths_module
from deerflow.runtime import user_context as uc_module
monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path))
monkeypatch.setattr(uc_module, "get_effective_user_id", lambda: None)
monkeypatch.setattr(
"deerflow.config.extensions_config.ExtensionsConfig.from_file",
classmethod(lambda cls: ExtensionsConfig(mcp_servers={}, skills={})),
)
monkeypatch.setattr("deerflow.tools.tools.is_host_bash_allowed", lambda config=None: True)
captured: dict[str, object] = {}
class DummyClient:
@property
def collected_text(self) -> str:
return "ok"
async def session_update(self, session_id, update, **kwargs):
pass
async def request_permission(self, options, session_id, tool_call, **kwargs):
raise AssertionError("should not be called")
class DummyConn:
async def initialize(self, **kwargs):
pass
async def new_session(self, **kwargs):
return SimpleNamespace(session_id="s1")
async def prompt(self, **kwargs):
pass
class DummyProcessContext:
def __init__(self, client, cmd, *args, env=None, cwd):
captured["cwd"] = cwd
async def __aenter__(self):
return DummyConn(), object()
async def __aexit__(self, exc_type, exc, tb):
return False
monkeypatch.setitem(
sys.modules,
"acp",
SimpleNamespace(
PROTOCOL_VERSION="2026-03-24",
Client=DummyClient,
spawn_agent_process=lambda client, cmd, *args, env=None, cwd: DummyProcessContext(client, cmd, *args, env=env, cwd=cwd),
text_block=lambda text: {"type": "text", "text": text},
),
)
monkeypatch.setitem(
sys.modules,
"acp.schema",
SimpleNamespace(
ClientCapabilities=lambda: {},
Implementation=lambda **kwargs: kwargs,
TextContentBlock=type("TextContentBlock", (), {"__init__": lambda self, text: setattr(self, "text", text)}),
),
)
explicit_config = SimpleNamespace(
tools=[],
models=[],
tool_search=SimpleNamespace(enabled=False),
skill_evolution=SimpleNamespace(enabled=False),
sandbox=SimpleNamespace(),
get_model_config=lambda name: None,
acp_agents={"codex": ACPAgentConfig(command="codex-acp", description="Codex CLI")},
)
tools = get_available_tools(include_mcp=False, subagent_enabled=False, app_config=explicit_config)
tool = next(tool for tool in tools if tool.name == "invoke_acp_agent")
thread_id = "thread-sync-123"
tool.invoke(
{"agent": "codex", "prompt": "Do something"},
config={"configurable": {"thread_id": thread_id}},
)
assert captured["cwd"] == str(tmp_path / "threads" / thread_id / "acp-workspace")
def test_get_available_tools_uses_explicit_app_config_for_acp_agents(monkeypatch): def test_get_available_tools_uses_explicit_app_config_for_acp_agents(monkeypatch):
explicit_agents = {"codex": ACPAgentConfig(command="codex-acp", description="Codex CLI")} explicit_agents = {"codex": ACPAgentConfig(command="codex-acp", description="Codex CLI")}
explicit_config = SimpleNamespace( explicit_config = SimpleNamespace(
@@ -204,6 +204,26 @@ class TestSymlinkEscapes:
assert exc_info.value.errno == errno.EACCES assert exc_info.value.errno == errno.EACCES
def test_download_file_blocks_symlink_escape_from_mount(self, tmp_path):
mount_dir = tmp_path / "mount"
mount_dir.mkdir()
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
(outside_dir / "secret.bin").write_bytes(b"\x00secret")
_symlink_to(outside_dir, mount_dir / "escape", target_is_directory=True)
sandbox = LocalSandbox(
"test",
[
PathMapping(container_path="/mnt/user-data", local_path=str(mount_dir), read_only=False),
],
)
with pytest.raises(PermissionError) as exc_info:
sandbox.download_file("/mnt/user-data/escape/secret.bin")
assert exc_info.value.errno == errno.EACCES
def test_write_file_blocks_symlink_escape_from_mount(self, tmp_path): def test_write_file_blocks_symlink_escape_from_mount(self, tmp_path):
mount_dir = tmp_path / "mount" mount_dir = tmp_path / "mount"
mount_dir.mkdir() mount_dir.mkdir()
@@ -334,6 +354,74 @@ class TestSymlinkEscapes:
assert existing.read_bytes() == b"original" assert existing.read_bytes() == b"original"
class TestDownloadFileMappings:
"""download_file must use _resolve_path_with_mapping so path resolution, symlink
containment, and read-only awareness are consistent with read_file."""
def test_resolves_container_path_via_mapping(self, tmp_path):
"""download_file should resolve container paths through path mappings."""
data_dir = tmp_path / "data"
data_dir.mkdir()
(data_dir / "asset.bin").write_bytes(b"\x01\x02\x03")
sandbox = LocalSandbox(
"test",
[PathMapping(container_path="/mnt/user-data", local_path=str(data_dir))],
)
result = sandbox.download_file("/mnt/user-data/asset.bin")
assert result == b"\x01\x02\x03"
def test_raises_oserror_with_original_path_when_missing(self, tmp_path):
"""OSError filename should show the container path, not the resolved host path."""
data_dir = tmp_path / "data"
data_dir.mkdir()
sandbox = LocalSandbox(
"test",
[PathMapping(container_path="/mnt/user-data", local_path=str(data_dir))],
)
with pytest.raises(OSError) as exc_info:
sandbox.download_file("/mnt/user-data/missing.bin")
assert exc_info.value.filename == "/mnt/user-data/missing.bin"
def test_rejects_path_outside_virtual_prefix_and_logs_error(self, tmp_path, caplog):
"""download_file must reject paths outside /mnt/user-data and log the reason."""
data_dir = tmp_path / "data"
data_dir.mkdir()
(data_dir / "model.bin").write_bytes(b"weights")
sandbox = LocalSandbox(
"test",
[PathMapping(container_path="/mnt/user-data", local_path=str(data_dir), read_only=True)],
)
with caplog.at_level("ERROR"):
with pytest.raises(PermissionError) as exc_info:
sandbox.download_file("/mnt/skills/model.bin")
assert exc_info.value.errno == errno.EACCES
assert "outside allowed directory" in caplog.text
def test_readable_from_read_only_mount(self, tmp_path):
"""Read-only mounts must not block download_file — read-only only restricts writes."""
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
(skills_dir / "model.bin").write_bytes(b"weights")
sandbox = LocalSandbox(
"test",
[PathMapping(container_path="/mnt/user-data", local_path=str(skills_dir), read_only=True)],
)
result = sandbox.download_file("/mnt/user-data/model.bin")
assert result == b"weights"
class TestMultipleMounts: class TestMultipleMounts:
def test_multiple_read_write_mounts(self, tmp_path): def test_multiple_read_write_mounts(self, tmp_path):
skills_dir = tmp_path / "skills" skills_dir = tmp_path / "skills"
+54
View File
@@ -1,7 +1,9 @@
import asyncio import asyncio
import contextvars
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import StructuredTool from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@@ -69,6 +71,58 @@ def test_mcp_tool_sync_wrapper_in_running_loop():
assert result == "async_result: 100" assert result == "async_result: 100"
def test_sync_wrapper_preserves_contextvars_in_running_loop():
"""The executor branch preserves LangGraph-style contextvars."""
current_value: contextvars.ContextVar[str | None] = contextvars.ContextVar("current_value", default=None)
async def mock_coro() -> str | None:
return current_value.get()
sync_func = make_sync_tool_wrapper(mock_coro, "test_tool")
async def run_in_loop() -> str | None:
token = current_value.set("from-parent-context")
try:
return sync_func()
finally:
current_value.reset(token)
assert asyncio.run(run_in_loop()) == "from-parent-context"
def test_sync_wrapper_preserves_runnable_config_injection():
"""LangChain can still inject RunnableConfig after an async tool is wrapped."""
captured: dict[str, object] = {}
async def mock_coro(x: int, config: RunnableConfig = None):
captured["thread_id"] = ((config or {}).get("configurable") or {}).get("thread_id")
return f"result: {x}"
mock_tool = StructuredTool(
name="test_tool",
description="test description",
args_schema=MockArgs,
func=make_sync_tool_wrapper(mock_coro, "test_tool"),
coroutine=mock_coro,
)
result = mock_tool.invoke({"x": 42}, config={"configurable": {"thread_id": "thread-123"}})
assert result == "result: 42"
assert captured["thread_id"] == "thread-123"
def test_sync_wrapper_preserves_regular_config_argument():
"""Only RunnableConfig-annotated coroutine params get special config injection."""
async def mock_coro(config: str):
return config
sync_func = make_sync_tool_wrapper(mock_coro, "test_tool")
assert sync_func(config="user-config") == "user-config"
def test_mcp_tool_sync_wrapper_exception_logging(): def test_mcp_tool_sync_wrapper_exception_logging():
"""Test the shared sync wrapper's error logging.""" """Test the shared sync wrapper's error logging."""
+35
View File
@@ -78,6 +78,41 @@ def test_apply_updates_skips_existing_duplicate_and_preserves_removals() -> None
assert all(fact["id"] != "fact_remove" for fact in result["facts"]) assert all(fact["id"] != "fact_remove" for fact in result["facts"])
def test_prepare_update_prompt_preserves_non_ascii_memory_text() -> None:
updater = MemoryUpdater()
current_memory = _make_memory(
facts=[
{
"id": "fact_cn",
"content": "Deer-flow是一个非常好的框架。",
"category": "context",
"confidence": 0.9,
"createdAt": "2026-05-20T00:00:00Z",
"source": "thread-cn",
},
]
)
with (
patch("deerflow.agents.memory.updater.get_memory_config", return_value=_memory_config(enabled=True)),
patch("deerflow.agents.memory.updater.get_memory_data", return_value=current_memory),
):
msg = MagicMock()
msg.type = "human"
msg.content = "你好"
prepared = updater._prepare_update_prompt(
[msg],
agent_name=None,
correction_detected=False,
reinforcement_detected=False,
)
assert prepared is not None
_, prompt = prepared
assert "Deer-flow是一个非常好的框架。" in prompt
assert "\\u" not in prompt
def test_apply_updates_skips_same_batch_duplicates_and_keeps_source_metadata() -> None: def test_apply_updates_skips_same_batch_duplicates_and_keeps_source_metadata() -> None:
updater = MemoryUpdater() updater = MemoryUpdater()
current_memory = _make_memory() current_memory = _make_memory()
+686
View File
@@ -0,0 +1,686 @@
"""HTTP/runtime lifecycle E2E tests for the Gateway-owned runs API.
These tests keep the external model out of scope while exercising the real
FastAPI app, auth middleware, lifespan-created runtime dependencies,
``start_run()``, ``run_agent()``, StreamBridge, checkpointer, run store, and
thread metadata store.
"""
from __future__ import annotations
import asyncio
import inspect
import json
import queue
import threading
import time
import uuid
from contextlib import suppress
from pathlib import Path
from typing import Any
from unittest.mock import patch
import pytest
from _agent_e2e_helpers import FakeToolCallingModel, build_single_tool_call_model
from langchain_core.messages import AIMessage, HumanMessage
pytestmark = pytest.mark.no_auto_user
_MINIMAL_CONFIG_YAML = """\
log_level: info
models:
- name: fake-test-model
display_name: Fake Test Model
use: langchain_openai:ChatOpenAI
model: gpt-4o-mini
api_key: $OPENAI_API_KEY
base_url: $OPENAI_API_BASE
sandbox:
use: deerflow.sandbox.local:LocalSandboxProvider
agents_api:
enabled: true
title:
enabled: false
memory:
enabled: false
database:
backend: sqlite
run_events:
backend: memory
"""
class _RunController:
"""Cross-thread controls for the fake async agent."""
def __init__(self) -> None:
self.started = threading.Event()
self.checkpoint_written = threading.Event()
self.cancelled = threading.Event()
self.release = threading.Event()
self.instances: list[_ScriptedAgent] = []
class _ScriptedAgent:
"""Deterministic runtime double for lifecycle-only tests.
This is intentionally not a full LangGraph graph. Tests that need
controllable blocking, cancellation, and rollback checkpoints use the small
``run_agent`` surface they exercise: ``astream()``, checkpointer/store
attachment, metadata, and interrupt node attributes. The real lead-agent
graph/tool dispatch path is covered separately by
``test_stream_run_executes_real_lead_agent_setup_agent_business_path``.
"""
def __init__(
self,
controller: _RunController,
*,
title: str,
answer: str,
block_after_first_chunk: bool = False,
) -> None:
self.controller = controller
self.title = title
self.answer = answer
self.block_after_first_chunk = block_after_first_chunk
self.checkpointer: Any | None = None
self.store: Any | None = None
self.metadata = {"model_name": "fake-test-model"}
self.interrupt_before_nodes = None
self.interrupt_after_nodes = None
self.model = FakeToolCallingModel(responses=[AIMessage(content=self.answer)])
async def astream(self, graph_input, config=None, stream_mode=None, subgraphs=False):
del subgraphs
self.controller.started.set()
thread_id = _thread_id_from_config(config)
human_text = _last_human_text(graph_input)
human = HumanMessage(content=human_text)
ai = await self.model.ainvoke([human], config=config)
state = {"messages": [human.model_dump(), ai.model_dump()], "title": self.title}
if self.checkpointer is not None:
await _write_checkpoint(self.checkpointer, thread_id=thread_id, state=state)
self.controller.checkpoint_written.set()
yield _stream_item_for_mode(stream_mode, state)
if self.block_after_first_chunk:
try:
while not self.controller.release.is_set():
await asyncio.sleep(0.05)
except asyncio.CancelledError:
self.controller.cancelled.set()
raise
def _make_agent_factory(controller: _RunController, **agent_kwargs):
def factory(*, config):
del config
agent = _ScriptedAgent(controller, **agent_kwargs)
controller.instances.append(agent)
return agent
return factory
def _build_fake_setup_agent_model(agent_name: str):
"""Patch target for lead_agent.agent.create_chat_model.
The graph, tool registry, ToolNode dispatch, and setup_agent implementation
remain production code; this fake only replaces the external LLM call.
"""
def fake_create_chat_model(*args: Any, **kwargs: Any) -> FakeToolCallingModel:
del args, kwargs
return build_single_tool_call_model(
tool_name="setup_agent",
tool_args={
"soul": f"# Runtime Business E2E\n\nAgent name: {agent_name}",
"description": "runtime lifecycle business path",
},
tool_call_id="call_runtime_business_1",
final_text=f"Created {agent_name} through the real setup_agent tool.",
)
return fake_create_chat_model
@pytest.fixture
def isolated_deer_flow_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
home = tmp_path / "deer-flow-home"
home.mkdir()
monkeypatch.setenv("DEER_FLOW_HOME", str(home))
monkeypatch.setenv("OPENAI_API_KEY", "sk-fake-key-not-used")
monkeypatch.setenv("OPENAI_API_BASE", "https://example.invalid")
staged_config = tmp_path / "config.yaml"
staged_config.write_text(_MINIMAL_CONFIG_YAML, encoding="utf-8")
monkeypatch.setenv("DEER_FLOW_CONFIG_PATH", str(staged_config))
staged_extensions_config = tmp_path / "extensions_config.json"
staged_extensions_config.write_text('{"mcpServers": {}, "skills": {}}', encoding="utf-8")
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(staged_extensions_config))
return home
def _reset_process_singletons(monkeypatch: pytest.MonkeyPatch) -> None:
"""Clear runtime singletons that depend on this test's temporary config.
The Gateway app/lifespan path reads process-wide caches before wiring
request-scoped dependencies. These E2E tests stage a temporary
``config.yaml``/``extensions_config.json`` and ``DEER_FLOW_HOME``, so the
caches below must be reset before app creation:
- app_config / extensions_config: parsed config file caches.
- paths: ``DEER_FLOW_HOME``-derived filesystem paths.
- persistence.engine: SQLAlchemy engine/session factory for the sqlite dir.
- app.gateway.deps: cached local auth provider/repository.
A shared public reset helper would be cleaner long-term; this test keeps
the reset boundary explicit because the PR is focused on runtime lifecycle
coverage rather than config-cache API cleanup.
"""
from app.gateway import deps as deps_module
from deerflow.config import app_config as app_config_module
from deerflow.config import extensions_config as extensions_config_module
from deerflow.config import paths as paths_module
from deerflow.persistence import engine as engine_module
for module, attr, value in (
(app_config_module, "_app_config", None),
(app_config_module, "_app_config_path", None),
(app_config_module, "_app_config_mtime", None),
(app_config_module, "_app_config_is_custom", False),
(extensions_config_module, "_extensions_config", None),
(paths_module, "_paths_singleton", None),
(paths_module, "_paths", None),
(engine_module, "_engine", None),
(engine_module, "_session_factory", None),
(deps_module, "_cached_local_provider", None),
(deps_module, "_cached_repo", None),
):
monkeypatch.setattr(module, attr, value, raising=False)
def _preserve_process_config_singletons(monkeypatch: pytest.MonkeyPatch) -> None:
"""Restore config singletons mutated as a side effect of AppConfig loading.
``AppConfig.from_file()`` calls ``_apply_singleton_configs()``, which pushes
nested config sections into module-level caches used by middlewares, tool
selection, and runtime providers. Snapshotting those attributes with
``monkeypatch`` lets pytest restore the pre-test values during teardown, so
loading the isolated test config does not leak into later tests.
"""
from deerflow.config import (
acp_config,
agents_api_config,
checkpointer_config,
guardrails_config,
memory_config,
stream_bridge_config,
subagents_config,
summarization_config,
title_config,
tool_search_config,
)
for module, attr in (
(title_config, "_title_config"),
(summarization_config, "_summarization_config"),
(memory_config, "_memory_config"),
(agents_api_config, "_agents_api_config"),
(subagents_config, "_subagents_config"),
(tool_search_config, "_tool_search_config"),
(guardrails_config, "_guardrails_config"),
(checkpointer_config, "_checkpointer_config"),
(stream_bridge_config, "_stream_bridge_config"),
(acp_config, "_acp_agents"),
):
monkeypatch.setattr(module, attr, getattr(module, attr), raising=False)
@pytest.fixture
def isolated_app(isolated_deer_flow_home: Path, monkeypatch: pytest.MonkeyPatch):
_preserve_process_config_singletons(monkeypatch)
_reset_process_singletons(monkeypatch)
from deerflow.config import app_config as app_config_module
cfg = app_config_module.get_app_config()
cfg.database.sqlite_dir = str(isolated_deer_flow_home / "db")
from app.gateway.app import create_app
return create_app()
def _register_user(client, *, email: str = "runtime-e2e@example.com") -> str:
response = client.post(
"/api/v1/auth/register",
json={"email": email, "password": "very-strong-password-123"},
)
assert response.status_code == 201, response.text
csrf_token = client.cookies.get("csrf_token")
assert csrf_token
return csrf_token
def _create_thread(client, csrf_token: str) -> str:
thread_id = str(uuid.uuid4())
response = client.post(
"/api/threads",
json={"thread_id": thread_id, "metadata": {"purpose": "runtime-lifecycle-e2e"}},
headers={"X-CSRF-Token": csrf_token},
)
assert response.status_code == 200, response.text
return thread_id
def _run_body(**overrides) -> dict[str, Any]:
body: dict[str, Any] = {
"assistant_id": "lead_agent",
"input": {"messages": [{"role": "user", "content": "Run lifecycle E2E prompt"}]},
"config": {"recursion_limit": 50},
"stream_mode": ["values"],
}
body.update(overrides)
return body
def _drain_stream(response, *, timeout: float = 10.0, max_bytes: int = 1024 * 1024) -> str:
chunks: queue.Queue[bytes | BaseException | object] = queue.Queue()
sentinel = object()
def read_stream() -> None:
try:
for chunk in response.iter_bytes():
chunks.put(chunk)
if b"event: end" in chunk:
break
except BaseException as exc: # pragma: no cover - reported in the main test thread
chunks.put(exc)
finally:
chunks.put(sentinel)
reader = threading.Thread(target=read_stream, daemon=True)
reader.start()
deadline = time.monotonic() + timeout
body = b""
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise AssertionError(f"SSE stream did not finish within {timeout}s; transcript tail={body[-4000:].decode('utf-8', errors='replace')}")
try:
chunk = chunks.get(timeout=remaining)
except queue.Empty as exc:
raise AssertionError(f"SSE stream did not produce data within {timeout}s; transcript tail={body[-4000:].decode('utf-8', errors='replace')}") from exc
if chunk is sentinel:
break
if isinstance(chunk, BaseException):
raise AssertionError("SSE reader failed") from chunk
body += chunk
if b"event: end" in body:
break
if len(body) >= max_bytes:
raise AssertionError(f"SSE stream exceeded {max_bytes} bytes without event: end")
if b"event: end" not in body:
raise AssertionError(f"SSE stream closed before event: end; transcript tail={body[-4000:].decode('utf-8', errors='replace')}")
return body.decode("utf-8", errors="replace")
def _parse_sse(transcript: str) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
for raw_frame in transcript.split("\n\n"):
frame = raw_frame.strip()
if not frame or frame.startswith(":"):
continue
parsed: dict[str, Any] = {}
for line in frame.splitlines():
if line.startswith("event: "):
parsed["event"] = line.removeprefix("event: ")
elif line.startswith("data: "):
payload = line.removeprefix("data: ")
parsed["data"] = json.loads(payload)
elif line.startswith("id: "):
parsed["id"] = line.removeprefix("id: ")
if parsed:
events.append(parsed)
return events
def _run_id_from_response(response) -> str:
location = response.headers.get("content-location", "")
assert location, "run stream response must include Content-Location"
return location.rstrip("/").split("/")[-1]
def _wait_for_status(client, thread_id: str, run_id: str, status: str, *, timeout: float = 5.0) -> dict:
deadline = time.monotonic() + timeout
last: dict | None = None
while time.monotonic() < deadline:
response = client.get(f"/api/threads/{thread_id}/runs/{run_id}")
assert response.status_code == 200, response.text
last = response.json()
if last["status"] == status:
return last
time.sleep(0.05)
raise AssertionError(f"Run {run_id} did not reach {status!r}; last={last!r}")
def _thread_id_from_config(config: dict | None) -> str:
config = config or {}
context = config.get("context") if isinstance(config.get("context"), dict) else {}
configurable = config.get("configurable") if isinstance(config.get("configurable"), dict) else {}
thread_id = context.get("thread_id") or configurable.get("thread_id")
assert thread_id, f"runtime config did not contain thread_id: {config!r}"
return str(thread_id)
def _last_human_text(graph_input: dict) -> str:
messages = graph_input.get("messages") or []
if not messages:
return ""
last = messages[-1]
content = getattr(last, "content", last)
if isinstance(content, str):
return content
return str(content)
async def _write_checkpoint(checkpointer: Any, *, thread_id: str, state: dict[str, Any]) -> None:
from langgraph.checkpoint.base import empty_checkpoint
checkpoint = empty_checkpoint()
checkpoint["channel_values"] = dict(state)
checkpoint["channel_versions"] = {key: 1 for key in state}
config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
metadata = {
"source": "loop",
"step": 1,
"writes": {"scripted_agent": {"title": state.get("title"), "message_count": len(state.get("messages", []))}},
"parents": {},
}
result = checkpointer.aput(config, checkpoint, metadata, {})
if inspect.isawaitable(result):
await result
def _stream_item_for_mode(stream_mode: Any, state: dict[str, Any]) -> Any:
if isinstance(stream_mode, list):
# ``run_agent`` passes a list when multiple modes/subgraphs are active.
return stream_mode[0], state
return state
def test_stream_run_completes_and_persists_runtime_state(isolated_app):
"""A streaming run should traverse the real runtime and leave state behind."""
from starlette.testclient import TestClient
controller = _RunController()
factory = _make_agent_factory(
controller,
title="Lifecycle E2E",
answer="Lifecycle complete.",
)
with (
patch("app.gateway.services.resolve_agent_factory", return_value=factory),
TestClient(isolated_app) as client,
):
csrf_token = _register_user(client)
thread_id = _create_thread(client, csrf_token)
with client.stream(
"POST",
f"/api/threads/{thread_id}/runs/stream",
json=_run_body(),
headers={"X-CSRF-Token": csrf_token},
) as response:
assert response.status_code == 200, response.read().decode()
run_id = _run_id_from_response(response)
transcript = _drain_stream(response)
events = _parse_sse(transcript)
assert [event["event"] for event in events] == ["metadata", "values", "end"]
assert events[0]["data"] == {"run_id": run_id, "thread_id": thread_id}
assert events[1]["data"]["title"] == "Lifecycle E2E"
assert events[1]["data"]["messages"][-1]["content"] == "Lifecycle complete."
run = client.get(f"/api/threads/{thread_id}/runs/{run_id}")
assert run.status_code == 200, run.text
assert run.json()["status"] == "success"
thread = client.get(f"/api/threads/{thread_id}")
assert thread.status_code == 200, thread.text
assert thread.json()["status"] == "idle"
assert thread.json()["values"]["title"] == "Lifecycle E2E"
messages = client.get(f"/api/threads/{thread_id}/runs/{run_id}/messages")
assert messages.status_code == 200, messages.text
message_events = messages.json()["data"]
event_types = [row["event_type"] for row in message_events]
assert "llm.human.input" in event_types
assert "llm.ai.response" in event_types
assert any(row["content"]["content"] == "Run lifecycle E2E prompt" for row in message_events if row["event_type"] == "llm.human.input")
assert any(row["content"]["content"] == "Lifecycle complete." for row in message_events if row["event_type"] == "llm.ai.response")
def test_stream_run_executes_real_lead_agent_setup_agent_business_path(isolated_app, isolated_deer_flow_home: Path):
"""A runtime stream should execute real lead-agent business code and tools."""
from starlette.testclient import TestClient
agent_name = "runtime-business-agent"
with (
patch(
"deerflow.agents.lead_agent.agent.create_chat_model",
new=_build_fake_setup_agent_model(agent_name),
),
TestClient(isolated_app) as client,
):
csrf_token = _register_user(client, email="business-e2e@example.com")
auth_user_id = client.get("/api/v1/auth/me").json()["id"]
thread_id = _create_thread(client, csrf_token)
body = _run_body(
input={
"messages": [
{
"role": "user",
"content": f"Create a custom agent named {agent_name}.",
}
]
},
context={
"agent_name": agent_name,
"is_bootstrap": True,
"thinking_enabled": False,
"is_plan_mode": False,
"subagent_enabled": False,
},
)
with client.stream(
"POST",
f"/api/threads/{thread_id}/runs/stream",
json=body,
headers={"X-CSRF-Token": csrf_token},
) as response:
assert response.status_code == 200, response.read().decode()
run_id = _run_id_from_response(response)
transcript = _drain_stream(response, timeout=20.0)
events = _parse_sse(transcript)
event_names = [event["event"] for event in events]
assert "metadata" in event_names
assert "error" not in event_names, transcript
assert event_names[-1] == "end"
run = _wait_for_status(client, thread_id, run_id, "success", timeout=10.0)
assert run["assistant_id"] == "lead_agent"
expected_soul = isolated_deer_flow_home / "users" / auth_user_id / "agents" / agent_name / "SOUL.md"
assert expected_soul.exists(), f"setup_agent did not write SOUL.md. tmp tree: {sorted(str(p.relative_to(isolated_deer_flow_home)) for p in isolated_deer_flow_home.rglob('SOUL.md'))}"
assert f"Agent name: {agent_name}" in expected_soul.read_text(encoding="utf-8")
assert not (isolated_deer_flow_home / "users" / "default" / "agents" / agent_name).exists()
def test_cancel_interrupt_stops_running_background_run(isolated_app):
"""HTTP cancel?action=interrupt should stop the worker and persist interruption."""
from starlette.testclient import TestClient
controller = _RunController()
factory = _make_agent_factory(
controller,
title="Interrupt candidate",
answer="This run should be interrupted.",
block_after_first_chunk=True,
)
with (
patch("app.gateway.services.resolve_agent_factory", return_value=factory),
TestClient(isolated_app) as client,
):
csrf_token = _register_user(client, email="interrupt-e2e@example.com")
thread_id = _create_thread(client, csrf_token)
created = client.post(
f"/api/threads/{thread_id}/runs",
json=_run_body(),
headers={"X-CSRF-Token": csrf_token},
)
assert created.status_code == 200, created.text
run_id = created.json()["run_id"]
assert controller.started.wait(5), "fake agent never started"
cancelled = client.post(
f"/api/threads/{thread_id}/runs/{run_id}/cancel?wait=true&action=interrupt",
headers={"X-CSRF-Token": csrf_token},
)
assert cancelled.status_code == 204, cancelled.text
assert controller.cancelled.wait(5), "fake agent task was not cancelled"
run = _wait_for_status(client, thread_id, run_id, "interrupted")
assert run["status"] == "interrupted"
thread = client.get(f"/api/threads/{thread_id}")
assert thread.status_code == 200, thread.text
assert thread.json()["status"] == "idle"
@pytest.mark.anyio
async def test_sse_consumer_disconnect_cancels_inflight_run():
"""A disconnected SSE request should cancel an in-flight run when configured."""
from app.gateway.services import sse_consumer
from deerflow.runtime import DisconnectMode, MemoryStreamBridge, RunManager, RunStatus
bridge = MemoryStreamBridge()
run_manager = RunManager()
record = await run_manager.create("thread-disconnect", on_disconnect=DisconnectMode.cancel)
await run_manager.set_status(record.run_id, RunStatus.running)
await bridge.publish(record.run_id, "metadata", {"run_id": record.run_id, "thread_id": record.thread_id})
worker_started = asyncio.Event()
worker_cancelled = asyncio.Event()
async def _pending_worker() -> None:
try:
worker_started.set()
await asyncio.Event().wait()
except asyncio.CancelledError:
worker_cancelled.set()
raise
record.task = asyncio.create_task(_pending_worker())
await asyncio.wait_for(worker_started.wait(), timeout=1.0)
class _DisconnectedRequest:
headers: dict[str, str] = {}
async def is_disconnected(self) -> bool:
return True
try:
frames = []
async for frame in sse_consumer(bridge, record, _DisconnectedRequest(), run_manager):
frames.append(frame)
assert frames == []
assert record.abort_event.is_set()
assert record.status == RunStatus.interrupted
await asyncio.wait_for(worker_cancelled.wait(), timeout=1.0)
assert record.task.cancelled()
finally:
if record.task is not None and not record.task.done():
record.task.cancel()
with suppress(asyncio.CancelledError):
await record.task
def test_cancel_rollback_restores_pre_run_checkpoint(isolated_app):
"""HTTP cancel?action=rollback should restore the checkpoint captured before run start."""
from starlette.testclient import TestClient
controller = _RunController()
factory = _make_agent_factory(
controller,
title="During rollback run",
answer="This answer should be rolled back.",
block_after_first_chunk=True,
)
with (
patch("app.gateway.services.resolve_agent_factory", return_value=factory),
TestClient(isolated_app) as client,
):
csrf_token = _register_user(client, email="rollback-e2e@example.com")
thread_id = _create_thread(client, csrf_token)
before = client.post(
f"/api/threads/{thread_id}/state",
json={
"values": {
"title": "Before rollback",
"messages": [{"type": "human", "content": "before"}],
},
"as_node": "test_seed",
},
headers={"X-CSRF-Token": csrf_token},
)
assert before.status_code == 200, before.text
assert before.json()["values"]["title"] == "Before rollback"
created = client.post(
f"/api/threads/{thread_id}/runs",
json=_run_body(),
headers={"X-CSRF-Token": csrf_token},
)
assert created.status_code == 200, created.text
run_id = created.json()["run_id"]
assert controller.checkpoint_written.wait(5), "fake agent did not write in-run checkpoint"
during = client.get(f"/api/threads/{thread_id}/state")
assert during.status_code == 200, during.text
assert during.json()["values"]["title"] == "During rollback run"
rolled_back = client.post(
f"/api/threads/{thread_id}/runs/{run_id}/cancel?wait=true&action=rollback",
headers={"X-CSRF-Token": csrf_token},
)
assert rolled_back.status_code == 204, rolled_back.text
assert controller.cancelled.wait(5), "rollback did not cancel the worker task"
run = _wait_for_status(client, thread_id, run_id, "error")
assert run["status"] == "error"
after = client.get(f"/api/threads/{thread_id}/state")
assert after.status_code == 200, after.text
assert after.json()["values"]["title"] == "Before rollback"
assert after.json()["values"]["messages"] == [{"type": "human", "content": "before"}]
+58
View File
@@ -95,6 +95,64 @@ def test_config_loaded_async_only_tool_gets_sync_wrapper(mock_bash, mock_cfg):
assert async_tool.invoke({"x": 42}) == "result: 42" assert async_tool.invoke({"x": 42}) == "result: 42"
@patch("deerflow.tools.tools.get_app_config")
@patch("deerflow.tools.tools.is_host_bash_allowed", return_value=True)
def test_subagent_async_only_tool_gets_sync_wrapper(mock_bash, mock_cfg):
"""Async-only tools added through the subagent path can be invoked by sync clients."""
async def async_tool_impl(x: int) -> str:
return f"subagent: {x}"
async_tool = StructuredTool(
name="async_subagent_tool",
description="Async-only subagent test tool.",
args_schema=AsyncToolArgs,
func=None,
coroutine=async_tool_impl,
)
mock_cfg.return_value = _make_minimal_config([])
with (
patch("deerflow.tools.tools.BUILTIN_TOOLS", []),
patch("deerflow.tools.tools.SUBAGENT_TOOLS", [async_tool]),
):
result = get_available_tools(include_mcp=False, subagent_enabled=True, app_config=mock_cfg.return_value)
assert async_tool in result
assert async_tool.func is not None
assert async_tool.invoke({"x": 7}) == "subagent: 7"
@patch("deerflow.tools.tools.get_app_config")
@patch("deerflow.tools.tools.is_host_bash_allowed", return_value=True)
def test_acp_async_only_tool_gets_sync_wrapper(mock_bash, mock_cfg):
"""Async-only ACP tools can be invoked by sync clients."""
async def async_tool_impl(x: int) -> str:
return f"acp: {x}"
async_tool = StructuredTool(
name="invoke_acp_agent",
description="Async-only ACP test tool.",
args_schema=AsyncToolArgs,
func=None,
coroutine=async_tool_impl,
)
config = _make_minimal_config([])
config.acp_agents = {"codex": object()}
mock_cfg.return_value = config
with (
patch("deerflow.tools.tools.BUILTIN_TOOLS", []),
patch("deerflow.tools.builtins.invoke_acp_agent_tool.build_invoke_acp_agent_tool", return_value=async_tool),
):
result = get_available_tools(include_mcp=False, app_config=config)
assert async_tool in result
assert async_tool.func is not None
assert async_tool.invoke({"x": 9}) == "acp: 9"
@patch("deerflow.tools.tools.get_app_config") @patch("deerflow.tools.tools.get_app_config")
@patch("deerflow.tools.tools.is_host_bash_allowed", return_value=True) @patch("deerflow.tools.tools.is_host_bash_allowed", return_value=True)
def test_no_duplicates_returned(mock_bash, mock_cfg): def test_no_duplicates_returned(mock_bash, mock_cfg):
+4 -4
View File
@@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.12" requires-python = ">=3.12"
resolution-markers = [ resolution-markers = [
"python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'win32'",
@@ -1504,11 +1504,11 @@ wheels = [
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.13" version = "3.15"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ce/cc/762dfb036166873f0059f3b7de4565e1b5bc3d6f28a414c13da27e442f99/idna-3.13.tar.gz", hash = "sha256:585ea8fe5d69b9181ec1afba340451fba6ba764af97026f92a91d4eef164a242", size = 194210, upload-time = "2026-04-22T16:42:42.314Z" } sdist = { url = "https://files.pythonhosted.org/packages/82/77/7b3966d0b9d1d31a36ddf1746926a11dface89a83409bf1483f0237aa758/idna-3.15.tar.gz", hash = "sha256:ca962446ea538f7092a95e057da437618e886f4d349216d2b1e294abfdb65fdc", size = 199245, upload-time = "2026-05-12T22:45:57.011Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/5d/13/ad7d7ca3808a898b4612b6fe93cde56b53f3034dcde235acb1f0e1df24c6/idna-3.13-py3-none-any.whl", hash = "sha256:892ea0cde124a99ce773decba204c5552b69c3c67ffd5f232eb7696135bc8bb3", size = 68629, upload-time = "2026-04-22T16:42:40.909Z" }, { url = "https://files.pythonhosted.org/packages/d2/23/408243171aa9aaba178d3e2559159c24c1171a641aa83b67bdd3394ead8e/idna-3.15-py3-none-any.whl", hash = "sha256:048adeaf8c2d788c40fee287673ccaa74c24ffd8dcf09ffa555a2fbb59f10ac8", size = 72340, upload-time = "2026-05-12T22:45:55.733Z" },
] ]
[[package]] [[package]]
+113 -113
View File
@@ -1731,128 +1731,128 @@ packages:
resolution: {integrity: sha512-FqALmHI8D4o6lk/LRWDnhw95z5eO+eAa6ORjVg09YRR7BkcM6oPHU9uyC0gtQG5vpFLvgpeU4+zEAz2H8APHNw==} resolution: {integrity: sha512-FqALmHI8D4o6lk/LRWDnhw95z5eO+eAa6ORjVg09YRR7BkcM6oPHU9uyC0gtQG5vpFLvgpeU4+zEAz2H8APHNw==}
engines: {node: '>= 10'} engines: {node: '>= 10'}
'@rollup/rollup-android-arm-eabi@4.60.3': '@rollup/rollup-android-arm-eabi@4.60.4':
resolution: {integrity: sha512-x35CNW/ANXG3hE/EZpRU8MXX1JDN86hBb2wMGAtltkz7pc6cxgjpy1OMMfDosOQ+2hWqIkag/fGok1Yady9nGw==} resolution: {integrity: sha512-F5QXMSiFebS9hKZj02XhWLLnRpJ3B3AROP0tWbFBSj+6kCbg5m9j5JoHKd4mmSVy5mS/IMQloYgYxCuJC0fxEQ==}
cpu: [arm] cpu: [arm]
os: [android] os: [android]
'@rollup/rollup-android-arm64@4.60.3': '@rollup/rollup-android-arm64@4.60.4':
resolution: {integrity: sha512-xw3xtkDApIOGayehp2+Rz4zimfkaX65r4t47iy+ymQB2G4iJCBBfj0ogVg5jpvjpn8UWn/+q9tprxleYeNp3Hw==} resolution: {integrity: sha512-GxxTKApUpzRhof7poWvCJHRF51C67u1R7D6DiluBE8wKU1u5GWE8t+v81JvJYtbawoBFX1hLv5Ei4eVjkWokaw==}
cpu: [arm64] cpu: [arm64]
os: [android] os: [android]
'@rollup/rollup-darwin-arm64@4.60.3': '@rollup/rollup-darwin-arm64@4.60.4':
resolution: {integrity: sha512-vo6Y5Qfpx7/5EaamIwi0WqW2+zfiusVihKatLvtN1VFVy3D13uERk/6gZLU1UiHRL6fDXqj/ELIeVRGnvcTE1g==} resolution: {integrity: sha512-tua0TaJxMOB1R0V0RS1jFZ/RpURFDJIOR2A6jWwQeawuFyS4gBW+rntLRaQd0EQ4bd6Vp44Z2rXW+YYDBsj6IA==}
cpu: [arm64] cpu: [arm64]
os: [darwin] os: [darwin]
'@rollup/rollup-darwin-x64@4.60.3': '@rollup/rollup-darwin-x64@4.60.4':
resolution: {integrity: sha512-D+0QGcZhBzTN82weOnsSlY7V7+RMmPuF1CkbxyMAGE8+ZHeUjyb76ZiWmBlCu//AQQONvxcqRbwZTajZKqjuOw==} resolution: {integrity: sha512-CSKq7MsP+5PFIcydhAiR1K0UhEI1A2jWXVKHPCBZ151yOutENwvnPocgVHkivu2kviURtCEB6zUQw0vs8RrhMg==}
cpu: [x64] cpu: [x64]
os: [darwin] os: [darwin]
'@rollup/rollup-freebsd-arm64@4.60.3': '@rollup/rollup-freebsd-arm64@4.60.4':
resolution: {integrity: sha512-6HnvHCT7fDyj6R0Ph7A6x8dQS/S38MClRWeDLqc0MdfWkxjiu1HSDYrdPhqSILzjTIC/pnXbbJbo+ft+gy/9hQ==} resolution: {integrity: sha512-+O8OkVdyvXMtJEciu2wS/pzm1IxntEEQx3z5TAVy4l32G0etZn+RsA48ARRrFm6Ri8fvqPQfgrvNxSjKAbnd3g==}
cpu: [arm64] cpu: [arm64]
os: [freebsd] os: [freebsd]
'@rollup/rollup-freebsd-x64@4.60.3': '@rollup/rollup-freebsd-x64@4.60.4':
resolution: {integrity: sha512-KHLgC3WKlUYW3ShFKnnosZDOJ0xjg9zp7au3sIm2bs/tGBeC2ipmvRh/N7JKi0t9Ue20C0dpEshi8WUubg+cnA==} resolution: {integrity: sha512-Iw3oMskH3AfNuhU0MSN7vNbdi4me/NiYo2azqPz/Le16zHSa+3RRmliCMWWQmh4lcndccU40xcJuTYJZxNo/lw==}
cpu: [x64] cpu: [x64]
os: [freebsd] os: [freebsd]
'@rollup/rollup-linux-arm-gnueabihf@4.60.3': '@rollup/rollup-linux-arm-gnueabihf@4.60.4':
resolution: {integrity: sha512-DV6fJoxEYWJOvaZIsok7KrYl0tPvga5OZ2yvKHNNYyk/2roMLqQAbGhr78EQ5YhHpnhLKJD3S1WFusAkmUuV5g==} resolution: {integrity: sha512-EIPRXTVQpHyF8WOo219AD2yEltPehLTcTMz2fn6JsatLYSzQf00hj3rulF+yauOlF9/FtM2WpkT/hJh/KJFGhA==}
cpu: [arm] cpu: [arm]
os: [linux] os: [linux]
'@rollup/rollup-linux-arm-musleabihf@4.60.3': '@rollup/rollup-linux-arm-musleabihf@4.60.4':
resolution: {integrity: sha512-mQKoJAzvuOs6F+TZybQO4GOTSMUu7v0WdxEk24krQ/uUxXoPTtHjuaUuPmFhtBcM4K0ons8nrE3JyhTuCFtT/w==} resolution: {integrity: sha512-J3Yh9PzzF1Ovah2At+lHiGQdsYgArxBbXv/zHfSyaiFQEqvNv7DcW98pCrmdjCZBrqBiKrKKe2V+aaSGWuBe/w==}
cpu: [arm] cpu: [arm]
os: [linux] os: [linux]
'@rollup/rollup-linux-arm64-gnu@4.60.3': '@rollup/rollup-linux-arm64-gnu@4.60.4':
resolution: {integrity: sha512-Whjj2qoiJ6+OOJMGptTYazaJvjOJm+iKHpXQM1P3LzGjt7Ff++Tp7nH4N8J/BUA7R9IHfDyx4DJIflifwnbmIA==} resolution: {integrity: sha512-BFDEZMYfUvLn37ONE1yMBojPxnMlTFsdyNoqncT0qFq1mAfllL+ATMMJd8TeuVMiX84s1KbcxcZbXInmcO2mRg==}
cpu: [arm64] cpu: [arm64]
os: [linux] os: [linux]
'@rollup/rollup-linux-arm64-musl@4.60.3': '@rollup/rollup-linux-arm64-musl@4.60.4':
resolution: {integrity: sha512-4YTNHKqGng5+yiZt3mg77nmyuCfmNfX4fPmyUapBcIk+BdwSwmCWGXOUxhXbBEkFHtoN5boLj/5NON+u5QC9tg==} resolution: {integrity: sha512-pc9EYOSlOgdQ2uPl1o9PF6/kLSgaUosia7gOuS8mB69IxJvlclko1MECXysjs5ryez1/5zjYqx3+xYU0TU6R1A==}
cpu: [arm64] cpu: [arm64]
os: [linux] os: [linux]
'@rollup/rollup-linux-loong64-gnu@4.60.3': '@rollup/rollup-linux-loong64-gnu@4.60.4':
resolution: {integrity: sha512-SU3kNlhkpI4UqlUc2VXPGK9o886ZsSeGfMAX2ba2b8DKmMXq4AL7KUrkSWVbb7koVqx41Yczx6dx5PNargIrEA==} resolution: {integrity: sha512-NxnomyxYerDh5n4iLrNa+sH+Z+U4BMEE46V2PgQ/hoB909i8gV1M5wPojWg9fk1jWpO3IQnOs20K4wyZuFLEFQ==}
cpu: [loong64] cpu: [loong64]
os: [linux] os: [linux]
'@rollup/rollup-linux-loong64-musl@4.60.3': '@rollup/rollup-linux-loong64-musl@4.60.4':
resolution: {integrity: sha512-6lDLl5h4TXpB1mTf2rQWnAk/LcXrx9vBfu/DT5TIPhvMhRWaZ5MxkIc8u4lJAmBo6klTe1ywXIUHFjylW505sg==} resolution: {integrity: sha512-nbJnQ8a3z1mtmrwImCYhc6BGpThAyYVRQxw9uKSKG4wR6aAYno9sVjJ0zaZcW9BPJX1GbrDPf+SvdWjgTuDmnw==}
cpu: [loong64] cpu: [loong64]
os: [linux] os: [linux]
'@rollup/rollup-linux-ppc64-gnu@4.60.3': '@rollup/rollup-linux-ppc64-gnu@4.60.4':
resolution: {integrity: sha512-BMo8bOw8evlup/8G+cj5xWtPyp93xPdyoSN16Zy90Q2QZ0ZYRhCt6ZJSwbrRzG9HApFabjwj2p25TUPDWrhzqQ==} resolution: {integrity: sha512-2EU6acNrQLd8tYvo/LXW535wupT3m6fo7HKo6lr7ktQoItxTyOL1ZCR/GfGCuXl2vR+zmfI6eRXkSemafv+iVg==}
cpu: [ppc64] cpu: [ppc64]
os: [linux] os: [linux]
'@rollup/rollup-linux-ppc64-musl@4.60.3': '@rollup/rollup-linux-ppc64-musl@4.60.4':
resolution: {integrity: sha512-E0L8X1dZN1/Rph+5VPF6Xj2G7JJvMACVXtamTJIDrVI44Y3K+G8gQaMEAavbqCGTa16InptiVrX6eM6pmJ+7qA==} resolution: {integrity: sha512-WeBtoMuaMxiiIrO2IYP3xs6GMWkJP2C0EoT8beTLkUPmzV1i/UcOSVw1d5r9KBODtHKilG5yFxsGRnBbK3wJ4A==}
cpu: [ppc64] cpu: [ppc64]
os: [linux] os: [linux]
'@rollup/rollup-linux-riscv64-gnu@4.60.3': '@rollup/rollup-linux-riscv64-gnu@4.60.4':
resolution: {integrity: sha512-oZJ/WHaVfHUiRAtmTAeo3DcevNsVvH8mbvodjZy7D5QKvCefO371SiKRpxoDcCxB3PTRTLayWBkvmDQKTcX/sw==} resolution: {integrity: sha512-FJHFfqpKUI3A10WrWKiFbBZ7yVbGT4q4B5o1qKFFojqpaYoh9LrQgqWCmmcxQzVSXYtyB5bzkXrYzlHTs21MYA==}
cpu: [riscv64] cpu: [riscv64]
os: [linux] os: [linux]
'@rollup/rollup-linux-riscv64-musl@4.60.3': '@rollup/rollup-linux-riscv64-musl@4.60.4':
resolution: {integrity: sha512-Dhbyh7j9FybM3YaTgaHmVALwA8AkUwTPccyCQ79TG9AJUsMQqgN1DDEZNr4+QUfwiWvLDumW5vdwzoeUF+TNxQ==} resolution: {integrity: sha512-mcEl6CUT5IAUmQf1m9FYSmVqCJlpQ8r8eyftFUHG8i9OhY7BkBXSUdnLH5DOf0wCOjcP9v/QO93zpmF1SptCCw==}
cpu: [riscv64] cpu: [riscv64]
os: [linux] os: [linux]
'@rollup/rollup-linux-s390x-gnu@4.60.3': '@rollup/rollup-linux-s390x-gnu@4.60.4':
resolution: {integrity: sha512-cJd1X5XhHHlltkaypz1UcWLA8AcoIi1aWhsvaWDskD1oz2eKCypnqvTQ8ykMNI0RSmm7NkTdSqSSD7zM0xa6Ig==} resolution: {integrity: sha512-ynt3JxVd2w2buzoKDWIyiV1pJW93xlQic1THVLXilz429oijRpSHivZAgp65KBu+cMcgf1eVVjdnTLvPxgCuoQ==}
cpu: [s390x] cpu: [s390x]
os: [linux] os: [linux]
'@rollup/rollup-linux-x64-gnu@4.60.3': '@rollup/rollup-linux-x64-gnu@4.60.4':
resolution: {integrity: sha512-DAZDBHQfG2oQuhY7mc6I3/qB4LU2fQCjRvxbDwd/Jdvb9fypP4IJ4qmtu6lNjes6B531AI8cg1aKC2di97bUxA==} resolution: {integrity: sha512-Boiz5+MsaROEWDf+GGEwF8VMHGhlUoQMtIPjOgA5fv4osupqTVnJteQNKJwUcnUog2G55jYXH7KZFFiJe0TEzQ==}
cpu: [x64] cpu: [x64]
os: [linux] os: [linux]
'@rollup/rollup-linux-x64-musl@4.60.3': '@rollup/rollup-linux-x64-musl@4.60.4':
resolution: {integrity: sha512-cRxsE8c13mZOh3vP+wLDxpQBRrOHDIGOWyDL93Sy0Ga8y515fBcC2pjUfFwUe5T7tqvTvWbCpg1URM/AXdWIXA==} resolution: {integrity: sha512-+qfSY27qIrFfI/Hom04KYFw3GKZSGU4lXus51wsb5EuySfFlWRwjkKWoE9emgRw/ukoT4Udsj4W/+xxG8VbPKg==}
cpu: [x64] cpu: [x64]
os: [linux] os: [linux]
'@rollup/rollup-openbsd-x64@4.60.3': '@rollup/rollup-openbsd-x64@4.60.4':
resolution: {integrity: sha512-QaWcIgRxqEdQdhJqW4DJctsH6HCmo5vHxY0krHSX4jMtOqfzC+dqDGuHM87bu4H8JBeibWx7jFz+h6/4C8wA5Q==} resolution: {integrity: sha512-VpTfOPHgVXEBeeR8hZ2O0F3aSso+JDWqTWmTmzcQKted54IAdUVbxE+j/MVxUsKa8L20HJhv3vUezVPoquqWjA==}
cpu: [x64] cpu: [x64]
os: [openbsd] os: [openbsd]
'@rollup/rollup-openharmony-arm64@4.60.3': '@rollup/rollup-openharmony-arm64@4.60.4':
resolution: {integrity: sha512-AaXwSvUi3QIPtroAUw1t5yHGIyqKEXwH54WUocFolZhpGDruJcs8c+xPNDRn4XiQsS7MEwnYsHW2l0MBLDMkWg==} resolution: {integrity: sha512-IPOsh5aRYuLv/nkU51X10Bf75Bsf6+gZdx1X+QP5QM6lIJFHHqbHLG0uJn/hWthzo13UAc2umiUorqZy3axoZg==}
cpu: [arm64] cpu: [arm64]
os: [openharmony] os: [openharmony]
'@rollup/rollup-win32-arm64-msvc@4.60.3': '@rollup/rollup-win32-arm64-msvc@4.60.4':
resolution: {integrity: sha512-65LAKM/bAWDqKNEelHlcHvm2V+Vfb8C6INFxQXRHCvaVN1rJfwr4NvdP4FyzUaLqWfaCGaadf6UbTm8xJeYfEg==} resolution: {integrity: sha512-4QzE9E81OohJ/HKzHhsqU+zcYYojVOXlFMs1DdyMT6qXl/niOH7AVElmmEdUNHHS/oRkc++d5k6Vy85zFs0DEw==}
cpu: [arm64] cpu: [arm64]
os: [win32] os: [win32]
'@rollup/rollup-win32-ia32-msvc@4.60.3': '@rollup/rollup-win32-ia32-msvc@4.60.4':
resolution: {integrity: sha512-EEM2gyhBF5MFnI6vMKdX1LAosE627RGBzIoGMdLloPZkXrUN0Ckqgr2Qi8+J3zip/8NVVro3/FjB+tjhZUgUHA==} resolution: {integrity: sha512-zTPgT1YuHHcd+Tmx7h8aml0FWFVelV5N54oHow9SLj+GfoDy/huQ+UV396N/C7KpMDMiPspRktzM1/0r1usYEA==}
cpu: [ia32] cpu: [ia32]
os: [win32] os: [win32]
'@rollup/rollup-win32-x64-gnu@4.60.3': '@rollup/rollup-win32-x64-gnu@4.60.4':
resolution: {integrity: sha512-E5Eb5H/DpxaoXH++Qkv28RcUJboMopmdDUALBczvHMf7hNIxaDZqwY5lK12UK1BHacSmvupoEWGu+n993Z0y1A==} resolution: {integrity: sha512-DRS4G7mi9lJxqEDezIkKCaUIKCrLUUDCUaCsTPCi/rtqaC6D/jjwslMQyiDU50Ka0JKpeXeRBFBAXwArY52vBw==}
cpu: [x64] cpu: [x64]
os: [win32] os: [win32]
'@rollup/rollup-win32-x64-msvc@4.60.3': '@rollup/rollup-win32-x64-msvc@4.60.4':
resolution: {integrity: sha512-hPt/bgL5cE+Qp+/TPHBqptcAgPzgj46mPcg/16zNUmbQk0j+mOEQV/+Lqu8QRtDV3Ek95Q6FeFITpuhl6OTsAA==} resolution: {integrity: sha512-QVTUovf40zgTqlFVrKA1uXMVvU2QWEFWfAH8Wdc48IxLvrJMQVMBRjuQyUpzZCDkakImib9eVazbWlC6ksWtJw==}
cpu: [x64] cpu: [x64]
os: [win32] os: [win32]
@@ -4079,8 +4079,8 @@ packages:
resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==} resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==}
hasBin: true hasBin: true
lru-cache@11.3.6: lru-cache@11.5.0:
resolution: {integrity: sha512-Gf/KoL3C/MlI7Bt0PGI9I+TeTC/I6r/csU58N4BSNc4lppLBeKsOdFYkK+dX0ABDUMJNfCHTyPpzwwO21Awd3A==} resolution: {integrity: sha512-5YgH9UJd7wVb9hIouI2adWpgqrrICkt070Dnj8EUY1+B4B2P9eRLPAkAAo6NICA7CEhOIeBHl46u9zSNpNu7zA==}
engines: {node: 20 || >=22} engines: {node: 20 || >=22}
lucide-react@0.542.0: lucide-react@0.542.0:
@@ -4671,8 +4671,8 @@ packages:
resolution: {integrity: sha512-PS08Iboia9mts/2ygV3eLpY5ghnUcfLV/EXTOW1E2qYxJKGGBUtNjN76FYHnMs36RmARn41bC0AZmn+rR0OVpQ==} resolution: {integrity: sha512-PS08Iboia9mts/2ygV3eLpY5ghnUcfLV/EXTOW1E2qYxJKGGBUtNjN76FYHnMs36RmARn41bC0AZmn+rR0OVpQ==}
engines: {node: ^10 || ^12 || >=14} engines: {node: ^10 || ^12 || >=14}
postcss@8.5.14: postcss@8.5.15:
resolution: {integrity: sha512-SoSL4+OSEtR99LHFZQiJLkT59C5B1amGO1NzTwj7TT1qCUgUO6hxOvzkOYxD+vMrXBM3XJIKzokoERdqQq/Zmg==} resolution: {integrity: sha512-FfR8sjd4em2T6fb3I2MwAJU7HWVMr9zba+enmQeeWFfCbm+UOC/0X4DS8XtpUTMwWMGbjKYP7xjfNekzyGmB3A==}
engines: {node: ^10 || ^12 || >=14} engines: {node: ^10 || ^12 || >=14}
postcss@8.5.6: postcss@8.5.6:
@@ -4962,8 +4962,8 @@ packages:
robust-predicates@3.0.2: robust-predicates@3.0.2:
resolution: {integrity: sha512-IXgzBWvWQwE6PrDI05OvmXUIruQTcoMDzRsOd5CDvHCVLcLHMTSYvOK5Cm46kWqlV3yAbuSpBZdJ5oP5OUoStg==} resolution: {integrity: sha512-IXgzBWvWQwE6PrDI05OvmXUIruQTcoMDzRsOd5CDvHCVLcLHMTSYvOK5Cm46kWqlV3yAbuSpBZdJ5oP5OUoStg==}
rollup@4.60.3: rollup@4.60.4:
resolution: {integrity: sha512-pAQK9HalE84QSm4Po3EmWIZPd3FnjkShVkiMlz1iligWYkWQ7wHYd1PF/T7QZ5TVSD6uSTon5gBVMSM4JfBV+A==} resolution: {integrity: sha512-WHeFSbZYsPu3+bLoNRUuAO+wavNlocOPf3wSHTP7hcFKVnJeWsYlCDbr3mTS14FCizf9ccIxXA8sGL8zKeQN3g==}
engines: {node: '>=18.0.0', npm: '>=8.0.0'} engines: {node: '>=18.0.0', npm: '>=8.0.0'}
hasBin: true hasBin: true
@@ -7297,79 +7297,79 @@ snapshots:
'@resvg/resvg-wasm@2.6.2': {} '@resvg/resvg-wasm@2.6.2': {}
'@rollup/rollup-android-arm-eabi@4.60.3': '@rollup/rollup-android-arm-eabi@4.60.4':
optional: true optional: true
'@rollup/rollup-android-arm64@4.60.3': '@rollup/rollup-android-arm64@4.60.4':
optional: true optional: true
'@rollup/rollup-darwin-arm64@4.60.3': '@rollup/rollup-darwin-arm64@4.60.4':
optional: true optional: true
'@rollup/rollup-darwin-x64@4.60.3': '@rollup/rollup-darwin-x64@4.60.4':
optional: true optional: true
'@rollup/rollup-freebsd-arm64@4.60.3': '@rollup/rollup-freebsd-arm64@4.60.4':
optional: true optional: true
'@rollup/rollup-freebsd-x64@4.60.3': '@rollup/rollup-freebsd-x64@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-arm-gnueabihf@4.60.3': '@rollup/rollup-linux-arm-gnueabihf@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-arm-musleabihf@4.60.3': '@rollup/rollup-linux-arm-musleabihf@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-arm64-gnu@4.60.3': '@rollup/rollup-linux-arm64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-arm64-musl@4.60.3': '@rollup/rollup-linux-arm64-musl@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-loong64-gnu@4.60.3': '@rollup/rollup-linux-loong64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-loong64-musl@4.60.3': '@rollup/rollup-linux-loong64-musl@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-ppc64-gnu@4.60.3': '@rollup/rollup-linux-ppc64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-ppc64-musl@4.60.3': '@rollup/rollup-linux-ppc64-musl@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-riscv64-gnu@4.60.3': '@rollup/rollup-linux-riscv64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-riscv64-musl@4.60.3': '@rollup/rollup-linux-riscv64-musl@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-s390x-gnu@4.60.3': '@rollup/rollup-linux-s390x-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-x64-gnu@4.60.3': '@rollup/rollup-linux-x64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-linux-x64-musl@4.60.3': '@rollup/rollup-linux-x64-musl@4.60.4':
optional: true optional: true
'@rollup/rollup-openbsd-x64@4.60.3': '@rollup/rollup-openbsd-x64@4.60.4':
optional: true optional: true
'@rollup/rollup-openharmony-arm64@4.60.3': '@rollup/rollup-openharmony-arm64@4.60.4':
optional: true optional: true
'@rollup/rollup-win32-arm64-msvc@4.60.3': '@rollup/rollup-win32-arm64-msvc@4.60.4':
optional: true optional: true
'@rollup/rollup-win32-ia32-msvc@4.60.3': '@rollup/rollup-win32-ia32-msvc@4.60.4':
optional: true optional: true
'@rollup/rollup-win32-x64-gnu@4.60.3': '@rollup/rollup-win32-x64-gnu@4.60.4':
optional: true optional: true
'@rollup/rollup-win32-x64-msvc@4.60.3': '@rollup/rollup-win32-x64-msvc@4.60.4':
optional: true optional: true
'@rtsao/scc@1.1.0': {} '@rtsao/scc@1.1.0': {}
@@ -8067,7 +8067,7 @@ snapshots:
'@vue/shared': 3.5.28 '@vue/shared': 3.5.28
estree-walker: 2.0.2 estree-walker: 2.0.2
magic-string: 0.30.21 magic-string: 0.30.21
postcss: 8.5.14 postcss: 8.5.15
source-map-js: 1.2.1 source-map-js: 1.2.1
'@vue/compiler-ssr@3.5.28': '@vue/compiler-ssr@3.5.28':
@@ -9947,7 +9947,7 @@ snapshots:
dependencies: dependencies:
js-tokens: 4.0.0 js-tokens: 4.0.0
lru-cache@11.3.6: {} lru-cache@11.5.0: {}
lucide-react@0.542.0(react@19.2.4): lucide-react@0.542.0(react@19.2.4):
dependencies: dependencies:
@@ -10941,7 +10941,7 @@ snapshots:
picocolors: 1.1.1 picocolors: 1.1.1
source-map-js: 1.2.1 source-map-js: 1.2.1
postcss@8.5.14: postcss@8.5.15:
dependencies: dependencies:
nanoid: 3.3.12 nanoid: 3.3.12
picocolors: 1.1.1 picocolors: 1.1.1
@@ -11282,35 +11282,35 @@ snapshots:
robust-predicates@3.0.2: {} robust-predicates@3.0.2: {}
rollup@4.60.3: rollup@4.60.4:
dependencies: dependencies:
'@types/estree': 1.0.8 '@types/estree': 1.0.8
optionalDependencies: optionalDependencies:
'@rollup/rollup-android-arm-eabi': 4.60.3 '@rollup/rollup-android-arm-eabi': 4.60.4
'@rollup/rollup-android-arm64': 4.60.3 '@rollup/rollup-android-arm64': 4.60.4
'@rollup/rollup-darwin-arm64': 4.60.3 '@rollup/rollup-darwin-arm64': 4.60.4
'@rollup/rollup-darwin-x64': 4.60.3 '@rollup/rollup-darwin-x64': 4.60.4
'@rollup/rollup-freebsd-arm64': 4.60.3 '@rollup/rollup-freebsd-arm64': 4.60.4
'@rollup/rollup-freebsd-x64': 4.60.3 '@rollup/rollup-freebsd-x64': 4.60.4
'@rollup/rollup-linux-arm-gnueabihf': 4.60.3 '@rollup/rollup-linux-arm-gnueabihf': 4.60.4
'@rollup/rollup-linux-arm-musleabihf': 4.60.3 '@rollup/rollup-linux-arm-musleabihf': 4.60.4
'@rollup/rollup-linux-arm64-gnu': 4.60.3 '@rollup/rollup-linux-arm64-gnu': 4.60.4
'@rollup/rollup-linux-arm64-musl': 4.60.3 '@rollup/rollup-linux-arm64-musl': 4.60.4
'@rollup/rollup-linux-loong64-gnu': 4.60.3 '@rollup/rollup-linux-loong64-gnu': 4.60.4
'@rollup/rollup-linux-loong64-musl': 4.60.3 '@rollup/rollup-linux-loong64-musl': 4.60.4
'@rollup/rollup-linux-ppc64-gnu': 4.60.3 '@rollup/rollup-linux-ppc64-gnu': 4.60.4
'@rollup/rollup-linux-ppc64-musl': 4.60.3 '@rollup/rollup-linux-ppc64-musl': 4.60.4
'@rollup/rollup-linux-riscv64-gnu': 4.60.3 '@rollup/rollup-linux-riscv64-gnu': 4.60.4
'@rollup/rollup-linux-riscv64-musl': 4.60.3 '@rollup/rollup-linux-riscv64-musl': 4.60.4
'@rollup/rollup-linux-s390x-gnu': 4.60.3 '@rollup/rollup-linux-s390x-gnu': 4.60.4
'@rollup/rollup-linux-x64-gnu': 4.60.3 '@rollup/rollup-linux-x64-gnu': 4.60.4
'@rollup/rollup-linux-x64-musl': 4.60.3 '@rollup/rollup-linux-x64-musl': 4.60.4
'@rollup/rollup-openbsd-x64': 4.60.3 '@rollup/rollup-openbsd-x64': 4.60.4
'@rollup/rollup-openharmony-arm64': 4.60.3 '@rollup/rollup-openharmony-arm64': 4.60.4
'@rollup/rollup-win32-arm64-msvc': 4.60.3 '@rollup/rollup-win32-arm64-msvc': 4.60.4
'@rollup/rollup-win32-ia32-msvc': 4.60.3 '@rollup/rollup-win32-ia32-msvc': 4.60.4
'@rollup/rollup-win32-x64-gnu': 4.60.3 '@rollup/rollup-win32-x64-gnu': 4.60.4
'@rollup/rollup-win32-x64-msvc': 4.60.3 '@rollup/rollup-win32-x64-msvc': 4.60.4
fsevents: 2.3.3 fsevents: 2.3.3
roughjs@4.6.6: roughjs@4.6.6:
@@ -11908,7 +11908,7 @@ snapshots:
chokidar: 5.0.0 chokidar: 5.0.0
destr: 2.0.5 destr: 2.0.5
h3: 1.15.11 h3: 1.15.11
lru-cache: 11.3.6 lru-cache: 11.5.0
node-fetch-native: 1.6.7 node-fetch-native: 1.6.7
ofetch: 1.5.1 ofetch: 1.5.1
ufo: 1.6.4 ufo: 1.6.4
@@ -11985,8 +11985,8 @@ snapshots:
esbuild: 0.27.7 esbuild: 0.27.7
fdir: 6.5.0(picomatch@4.0.4) fdir: 6.5.0(picomatch@4.0.4)
picomatch: 4.0.4 picomatch: 4.0.4
postcss: 8.5.14 postcss: 8.5.15
rollup: 4.60.3 rollup: 4.60.4
tinyglobby: 0.2.16 tinyglobby: 0.2.16
optionalDependencies: optionalDependencies:
'@types/node': 20.19.33 '@types/node': 20.19.33
+14 -1
View File
@@ -120,7 +120,20 @@ if [ -z "$BETTER_AUTH_SECRET" ]; then
echo -e "${GREEN}✓ BETTER_AUTH_SECRET loaded from $_secret_file${NC}" echo -e "${GREEN}✓ BETTER_AUTH_SECRET loaded from $_secret_file${NC}"
else else
export BETTER_AUTH_SECRET export BETTER_AUTH_SECRET
BETTER_AUTH_SECRET="$(python3 -c 'import secrets; print(secrets.token_hex(32))')" if command -v python3 > /dev/null 2>&1 && \
BETTER_AUTH_SECRET="$(python3 -c 'import sys; sys.version_info >= (3, 6) or sys.exit(1); import secrets; print(secrets.token_hex(32))' 2>/dev/null)"; then
true
elif command -v python > /dev/null 2>&1 && \
BETTER_AUTH_SECRET="$(python -c 'import sys; sys.version_info >= (3, 6) or sys.exit(1); import secrets; print(secrets.token_hex(32))' 2>/dev/null)"; then
true
elif command -v openssl > /dev/null 2>&1 && \
BETTER_AUTH_SECRET="$(openssl rand -hex 32)"; then
true
else
echo -e "${RED}✗ Cannot generate BETTER_AUTH_SECRET: python3, python, and openssl are all unavailable.${NC}" >&2
echo -e "${RED} Set BETTER_AUTH_SECRET manually before running make up.${NC}" >&2
exit 1
fi
echo "$BETTER_AUTH_SECRET" > "$_secret_file" echo "$BETTER_AUTH_SECRET" > "$_secret_file"
chmod 600 "$_secret_file" chmod 600 "$_secret_file"
echo -e "${GREEN}✓ BETTER_AUTH_SECRET generated → $_secret_file${NC}" echo -e "${GREEN}✓ BETTER_AUTH_SECRET generated → $_secret_file${NC}"
+23
View File
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""CLI wrapper for the async/thread boundary detector."""
from __future__ import annotations
import sys
from collections.abc import Sequence
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
TEST_SUPPORT_PATH = REPO_ROOT / "backend" / "tests"
if str(TEST_SUPPORT_PATH) not in sys.path:
sys.path.insert(0, str(TEST_SUPPORT_PATH))
def main(argv: Sequence[str] | None = None) -> int:
from support.detectors.thread_boundaries import main as detector_main
return detector_main(argv)
if __name__ == "__main__":
sys.exit(main())
+124 -14
View File
@@ -62,27 +62,129 @@ done
# ── Stop helper ────────────────────────────────────────────────────────────── # ── Stop helper ──────────────────────────────────────────────────────────────
_kill_port() { _is_repo_pid() {
local pid=$1
lsof -p "$pid" 2>/dev/null | grep -F "$REPO_ROOT" >/dev/null
}
_kill_repo_processes() {
local pattern=$1
local pid
local pids=""
while IFS= read -r pid; do
if [ -n "$pid" ] && _is_repo_pid "$pid"; then
case " $pids " in
*" $pid "*) ;;
*) pids="$pids $pid" ;;
esac
fi
done < <(pgrep -f "$pattern" 2>/dev/null || true)
if [ -n "$pids" ]; then
kill $pids 2>/dev/null || true
fi
}
_kill_repo_port() {
local port=$1 local port=$1
local pid local pid
pid=$(lsof -ti :"$port" 2>/dev/null) || true local pids=""
if [ -n "$pid" ]; then
kill -9 $pid 2>/dev/null || true while IFS= read -r pid; do
if [ -n "$pid" ] && _is_repo_pid "$pid"; then
case " $pids " in
*" $pid "*) ;;
*) pids="$pids $pid" ;;
esac
fi
done < <(lsof -nP -iTCP:"$port" -sTCP:LISTEN -t 2>/dev/null || true)
if [ -n "$pids" ]; then
kill -9 $pids 2>/dev/null || true
fi
}
_is_port_listening() {
local port=$1
if command -v lsof >/dev/null 2>&1; then
if lsof -nP -iTCP:"$port" -sTCP:LISTEN -t >/dev/null 2>&1; then
return 0
fi
fi
if command -v ss >/dev/null 2>&1; then
if ss -ltn "( sport = :$port )" 2>/dev/null | tail -n +2 | grep -q .; then
return 0
fi
fi
if command -v netstat >/dev/null 2>&1; then
if netstat -ltn 2>/dev/null | awk '{print $4}' | grep -Eq "(^|[.:])${port}$"; then
return 0
fi
fi
return 1
}
_is_repo_nginx_pid() {
local pid=$1
local command
local args
command=$(ps -p "$pid" -o comm= 2>/dev/null) || return 1
case "$command" in
nginx|*/nginx) ;;
*) return 1 ;;
esac
args=$(ps -p "$pid" -o args= 2>/dev/null) || return 1
case "$args" in
*"$REPO_ROOT/docker/nginx/nginx.local.conf"*|*"$REPO_ROOT"*) return 0 ;;
esac
_is_repo_pid "$pid"
}
_kill_repo_nginx() {
local pid
local pids=""
if [ -f "$REPO_ROOT/logs/nginx.pid" ]; then
read -r pid < "$REPO_ROOT/logs/nginx.pid" || true
if [ -n "$pid" ] && _is_repo_nginx_pid "$pid"; then
pids="$pids $pid"
fi
fi
while IFS= read -r pid; do
if [ -n "$pid" ] && _is_repo_nginx_pid "$pid"; then
case " $pids " in
*" $pid "*) ;;
*) pids="$pids $pid" ;;
esac
fi
done < <(pgrep -f nginx 2>/dev/null || true)
if [ -n "$pids" ]; then
kill -9 $pids 2>/dev/null || true
fi fi
} }
stop_all() { stop_all() {
echo "Stopping all services..." echo "Stopping all services..."
pkill -f "uvicorn app.gateway.app:app" 2>/dev/null || true _kill_repo_processes "uvicorn app.gateway.app:app"
pkill -f "next dev" 2>/dev/null || true _kill_repo_processes "next dev"
pkill -f "next start" 2>/dev/null || true _kill_repo_processes "next start"
pkill -f "next-server" 2>/dev/null || true _kill_repo_processes "next-server"
nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true
sleep 1 sleep 1
pkill -9 nginx 2>/dev/null || true _kill_repo_nginx
# Force-kill any survivors still holding the service ports # Force-kill any survivors still holding the service ports
_kill_port 8001 _kill_repo_port 8001
_kill_port 3000 _kill_repo_port 3000
./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true ./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true
echo "✓ All services stopped" echo "✓ All services stopped"
} }
@@ -216,13 +318,15 @@ echo ""
# ── Cleanup handler ────────────────────────────────────────────────────────── # ── Cleanup handler ──────────────────────────────────────────────────────────
cleanup() { cleanup() {
local status="${1:-0}"
trap - INT TERM trap - INT TERM
echo "" echo ""
stop_all stop_all
exit 0 exit "$status"
} }
trap cleanup INT TERM trap 'cleanup 130' INT
trap 'cleanup 143' TERM
# ── Helper: start a service ────────────────────────────────────────────────── # ── Helper: start a service ──────────────────────────────────────────────────
@@ -231,6 +335,12 @@ trap cleanup INT TERM
run_service() { run_service() {
local name="$1" cmd="$2" port="$3" timeout="$4" local name="$1" cmd="$2" port="$3" timeout="$4"
if _is_port_listening "$port"; then
echo "$name cannot start because port $port is already in use."
echo " If it belongs to this worktree, run 'make stop'; otherwise free the port manually."
cleanup 1
fi
echo "Starting $name..." echo "Starting $name..."
if $DAEMON_MODE; then if $DAEMON_MODE; then
nohup sh -c "$cmd" > /dev/null 2>&1 & nohup sh -c "$cmd" > /dev/null 2>&1 &
@@ -242,7 +352,7 @@ run_service() {
local logfile="logs/$(echo "$name" | tr '[:upper:]' '[:lower:]' | tr ' ' '-').log" local logfile="logs/$(echo "$name" | tr '[:upper:]' '[:lower:]' | tr ' ' '-').log"
echo "$name failed to start." echo "$name failed to start."
[ -f "$logfile" ] && tail -20 "$logfile" [ -f "$logfile" ] && tail -20 "$logfile"
cleanup cleanup 1
} }
echo "$name started on localhost:$port" echo "$name started on localhost:$port"
} }