Compare commits

..

7 Commits

Author SHA1 Message Date
QY 92905e9e3e fix(todo): reuse thread state schema (#3206)
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-26 23:58:08 +08:00
AochenShen99 da41701f87 Add static blocking IO inventory (#3208)
* feat(detectors): add static blocking IO inventory

* refactor(detectors): drop superseded runtime probe; clarify static report path

- Remove the #2924 custom runtime blocking IO probe entirely:
  backend/tests/support/detectors/blocking_io.py,
  backend/tests/test_blocking_io_detector.py,
  backend/tests/test_blocking_io_probe_integration.py, and the
  pytest_addoption / pytest_runtest_call / pytest_runtest_teardown /
  pytest_sessionfinish / pytest_terminal_summary hooks plus the
  blocking_io_detector fixture from backend/tests/conftest.py.
  Its narrow DEFAULT_BLOCKING_CALL_SPECS (time.sleep, requests, httpx,
  os.walk, Path.resolve, Path.read_text, Path.write_text) cannot serve
  as a CI gate; a Blockbuster-backed runtime detector will land in a
  separate follow-up PR. Leaving the half-coverage probe alongside
  the static inventory in this PR added a redundant detect path with
  no production value.
- Address Copilot review comments on backend/README.md and
  backend/CLAUDE.md by stating explicitly that the JSON report writes
  to .deer-flow/blocking-io-findings.json at the repository root,
  whether the target is invoked from the repo root or from backend/.

Verified: pytest tests/test_detect_blocking_io_static.py (18 passed),
ruff check + format on touched files (passed), make detect-blocking-io
from both repo root and backend/ produce the same 105-finding report
at <repo-root>/.deer-flow/blocking-io-findings.json.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-26 23:30:24 +08:00
Xinmin Zeng e02801944a chore: add a pull request template (#3259)
* chore: add pull request template

* fix: address Copilot review on PR template

- Reword the issue-link comment (plain #123 links; Fixes/Closes only auto-closes)
- Remove the standalone '-' bullets under Bug fix verification / Validation
- Align Validation commands with CI (frontend format + build with BETTER_AUTH_SECRET)
2026-05-26 23:25:29 +08:00
Stellar鱼 b00749a8a6 fix(auth): share internal gateway token across workers (#3184)
* fix(auth): share internal gateway token across workers

* fix: restore deploy script executable bit

* Update deploy.sh

to skip the auth_token setup for the down command

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-05-26 23:19:57 +08:00
AochenShen99 e344be8d94 feat(tests): add Blockbuster runtime gate for event-loop blocking IO (#3229)
* feat(tests): add Blockbuster runtime gate for event-loop blocking IO

Adds a strict runtime gate that fails CI when sync blocking IO calls run
on the asyncio event loop thread through DeerFlow business code.

Components:
- backend/tests/support/detectors/blocking_io_runtime.py — Blockbuster
  context scoped to `app.*` and `deerflow.*` so test infrastructure,
  pytest internals, and third-party libraries stay silent.
- backend/tests/blocking_io/conftest.py — pytest_runtest_protocol
  hookwrapper that wraps every item (setup + call + teardown) with the
  strict context. Respects `@pytest.mark.allow_blocking_io` opt-out.
- backend/tests/blocking_io/test_skills_load.py — regression anchor for
  the #1917 fix (asyncio.to_thread offload around
  LocalSkillStorage.load_skills).
- backend/tests/blocking_io/test_sqlite_lifespan.py — regression anchor
  for the #1912 fix (asyncio.to_thread offload around
  ensure_sqlite_parent_dir).
- backend/tests/blocking_io/test_gate_smoke.py — meta-test asserting the
  gate actually catches unoffloaded blocking IO and that the
  `@pytest.mark.allow_blocking_io` opt-out works.
- backend/Makefile — `make test-blocking-io` target.
- .github/workflows/backend-blocking-io-tests.yml — hard-fail PR gate on
  ubuntu-latest. Windows matrix deferred to follow-up.

Dependencies:
- blockbuster>=1.5.26,<1.6 added to dev group.

Coverage boundary (called out in PR body): the gate only catches blocking
IO on code paths the test suite actually exercises. Static AST inventory
(separate, informational) is the complementary coverage tool. Three blind
spot categories — untested paths, mocked-away paths, env-mismatched paths
— are documented in the PR description.

Findings surfaced while authoring this PR:
- resolve_sqlite_conn_str in runtime/store/_sqlite_utils.py:19 does sync
  Path.resolve() -> os.path.abspath on the lifespan loop thread, ahead of
  the #1912 fix. Not addressed here; tracked as follow-up.

Tests: 4 passed locally (`make test-blocking-io`).
Lint/format: clean (`ruff check` and `ruff format --check`).

* fix(tests): scope Blockbuster gate to blocking-io suite

* fix(tests): harden Blockbuster runtime gate

* test(blocking-io): add project rule extension point

* test(blocking-io): address review cleanup
2026-05-26 23:03:49 +08:00
Admire f68bcb771c fix(frontend): guard message copy clipboard access (#3211)
* fix(frontend): guard message copy clipboard access

* fix(frontend): reuse clipboard guard across copy actions
2026-05-26 09:37:51 +08:00
AochenShen99 11dd5b0683 fix(frontend): strip unclosed <think> tags from streaming AI content (#3218)
* fix(frontend): strip unclosed <think> tags from streaming AI content

During streaming, an opening <think> tag may arrive in one chunk
while the matching </think> arrives in a later chunk. The existing
splitInlineReasoning regex only matched fully closed pairs, so the
mid-flight reasoning was left in message.content and rendered into
the chat bubble via the markdown pipeline's rehypeRaw plugin until
the closing tag landed.

Extend splitInlineReasoning with a second pass: after stripping every
closed <think>...</think> pair, route any remaining content from a
lone opener to the reasoning slot and leave only the preceding
preamble in content. Closed-tag behavior is unchanged.

Covers every provider whose stream emits reasoning inline as <think>
tags (MiniMax streaming path, MindIE, PatchedChatOpenAI, and any
gateway-served DeepSeek/OpenAI-compatible model).

* style(frontend): apply prettier formatting to streaming reasoning tests

* fix(frontend): skip <think> split for literal think tags in inline code

Treats a `<think>` opener immediately preceded by a backtick as part of
markdown inline code rather than a streaming reasoning marker. Prevents
permanent content truncation when an AI message documents the `<think>`
tag literally (e.g. ``Use `<think>` markers``), where the streaming-safe
fallback would otherwise route the rest of the answer into the reasoning
panel because no `</think>` ever arrives.

Adds regression tests for both the post-stream and mid-stream cases.
2026-05-26 09:35:07 +08:00
42 changed files with 2376 additions and 743 deletions
+5
View File
@@ -50,6 +50,11 @@ INFOQUEST_API_KEY=your-infoquest-api-key
# Set to "false" to disable Swagger UI, ReDoc, and OpenAPI schema in production
# GATEWAY_ENABLE_DOCS=false
# Shared internal Gateway auth token for multi-worker deployments.
# `make up` generates and persists this automatically; set it manually only
# when you run Gateway workers outside the bundled deploy script.
# DEER_FLOW_INTERNAL_AUTH_TOKEN=your-shared-internal-token
# ── Frontend SSR → Gateway wiring ─────────────────────────────────────────────
# The Next.js server uses these to reach the Gateway during SSR (auth checks,
# /api/* rewrites). They default to localhost values that match `make dev` and
+61
View File
@@ -0,0 +1,61 @@
<!-- Reference a related issue with #123. Use Fixes / Closes / Resolves to
auto-close it on merge. Delete this line if the PR doesn't reference an issue. -->
Fixes #
## Why
<!-- Why are you opening this PR? Cover two things:
- The trigger — what made you write this? A bug you hit, a feature you need,
tech debt, or a prod issue?
- The pain being addressed — user-facing problem, or what it unblocks.
For non-trivial features, please open an issue/discussion first to align on
scope before writing code. -->
## What changed
<!-- Describe the change from a user's / caller's perspective, not as a code diff. e.g.:
- "Settings now has a 'Custom endpoint' field, off by default"
- "Backend /api/chat gains a `stream` flag, defaults to false"
- "Default model changed from X to Y — existing users notice on first run" -->
## Surface area
<!-- Check every box that applies. Reviewers use this to scope the review. -->
- [ ] **Frontend UI** — page / component / setting / interaction under `frontend/`
- [ ] **Backend API** — endpoint / SSE event / request-response shape under `backend/app`
- [ ] **Agents / LangGraph** — agent node, graph wiring, `langgraph.json`, or prompt change
- [ ] **Sandbox**`docker/` or sandboxed execution
- [ ] **Skills** — change under `skills/`
- [ ] **Dependencies** — new/upgraded entry in `backend/pyproject.toml` or `frontend/package.json` (say what it buys us)
- [ ] **Default behavior change** — changes existing behavior without the user opting in (default model, default setting, data shape)
- [ ] **Docs / tests / CI only** — no runtime behavior change
## Screenshots / Recording
<!-- If you checked "Frontend UI", attach screenshots showing the entry point —
where users discover the change — not just the feature in isolation.
Before/after is best for behavior changes. Short GIFs welcome. -->
## Bug fix verification
<!-- Skip (delete) this section if this PR is not a bug fix.
Bugs should be encoded as a failing test that goes red before the fix.
Confirm:
- Test path that reproduces the bug:
- Did it go red on `main` and green on this branch? (yes / no)
- If a red test wasn't cheap to write, explain why and what you did instead. -->
## Validation
<!-- What you actually ran. Run at least the checks for the area you changed:
Backend: cd backend && make lint && make test
Frontend: cd frontend && pnpm format && pnpm lint && pnpm typecheck && BETTER_AUTH_SECRET=local-dev-secret pnpm build && make test
Frontend E2E (if you touched frontend/): cd frontend && make test-e2e -->
@@ -0,0 +1,46 @@
name: Backend Blocking IO
on:
push:
branches: ["main"]
paths:
- "backend/**"
- ".github/workflows/backend-blocking-io-tests.yml"
pull_request:
types: [opened, synchronize, reopened, ready_for_review]
paths:
- "backend/**"
- ".github/workflows/backend-blocking-io-tests.yml"
concurrency:
group: blocking-io-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
permissions:
contents: read
jobs:
backend-blocking-io:
if: github.event_name != 'pull_request' || github.event.pull_request.draft == false
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v3
- name: Install backend dependencies
working-directory: backend
run: uv sync --group dev
- name: Run blocking IO regression tests
working-directory: backend
run: make test-blocking-io
+5 -1
View File
@@ -1,6 +1,6 @@
# DeerFlow - Unified Development Environment
.PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway
.PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries detect-blocking-io dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway
BASH ?= bash
BACKEND_UV_RUN = cd backend && uv run
@@ -24,6 +24,7 @@ help:
@echo " make config-upgrade - Merge new fields from config.example.yaml into config.yaml"
@echo " make check - Check if all required tools are installed"
@echo " make detect-thread-boundaries - Inventory async/thread boundary points"
@echo " make detect-blocking-io - Inventory blocking IO that may block the backend event loop"
@echo " make install - Install all dependencies (frontend + backend + pre-commit hooks)"
@echo " make setup-sandbox - Pre-pull sandbox container image (recommended)"
@echo " make dev - Start all services in development mode (with hot-reloading)"
@@ -55,6 +56,9 @@ doctor:
detect-thread-boundaries:
@$(PYTHON) ./scripts/detect_thread_boundaries.py
detect-blocking-io:
@$(MAKE) -C backend detect-blocking-io
config:
@$(PYTHON) ./scripts/configure.py
+6
View File
@@ -740,6 +740,12 @@ DeerFlow has key high-privilege capabilities including **system command executio
We welcome contributions! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for development setup, workflow, and guidelines.
Regression coverage includes Docker sandbox mode detection and provisioner kubeconfig-path handling tests in `backend/tests/`.
Backend blocking-IO diagnostics are available from the repository root with
`make detect-blocking-io`: it statically scans backend business code for
blocking IO that may run on the backend event loop, prints a concise summary,
and writes complete JSON findings to `.deer-flow/blocking-io-findings.json`.
The JSON includes compact review records with `priority`, `location`,
`blocking_call`, `event_loop_exposure`, `reason`, and `code`.
Gateway artifact serving now forces active web content types (`text/html`, `application/xhtml+xml`, `image/svg+xml`) to download as attachments instead of inline rendering, reducing XSS risk for generated artifacts.
## License
+35
View File
@@ -92,14 +92,49 @@ make install # Install backend dependencies
make dev # Run Gateway API with reload (port 8001)
make gateway # Run Gateway API only (port 8001)
make test # Run all backend tests
make test-blocking-io # Run strict Blockbuster runtime gate on tests/blocking_io/
make lint # Lint with ruff
make format # Format code with ruff
```
The `detect-blocking-io` target parses `app/`, `packages/harness/deerflow/`,
and `scripts/` with AST. By default it reports only blocking IO candidates that
are inside async code, reachable from async code in the same file, or reachable
from sync-only `AgentMiddleware` before/after hooks that LangGraph can execute
on the async graph path. It prints a concise summary and writes complete JSON
findings to `.deer-flow/blocking-io-findings.json` at the repository root
(both `make detect-blocking-io` from the repo root and `cd backend && make
detect-blocking-io` resolve to the same repo-root path). JSON findings include
`priority`, `location`, `blocking_call`, `event_loop_exposure`, `reason`, and
`code` for model-assisted or manual review. `priority` is a deterministic
review ordering from operation type, not proof of a bug. Bare-name same-file
calls are resolved by function name, so duplicate helper names in one file can
conservatively over-report async reachability. It is intentionally
informational and is not run from CI in this round.
Regression tests related to Docker/provisioner behavior:
- `tests/test_docker_sandbox_mode_detection.py` (mode detection from `config.yaml`)
- `tests/test_provisioner_kubeconfig.py` (kubeconfig file/directory handling)
Blocking-IO runtime gate (`tests/blocking_io/`):
- Wraps every item under `tests/blocking_io/` with a strict Blockbuster
context scoped to `app.*` and `deerflow.*` (see
`tests/support/detectors/blocking_io_runtime.py`). Any sync blocking IO
call whose stack passes through DeerFlow business code while running on
the asyncio event loop raises `BlockingError` and fails the test.
- Two regression anchors live there: `test_skills_load.py` (locks the
`asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix
for #1917) and `test_sqlite_lifespan.py` (locks the offload around
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912).
- `test_gate_smoke.py` is a meta-test asserting the gate actually catches
unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io`
opt-out works.
- Coverage boundary: the gate only sees code that test execution actually
touches. Static AST coverage is a separate concern (out of scope for
this PR).
- CI: runs on every PR via `.github/workflows/backend-blocking-io-tests.yml`,
hard-fail.
Boundary check (harness → app import firewall):
- `tests/test_harness_boundary.py` — ensures `packages/harness/deerflow/` never imports from `app.*`
+6
View File
@@ -10,9 +10,15 @@ gateway:
test:
PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run pytest tests/ -v
test-blocking-io:
PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run pytest tests/blocking_io -q --tb=short
lint:
uvx ruff check .
uvx ruff format --check .
format:
uvx ruff check . --fix && uvx ruff format .
detect-blocking-io:
@PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run python ../scripts/detect_blocking_io_static.py --output ../.deer-flow/blocking-io-findings.json
+13
View File
@@ -362,6 +362,7 @@ make dev # Run Gateway API + embedded agent runtime (port 8001)
make gateway # Run Gateway API without reload (port 8001)
make lint # Run linter (ruff)
make format # Format code (ruff)
make detect-blocking-io # Inventory blocking IO that may block the backend event loop
```
### Code Style
@@ -378,6 +379,18 @@ make format # Format code (ruff)
uv run pytest
```
`make detect-blocking-io` statically scans backend business code for blocking
IO that may run on the backend event loop and is not test-coverage-bound. It
prints a concise summary for human review and writes complete JSON findings to
`.deer-flow/blocking-io-findings.json` at the repository root (regardless of
whether the target is invoked from the repo root or from `backend/`). JSON
findings include both broad IO category and review-oriented fields such as
`priority`, `location`, `blocking_call`, `event_loop_exposure`, `reason`, and
`code`. `priority` is a deterministic review ordering from the operation type,
not proof of a bug. Bare-name same-file calls are resolved by function name,
so duplicate helper names in one file can conservatively over-report async
reachability.
---
## Technology Stack
+15 -4
View File
@@ -1,23 +1,34 @@
"""Process-local authentication for Gateway internal callers."""
"""Authentication for trusted Gateway internal callers."""
from __future__ import annotations
import os
import secrets
from types import SimpleNamespace
from deerflow.runtime.user_context import DEFAULT_USER_ID
INTERNAL_AUTH_HEADER_NAME = "X-DeerFlow-Internal-Token"
_INTERNAL_AUTH_TOKEN = secrets.token_urlsafe(32)
INTERNAL_AUTH_ENV_VAR = "DEER_FLOW_INTERNAL_AUTH_TOKEN"
def _load_internal_auth_token() -> str:
token = os.environ.get(INTERNAL_AUTH_ENV_VAR)
if token:
return token
return secrets.token_urlsafe(32)
_INTERNAL_AUTH_TOKEN = _load_internal_auth_token()
def create_internal_auth_headers() -> dict[str, str]:
"""Return headers that authenticate same-process Gateway internal calls."""
"""Return headers that authenticate trusted Gateway internal calls."""
return {INTERNAL_AUTH_HEADER_NAME: _INTERNAL_AUTH_TOKEN}
def is_valid_internal_auth_token(token: str | None) -> bool:
"""Return True when *token* matches the process-local internal token."""
"""Return True when *token* matches this Gateway worker's internal token."""
return bool(token) and secrets.compare_digest(token, _INTERNAL_AUTH_TOKEN)
@@ -20,11 +20,13 @@ from collections.abc import Awaitable, Callable
from typing import Any, override
from langchain.agents.middleware import TodoListMiddleware
from langchain.agents.middleware.todo import PlanningState, Todo
from langchain.agents.middleware.todo import Todo
from langchain.agents.middleware.types import ModelCallResult, ModelRequest, ModelResponse, hook_config
from langchain_core.messages import AIMessage, HumanMessage
from langgraph.runtime import Runtime
from deerflow.agents.thread_state import ThreadState
def _todos_in_messages(messages: list[Any]) -> bool:
"""Return True if any AIMessage in *messages* contains a write_todos tool call."""
@@ -113,10 +115,12 @@ class TodoMiddleware(TodoListMiddleware):
and injects a reminder message so the model can continue tracking progress.
"""
state_schema = ThreadState
@override
def before_model(
self,
state: PlanningState,
state: ThreadState,
runtime: Runtime,
) -> dict[str, Any] | None:
"""Inject a todo-list reminder when write_todos has left the context window."""
@@ -154,7 +158,7 @@ class TodoMiddleware(TodoListMiddleware):
@override
async def abefore_model(
self,
state: PlanningState,
state: ThreadState,
runtime: Runtime,
) -> dict[str, Any] | None:
"""Async version of before_model."""
@@ -249,12 +253,12 @@ class TodoMiddleware(TodoListMiddleware):
self._drop_completion_reminder_key_locked(key)
@override
def before_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
def before_agent(self, state: ThreadState, runtime: Runtime) -> dict[str, Any] | None:
self._clear_other_run_completion_reminders(runtime)
return None
@override
async def abefore_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
async def abefore_agent(self, state: ThreadState, runtime: Runtime) -> dict[str, Any] | None:
self._clear_other_run_completion_reminders(runtime)
return None
@@ -262,7 +266,7 @@ class TodoMiddleware(TodoListMiddleware):
@override
def after_model(
self,
state: PlanningState,
state: ThreadState,
runtime: Runtime,
) -> dict[str, Any] | None:
"""Prevent premature agent exit when todo items are still incomplete.
@@ -308,7 +312,7 @@ class TodoMiddleware(TodoListMiddleware):
@hook_config(can_jump_to=["model"])
async def aafter_model(
self,
state: PlanningState,
state: ThreadState,
runtime: Runtime,
) -> dict[str, Any] | None:
"""Async version of after_model."""
@@ -349,11 +353,11 @@ class TodoMiddleware(TodoListMiddleware):
return await handler(self._augment_request(request))
@override
def after_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
def after_agent(self, state: ThreadState, runtime: Runtime) -> dict[str, Any] | None:
self._clear_current_run_completion_reminders(runtime)
return None
@override
async def aafter_agent(self, state: PlanningState, runtime: Runtime) -> dict[str, Any] | None:
async def aafter_agent(self, state: ThreadState, runtime: Runtime) -> dict[str, Any] | None:
self._clear_current_run_completion_reminders(runtime)
return None
+3 -12
View File
@@ -1,4 +1,4 @@
"""Load MCP tools using langchain-mcp-adapters with stdio session pooling."""
"""Load MCP tools using langchain-mcp-adapters with persistent sessions."""
from __future__ import annotations
@@ -173,10 +173,8 @@ def _make_session_pool_tool(
async def get_mcp_tools() -> list[BaseTool]:
"""Get all tools from enabled MCP servers.
Tools using stdio transport are wrapped with persistent-session logic so
consecutive calls within the same thread reuse the same MCP session.
HTTP/SSE tools are returned unwrapped to avoid cross-task TaskGroup
cleanup errors.
Tools are wrapped with persistent-session logic so that consecutive
calls within the same thread reuse the same MCP session.
Returns:
List of LangChain tools from all enabled MCP servers.
@@ -253,9 +251,6 @@ async def get_mcp_tools() -> list[BaseTool]:
logger.info(f"Successfully loaded {len(tools)} tool(s) from MCP servers")
# Wrap each tool with persistent-session logic.
# Only pool stdio sessions. HTTP/SSE transports use anyio TaskGroups
# internally which cannot be closed from a different async task, so
# pooling them causes RuntimeError on cleanup (see #3203).
wrapped_tools: list[BaseTool] = []
for tool in tools:
tool_server: str | None = None
@@ -265,13 +260,9 @@ async def get_mcp_tools() -> list[BaseTool]:
break
if tool_server is not None:
transport = servers_config[tool_server].get("transport", "stdio")
if transport == "stdio":
wrapped_tools.append(_make_session_pool_tool(tool, tool_server, servers_config[tool_server], tool_interceptors))
else:
wrapped_tools.append(tool)
else:
wrapped_tools.append(tool)
# Patch tools to support sync invocation, as deerflow client streams synchronously
for tool in wrapped_tools:
@@ -34,6 +34,19 @@ from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resol
logger = logging.getLogger(__name__)
def _prepare_sqlite_checkpointer_path(raw: str) -> str:
conn_str = resolve_sqlite_conn_str(raw)
ensure_sqlite_parent_dir(conn_str)
return conn_str
def _prepare_database_sqlite_checkpointer_path(db_config) -> str:
conn_str = db_config.checkpointer_sqlite_path
ensure_sqlite_parent_dir(conn_str)
return conn_str
# ---------------------------------------------------------------------------
# Async factory
# ---------------------------------------------------------------------------
@@ -54,8 +67,7 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]:
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db")
await asyncio.to_thread(ensure_sqlite_parent_dir, conn_str)
conn_str = await asyncio.to_thread(_prepare_sqlite_checkpointer_path, config.connection_string or "store.db")
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
@@ -98,8 +110,7 @@ async def _async_checkpointer_from_database(db_config) -> AsyncIterator[Checkpoi
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
conn_str = db_config.checkpointer_sqlite_path
ensure_sqlite_parent_dir(conn_str)
conn_str = await asyncio.to_thread(_prepare_database_sqlite_checkpointer_path, db_config)
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
+2
View File
@@ -29,6 +29,7 @@ discord = ["discord.py>=2.7.0"]
[dependency-groups]
dev = [
"blockbuster>=1.5.26,<1.6",
"prompt-toolkit>=3.0.0",
"pytest>=9.0.3",
"pytest-asyncio>=1.3.0",
@@ -38,6 +39,7 @@ dev = [
[tool.pytest.ini_options]
markers = [
"no_auto_user: disable the conftest autouse contextvar fixture for this test",
"allow_blocking_io: opt out of the strict Blockbuster gate in tests/blocking_io/",
]
[tool.uv]
+37
View File
@@ -0,0 +1,37 @@
"""Pytest conftest for the strict Blockbuster runtime gate.
Activates `detect_blocking_io_strict()` around the entire pytest item
protocol (setup + call + teardown) so blocking IO in async fixtures and
lifespan code is also caught, not just blocking IO inside the test body.
Scope: only applies to items whose path is under `backend/tests/blocking_io/`.
Pytest registers conftest hookwrappers globally once the file is loaded,
so an explicit path filter is required to keep the strict gate from
firing on unrelated tests when the full suite is collected.
Opt-out: mark a test with `@pytest.mark.allow_blocking_io` to skip the gate.
"""
from __future__ import annotations
from collections.abc import Generator
from pathlib import Path
import pytest
from support.detectors.blocking_io_runtime import detect_blocking_io_strict
_BLOCKING_IO_TEST_ROOT = Path(__file__).resolve().parent
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_protocol(item: pytest.Item, nextitem: pytest.Item | None) -> Generator[None, None, None]:
if not _is_blocking_io_item(item) or item.get_closest_marker("allow_blocking_io") is not None:
yield
return
with detect_blocking_io_strict():
yield
def _is_blocking_io_item(item: pytest.Item) -> bool:
return Path(item.path).resolve().is_relative_to(_BLOCKING_IO_TEST_ROOT)
@@ -0,0 +1,55 @@
"""Smoke test: the strict Blockbuster gate is wired up and actively catching.
Independent of any specific production code path, asserts that calling a
known blocking IO function directly from an `async def` (without an
`asyncio.to_thread` wrapper) raises `BlockingError`. If this test ever
stops raising, the gate machinery itself is broken — typical causes are
`scanned_modules` misconfiguration, accidental removal of the Blockbuster
dev dependency, or the conftest hookwrapper no longer firing.
This is the meta-test that protects every other test in this directory
from silent regressions (a green gate that no longer catches anything is
worse than no gate at all).
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from blockbuster import BlockingError
from support.detectors.blocking_io_runtime import detect_blocking_io_strict
pytestmark = pytest.mark.asyncio
async def test_gate_catches_unoffloaded_blocking_io_in_deerflow_module(tmp_path: Path) -> None:
from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir
db_file = tmp_path / "subdir" / "store.db"
with pytest.raises(BlockingError):
ensure_sqlite_parent_dir(str(db_file))
async def test_gate_restores_blockbuster_patches_after_exceptions() -> None:
original_stat = os.stat
with pytest.raises(RuntimeError, match="boom"):
with detect_blocking_io_strict():
raise RuntimeError("boom")
assert os.stat is original_stat
@pytest.mark.allow_blocking_io
async def test_allow_blocking_io_marker_opts_out_of_gate(tmp_path: Path) -> None:
"""Verify the @pytest.mark.allow_blocking_io opt-out actually disables the gate."""
from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir
db_file = tmp_path / "subdir" / "store.db"
ensure_sqlite_parent_dir(str(db_file))
assert db_file.parent.exists()
@@ -0,0 +1,102 @@
"""Regression test: skill loading must remain releasable to a worker thread.
Anchors the production offload from `subagents/executor.py:_load_skills`,
where both `get_or_new_skill_storage` and the sync `storage.load_skills(...)`
method are dispatched via `asyncio.to_thread`. That fix addressed #1917,
where `os.walk` inside `load_skills` blocked the LangGraph async event loop.
This test invokes the production `_load_skills()` call path under the strict
Blockbuster context against a real `LocalSkillStorage` instance pointed at
a tmp directory. If the production `asyncio.to_thread` offload is removed,
Blockbuster raises `BlockingError` and this test fails.
"""
from __future__ import annotations
import importlib
import sys
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
pytestmark = pytest.mark.asyncio
_MISSING = object()
_EXECUTOR_IMPORT_MOCKS = (
"deerflow.agents",
"deerflow.agents.thread_state",
"deerflow.models",
)
def _seed_skill(skills_root: Path) -> None:
skill = skills_root / "public" / "demo"
skill.mkdir(parents=True, exist_ok=True)
(skill / "SKILL.md").write_text(
"---\nname: demo\ndescription: regression-test skill\n---\n# demo\n",
encoding="utf-8",
)
@contextmanager
def _real_subagent_executor() -> Iterator[type]:
"""Import the real executor despite the suite-level circular-import mock."""
original_modules = {name: sys.modules.get(name, _MISSING) for name in _EXECUTOR_IMPORT_MOCKS}
original_executor = sys.modules.get("deerflow.subagents.executor", _MISSING)
parent_module = sys.modules.get("deerflow.subagents")
original_parent_executor = getattr(parent_module, "executor", _MISSING) if parent_module is not None else _MISSING
sys.modules.pop("deerflow.subagents.executor", None)
for name in _EXECUTOR_IMPORT_MOCKS:
sys.modules[name] = MagicMock()
try:
executor_module = importlib.import_module("deerflow.subagents.executor")
yield executor_module.SubagentExecutor
finally:
if original_executor is _MISSING:
sys.modules.pop("deerflow.subagents.executor", None)
else:
sys.modules["deerflow.subagents.executor"] = original_executor
if parent_module is not None:
if original_parent_executor is _MISSING:
try:
delattr(parent_module, "executor")
except AttributeError:
pass
else:
parent_module.executor = original_parent_executor
for name, module in original_modules.items():
if module is _MISSING:
sys.modules.pop(name, None)
else:
sys.modules[name] = module
async def test_load_skills_via_to_thread_does_not_block_event_loop(tmp_path: Path) -> None:
from deerflow.config.skills_config import SkillsConfig
from deerflow.subagents.config import SubagentConfig
_seed_skill(tmp_path)
with _real_subagent_executor() as SubagentExecutor:
executor = SubagentExecutor(
config=SubagentConfig(
name="demo",
description="Loads skills through the production async path.",
),
tools=[],
app_config=SimpleNamespace(skills=SkillsConfig(path=str(tmp_path))),
parent_model="test-model",
)
skills = await executor._load_skills()
assert isinstance(skills, list)
assert any(s.name == "demo" for s in skills)
@@ -0,0 +1,52 @@
"""Regression test: sqlite path setup must run off the event loop.
Anchors the production offload from
`runtime/checkpointer/async_provider.py:_async_checkpointer`, where SQLite
path resolution and `ensure_sqlite_parent_dir` are dispatched via
`await asyncio.to_thread(...)`.
That fix addressed #1912, where the sync `Path.mkdir` / `os.mkdir` inside
`ensure_sqlite_parent_dir` ran on the FastAPI lifespan event loop thread
and blocked startup.
This test invokes the production `_async_checkpointer()` path under the
strict Blockbuster context. The target path's parent does not yet exist, so
the underlying path resolution and `os.mkdir` both execute. If either step is
regressed to run directly on the event loop, Blockbuster raises
`BlockingError` and this test fails.
"""
from __future__ import annotations
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
pytestmark = pytest.mark.asyncio
async def test_async_checkpointer_sqlite_setup_does_not_block_event_loop(tmp_path: Path) -> None:
from deerflow.config.checkpointer_config import CheckpointerConfig
from deerflow.runtime.checkpointer.async_provider import _async_checkpointer
db_file = tmp_path / "subdir" / "store.db"
mock_saver = AsyncMock()
mock_context_manager = AsyncMock()
mock_context_manager.__aenter__.return_value = mock_saver
mock_context_manager.__aexit__.return_value = False
mock_saver_cls = MagicMock()
mock_saver_cls.from_conn_string.return_value = mock_context_manager
mock_module = MagicMock()
mock_module.AsyncSqliteSaver = mock_saver_cls
with patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}):
async with _async_checkpointer(CheckpointerConfig(type="sqlite", connection_string=str(db_file))) as saver:
assert saver is mock_saver
assert db_file.parent.exists()
mock_saver_cls.from_conn_string.assert_called_once_with(str(db_file.resolve()))
mock_saver.setup.assert_awaited_once()
-91
View File
@@ -13,16 +13,11 @@ from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from support.detectors.blocking_io import BlockingIOProbe, detect_blocking_io
# Make 'app' and 'deerflow' importable from any working directory
sys.path.insert(0, str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "scripts"))
_BACKEND_ROOT = Path(__file__).resolve().parents[1]
_blocking_io_probe = BlockingIOProbe(_BACKEND_ROOT)
_BLOCKING_IO_DETECTOR_ATTR = "_blocking_io_detector"
# Break the circular import chain that exists in production code:
# deerflow.subagents.__init__
# -> .executor (SubagentExecutor, SubagentResult)
@@ -63,92 +58,6 @@ def provisioner_module():
return module
@pytest.fixture()
def blocking_io_detector():
"""Fail a focused test if blocking calls run on the event loop thread."""
with detect_blocking_io(fail_on_exit=True) as detector:
yield detector
def pytest_addoption(parser: pytest.Parser) -> None:
group = parser.getgroup("blocking-io")
group.addoption(
"--detect-blocking-io",
action="store_true",
default=False,
help="Collect blocking calls made while an asyncio event loop is running and report a summary.",
)
group.addoption(
"--detect-blocking-io-fail",
action="store_true",
default=False,
help="Set a failing exit status when --detect-blocking-io records violations.",
)
def pytest_configure(config: pytest.Config) -> None:
config.addinivalue_line("markers", "no_blocking_io_probe: skip the optional blocking IO probe")
def pytest_sessionstart(session: pytest.Session) -> None:
if _blocking_io_probe_enabled(session.config):
_blocking_io_probe.clear()
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item: pytest.Item):
if not _blocking_io_probe_enabled(item.config) or _blocking_io_probe_skipped(item):
yield
return
detector = detect_blocking_io(fail_on_exit=False, stack_limit=18)
detector.__enter__()
setattr(item, _BLOCKING_IO_DETECTOR_ATTR, detector)
yield
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(item: pytest.Item):
yield
detector = getattr(item, _BLOCKING_IO_DETECTOR_ATTR, None)
if detector is None:
return
try:
detector.__exit__(None, None, None)
_blocking_io_probe.record(item.nodeid, detector.violations)
finally:
delattr(item, _BLOCKING_IO_DETECTOR_ATTR)
def pytest_sessionfinish(session: pytest.Session) -> None:
if _blocking_io_fail_enabled(session.config) and _blocking_io_probe.violation_count and session.exitstatus == pytest.ExitCode.OK:
session.exitstatus = pytest.ExitCode.TESTS_FAILED
def pytest_terminal_summary(terminalreporter: pytest.TerminalReporter) -> None:
if not _blocking_io_probe_enabled(terminalreporter.config):
return
header, *details = _blocking_io_probe.format_summary().splitlines()
terminalreporter.write_sep("=", header)
for line in details:
terminalreporter.write_line(line)
def _blocking_io_probe_enabled(config: pytest.Config) -> bool:
return bool(config.getoption("--detect-blocking-io") or config.getoption("--detect-blocking-io-fail"))
def _blocking_io_fail_enabled(config: pytest.Config) -> bool:
return bool(config.getoption("--detect-blocking-io-fail"))
def _blocking_io_probe_skipped(item: pytest.Item) -> bool:
return item.path.name == "test_blocking_io_detector.py" or item.get_closest_marker("no_blocking_io_probe") is not None
# ---------------------------------------------------------------------------
# Auto-set user context for every test unless marked no_auto_user
# ---------------------------------------------------------------------------
@@ -1,287 +0,0 @@
"""Test helper for detecting blocking calls on an asyncio event loop.
The detector is intentionally test-only. It monkeypatches a small set of
well-known blocking entry points and their already-loaded module-level aliases,
then records calls only when they happen on a thread that is currently running
an asyncio event loop. Aliases captured in closures or default arguments remain
out of scope.
"""
from __future__ import annotations
import asyncio
import importlib
import sys
import traceback
from collections import Counter
from collections.abc import Callable, Iterable, Iterator
from contextlib import AbstractContextManager
from dataclasses import dataclass
from functools import wraps
from pathlib import Path
from types import TracebackType
from typing import Any
BlockingCallable = Callable[..., Any]
@dataclass(frozen=True)
class BlockingCallSpec:
"""Describes one blocking callable to wrap during a detector run."""
name: str
target: str
record_on_iteration: bool = False
@dataclass(frozen=True)
class BlockingCall:
"""One blocking call observed on an asyncio event loop thread."""
name: str
target: str
stack: tuple[traceback.FrameSummary, ...]
DEFAULT_BLOCKING_CALL_SPECS: tuple[BlockingCallSpec, ...] = (
BlockingCallSpec("time.sleep", "time:sleep"),
BlockingCallSpec("requests.Session.request", "requests.sessions:Session.request"),
BlockingCallSpec("httpx.Client.request", "httpx:Client.request"),
BlockingCallSpec("os.walk", "os:walk", record_on_iteration=True),
BlockingCallSpec("pathlib.Path.resolve", "pathlib:Path.resolve"),
BlockingCallSpec("pathlib.Path.read_text", "pathlib:Path.read_text"),
BlockingCallSpec("pathlib.Path.write_text", "pathlib:Path.write_text"),
)
def _is_event_loop_thread() -> bool:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return False
return loop.is_running()
def _resolve_target(target: str) -> tuple[object, str, BlockingCallable]:
module_name, attr_path = target.split(":", maxsplit=1)
owner: object = importlib.import_module(module_name)
parts = attr_path.split(".")
for part in parts[:-1]:
owner = getattr(owner, part)
attr_name = parts[-1]
original = getattr(owner, attr_name)
return owner, attr_name, original
def _trim_detector_frames(stack: Iterable[traceback.FrameSummary]) -> tuple[traceback.FrameSummary, ...]:
return tuple(frame for frame in stack if frame.filename != __file__)
class BlockingIODetector(AbstractContextManager["BlockingIODetector"]):
"""Record blocking calls made from async runtime code.
By default the detector reports violations but does not fail on context
exit. Tests can set ``fail_on_exit=True`` or call
``assert_no_blocking_calls()`` explicitly.
"""
def __init__(
self,
specs: Iterable[BlockingCallSpec] = DEFAULT_BLOCKING_CALL_SPECS,
*,
fail_on_exit: bool = False,
patch_loaded_aliases: bool = True,
stack_limit: int = 12,
) -> None:
self._specs = tuple(specs)
self._fail_on_exit = fail_on_exit
self._patch_loaded_aliases_enabled = patch_loaded_aliases
self._stack_limit = stack_limit
self._patches: list[tuple[object, str, BlockingCallable]] = []
self._patch_keys: set[tuple[int, str]] = set()
self.violations: list[BlockingCall] = []
self._active = False
def __enter__(self) -> BlockingIODetector:
try:
self._active = True
alias_replacements: dict[int, BlockingCallable] = {}
for spec in self._specs:
owner, attr_name, original = _resolve_target(spec.target)
wrapper = self._wrap(spec, original)
self._patch_attribute(owner, attr_name, original, wrapper)
alias_replacements[id(original)] = wrapper
if self._patch_loaded_aliases_enabled:
self._patch_loaded_module_aliases(alias_replacements)
except Exception:
self._restore()
self._active = False
raise
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback_value: TracebackType | None,
) -> bool | None:
self._restore()
self._active = False
if exc_type is None and self._fail_on_exit:
self.assert_no_blocking_calls()
return None
def _restore(self) -> None:
for owner, attr_name, original in reversed(self._patches):
setattr(owner, attr_name, original)
self._patches.clear()
self._patch_keys.clear()
def _patch_attribute(self, owner: object, attr_name: str, original: BlockingCallable, replacement: BlockingCallable) -> None:
key = (id(owner), attr_name)
if key in self._patch_keys:
return
setattr(owner, attr_name, replacement)
self._patches.append((owner, attr_name, original))
self._patch_keys.add(key)
def _patch_loaded_module_aliases(self, replacements_by_id: dict[int, BlockingCallable]) -> None:
for module in tuple(sys.modules.values()):
namespace = getattr(module, "__dict__", None)
if not isinstance(namespace, dict):
continue
for attr_name, value in tuple(namespace.items()):
replacement = replacements_by_id.get(id(value))
if replacement is not None:
self._patch_attribute(module, attr_name, value, replacement)
def _wrap(self, spec: BlockingCallSpec, original: BlockingCallable) -> BlockingCallable:
@wraps(original)
def wrapper(*args: Any, **kwargs: Any) -> Any:
if spec.record_on_iteration:
result = original(*args, **kwargs)
return self._wrap_iteration(spec, result)
self._record_if_blocking(spec)
return original(*args, **kwargs)
return wrapper
def _wrap_iteration(self, spec: BlockingCallSpec, iterable: Iterable[Any]) -> Iterator[Any]:
iterator = iter(iterable)
reported = False
while True:
if not reported:
reported = self._record_if_blocking(spec)
try:
yield next(iterator)
except StopIteration:
return
def _record_if_blocking(self, spec: BlockingCallSpec) -> bool:
if self._active and _is_event_loop_thread():
stack = _trim_detector_frames(traceback.extract_stack(limit=self._stack_limit))
self.violations.append(BlockingCall(spec.name, spec.target, stack))
return True
return False
def assert_no_blocking_calls(self) -> None:
if self.violations:
raise AssertionError(format_blocking_calls(self.violations))
class BlockingIOProbe:
"""Collect detector output across tests and format a compact summary."""
def __init__(self, project_root: Path) -> None:
self._project_root = project_root.resolve()
self._observed: list[tuple[str, BlockingCall]] = []
@property
def violation_count(self) -> int:
return len(self._observed)
@property
def test_count(self) -> int:
return len({nodeid for nodeid, _violation in self._observed})
def clear(self) -> None:
self._observed.clear()
def record(self, nodeid: str, violations: Iterable[BlockingCall]) -> None:
for violation in violations:
self._observed.append((nodeid, violation))
def format_summary(self, *, limit: int = 30) -> str:
if not self._observed:
return "blocking io probe: no violations"
call_sites: Counter[tuple[str, str, int, str, str]] = Counter()
for _nodeid, violation in self._observed:
frame = self._local_call_site(violation.stack)
if frame is None:
call_sites[(violation.name, "<unknown>", 0, "<unknown>", "")] += 1
continue
call_sites[
(
violation.name,
self._relative(frame.filename),
frame.lineno,
frame.name,
(frame.line or "").strip(),
)
] += 1
lines = [f"blocking io probe: {self.violation_count} violations across {self.test_count} tests", "Top call sites:"]
for (name, filename, lineno, function, line), count in call_sites.most_common(limit):
lines.append(f"{count:4d} {name} {filename}:{lineno} {function} | {line}")
return "\n".join(lines)
def _relative(self, filename: str) -> str:
try:
return str(Path(filename).resolve().relative_to(self._project_root))
except ValueError:
return filename
def _local_call_site(self, stack: tuple[traceback.FrameSummary, ...]) -> traceback.FrameSummary | None:
local_frames = [frame for frame in stack if str(self._project_root) in frame.filename and "/.venv/" not in frame.filename and not self._relative(frame.filename).startswith("tests/")]
if local_frames:
return local_frames[-1]
test_frames = [frame for frame in stack if str(self._project_root) in frame.filename and "/.venv/" not in frame.filename]
return test_frames[-1] if test_frames else None
def detect_blocking_io(
specs: Iterable[BlockingCallSpec] = DEFAULT_BLOCKING_CALL_SPECS,
*,
fail_on_exit: bool = False,
patch_loaded_aliases: bool = True,
stack_limit: int = 12,
) -> BlockingIODetector:
"""Create a detector context manager for a focused test scope."""
return BlockingIODetector(specs, fail_on_exit=fail_on_exit, patch_loaded_aliases=patch_loaded_aliases, stack_limit=stack_limit)
def format_blocking_calls(violations: Iterable[BlockingCall]) -> str:
"""Format detector output with enough stack context to locate call sites."""
lines = ["Blocking calls were executed on an asyncio event loop thread:"]
for index, violation in enumerate(violations, start=1):
lines.append(f"{index}. {violation.name} ({violation.target})")
lines.extend(_format_stack(violation.stack))
return "\n".join(lines)
def _format_stack(stack: Iterable[traceback.FrameSummary]) -> Iterator[str]:
for frame in stack:
location = f"{frame.filename}:{frame.lineno}"
lines = [f" at {frame.name} ({location})"]
if frame.line:
lines.append(f" {frame.line.strip()}")
yield from lines
@@ -0,0 +1,44 @@
"""Strict Blockbuster runtime context scoped to DeerFlow business code.
Creates a `BlockBuster` instance with `scanned_modules=("app", "deerflow")`
so that test infrastructure (pytest, langchain, importlib, third-party libs)
is out of scope and does not produce false positives. Only loop-blocking
sync IO whose caller stack passes through `app.*` or `deerflow.*` raises
`BlockingError`.
Used by `backend/tests/blocking_io/conftest.py` to gate the regression suite.
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from blockbuster import BlockBuster, BlockBusterFunction, BlockingError
_SCANNED_MODULES: tuple[str, ...] = ("app", "deerflow")
# Add DeerFlow-local rules here only when Blockbuster's default rule set misses
# a generic blocking primitive used by production code. If a path is invisible
# because no test exercises it, add a production-path runtime anchor instead.
_PROJECT_BLOCKING_RULES: tuple[tuple[str, BlockBusterFunction], ...] = ()
def _install_project_rules(bb: BlockBuster) -> None:
for name, rule in _PROJECT_BLOCKING_RULES:
bb.functions[name] = rule
@contextmanager
def detect_blocking_io_strict() -> Iterator[BlockBuster]:
"""Activate Blockbuster scoped to app.* and deerflow.* callers only."""
bb = BlockBuster(scanned_modules=list(_SCANNED_MODULES))
_install_project_rules(bb)
try:
bb.activate()
yield bb
finally:
bb.deactivate()
__all__ = ["BlockingError", "detect_blocking_io_strict"]
@@ -0,0 +1,892 @@
#!/usr/bin/env python3
"""Static inventory for likely backend event-loop blocking IO.
This detector parses backend business source with AST so untested paths are
still visible during review. Findings are prioritized static candidates, not
automatic bug decisions.
"""
from __future__ import annotations
import argparse
import ast
import json
import os
import sys
from collections import Counter, defaultdict, deque
from collections.abc import Callable, Iterable, Sequence
from dataclasses import dataclass
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[4]
DEFAULT_SCAN_PATHS = (
REPO_ROOT / "backend" / "app",
REPO_ROOT / "backend" / "packages" / "harness" / "deerflow",
REPO_ROOT / "backend" / "scripts",
)
IGNORED_DIR_NAMES = {
".git",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".venv",
"__pycache__",
"node_modules",
}
CODE_SNIPPET_LIMIT = 200
PATH_METHOD_NAMES = {
"exists",
"glob",
"hardlink_to",
"is_dir",
"is_file",
"iterdir",
"mkdir",
"open",
"readlink",
"read_bytes",
"read_text",
"rename",
"resolve",
"rglob",
"rmdir",
"samefile",
"stat",
"symlink_to",
"touch",
"unlink",
"write_bytes",
"write_text",
}
AMBIGUOUS_PATH_METHOD_NAMES = {"replace"}
HTTP_METHOD_NAMES = {
"delete",
"get",
"head",
"options",
"patch",
"post",
"put",
"request",
"stream",
}
BUILTIN_OPEN_NAMES = {"builtins.open", "io.open", "open"}
BLOCKING_SLEEP_NAMES = {"time.sleep"}
BLOCKING_OS_FILE_NAMES = {
"os.listdir",
"os.lstat",
"os.makedirs",
"os.mkdir",
"os.remove",
"os.rename",
"os.replace",
"os.rmdir",
"os.scandir",
"os.stat",
"os.unlink",
"os.walk",
"os.path.exists",
"os.path.getsize",
"os.path.isdir",
"os.path.isfile",
}
BLOCKING_SUBPROCESS_NAMES = {
"subprocess.Popen",
"subprocess.check_call",
"subprocess.check_output",
"subprocess.run",
}
BLOCKING_HTTP_NAMES = {
"requests.delete",
"requests.get",
"requests.head",
"requests.options",
"requests.patch",
"requests.post",
"requests.put",
"requests.request",
"requests.sessions.Session.request",
"httpx.delete",
"httpx.get",
"httpx.head",
"httpx.options",
"httpx.patch",
"httpx.post",
"httpx.put",
"httpx.request",
"httpx.stream",
"urllib.request.urlopen",
}
SYNC_HTTP_CLIENT_FACTORIES = {
"httpx.Client": "httpx.Client",
"requests.Session": "requests.Session",
"requests.sessions.Session": "requests.Session",
"requests.session": "requests.Session",
}
BLOCKING_SHUTIL_NAMES = {
"shutil.copy",
"shutil.copyfile",
"shutil.copytree",
"shutil.move",
"shutil.rmtree",
}
SYNC_AGENT_MIDDLEWARE_HOOKS = {
"before_agent": "abefore_agent",
"before_model": "abefore_model",
"after_model": "aafter_model",
"after_agent": "aafter_agent",
}
PATH_METHOD_OPERATIONS = {
"exists": "FILE_METADATA",
"glob": "FILE_ENUMERATION",
"hardlink_to": "FILE_WRITE",
"is_dir": "FILE_METADATA",
"is_file": "FILE_METADATA",
"iterdir": "FILE_ENUMERATION",
"mkdir": "FILE_WRITE",
"open": "FILE_OPEN",
"readlink": "FILE_METADATA",
"read_bytes": "FILE_READ",
"read_text": "FILE_READ",
"rename": "FILE_COPY_MOVE",
"replace": "FILE_COPY_MOVE",
"resolve": "FILE_METADATA",
"rglob": "FILE_ENUMERATION",
"rmdir": "FILE_DELETE",
"samefile": "FILE_METADATA",
"stat": "FILE_METADATA",
"symlink_to": "FILE_WRITE",
"touch": "FILE_WRITE",
"unlink": "FILE_DELETE",
"write_bytes": "FILE_WRITE",
"write_text": "FILE_WRITE",
}
OS_FILE_OPERATIONS = {
"os.listdir": "FILE_ENUMERATION",
"os.lstat": "FILE_METADATA",
"os.makedirs": "FILE_WRITE",
"os.mkdir": "FILE_WRITE",
"os.remove": "FILE_DELETE",
"os.rename": "FILE_COPY_MOVE",
"os.replace": "FILE_COPY_MOVE",
"os.rmdir": "FILE_DELETE",
"os.scandir": "FILE_ENUMERATION",
"os.stat": "FILE_METADATA",
"os.unlink": "FILE_DELETE",
"os.walk": "FILE_ENUMERATION",
"os.path.exists": "FILE_METADATA",
"os.path.getsize": "FILE_METADATA",
"os.path.isdir": "FILE_METADATA",
"os.path.isfile": "FILE_METADATA",
}
SHUTIL_OPERATIONS = {
"shutil.copy": "FILE_COPY_MOVE",
"shutil.copyfile": "FILE_COPY_MOVE",
"shutil.copytree": "FILE_TREE_COPY",
"shutil.move": "FILE_COPY_MOVE",
"shutil.rmtree": "FILE_TREE_DELETE",
}
OPERATION_BASE_PRIORITY = {
"FILE_METADATA": "LOW",
"FILE_OPEN": "MEDIUM",
"FILE_READ": "MEDIUM",
"FILE_WRITE": "MEDIUM",
"FILE_ENUMERATION": "HIGH",
"FILE_DELETE": "MEDIUM",
"FILE_COPY_MOVE": "HIGH",
"FILE_TREE_COPY": "HIGH",
"FILE_TREE_DELETE": "HIGH",
"HTTP_REQUEST": "HIGH",
"SUBPROCESS": "HIGH",
"SLEEP": "HIGH",
"PARSE_ERROR": "MEDIUM",
}
@dataclass(frozen=True)
class BlockingIOStaticFinding:
category: str
operation: str
priority: str
path: str
line: int
column: int
function: str
exposure: str
symbol: str
code: str
def to_dict(self) -> dict[str, object]:
return {
"priority": self.priority,
"location": {
"path": self.path,
"line": self.line,
"column": self.column + 1,
"function": self.function,
},
"blocking_call": {
"category": self.category,
"operation": self.operation,
"symbol": self.symbol,
},
"event_loop_exposure": self.exposure,
"reason": _finding_reason(self.operation, self.exposure),
"code": self.code,
}
@dataclass(frozen=True)
class _FunctionContext:
qualname: str
class_name: str | None
is_async: bool
@dataclass(frozen=True)
class _FunctionInfo:
is_async: bool
@dataclass(frozen=True)
class _CallRef:
name: str
class_name: str | None
self_method: bool
@dataclass(frozen=True)
class _PotentialFinding:
category: str
operation: str
path: str
line: int
column: int
function: str
symbol: str
code: str
@dataclass(frozen=True)
class _BlockingRule:
category: str
operation: str
symbol: str
def dotted_name(node: ast.AST | None) -> str | None:
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
parent = dotted_name(node.value)
if parent:
return f"{parent}.{node.attr}"
return node.attr
if isinstance(node, ast.Call):
return dotted_name(node.func)
if isinstance(node, ast.Subscript):
return dotted_name(node.value)
return None
def relative_to_repo(path: Path, repo_root: Path = REPO_ROOT) -> str:
try:
return path.resolve().relative_to(repo_root.resolve()).as_posix()
except ValueError:
return path.as_posix()
def _source_snippet(source_lines: Sequence[str], line: int) -> str:
if not 0 < line <= len(source_lines):
return ""
snippet = source_lines[line - 1].strip()
if len(snippet) <= CODE_SNIPPET_LIMIT:
return snippet
return f"{snippet[:CODE_SNIPPET_LIMIT]}..."
class BlockingIOStaticVisitor(ast.NodeVisitor):
def __init__(self, relative_path: str, source_lines: Sequence[str]) -> None:
self.relative_path = relative_path
self.source_lines = source_lines
self.import_aliases: dict[str, str] = {}
self.class_stack: list[str] = []
self.function_stack: list[_FunctionContext] = []
self.module_context = _FunctionContext("<module>", None, False)
self.module_sync_http_clients: dict[str, str] = {}
self.sync_http_client_stack: list[dict[str, str]] = []
self.class_bases: dict[str, set[str]] = defaultdict(set)
self.class_methods: dict[str, set[str]] = defaultdict(set)
self.function_defs: dict[str, _FunctionInfo] = {}
self.functions_by_name: dict[str, list[str]] = defaultdict(list)
self.call_refs: dict[str, list[_CallRef]] = defaultdict(list)
self.path_like_name_stack: list[set[str]] = []
self.potential_findings: list[_PotentialFinding] = []
@property
def current_function(self) -> _FunctionContext | None:
return self.function_stack[-1] if self.function_stack else None
@property
def current_context(self) -> _FunctionContext:
return self.current_function or self.module_context
@property
def current_sync_http_clients(self) -> dict[str, str]:
return self.sync_http_client_stack[-1] if self.sync_http_client_stack else self.module_sync_http_clients
def visit_Import(self, node: ast.Import) -> None:
for alias in node.names:
local_name = alias.asname or alias.name.split(".", 1)[0]
canonical_name = alias.name if alias.asname else local_name
self.import_aliases[local_name] = canonical_name
def visit_ImportFrom(self, node: ast.ImportFrom) -> None:
if node.module is None:
return
for alias in node.names:
local_name = alias.asname or alias.name
self.import_aliases[local_name] = f"{node.module}.{alias.name}"
def visit_ClassDef(self, node: ast.ClassDef) -> None:
class_name = ".".join((*self.class_stack, node.name)) if self.class_stack else node.name
self.class_bases[class_name].update(canonical_name for base in node.bases if (canonical_name := self._canonical_name(dotted_name(base))) is not None)
self.class_stack.append(node.name)
self.generic_visit(node)
self.class_stack.pop()
def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
self._visit_function(node, is_async=False)
def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None:
self._visit_function(node, is_async=True)
def visit_Assign(self, node: ast.Assign) -> None:
self._record_sync_http_client_targets(node.value, node.targets)
self.generic_visit(node)
def visit_AnnAssign(self, node: ast.AnnAssign) -> None:
self._record_path_like_annotation(node.annotation, [node.target])
if node.value is not None:
self._record_sync_http_client_targets(node.value, [node.target])
self.generic_visit(node)
def visit_With(self, node: ast.With) -> None:
temporary_clients: dict[str, str | None] = {}
current_clients = self.current_sync_http_clients
for item in node.items:
self.visit(item.context_expr)
client_base = self._sync_http_client_factory_base(item.context_expr)
if client_base is None or not isinstance(item.optional_vars, ast.Name):
continue
name = item.optional_vars.id
temporary_clients[name] = current_clients.get(name)
current_clients[name] = client_base
try:
for statement in node.body:
self.visit(statement)
finally:
for name, previous in temporary_clients.items():
if previous is None:
current_clients.pop(name, None)
else:
current_clients[name] = previous
def visit_Call(self, node: ast.Call) -> None:
current = self.current_context
call_name = self._canonical_name(dotted_name(node.func))
if call_name is not None:
self._record_call_ref(node, call_name, current)
self._record_blocking_candidate(node, call_name, current)
self.generic_visit(node)
def _visit_function(self, node: ast.FunctionDef | ast.AsyncFunctionDef, *, is_async: bool) -> None:
qualname = ".".join((*self.class_stack, node.name)) if self.class_stack else node.name
class_name = self.class_stack[-1] if self.class_stack else None
context = _FunctionContext(qualname, class_name, is_async)
self.function_defs[qualname] = _FunctionInfo(is_async)
self.functions_by_name[node.name].append(qualname)
if class_name is not None:
self.class_methods[class_name].add(node.name)
self.function_stack.append(context)
self.sync_http_client_stack.append({})
self.path_like_name_stack.append(set(_path_like_argument_names(node.args, self._canonical_name)))
self.generic_visit(node)
self.path_like_name_stack.pop()
self.sync_http_client_stack.pop()
self.function_stack.pop()
def _canonical_name(self, name: str | None) -> str | None:
if name is None:
return None
parts = name.split(".")
if parts and parts[0] in self.import_aliases:
return ".".join((self.import_aliases[parts[0]], *parts[1:]))
return name
def _record_call_ref(self, node: ast.Call, call_name: str, current: _FunctionContext) -> None:
if current.qualname == "<module>":
return
if isinstance(node.func, ast.Name):
self.call_refs[current.qualname].append(_CallRef(node.func.id, current.class_name, self_method=False))
return
if not isinstance(node.func, ast.Attribute):
return
receiver = dotted_name(node.func.value)
if receiver in {"self", "cls"}:
self.call_refs[current.qualname].append(_CallRef(node.func.attr, current.class_name, self_method=True))
return
# Keep same-module direct calls through canonical aliases out of the call graph.
# External calls are handled as blocking candidates instead.
if "." not in call_name:
self.call_refs[current.qualname].append(_CallRef(call_name, current.class_name, self_method=False))
def _record_blocking_candidate(self, node: ast.Call, call_name: str, current: _FunctionContext) -> None:
rule = self._blocking_rule(node, call_name)
if rule is None:
return
line = getattr(node, "lineno", 0)
column = getattr(node, "col_offset", 0)
code = _source_snippet(self.source_lines, line)
self.potential_findings.append(
_PotentialFinding(
category=rule.category,
operation=rule.operation,
path=self.relative_path,
line=line,
column=column,
function=current.qualname,
symbol=rule.symbol,
code=code,
)
)
def _blocking_rule(self, node: ast.Call, call_name: str) -> _BlockingRule | None:
sync_client_symbol = self._sync_http_client_method_symbol(call_name)
if sync_client_symbol is not None:
return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", sync_client_symbol)
chained_client_symbol = _sync_http_client_chained_method_symbol(call_name)
if chained_client_symbol is not None:
return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", chained_client_symbol)
leaf_name = call_name.rsplit(".", 1)[-1]
if call_name in BUILTIN_OPEN_NAMES:
return _BlockingRule("BLOCKING_FILE_IO", "FILE_OPEN", call_name)
if leaf_name in PATH_METHOD_NAMES | AMBIGUOUS_PATH_METHOD_NAMES:
if self._is_path_method_call(node):
return _BlockingRule("BLOCKING_FILE_IO", _path_method_operation(leaf_name), call_name)
if call_name in BLOCKING_OS_FILE_NAMES:
return _BlockingRule("BLOCKING_FILE_IO", OS_FILE_OPERATIONS[call_name], call_name)
if call_name in BLOCKING_SLEEP_NAMES:
return _BlockingRule("BLOCKING_SLEEP", "SLEEP", call_name)
if call_name in BLOCKING_SUBPROCESS_NAMES:
return _BlockingRule("BLOCKING_SUBPROCESS", "SUBPROCESS", call_name)
if call_name in BLOCKING_HTTP_NAMES:
return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", call_name)
if call_name in BLOCKING_SHUTIL_NAMES:
return _BlockingRule("BLOCKING_FILE_IO", SHUTIL_OPERATIONS[call_name], call_name)
return None
def _is_path_method_call(self, node: ast.Call) -> bool:
if not isinstance(node.func, ast.Attribute):
return False
if node.func.attr in AMBIGUOUS_PATH_METHOD_NAMES and node.func.attr == "replace" and len(node.args) >= 2:
return False
receiver = node.func.value
if _is_constructed_path(receiver):
return True
receiver_name = dotted_name(receiver)
if receiver_name in self.current_path_like_names:
return True
if _looks_like_path_receiver_name(receiver_name):
return True
if node.func.attr in PATH_METHOD_NAMES and isinstance(receiver, ast.Attribute):
return True
return False
@property
def current_path_like_names(self) -> set[str]:
return self.path_like_name_stack[-1] if self.path_like_name_stack else set()
def _record_path_like_annotation(self, annotation: ast.AST, targets: Iterable[ast.AST]) -> None:
if not self.path_like_name_stack or not _is_path_annotation(annotation, self._canonical_name):
return
self.current_path_like_names.update(name for target in targets for name in _iter_assigned_names(target))
def _record_sync_http_client_targets(self, value: ast.AST, targets: Iterable[ast.AST]) -> None:
client_base = self._sync_http_client_factory_base(value)
if client_base is None:
return
current_clients = self.current_sync_http_clients
for target in targets:
for name in _iter_assigned_names(target):
current_clients[name] = client_base
def _sync_http_client_factory_base(self, node: ast.AST) -> str | None:
if not isinstance(node, ast.Call):
return None
call_name = self._canonical_name(dotted_name(node.func))
if call_name is None:
return None
return SYNC_HTTP_CLIENT_FACTORIES.get(call_name)
def _sync_http_client_method_symbol(self, call_name: str) -> str | None:
parts = call_name.split(".")
if len(parts) != 2 or parts[1] not in HTTP_METHOD_NAMES:
return None
client_base = self.current_sync_http_clients.get(parts[0])
if client_base is None:
return None
return f"{client_base}.{parts[1]}"
def _path_method_operation(method_name: str) -> str:
return PATH_METHOD_OPERATIONS.get(method_name, "FILE_METADATA")
def _is_constructed_path(node: ast.AST) -> bool:
return isinstance(node, ast.Call) and dotted_name(node.func) in {"Path", "pathlib.Path"}
def _looks_like_path_receiver_name(receiver_name: str | None) -> bool:
if receiver_name is None:
return False
leaf = receiver_name.rsplit(".", 1)[-1].lower()
return leaf in {"path", "file_path", "dir_path", "target", "dest", "destination", "source"} or leaf.endswith(("_path", "_dir", "_file", "_root")) or "path" in leaf
def _is_path_annotation(annotation: ast.AST | None, canonical_name: Callable[[str | None], str | None]) -> bool:
if annotation is None:
return False
if isinstance(annotation, ast.BinOp) and isinstance(annotation.op, ast.BitOr):
return _is_path_annotation(annotation.left, canonical_name) or _is_path_annotation(annotation.right, canonical_name)
name = dotted_name(annotation)
canonical = canonical_name(name)
if canonical in {"pathlib.Path", "Path"}:
return True
if isinstance(annotation, ast.Subscript):
return _is_path_annotation(annotation.slice, canonical_name)
return False
def _path_like_argument_names(arguments: ast.arguments, canonical_name: Callable[[str | None], str | None]) -> Iterable[str]:
candidates = [*arguments.posonlyargs, *arguments.args, *arguments.kwonlyargs]
if arguments.vararg is not None:
candidates.append(arguments.vararg)
if arguments.kwarg is not None:
candidates.append(arguments.kwarg)
for argument in candidates:
if _is_path_annotation(argument.annotation, canonical_name):
yield argument.arg
def _iter_assigned_names(target: ast.AST) -> Iterable[str]:
if isinstance(target, ast.Name):
yield target.id
return
if isinstance(target, (ast.Tuple, ast.List)):
for element in target.elts:
yield from _iter_assigned_names(element)
def _sync_http_client_chained_method_symbol(call_name: str) -> str | None:
for factory_name, client_base in SYNC_HTTP_CLIENT_FACTORIES.items():
prefix = f"{factory_name}."
if not call_name.startswith(prefix):
continue
method_name = call_name[len(prefix) :]
if method_name in HTTP_METHOD_NAMES:
return f"{client_base}.{method_name}"
return None
def _resolve_call_ref(visitor: BlockingIOStaticVisitor, ref: _CallRef) -> list[str]:
if ref.self_method and ref.class_name is not None:
qualname = f"{ref.class_name}.{ref.name}"
return [qualname] if qualname in visitor.function_defs else []
return list(visitor.functions_by_name.get(ref.name, ()))
def _reachable_functions(visitor: BlockingIOStaticVisitor, roots: Iterable[str]) -> set[str]:
reachable = set(roots)
queue: deque[str] = deque(reachable)
while queue:
qualname = queue.popleft()
for ref in visitor.call_refs.get(qualname, ()):
for target in _resolve_call_ref(visitor, ref):
if target in reachable:
continue
reachable.add(target)
queue.append(target)
return reachable
def _async_reachable_functions(visitor: BlockingIOStaticVisitor) -> set[str]:
return _reachable_functions(
visitor,
(qualname for qualname, info in visitor.function_defs.items() if info.is_async),
)
def _agent_middleware_classes(visitor: BlockingIOStaticVisitor) -> set[str]:
middleware_classes: set[str] = set()
changed = True
while changed:
changed = False
for class_name, bases in visitor.class_bases.items():
if class_name in middleware_classes:
continue
if any(_is_agent_middleware_base(base, middleware_classes) for base in bases):
middleware_classes.add(class_name)
changed = True
return middleware_classes
def _is_agent_middleware_base(base: str, known_middleware_classes: set[str]) -> bool:
leaf = base.rsplit(".", 1)[-1]
return leaf == "AgentMiddleware" or leaf in known_middleware_classes
def _sync_only_agent_middleware_entrypoints(visitor: BlockingIOStaticVisitor) -> set[str]:
entrypoints: set[str] = set()
middleware_classes = _agent_middleware_classes(visitor)
for class_name in middleware_classes:
methods = visitor.class_methods.get(class_name, set())
for sync_hook, async_hook in SYNC_AGENT_MIDDLEWARE_HOOKS.items():
if sync_hook in methods and async_hook not in methods:
qualname = f"{class_name}.{sync_hook}"
if qualname in visitor.function_defs:
entrypoints.add(qualname)
return entrypoints
def _event_loop_exposures(
visitor: BlockingIOStaticVisitor,
async_reachable: set[str],
middleware_reachable: set[str],
) -> dict[str, str]:
exposures: dict[str, str] = {}
for qualname, info in visitor.function_defs.items():
if info.is_async:
exposures[qualname] = "DIRECT_ASYNC"
for qualname in async_reachable:
exposures.setdefault(qualname, "ASYNC_REACHABLE_SAME_FILE")
for qualname in middleware_reachable:
exposures.setdefault(qualname, "SYNC_AGENT_MIDDLEWARE_HOOK")
return exposures
def _priority(operation: str) -> str:
return OPERATION_BASE_PRIORITY[operation]
def _finding_reason(operation: str, exposure: str) -> str:
if exposure == "DIRECT_ASYNC":
return f"{operation} is called directly inside an async function."
if exposure == "ASYNC_REACHABLE_SAME_FILE":
return f"{operation} is statically reachable from an async function in the same file."
if exposure == "SYNC_AGENT_MIDDLEWARE_HOOK":
return f"{operation} is statically reachable from a sync AgentMiddleware hook used by the async graph."
return "Source could not be parsed; scan coverage is incomplete for this file."
def _finalize_findings(visitor: BlockingIOStaticVisitor) -> list[BlockingIOStaticFinding]:
reachable = _async_reachable_functions(visitor)
middleware_reachable = _reachable_functions(visitor, _sync_only_agent_middleware_entrypoints(visitor))
event_loop_exposures = _event_loop_exposures(visitor, reachable, middleware_reachable)
findings: list[BlockingIOStaticFinding] = []
for candidate in visitor.potential_findings:
exposure = event_loop_exposures.get(candidate.function)
if exposure is None:
continue
findings.append(
BlockingIOStaticFinding(
category=candidate.category,
operation=candidate.operation,
priority=_priority(candidate.operation),
path=candidate.path,
line=candidate.line,
column=candidate.column,
function=candidate.function,
exposure=exposure,
symbol=candidate.symbol,
code=candidate.code,
)
)
return findings
def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]:
source = path.read_text(encoding="utf-8")
source_lines = source.splitlines()
relative_path = relative_to_repo(path, repo_root)
try:
tree = ast.parse(source, filename=str(path))
except SyntaxError as exc:
line = exc.lineno or 0
code = _source_snippet(source_lines, line)
return [
BlockingIOStaticFinding(
category="PARSE_ERROR",
operation="PARSE_ERROR",
priority="MEDIUM",
path=relative_path,
line=line,
column=max((exc.offset or 1) - 1, 0),
function="<module>",
exposure="PARSE_INCOMPLETE",
symbol="SyntaxError",
code=code,
)
]
visitor = BlockingIOStaticVisitor(relative_path, source_lines)
visitor.visit(tree)
return sorted(_finalize_findings(visitor), key=lambda finding: (finding.path, finding.line, finding.column, finding.category))
def is_ignored_path(path: Path) -> bool:
return any(part in IGNORED_DIR_NAMES for part in path.parts)
def iter_python_files(paths: Iterable[Path]) -> Iterable[Path]:
for path in paths:
if not path.exists() or is_ignored_path(path):
continue
if path.is_file():
if path.suffix == ".py" and not is_ignored_path(path):
yield path
continue
for dirpath, dirnames, filenames in os.walk(path):
dirnames[:] = [dirname for dirname in dirnames if dirname not in IGNORED_DIR_NAMES]
for filename in filenames:
if filename.endswith(".py"):
yield Path(dirpath) / filename
def scan_paths(paths: Iterable[Path], *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]:
findings: list[BlockingIOStaticFinding] = []
for path in sorted(iter_python_files(paths)):
findings.extend(scan_file(path, repo_root=repo_root))
return sorted(findings, key=lambda finding: (finding.path, finding.line, finding.column, finding.category))
def findings_to_json(findings: Sequence[BlockingIOStaticFinding]) -> str:
return json.dumps([finding.to_dict() for finding in findings], indent=2) + "\n"
def write_json_report(findings: Sequence[BlockingIOStaticFinding], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(findings_to_json(findings), encoding="utf-8")
def _scan_root(path: str) -> str:
parts = path.split("/")
if parts[:4] == ["backend", "packages", "harness", "deerflow"]:
return "backend/packages/harness/deerflow"
if len(parts) >= 2 and parts[0] == "backend":
return "/".join(parts[:2])
return parts[0] if parts else path
def _format_counter(title: str, counter: Counter[str], *, limit: int | None = None, order: Sequence[str] | None = None) -> list[str]:
lines = [title]
if order is None:
items = sorted(counter.items(), key=lambda item: (-item[1], item[0]))
else:
ordered = [(name, counter[name]) for name in order if counter.get(name)]
ordered_names = {name for name, _ in ordered}
extras = sorted((item for item in counter.items() if item[0] not in ordered_names), key=lambda item: (-item[1], item[0]))
items = ordered + extras
if limit is not None:
items = items[:limit]
width = max((len(str(count)) for _, count in items), default=1)
lines.extend(f" {count:>{width}} {name}" for name, count in items)
return lines
def format_summary(findings: Sequence[BlockingIOStaticFinding], *, output_path: Path | None = None) -> str:
if not findings:
lines = ["No static blocking IO event-loop risk findings in backend business code."]
else:
lines = [
f"Static blocking IO event-loop risk findings: {len(findings)}",
"",
*_format_counter("By category:", Counter(finding.category for finding in findings)),
"",
*_format_counter("By priority:", Counter(finding.priority for finding in findings), order=("HIGH", "MEDIUM", "LOW")),
"",
*_format_counter("By operation:", Counter(finding.operation for finding in findings)),
"",
*_format_counter("By event-loop exposure:", Counter(finding.exposure for finding in findings)),
"",
*_format_counter("By scan root:", Counter(_scan_root(finding.path) for finding in findings)),
"",
*_format_counter("Top files:", Counter(finding.path for finding in findings), limit=10),
]
if output_path is not None:
lines.extend(["", f"Full JSON report: {relative_to_repo(output_path.resolve())}"])
else:
lines.extend(["", "Use --format json for full structured findings."])
return "\n".join(lines)
def format_text(findings: Sequence[BlockingIOStaticFinding]) -> str:
if not findings:
return "No static blocking IO event-loop risk findings in backend business code."
lines: list[str] = []
for finding in findings:
lines.append(f"{finding.priority} {finding.category}/{finding.operation} {finding.path}:{finding.line}:{finding.column + 1} in {finding.function} exposure={finding.exposure}")
lines.append(f" symbol: {finding.symbol}")
lines.append(f" reason: {_finding_reason(finding.operation, finding.exposure)}")
if finding.code:
lines.append(f" code: {finding.code}")
return "\n".join(lines)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=("Statically inventory blocking IO calls that may block the backend asyncio event loop. Findings are prioritized review candidates, not automatic bug decisions."))
parser.add_argument(
"paths",
nargs="*",
type=Path,
help="Files or directories to scan. Defaults to backend app and harness sources.",
)
parser.add_argument(
"--format",
choices=("summary", "text", "json"),
default="summary",
help="Output format.",
)
parser.add_argument(
"--output",
type=Path,
help="Write the complete finding list as JSON to this file.",
)
return parser
def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
paths = args.paths or list(DEFAULT_SCAN_PATHS)
findings = scan_paths(paths)
output_path = args.output
if output_path is not None:
write_json_report(findings, output_path)
if args.format == "summary":
print(format_summary(findings, output_path=output_path))
elif args.format == "json":
print(findings_to_json(findings), end="")
else:
print(format_text(findings))
return 0
if __name__ == "__main__":
sys.exit(main())
-190
View File
@@ -1,190 +0,0 @@
from __future__ import annotations
import asyncio
import os
import time
from os import walk as imported_walk
from pathlib import Path
from time import sleep as imported_sleep
import httpx
import pytest
import requests
from support.detectors.blocking_io import (
BlockingCallSpec,
BlockingIOProbe,
detect_blocking_io,
)
pytestmark = pytest.mark.asyncio
TIME_SLEEP_ONLY = (BlockingCallSpec("time.sleep", "time:sleep"),)
REQUESTS_ONLY = (BlockingCallSpec("requests.Session.request", "requests.sessions:Session.request"),)
HTTPX_ONLY = (BlockingCallSpec("httpx.Client.request", "httpx:Client.request"),)
OS_WALK_ONLY = (BlockingCallSpec("os.walk", "os:walk", record_on_iteration=True),)
PATH_READ_TEXT_ONLY = (BlockingCallSpec("pathlib.Path.read_text", "pathlib:Path.read_text"),)
async def test_records_time_sleep_on_event_loop() -> None:
with detect_blocking_io(TIME_SLEEP_ONLY) as detector:
time.sleep(0)
assert [violation.name for violation in detector.violations] == ["time.sleep"]
async def test_records_already_imported_sleep_alias_on_event_loop() -> None:
original_alias = imported_sleep
with detect_blocking_io(TIME_SLEEP_ONLY) as detector:
imported_sleep(0)
assert imported_sleep is original_alias
assert [violation.name for violation in detector.violations] == ["time.sleep"]
async def test_can_disable_loaded_alias_patching() -> None:
with detect_blocking_io(TIME_SLEEP_ONLY, patch_loaded_aliases=False) as detector:
imported_sleep(0)
assert detector.violations == []
async def test_does_not_record_time_sleep_offloaded_to_thread() -> None:
with detect_blocking_io(TIME_SLEEP_ONLY) as detector:
await asyncio.to_thread(time.sleep, 0)
assert detector.violations == []
async def test_fixture_allows_offloaded_sync_work(blocking_io_detector) -> None:
await asyncio.to_thread(time.sleep, 0)
assert blocking_io_detector.violations == []
async def test_does_not_record_sync_call_without_running_event_loop() -> None:
def call_sleep() -> list[str]:
with detect_blocking_io(TIME_SLEEP_ONLY) as detector:
time.sleep(0)
return [violation.name for violation in detector.violations]
assert await asyncio.to_thread(call_sleep) == []
async def test_fail_on_exit_includes_call_site() -> None:
with pytest.raises(AssertionError) as exc_info:
with detect_blocking_io(TIME_SLEEP_ONLY, fail_on_exit=True):
time.sleep(0)
message = str(exc_info.value)
assert "time.sleep" in message
assert "test_fail_on_exit_includes_call_site" in message
async def test_records_requests_session_request_without_real_network(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_request(self: requests.Session, method: str, url: str, **kwargs: object) -> str:
return f"{method}:{url}"
monkeypatch.setattr(requests.sessions.Session, "request", fake_request)
with detect_blocking_io(REQUESTS_ONLY) as detector:
assert requests.get("https://example.invalid") == "get:https://example.invalid"
assert [violation.name for violation in detector.violations] == ["requests.Session.request"]
async def test_records_sync_httpx_client_request_without_real_network(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_request(self: httpx.Client, method: str, url: str, **kwargs: object) -> httpx.Response:
return httpx.Response(200, request=httpx.Request(method, url))
monkeypatch.setattr(httpx.Client, "request", fake_request)
with detect_blocking_io(HTTPX_ONLY) as detector:
with httpx.Client() as client:
response = client.get("https://example.invalid")
assert response.status_code == 200
assert [violation.name for violation in detector.violations] == ["httpx.Client.request"]
async def test_records_os_walk_on_event_loop(tmp_path: Path) -> None:
(tmp_path / "nested").mkdir()
with detect_blocking_io(OS_WALK_ONLY) as detector:
assert list(os.walk(tmp_path))
assert [violation.name for violation in detector.violations] == ["os.walk"]
async def test_records_already_imported_os_walk_alias_on_iteration(tmp_path: Path) -> None:
(tmp_path / "nested").mkdir()
original_alias = imported_walk
with detect_blocking_io(OS_WALK_ONLY) as detector:
assert list(imported_walk(tmp_path))
assert imported_walk is original_alias
assert [violation.name for violation in detector.violations] == ["os.walk"]
async def test_does_not_record_os_walk_before_iteration(tmp_path: Path) -> None:
with detect_blocking_io(OS_WALK_ONLY) as detector:
walker = os.walk(tmp_path)
assert list(walker)
assert detector.violations == []
async def test_does_not_record_os_walk_iterated_off_event_loop(tmp_path: Path) -> None:
(tmp_path / "nested").mkdir()
with detect_blocking_io(OS_WALK_ONLY) as detector:
walker = os.walk(tmp_path)
assert await asyncio.to_thread(lambda: list(walker))
assert detector.violations == []
async def test_records_path_read_text_on_event_loop(tmp_path: Path) -> None:
path = tmp_path / "data.txt"
path.write_text("content", encoding="utf-8")
with detect_blocking_io(PATH_READ_TEXT_ONLY) as detector:
assert path.read_text(encoding="utf-8") == "content"
assert [violation.name for violation in detector.violations] == ["pathlib.Path.read_text"]
async def test_probe_formats_summary_for_recorded_violations(tmp_path: Path) -> None:
probe = BlockingIOProbe(Path(__file__).resolve().parents[1])
path = tmp_path / "data.txt"
path.write_text("content", encoding="utf-8")
with detect_blocking_io(PATH_READ_TEXT_ONLY, stack_limit=18) as detector:
assert path.read_text(encoding="utf-8") == "content"
probe.record("tests/test_example.py::test_example", detector.violations)
summary = probe.format_summary()
assert "blocking io probe: 1 violations across 1 tests" in summary
assert "pathlib.Path.read_text" in summary
async def test_probe_formats_empty_summary_and_can_be_cleared(tmp_path: Path) -> None:
probe = BlockingIOProbe(Path(__file__).resolve().parents[1])
assert probe.format_summary() == "blocking io probe: no violations"
path = tmp_path / "data.txt"
path.write_text("content", encoding="utf-8")
with detect_blocking_io(PATH_READ_TEXT_ONLY, stack_limit=18) as detector:
assert path.read_text(encoding="utf-8") == "content"
probe.record("tests/test_example.py::test_example", detector.violations)
assert probe.violation_count == 1
probe.clear()
assert probe.violation_count == 0
assert probe.format_summary() == "blocking io probe: no violations"
@@ -1,22 +0,0 @@
from __future__ import annotations
import time
import pytest
ORIGINAL_SLEEP = time.sleep
def replacement_sleep(seconds: float) -> None:
return None
def test_probe_survives_monkeypatch_teardown(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(time, "sleep", replacement_sleep)
assert time.sleep is replacement_sleep
@pytest.mark.no_blocking_io_probe
def test_probe_restores_original_after_monkeypatch_teardown() -> None:
assert time.sleep is ORIGINAL_SLEEP
assert getattr(time.sleep, "__wrapped__", None) is None
+47 -6
View File
@@ -291,7 +291,7 @@ class TestAsyncCheckpointer:
@pytest.mark.anyio
async def test_sqlite_creates_parent_dir_via_to_thread(self):
"""Async SQLite setup should move mkdir off the event loop."""
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
from deerflow.runtime.checkpointer.async_provider import _prepare_sqlite_checkpointer_path, make_checkpointer
mock_config = MagicMock()
mock_config.checkpointer = CheckpointerConfig(type="sqlite", connection_string="relative/test.db")
@@ -310,22 +310,63 @@ class TestAsyncCheckpointer:
with (
patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config),
patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}),
patch("deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread,
patch(
"deerflow.runtime.checkpointer.async_provider.resolve_sqlite_conn_str",
"deerflow.runtime.checkpointer.async_provider.asyncio.to_thread",
new_callable=AsyncMock,
return_value="/tmp/resolved/test.db",
),
) as mock_to_thread,
):
async with make_checkpointer() as saver:
assert saver is mock_saver
mock_to_thread.assert_awaited_once()
called_fn, called_path = mock_to_thread.await_args.args
assert called_fn.__name__ == "ensure_sqlite_parent_dir"
assert called_path == "/tmp/resolved/test.db"
assert called_fn is _prepare_sqlite_checkpointer_path
assert called_path == "relative/test.db"
mock_saver_cls.from_conn_string.assert_called_once_with("/tmp/resolved/test.db")
mock_saver.setup.assert_awaited_once()
@pytest.mark.anyio
async def test_database_sqlite_creates_parent_dir_via_to_thread(self):
"""Unified database SQLite setup should also move path IO off the event loop."""
from deerflow.config.database_config import DatabaseConfig
from deerflow.runtime.checkpointer.async_provider import _prepare_database_sqlite_checkpointer_path, make_checkpointer
db_config = DatabaseConfig(backend="sqlite", sqlite_dir="relative-data")
mock_config = MagicMock()
mock_config.checkpointer = None
mock_config.database = db_config
mock_saver = AsyncMock()
mock_cm = AsyncMock()
mock_cm.__aenter__.return_value = mock_saver
mock_cm.__aexit__.return_value = False
mock_saver_cls = MagicMock()
mock_saver_cls.from_conn_string.return_value = mock_cm
mock_module = MagicMock()
mock_module.AsyncSqliteSaver = mock_saver_cls
with (
patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config),
patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}),
patch(
"deerflow.runtime.checkpointer.async_provider.asyncio.to_thread",
new_callable=AsyncMock,
return_value="/tmp/data/deerflow.db",
) as mock_to_thread,
):
async with make_checkpointer() as saver:
assert saver is mock_saver
mock_to_thread.assert_awaited_once()
called_fn, called_db_config = mock_to_thread.await_args.args
assert called_fn is _prepare_database_sqlite_checkpointer_path
assert called_db_config is db_config
mock_saver_cls.from_conn_string.assert_called_once_with("/tmp/data/deerflow.db")
mock_saver.setup.assert_awaited_once()
# ---------------------------------------------------------------------------
# app_config.py integration
@@ -0,0 +1,421 @@
from __future__ import annotations
import json
import textwrap
from pathlib import Path
from support.detectors import blocking_io_static as detector
def _write_python(path: Path, source: str) -> Path:
path.write_text(textwrap.dedent(source).strip() + "\n", encoding="utf-8")
return path
def _payload(path: Path, repo_root: Path) -> list[dict[str, object]]:
return [finding.to_dict() for finding in detector.scan_file(path, repo_root=repo_root)]
def test_scan_file_detects_direct_blocking_calls_in_async_code(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import subprocess
import time
import urllib.request
from pathlib import Path
async def handler(path: Path):
time.sleep(1)
subprocess.run(["echo", "ok"])
path.read_text(encoding="utf-8")
with open(path, encoding="utf-8") as handle:
return urllib.request.urlopen(handle.read())
""",
)
findings = _payload(source_file, tmp_path)
categories = {finding["blocking_call"]["category"] for finding in findings}
symbols = {finding["blocking_call"]["symbol"] for finding in findings}
assert categories == {
"BLOCKING_FILE_IO",
"BLOCKING_HTTP_IO",
"BLOCKING_SLEEP",
"BLOCKING_SUBPROCESS",
}
assert {"time.sleep", "subprocess.run", "path.read_text", "open", "urllib.request.urlopen"}.issubset(symbols)
assert {finding["event_loop_exposure"] for finding in findings} == {"DIRECT_ASYNC"}
def test_scan_file_detects_blocking_calls_in_sync_helper_reached_from_async_code(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from pathlib import Path
def load_payload(path: Path) -> bytes:
return path.read_bytes()
async def route(path: Path) -> bytes:
return load_payload(path)
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert findings[0]["blocking_call"]["category"] == "BLOCKING_FILE_IO"
assert findings[0]["location"]["function"] == "load_payload"
assert findings[0]["event_loop_exposure"] == "ASYNC_REACHABLE_SAME_FILE"
assert findings[0]["blocking_call"]["symbol"] == "path.read_bytes"
def test_scan_file_omits_sync_only_blocking_calls_from_default_results(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from pathlib import Path
def load_payload(path: Path) -> str:
return path.read_text()
""",
)
assert detector.scan_file(source_file, repo_root=tmp_path) == []
def test_scan_file_detects_self_helper_reached_from_async_method(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
class ArtifactRouter:
def read_payload(self, path):
return path.read_text(encoding="utf-8")
async def get(self, path):
return self.read_payload(path)
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert findings[0]["location"]["function"] == "ArtifactRouter.read_payload"
assert findings[0]["event_loop_exposure"] == "ASYNC_REACHABLE_SAME_FILE"
def test_json_output_uses_concise_review_record_schema(tmp_path: Path, capsys) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import subprocess
async def handler():
subprocess.run(["echo", "ok"])
""",
)
exit_code = detector.main(["--format", "json", str(source_file)])
assert exit_code == 0
payload = json.loads(capsys.readouterr().out)
assert payload == [
{
"priority": "HIGH",
"location": {
"path": str(source_file),
"line": 4,
"column": 5,
"function": "handler",
},
"blocking_call": {
"category": "BLOCKING_SUBPROCESS",
"operation": "SUBPROCESS",
"symbol": "subprocess.run",
},
"event_loop_exposure": "DIRECT_ASYNC",
"reason": "SUBPROCESS is called directly inside an async function.",
"code": 'subprocess.run(["echo", "ok"])',
}
]
assert "confidence" not in payload[0]
assert "severity" not in payload[0]
assert "event_loop_risk" not in payload[0]
def test_summary_output_writes_json_report(tmp_path: Path, capsys) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import subprocess
async def handler():
subprocess.run(["echo", "ok"])
""",
)
output_path = tmp_path / "reports" / "blocking-io.json"
exit_code = detector.main(["--output", str(output_path), str(source_file)])
assert exit_code == 0
stdout = capsys.readouterr().out
assert "Static blocking IO event-loop risk findings: 1" in stdout
assert "By category:" in stdout
assert "BLOCKING_SUBPROCESS" in stdout
assert "Full JSON report:" in stdout
payload = json.loads(output_path.read_text(encoding="utf-8"))
assert [finding["blocking_call"]["category"] for finding in payload] == ["BLOCKING_SUBPROCESS"]
def test_json_output_ranks_operations_without_confidence_noise(tmp_path: Path, capsys) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import shutil
async def handler(path):
path.exists()
path.read_text()
shutil.rmtree(path)
""",
)
exit_code = detector.main(["--format", "json", str(source_file)])
assert exit_code == 0
payload = json.loads(capsys.readouterr().out)
by_symbol = {finding["blocking_call"]["symbol"]: finding for finding in payload}
assert by_symbol["path.exists"]["blocking_call"]["operation"] == "FILE_METADATA"
assert by_symbol["path.exists"]["priority"] == "LOW"
assert by_symbol["path.read_text"]["blocking_call"]["operation"] == "FILE_READ"
assert by_symbol["path.read_text"]["priority"] == "MEDIUM"
assert by_symbol["shutil.rmtree"]["blocking_call"]["operation"] == "FILE_TREE_DELETE"
assert by_symbol["shutil.rmtree"]["priority"] == "HIGH"
assert {finding["event_loop_exposure"] for finding in payload} == {"DIRECT_ASYNC"}
assert all("confidence" not in finding for finding in payload)
def test_path_receiver_detection_uses_path_annotations(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from pathlib import Path
async def typed(path: Path):
return path.read_text()
async def constructed():
return Path("payload.txt").read_text()
""",
)
findings = _payload(source_file, tmp_path)
assert {finding["blocking_call"]["symbol"] for finding in findings} == {"path.read_text", "pathlib.Path.read_text"}
assert {finding["priority"] for finding in findings} == {"MEDIUM"}
def test_summary_groups_findings_by_priority_and_operation(tmp_path: Path, capsys) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import os
from pathlib import Path
def load_payload(path: Path) -> str:
return path.read_text()
async def handler(path: Path) -> str:
path.exists()
list(os.walk(path))
return load_payload(path)
""",
)
exit_code = detector.main([str(source_file)])
assert exit_code == 0
stdout = capsys.readouterr().out
assert "By priority:" in stdout
assert "HIGH" in stdout
assert "MEDIUM" in stdout
assert "By operation:" in stdout
assert "FILE_ENUMERATION" in stdout
assert "FILE_METADATA" in stdout
assert "FILE_READ" in stdout
assert "By event-loop exposure:" in stdout
assert "DIRECT_ASYNC" in stdout
assert "ASYNC_REACHABLE_SAME_FILE" in stdout
def test_source_code_snippet_is_truncated_for_json_output(tmp_path: Path) -> None:
long_suffix = " + ".join('"chunk"' for _ in range(80))
source_file = _write_python(
tmp_path / "sample.py",
f"""
async def handler(path):
return path.read_text() + {long_suffix}
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert len(findings[0]["code"]) <= 203
assert findings[0]["code"].endswith("...")
def test_cli_default_filters_sync_only_inventory_items(tmp_path: Path, capsys) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from pathlib import Path
def load_payload(path: Path) -> str:
return path.read_text()
""",
)
exit_code = detector.main(["--format", "json", str(source_file)])
assert exit_code == 0
assert json.loads(capsys.readouterr().out) == []
def test_sync_only_agent_middleware_hook_gets_event_loop_exposure(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from langchain.agents.middleware import AgentMiddleware
from pathlib import Path
class UploadsMiddleware(AgentMiddleware):
def before_agent(self, state, runtime):
return self._load(Path("uploads"))
def _load(self, path: Path) -> str:
return path.read_text()
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert findings[0]["location"]["function"] == "UploadsMiddleware._load"
assert findings[0]["event_loop_exposure"] == "SYNC_AGENT_MIDDLEWARE_HOOK"
assert "statically reachable from a sync AgentMiddleware hook" in findings[0]["reason"]
def test_sync_agent_middleware_hook_with_async_counterpart_is_not_reported(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
from langchain.agents.middleware import AgentMiddleware
from pathlib import Path
class UploadsMiddleware(AgentMiddleware):
def before_agent(self, state, runtime):
return Path("uploads").read_text()
async def abefore_agent(self, state, runtime):
return None
""",
)
assert detector.scan_file(source_file, repo_root=tmp_path) == []
def test_scan_file_detects_sync_httpx_client_methods_in_async_code(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import httpx
async def search() -> str:
with httpx.Client(timeout=30) as client:
return client.post("https://example.invalid").text
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert findings[0]["blocking_call"]["category"] == "BLOCKING_HTTP_IO"
assert findings[0]["location"]["function"] == "search"
assert findings[0]["event_loop_exposure"] == "DIRECT_ASYNC"
assert findings[0]["blocking_call"]["symbol"] == "httpx.Client.post"
def test_scan_file_detects_chained_sync_http_client_methods_in_async_code(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import httpx
import requests
async def fetch() -> tuple[object, object]:
return (
httpx.Client().get("https://example.invalid"),
requests.Session().post("https://example.invalid"),
)
""",
)
findings = _payload(source_file, tmp_path)
symbols = {finding["blocking_call"]["symbol"] for finding in findings}
assert symbols == {"httpx.Client.get", "requests.Session.post"}
assert {finding["blocking_call"]["category"] for finding in findings} == {"BLOCKING_HTTP_IO"}
def test_scan_file_detects_os_walk_and_path_resolve_in_async_code(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
import os
from pathlib import Path
async def inspect_tree(path: Path) -> list[str]:
root = path.resolve()
return [name for _, _, names in os.walk(root) for name in names]
""",
)
findings = _payload(source_file, tmp_path)
symbols = {finding["blocking_call"]["symbol"] for finding in findings}
assert symbols == {"path.resolve", "os.walk"}
assert {finding["blocking_call"]["category"] for finding in findings} == {"BLOCKING_FILE_IO"}
def test_scan_file_does_not_treat_string_replace_as_file_io(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "sample.py",
"""
def _path_variants(path: str) -> set[str]:
return {path, path.replace("\\\\", "/"), path.replace("/", "\\\\")}
async def normalize(text: str) -> str:
return text.replace("a", "b")
""",
)
assert detector.scan_file(source_file, repo_root=tmp_path) == []
def test_parse_errors_are_reported_as_findings(tmp_path: Path) -> None:
source_file = _write_python(
tmp_path / "broken.py",
"""
async def broken(:
pass
""",
)
findings = _payload(source_file, tmp_path)
assert len(findings) == 1
assert findings[0]["blocking_call"]["category"] == "PARSE_ERROR"
assert findings[0]["priority"] == "MEDIUM"
assert f"{source_file.name}:1:18" in detector.format_text(detector.scan_file(source_file, repo_root=tmp_path))
+35
View File
@@ -0,0 +1,35 @@
"""Tests for Gateway internal auth token handling."""
from __future__ import annotations
import importlib
def test_internal_auth_uses_shared_env_token(monkeypatch):
import app.gateway.internal_auth as internal_auth
monkeypatch.setenv("DEER_FLOW_INTERNAL_AUTH_TOKEN", "shared-token")
reloaded = importlib.reload(internal_auth)
try:
headers = reloaded.create_internal_auth_headers()
assert headers[reloaded.INTERNAL_AUTH_HEADER_NAME] == "shared-token"
assert reloaded.is_valid_internal_auth_token("shared-token") is True
assert reloaded.is_valid_internal_auth_token("other-token") is False
finally:
monkeypatch.delenv("DEER_FLOW_INTERNAL_AUTH_TOKEN", raising=False)
importlib.reload(reloaded)
def test_internal_auth_generates_process_local_fallback(monkeypatch):
import app.gateway.internal_auth as internal_auth
monkeypatch.delenv("DEER_FLOW_INTERNAL_AUTH_TOKEN", raising=False)
reloaded = importlib.reload(internal_auth)
try:
token = reloaded.create_internal_auth_headers()[reloaded.INTERNAL_AUTH_HEADER_NAME]
assert token
assert reloaded.is_valid_internal_auth_token(token) is True
finally:
importlib.reload(reloaded)
-77
View File
@@ -407,80 +407,3 @@ def test_session_pool_tool_sync_wrapper_path_is_safe():
wrapped.func(url="https://example.com")
mock_session.call_tool.assert_called_once_with("navigate", {"url": "https://example.com"})
# ---------------------------------------------------------------------------
# get_mcp_tools: HTTP transport should NOT be pooled
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_http_transport_tools_not_pooled():
"""HTTP/SSE transport tools should NOT be wrapped with the session pool."""
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from deerflow.mcp.tools import get_mcp_tools
class Args(BaseModel):
query: str = Field(..., description="query")
http_tool = StructuredTool(
name="myserver_search",
description="Search tool",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
stdio_tool = StructuredTool(
name="playwright_navigate",
description="Navigate browser",
args_schema=Args,
coroutine=AsyncMock(),
response_format="content_and_artifact",
)
mock_session = AsyncMock()
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_session)
mock_cm.__aexit__ = AsyncMock(return_value=False)
extensions_config = MagicMock()
extensions_config.get_enabled_mcp_servers.return_value = {
"myserver": MagicMock(type="http", url="http://localhost:8000/mcp", headers=None, command=None, args=[], env=None),
"playwright": MagicMock(type="stdio", command="npx", args=["-y", "@anthropic/mcp-server-playwright"], env=None, url=None, headers=None),
}
extensions_config.model_extra = {}
servers_config = {
"myserver": {"transport": "http", "url": "http://localhost:8000/mcp"},
"playwright": {"transport": "stdio", "command": "npx", "args": ["-y", "@anthropic/mcp-server-playwright"]},
}
with (
patch("deerflow.mcp.tools.ExtensionsConfig.from_file", return_value=extensions_config),
patch("deerflow.mcp.tools.build_servers_config", return_value=servers_config),
patch("deerflow.mcp.tools.get_initial_oauth_headers", return_value={}),
patch("deerflow.mcp.tools.build_oauth_tool_interceptor", return_value=None),
patch("langchain_mcp_adapters.client.MultiServerMCPClient") as MockClient,
patch("langchain_mcp_adapters.sessions.create_session", return_value=mock_cm),
):
mock_client_instance = MockClient.return_value
mock_client_instance.get_tools = AsyncMock(return_value=[http_tool, stdio_tool])
tools = await get_mcp_tools()
pool = get_session_pool()
# Tool discovery is lazy: no pooled sessions are created until a wrapped tool is invoked.
assert list(pool._entries.keys()) == []
# Verify the HTTP tool was NOT wrapped with the pool (it's the original tool).
http_tools = [t for t in tools if t.name == "myserver_search"]
assert len(http_tools) == 1
assert http_tools[0].coroutine is http_tool.coroutine
# Verify the stdio tool WAS wrapped with the pool.
stdio_tools = [t for t in tools if t.name == "playwright_navigate"]
assert len(stdio_tools) == 1
assert stdio_tools[0].coroutine is not stdio_tool.coroutine
+37
View File
@@ -17,6 +17,7 @@ from deerflow.agents.middlewares.todo_middleware import (
_reminder_in_messages,
_todos_in_messages,
)
from deerflow.agents.thread_state import ThreadState
def _ai_with_write_todos():
@@ -510,6 +511,42 @@ class TestWrapModelCall:
class TestTodoMiddlewareAgentGraphIntegration:
def test_reuses_thread_state_todos_schema_in_real_agent_graph(self):
mw = TodoMiddleware()
model = _CapturingFakeMessagesListChatModel(
responses=[
AIMessage(
content="",
tool_calls=[
{
"name": "write_todos",
"id": "todos-1",
"args": {
"todos": [
{"content": "Step 1", "status": "pending"},
]
},
}
],
),
AIMessage(content="final"),
],
)
graph = create_agent(
model=model,
tools=[],
middleware=[mw],
state_schema=ThreadState,
)
result = graph.invoke(
{"messages": [("user", "create a todo")]},
context={"thread_id": "schema-thread", "run_id": "schema-run"},
)
assert result["todos"] == [{"content": "Step 1", "status": "pending"}]
def test_completion_reminder_is_transient_in_real_agent_graph(self):
mw = TodoMiddleware()
model = _CapturingFakeMessagesListChatModel(
+2
View File
@@ -772,6 +772,7 @@ postgres = [
[package.dev-dependencies]
dev = [
{ name = "blockbuster" },
{ name = "prompt-toolkit" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
@@ -803,6 +804,7 @@ provides-extras = ["postgres", "discord"]
[package.metadata.requires-dev]
dev = [
{ name = "blockbuster", specifier = ">=1.5.26,<1.6" },
{ name = "prompt-toolkit", specifier = ">=3.0.0" },
{ name = "pytest", specifier = ">=9.0.3" },
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
+1
View File
@@ -168,6 +168,7 @@ services:
- DEER_FLOW_HOME=/app/backend/.deer-flow
- DEER_FLOW_CHANNELS_LANGGRAPH_URL=${DEER_FLOW_CHANNELS_LANGGRAPH_URL:-http://gateway:8001/api}
- DEER_FLOW_CHANNELS_GATEWAY_URL=${DEER_FLOW_CHANNELS_GATEWAY_URL:-http://gateway:8001}
- DEER_FLOW_INTERNAL_AUTH_TOKEN=${DEER_FLOW_INTERNAL_AUTH_TOKEN:-}
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_ROOT}/backend/.deer-flow
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_ROOT}/skills
- DEER_FLOW_SANDBOX_HOST=host.docker.internal
+2
View File
@@ -16,6 +16,7 @@
# DEER_FLOW_DOCKER_SOCKET — Docker socket path, default /var/run/docker.sock
# DEER_FLOW_REPO_ROOT — repo root (used for skills host path in DooD)
# BETTER_AUTH_SECRET — required for frontend auth/session security
# DEER_FLOW_INTERNAL_AUTH_TOKEN — shared internal Gateway auth token for multi-worker IM channels
#
# LangSmith tracing is disabled by default (LANGSMITH_TRACING=false).
# Set LANGSMITH_TRACING=true and LANGSMITH_API_KEY in .env to enable it.
@@ -101,6 +102,7 @@ services:
- DEER_FLOW_EXTENSIONS_CONFIG_PATH=/app/backend/extensions_config.json
- DEER_FLOW_CHANNELS_LANGGRAPH_URL=${DEER_FLOW_CHANNELS_LANGGRAPH_URL:-http://gateway:8001/api}
- DEER_FLOW_CHANNELS_GATEWAY_URL=${DEER_FLOW_CHANNELS_GATEWAY_URL:-http://gateway:8001}
- DEER_FLOW_INTERNAL_AUTH_TOKEN=${DEER_FLOW_INTERNAL_AUTH_TOKEN}
# DooD path/network translation
- DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_HOME}
- DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_REPO_ROOT}/skills
@@ -1,6 +1,7 @@
"use client";
import { Button } from "@/components/ui/button";
import { writeTextToClipboard } from "@/core/clipboard";
import { cn } from "@/lib/utils";
import { CheckIcon, CopyIcon } from "lucide-react";
import {
@@ -146,20 +147,20 @@ export const CodeBlockCopyButton = ({
const [isCopied, setIsCopied] = useState(false);
const { code } = useContext(CodeBlockContext);
const copyToClipboard = async () => {
if (typeof window === "undefined" || !navigator?.clipboard?.writeText) {
const copyToClipboard = () => {
void (async () => {
const didCopy = await writeTextToClipboard(code);
if (!didCopy) {
onError?.(new Error("Clipboard API not available"));
return;
}
try {
await navigator.clipboard.writeText(code);
setIsCopied(true);
onCopy?.();
setTimeout(() => setIsCopied(false), timeout);
} catch (error) {
})().catch((error) => {
onError?.(error as Error);
}
});
};
const Icon = isCopied ? CheckIcon : CopyIcon;
@@ -38,6 +38,7 @@ import {
HTML_PREVIEW_SCROLL_MESSAGE_SOURCE,
} from "@/core/artifacts/preview";
import { urlOfArtifact } from "@/core/artifacts/utils";
import { writeTextToClipboard } from "@/core/clipboard";
import { useI18n } from "@/core/i18n/hooks";
import { findToolCallResult } from "@/core/messages/utils";
import { installSkill } from "@/core/skills/api";
@@ -237,14 +238,20 @@ export function ArtifactFileDetail({
icon={CopyIcon}
label={t.clipboard.copyToClipboard}
disabled={!content}
onClick={async () => {
try {
await navigator.clipboard.writeText(visibleContent ?? "");
toast.success(t.clipboard.copiedToClipboard);
} catch (error) {
toast.error("Failed to copy to clipboard");
console.error(error);
onClick={() => {
void (async () => {
const didCopy = await writeTextToClipboard(
visibleContent ?? "",
);
if (!didCopy) {
toast.error(t.clipboard.failedToCopyToClipboard);
return;
}
toast.success(t.clipboard.copiedToClipboard);
})().catch(() => {
toast.error(t.clipboard.failedToCopyToClipboard);
});
}}
tooltip={t.clipboard.copyToClipboard}
/>
@@ -1,7 +1,9 @@
import { CheckIcon, CopyIcon } from "lucide-react";
import { useCallback, useState, type ComponentProps } from "react";
import { toast } from "sonner";
import { Button } from "@/components/ui/button";
import { writeTextToClipboard } from "@/core/clipboard";
import { useI18n } from "@/core/i18n/hooks";
import { Tooltip } from "./tooltip";
@@ -15,10 +17,19 @@ export function CopyButton({
const { t } = useI18n();
const [copied, setCopied] = useState(false);
const handleCopy = useCallback(() => {
void navigator.clipboard.writeText(clipboardData);
void (async () => {
const didCopy = await writeTextToClipboard(clipboardData);
if (!didCopy) {
toast.error(t.clipboard.failedToCopyToClipboard);
return;
}
setCopied(true);
setTimeout(() => setCopied(false), 2000);
}, [clipboardData]);
})().catch(() => {
toast.error(t.clipboard.failedToCopyToClipboard);
});
}, [clipboardData, t.clipboard.failedToCopyToClipboard]);
return (
<Tooltip content={t.clipboard.copyToClipboard}>
<Button
@@ -43,6 +43,7 @@ import {
SidebarMenuItem,
} from "@/components/ui/sidebar";
import { getAPIClient } from "@/core/api";
import { writeTextToClipboard } from "@/core/clipboard";
import { useI18n } from "@/core/i18n/hooks";
import {
exportThreadAsJSON,
@@ -126,7 +127,12 @@ export function RecentChatList() {
const baseUrl = isLocalhost ? VERCEL_URL : window.location.origin;
const shareUrl = `${baseUrl}${pathOfThread(thread)}`;
try {
await navigator.clipboard.writeText(shareUrl);
const didCopy = await writeTextToClipboard(shareUrl);
if (!didCopy) {
toast.error(t.clipboard.failedToCopyToClipboard);
return;
}
toast.success(t.clipboard.linkCopied);
} catch {
toast.error(t.clipboard.failedToCopyToClipboard);
+31
View File
@@ -0,0 +1,31 @@
export async function writeTextToClipboard(text: string): Promise<boolean> {
try {
const clipboard = globalThis.navigator?.clipboard;
if (clipboard?.writeText) {
await clipboard.writeText(text);
return true;
}
const document = globalThis.document;
if (!document?.body?.appendChild || !document.execCommand) {
return false;
}
const textarea = document.createElement("textarea");
textarea.value = text;
textarea.setAttribute("readonly", "");
textarea.style.position = "fixed";
textarea.style.top = "-9999px";
textarea.style.left = "-9999px";
document.body.appendChild(textarea);
textarea.select();
try {
return document.execCommand("copy");
} finally {
textarea.remove();
}
} catch {
return false;
}
}
+25 -5
View File
@@ -266,22 +266,42 @@ export function extractTextFromMessage(message: Message) {
return "";
}
const THINK_OPEN_TAG = "<think>";
const THINK_TAG_RE = /<think>\s*([\s\S]*?)\s*<\/think>/g;
function splitInlineReasoning(content: string) {
const reasoningParts: string[] = [];
const cleaned = content
.replace(THINK_TAG_RE, (_, reasoning: string) => {
// First pass: strip every fully closed `<think>...</think>` pair and
// collect its body as reasoning.
let cleaned = content.replace(THINK_TAG_RE, (_, reasoning: string) => {
const normalized = reasoning.trim();
if (normalized) {
reasoningParts.push(normalized);
}
return "";
})
.trim();
});
// Streaming-safe pass: a `<think>` opener whose `</think>` has not arrived
// yet means the rest of the chunk is reasoning in flight. Route it into the
// reasoning slot instead of letting it render as message content (the
// raw-HTML markdown pipeline would otherwise paint the inner text on
// screen until the closing tag lands).
//
// Skip when the opener sits right after a backtick — that is the model
// talking about `<think>` literally inside markdown inline code, not
// actually streaming reasoning.
const openTagIndex = cleaned.indexOf(THINK_OPEN_TAG);
if (openTagIndex !== -1 && cleaned[openTagIndex - 1] !== "`") {
const tail = cleaned.slice(openTagIndex + THINK_OPEN_TAG.length).trim();
if (tail) {
reasoningParts.push(tail);
}
cleaned = cleaned.slice(0, openTagIndex);
}
return {
content: cleaned,
content: cleaned.trim(),
reasoning: reasoningParts.length > 0 ? reasoningParts.join("\n\n") : null,
};
}
+146
View File
@@ -0,0 +1,146 @@
import { afterEach, expect, test, vi } from "vitest";
import { writeTextToClipboard } from "@/core/clipboard";
const originalNavigator = globalThis.navigator;
const hadOriginalNavigator = "navigator" in globalThis;
const originalDocument = globalThis.document;
const hadOriginalDocument = "document" in globalThis;
afterEach(() => {
vi.restoreAllMocks();
if (!hadOriginalNavigator) {
Reflect.deleteProperty(globalThis, "navigator");
} else {
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: originalNavigator,
});
}
if (!hadOriginalDocument) {
Reflect.deleteProperty(globalThis, "document");
} else {
Object.defineProperty(globalThis, "document", {
configurable: true,
value: originalDocument,
});
}
});
test("writes text with the Clipboard API when available", async () => {
const writeText = vi.fn().mockResolvedValue(undefined);
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: {
clipboard: {
writeText,
},
},
});
await expect(writeTextToClipboard("hello")).resolves.toBe(true);
expect(writeText).toHaveBeenCalledWith("hello");
});
test("returns false when Clipboard API is unavailable", async () => {
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: {},
});
Object.defineProperty(globalThis, "document", {
configurable: true,
value: undefined,
});
await expect(writeTextToClipboard("hello")).resolves.toBe(false);
});
test("falls back to execCommand when Clipboard API is unavailable", async () => {
const textarea = {
remove: vi.fn(),
select: vi.fn(),
setAttribute: vi.fn(),
style: {},
value: "",
};
const appendChild = vi.fn();
const execCommand = vi.fn().mockReturnValue(true);
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: {},
});
Object.defineProperty(globalThis, "document", {
configurable: true,
value: {
body: {
appendChild,
},
createElement: vi.fn().mockReturnValue(textarea),
execCommand,
},
});
await expect(writeTextToClipboard("hello")).resolves.toBe(true);
expect(textarea.value).toBe("hello");
expect(appendChild).toHaveBeenCalledWith(textarea);
expect(textarea.select).toHaveBeenCalled();
expect(execCommand).toHaveBeenCalledWith("copy");
expect(textarea.remove).toHaveBeenCalled();
});
test("returns false when execCommand fallback fails", async () => {
const textarea = {
remove: vi.fn(),
select: vi.fn(),
setAttribute: vi.fn(),
style: {},
value: "",
};
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: {},
});
Object.defineProperty(globalThis, "document", {
configurable: true,
value: {
body: {
appendChild: vi.fn(),
},
createElement: vi.fn().mockReturnValue(textarea),
execCommand: vi.fn().mockReturnValue(false),
},
});
await expect(writeTextToClipboard("hello")).resolves.toBe(false);
expect(textarea.remove).toHaveBeenCalled();
});
test("returns false when navigator is unavailable", async () => {
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: undefined,
});
Object.defineProperty(globalThis, "document", {
configurable: true,
value: undefined,
});
await expect(writeTextToClipboard("hello")).resolves.toBe(false);
});
test("returns false when Clipboard API rejects", async () => {
const writeText = vi.fn().mockRejectedValue(new Error("denied"));
Object.defineProperty(globalThis, "navigator", {
configurable: true,
value: {
clipboard: {
writeText,
},
},
});
await expect(writeTextToClipboard("hello")).resolves.toBe(false);
});
+107 -1
View File
@@ -1,14 +1,26 @@
import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";
import { describe, expect, test } from "vitest";
import {
extractContentFromMessage,
extractReasoningContentFromMessage,
getAssistantTurnCopyData,
getAssistantTurnUsageMessages,
getMessageGroups,
getStreamingMessageLookup,
hasContent,
hasReasoning,
isAssistantMessageGroupStreaming,
} from "@/core/messages/utils";
function aiMessage(content: string): Message {
return {
id: "ai-1",
type: "ai",
content,
} as Message;
}
test("aggregates token usage messages once per assistant turn", () => {
const messages = [
{
@@ -67,6 +79,100 @@ test("aggregates token usage messages once per assistant turn", () => {
).toEqual([null, null, ["ai-1", "ai-2"], null, ["ai-3"]]);
});
describe("inline <think> tag splitting", () => {
test("strips a fully closed <think> block from AI content", () => {
const message = aiMessage("<think>internal reasoning</think>final answer");
expect(extractContentFromMessage(message)).toBe("final answer");
expect(extractReasoningContentFromMessage(message)).toBe(
"internal reasoning",
);
});
test("strips multiple closed <think> blocks and joins their reasoning", () => {
const message = aiMessage(
"<think>step one</think>between<think>step two</think>after",
);
expect(extractContentFromMessage(message)).toBe("betweenafter");
expect(extractReasoningContentFromMessage(message)).toBe(
"step one\n\nstep two",
);
});
test("during streaming, an unclosed <think> tag does not leak its tail into content", () => {
// Simulates accumulated content mid-stream, before </think> arrives.
const message = aiMessage(
"<think>I need to analyze the user's question step by",
);
expect(extractContentFromMessage(message)).toBe("");
expect(extractContentFromMessage(message)).not.toContain("<think>");
expect(extractReasoningContentFromMessage(message)).toBe(
"I need to analyze the user's question step by",
);
});
test("preamble before an unclosed <think> stays in content", () => {
const message = aiMessage(
"Here is part of the answer.<think>but wait, let me reconsider",
);
expect(extractContentFromMessage(message)).toBe(
"Here is part of the answer.",
);
expect(extractReasoningContentFromMessage(message)).toBe(
"but wait, let me reconsider",
);
});
test("closed <think> followed by a trailing unclosed <think> merges both into reasoning", () => {
const message = aiMessage(
"<think>first step</think>partial answer<think>second step still streaming",
);
expect(extractContentFromMessage(message)).toBe("partial answer");
expect(extractReasoningContentFromMessage(message)).toBe(
"first step\n\nsecond step still streaming",
);
});
test("hasReasoning recognises an unclosed <think> tag mid-stream", () => {
expect(hasReasoning(aiMessage("<think>thinking in progress"))).toBe(true);
});
test("hasContent excludes an unclosed <think> tail when no preamble exists", () => {
expect(hasContent(aiMessage("<think>thinking in progress"))).toBe(false);
});
test("hasContent stays true when preamble precedes an unclosed <think>", () => {
expect(hasContent(aiMessage("preamble<think>still thinking"))).toBe(true);
});
test("a lone <think> open tag with no body yields no reasoning and no content", () => {
const message = aiMessage("<think>");
expect(extractContentFromMessage(message)).toBe("");
expect(extractReasoningContentFromMessage(message)).toBeNull();
expect(hasReasoning(message)).toBe(false);
});
test("a literal <think> inside markdown inline code is not treated as reasoning", () => {
const message = aiMessage(
"Use `<think>` markers to delimit reasoning sections.",
);
expect(extractContentFromMessage(message)).toBe(
"Use `<think>` markers to delimit reasoning sections.",
);
expect(extractReasoningContentFromMessage(message)).toBeNull();
expect(hasReasoning(message)).toBe(false);
});
test("a backtick-prefixed <think> mid-stream is not split into reasoning", () => {
// Simulates the moment the model has emitted the opening backtick and
// `<think>` for a literal documentation reference, before the closing
// backtick arrives. The pre-fix behaviour would have permanently
// truncated the content here.
const message = aiMessage("Documentation: `<think>");
expect(extractContentFromMessage(message)).toBe("Documentation: `<think>");
expect(extractReasoningContentFromMessage(message)).toBeNull();
});
});
test("hides internal todo reminder messages from message groups", () => {
const messages = [
{
+34 -1
View File
@@ -71,7 +71,7 @@ if [ -z "$DEER_FLOW_CONFIG_PATH" ]; then
export DEER_FLOW_CONFIG_PATH="$REPO_ROOT/config.yaml"
fi
if [ ! -f "$DEER_FLOW_CONFIG_PATH" ]; then
if [ "$CMD" != "down" ] && [ ! -f "$DEER_FLOW_CONFIG_PATH" ]; then
# Try to seed from repo (config.example.yaml is the canonical template)
if [ -f "$REPO_ROOT/config.example.yaml" ]; then
cp "$REPO_ROOT/config.example.yaml" "$DEER_FLOW_CONFIG_PATH"
@@ -140,6 +140,38 @@ if [ -z "$BETTER_AUTH_SECRET" ]; then
fi
fi
# ── DEER_FLOW_INTERNAL_AUTH_TOKEN ────────────────────────────────────────────
# Shared by all Gateway workers so channel workers can call internal Gateway
# APIs even when the request is handled by a different Uvicorn worker.
_internal_auth_token_file="$DEER_FLOW_HOME/.internal-auth-token"
if [ "$CMD" != "down" ] && [ -z "$DEER_FLOW_INTERNAL_AUTH_TOKEN" ]; then
if [ -f "$_internal_auth_token_file" ]; then
export DEER_FLOW_INTERNAL_AUTH_TOKEN
DEER_FLOW_INTERNAL_AUTH_TOKEN="$(cat "$_internal_auth_token_file")"
echo -e "${GREEN}✓ DEER_FLOW_INTERNAL_AUTH_TOKEN loaded from $_internal_auth_token_file${NC}"
else
export DEER_FLOW_INTERNAL_AUTH_TOKEN
if command -v python3 > /dev/null 2>&1 && \
DEER_FLOW_INTERNAL_AUTH_TOKEN="$(python3 -c 'import sys; sys.version_info >= (3, 6) or sys.exit(1); import secrets; print(secrets.token_urlsafe(32))' 2>/dev/null)"; then
true
elif command -v python > /dev/null 2>&1 && \
DEER_FLOW_INTERNAL_AUTH_TOKEN="$(python -c 'import sys; sys.version_info >= (3, 6) or sys.exit(1); import secrets; print(secrets.token_urlsafe(32))' 2>/dev/null)"; then
true
elif command -v openssl > /dev/null 2>&1 && \
DEER_FLOW_INTERNAL_AUTH_TOKEN="$(openssl rand -hex 32)"; then
true
else
echo -e "${RED}✗ Cannot generate DEER_FLOW_INTERNAL_AUTH_TOKEN: python3, python, and openssl are all unavailable.${NC}" >&2
echo -e "${RED} Set DEER_FLOW_INTERNAL_AUTH_TOKEN manually before running make up.${NC}" >&2
exit 1
fi
echo "$DEER_FLOW_INTERNAL_AUTH_TOKEN" > "$_internal_auth_token_file"
chmod 600 "$_internal_auth_token_file"
echo -e "${GREEN}✓ DEER_FLOW_INTERNAL_AUTH_TOKEN generated → $_internal_auth_token_file${NC}"
fi
fi
# ── detect_sandbox_mode ───────────────────────────────────────────────────────
detect_sandbox_mode() {
@@ -186,6 +218,7 @@ if [ "$CMD" = "down" ]; then
export DEER_FLOW_DOCKER_SOCKET="${DEER_FLOW_DOCKER_SOCKET:-/var/run/docker.sock}"
export DEER_FLOW_REPO_ROOT="${DEER_FLOW_REPO_ROOT:-$REPO_ROOT}"
export BETTER_AUTH_SECRET="${BETTER_AUTH_SECRET:-placeholder}"
export DEER_FLOW_INTERNAL_AUTH_TOKEN="${DEER_FLOW_INTERNAL_AUTH_TOKEN:-placeholder}"
"${COMPOSE_CMD[@]}" down
exit 0
fi
+23
View File
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
"""CLI wrapper for the static blocking IO detector."""
from __future__ import annotations
import sys
from collections.abc import Sequence
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
TEST_SUPPORT_PATH = REPO_ROOT / "backend" / "tests"
if str(TEST_SUPPORT_PATH) not in sys.path:
sys.path.insert(0, str(TEST_SUPPORT_PATH))
def main(argv: Sequence[str] | None = None) -> int:
from support.detectors.blocking_io_static import main as detector_main
return detector_main(argv)
if __name__ == "__main__":
sys.exit(main())