From da41701f87eb1767ccdf5098789fc1accaa7e974 Mon Sep 17 00:00:00 2001 From: AochenShen99 Date: Tue, 26 May 2026 23:30:24 +0800 Subject: [PATCH] Add static blocking IO inventory (#3208) * feat(detectors): add static blocking IO inventory * refactor(detectors): drop superseded runtime probe; clarify static report path - Remove the #2924 custom runtime blocking IO probe entirely: backend/tests/support/detectors/blocking_io.py, backend/tests/test_blocking_io_detector.py, backend/tests/test_blocking_io_probe_integration.py, and the pytest_addoption / pytest_runtest_call / pytest_runtest_teardown / pytest_sessionfinish / pytest_terminal_summary hooks plus the blocking_io_detector fixture from backend/tests/conftest.py. Its narrow DEFAULT_BLOCKING_CALL_SPECS (time.sleep, requests, httpx, os.walk, Path.resolve, Path.read_text, Path.write_text) cannot serve as a CI gate; a Blockbuster-backed runtime detector will land in a separate follow-up PR. Leaving the half-coverage probe alongside the static inventory in this PR added a redundant detect path with no production value. - Address Copilot review comments on backend/README.md and backend/CLAUDE.md by stating explicitly that the JSON report writes to .deer-flow/blocking-io-findings.json at the repository root, whether the target is invoked from the repo root or from backend/. Verified: pytest tests/test_detect_blocking_io_static.py (18 passed), ruff check + format on touched files (passed), make detect-blocking-io from both repo root and backend/ produce the same 105-finding report at /.deer-flow/blocking-io-findings.json. Co-Authored-By: Claude Opus 4.7 --------- Co-authored-by: Claude Opus 4.7 Co-authored-by: Willem Jiang --- Makefile | 6 +- README.md | 6 + backend/CLAUDE.md | 15 + backend/Makefile | 3 + backend/README.md | 13 + backend/tests/conftest.py | 91 -- .../tests/support/detectors/blocking_io.py | 287 ------ .../support/detectors/blocking_io_static.py | 892 ++++++++++++++++++ backend/tests/test_blocking_io_detector.py | 190 ---- .../test_blocking_io_probe_integration.py | 22 - .../tests/test_detect_blocking_io_static.py | 421 +++++++++ scripts/detect_blocking_io_static.py | 23 + 12 files changed, 1378 insertions(+), 591 deletions(-) delete mode 100644 backend/tests/support/detectors/blocking_io.py create mode 100644 backend/tests/support/detectors/blocking_io_static.py delete mode 100644 backend/tests/test_blocking_io_detector.py delete mode 100644 backend/tests/test_blocking_io_probe_integration.py create mode 100644 backend/tests/test_detect_blocking_io_static.py create mode 100644 scripts/detect_blocking_io_static.py diff --git a/Makefile b/Makefile index fb83cd556..81c929634 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # DeerFlow - Unified Development Environment -.PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway +.PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries detect-blocking-io dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway BASH ?= bash BACKEND_UV_RUN = cd backend && uv run @@ -24,6 +24,7 @@ help: @echo " make config-upgrade - Merge new fields from config.example.yaml into config.yaml" @echo " make check - Check if all required tools are installed" @echo " make detect-thread-boundaries - Inventory async/thread boundary points" + @echo " make detect-blocking-io - Inventory blocking IO that may block the backend event loop" @echo " make install - Install all dependencies (frontend + backend + pre-commit hooks)" @echo " make setup-sandbox - Pre-pull sandbox container image (recommended)" @echo " make dev - Start all services in development mode (with hot-reloading)" @@ -55,6 +56,9 @@ doctor: detect-thread-boundaries: @$(PYTHON) ./scripts/detect_thread_boundaries.py +detect-blocking-io: + @$(MAKE) -C backend detect-blocking-io + config: @$(PYTHON) ./scripts/configure.py diff --git a/README.md b/README.md index 83b43fd93..a093b6f10 100644 --- a/README.md +++ b/README.md @@ -740,6 +740,12 @@ DeerFlow has key high-privilege capabilities including **system command executio We welcome contributions! Please see [CONTRIBUTING.md](CONTRIBUTING.md) for development setup, workflow, and guidelines. Regression coverage includes Docker sandbox mode detection and provisioner kubeconfig-path handling tests in `backend/tests/`. +Backend blocking-IO diagnostics are available from the repository root with +`make detect-blocking-io`: it statically scans backend business code for +blocking IO that may run on the backend event loop, prints a concise summary, +and writes complete JSON findings to `.deer-flow/blocking-io-findings.json`. +The JSON includes compact review records with `priority`, `location`, +`blocking_call`, `event_loop_exposure`, `reason`, and `code`. Gateway artifact serving now forces active web content types (`text/html`, `application/xhtml+xml`, `image/svg+xml`) to download as attachments instead of inline rendering, reducing XSS risk for generated artifacts. ## License diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 1d1cbadf8..38e8e1d26 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -97,6 +97,21 @@ make lint # Lint with ruff make format # Format code with ruff ``` +The `detect-blocking-io` target parses `app/`, `packages/harness/deerflow/`, +and `scripts/` with AST. By default it reports only blocking IO candidates that +are inside async code, reachable from async code in the same file, or reachable +from sync-only `AgentMiddleware` before/after hooks that LangGraph can execute +on the async graph path. It prints a concise summary and writes complete JSON +findings to `.deer-flow/blocking-io-findings.json` at the repository root +(both `make detect-blocking-io` from the repo root and `cd backend && make +detect-blocking-io` resolve to the same repo-root path). JSON findings include +`priority`, `location`, `blocking_call`, `event_loop_exposure`, `reason`, and +`code` for model-assisted or manual review. `priority` is a deterministic +review ordering from operation type, not proof of a bug. Bare-name same-file +calls are resolved by function name, so duplicate helper names in one file can +conservatively over-report async reachability. It is intentionally +informational and is not run from CI in this round. + Regression tests related to Docker/provisioner behavior: - `tests/test_docker_sandbox_mode_detection.py` (mode detection from `config.yaml`) - `tests/test_provisioner_kubeconfig.py` (kubeconfig file/directory handling) diff --git a/backend/Makefile b/backend/Makefile index fc8cab1e0..a8ecc602c 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -19,3 +19,6 @@ lint: format: uvx ruff check . --fix && uvx ruff format . + +detect-blocking-io: + @PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run python ../scripts/detect_blocking_io_static.py --output ../.deer-flow/blocking-io-findings.json diff --git a/backend/README.md b/backend/README.md index 0ee0d454b..20ef72d50 100644 --- a/backend/README.md +++ b/backend/README.md @@ -362,6 +362,7 @@ make dev # Run Gateway API + embedded agent runtime (port 8001) make gateway # Run Gateway API without reload (port 8001) make lint # Run linter (ruff) make format # Format code (ruff) +make detect-blocking-io # Inventory blocking IO that may block the backend event loop ``` ### Code Style @@ -378,6 +379,18 @@ make format # Format code (ruff) uv run pytest ``` +`make detect-blocking-io` statically scans backend business code for blocking +IO that may run on the backend event loop and is not test-coverage-bound. It +prints a concise summary for human review and writes complete JSON findings to +`.deer-flow/blocking-io-findings.json` at the repository root (regardless of +whether the target is invoked from the repo root or from `backend/`). JSON +findings include both broad IO category and review-oriented fields such as +`priority`, `location`, `blocking_call`, `event_loop_exposure`, `reason`, and +`code`. `priority` is a deterministic review ordering from the operation type, +not proof of a bug. Bare-name same-file calls are resolved by function name, +so duplicate helper names in one file can conservatively over-report async +reachability. + --- ## Technology Stack diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 5293652f6..03dee4b0c 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -13,16 +13,11 @@ from types import SimpleNamespace from unittest.mock import MagicMock import pytest -from support.detectors.blocking_io import BlockingIOProbe, detect_blocking_io # Make 'app' and 'deerflow' importable from any working directory sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "scripts")) -_BACKEND_ROOT = Path(__file__).resolve().parents[1] -_blocking_io_probe = BlockingIOProbe(_BACKEND_ROOT) -_BLOCKING_IO_DETECTOR_ATTR = "_blocking_io_detector" - # Break the circular import chain that exists in production code: # deerflow.subagents.__init__ # -> .executor (SubagentExecutor, SubagentResult) @@ -63,92 +58,6 @@ def provisioner_module(): return module -@pytest.fixture() -def blocking_io_detector(): - """Fail a focused test if blocking calls run on the event loop thread.""" - with detect_blocking_io(fail_on_exit=True) as detector: - yield detector - - -def pytest_addoption(parser: pytest.Parser) -> None: - group = parser.getgroup("blocking-io") - group.addoption( - "--detect-blocking-io", - action="store_true", - default=False, - help="Collect blocking calls made while an asyncio event loop is running and report a summary.", - ) - group.addoption( - "--detect-blocking-io-fail", - action="store_true", - default=False, - help="Set a failing exit status when --detect-blocking-io records violations.", - ) - - -def pytest_configure(config: pytest.Config) -> None: - config.addinivalue_line("markers", "no_blocking_io_probe: skip the optional blocking IO probe") - - -def pytest_sessionstart(session: pytest.Session) -> None: - if _blocking_io_probe_enabled(session.config): - _blocking_io_probe.clear() - - -@pytest.hookimpl(hookwrapper=True) -def pytest_runtest_call(item: pytest.Item): - if not _blocking_io_probe_enabled(item.config) or _blocking_io_probe_skipped(item): - yield - return - - detector = detect_blocking_io(fail_on_exit=False, stack_limit=18) - detector.__enter__() - setattr(item, _BLOCKING_IO_DETECTOR_ATTR, detector) - yield - - -@pytest.hookimpl(hookwrapper=True) -def pytest_runtest_teardown(item: pytest.Item): - yield - - detector = getattr(item, _BLOCKING_IO_DETECTOR_ATTR, None) - if detector is None: - return - - try: - detector.__exit__(None, None, None) - _blocking_io_probe.record(item.nodeid, detector.violations) - finally: - delattr(item, _BLOCKING_IO_DETECTOR_ATTR) - - -def pytest_sessionfinish(session: pytest.Session) -> None: - if _blocking_io_fail_enabled(session.config) and _blocking_io_probe.violation_count and session.exitstatus == pytest.ExitCode.OK: - session.exitstatus = pytest.ExitCode.TESTS_FAILED - - -def pytest_terminal_summary(terminalreporter: pytest.TerminalReporter) -> None: - if not _blocking_io_probe_enabled(terminalreporter.config): - return - - header, *details = _blocking_io_probe.format_summary().splitlines() - terminalreporter.write_sep("=", header) - for line in details: - terminalreporter.write_line(line) - - -def _blocking_io_probe_enabled(config: pytest.Config) -> bool: - return bool(config.getoption("--detect-blocking-io") or config.getoption("--detect-blocking-io-fail")) - - -def _blocking_io_fail_enabled(config: pytest.Config) -> bool: - return bool(config.getoption("--detect-blocking-io-fail")) - - -def _blocking_io_probe_skipped(item: pytest.Item) -> bool: - return item.path.name == "test_blocking_io_detector.py" or item.get_closest_marker("no_blocking_io_probe") is not None - - # --------------------------------------------------------------------------- # Auto-set user context for every test unless marked no_auto_user # --------------------------------------------------------------------------- diff --git a/backend/tests/support/detectors/blocking_io.py b/backend/tests/support/detectors/blocking_io.py deleted file mode 100644 index c1adfd55a..000000000 --- a/backend/tests/support/detectors/blocking_io.py +++ /dev/null @@ -1,287 +0,0 @@ -"""Test helper for detecting blocking calls on an asyncio event loop. - -The detector is intentionally test-only. It monkeypatches a small set of -well-known blocking entry points and their already-loaded module-level aliases, -then records calls only when they happen on a thread that is currently running -an asyncio event loop. Aliases captured in closures or default arguments remain -out of scope. -""" - -from __future__ import annotations - -import asyncio -import importlib -import sys -import traceback -from collections import Counter -from collections.abc import Callable, Iterable, Iterator -from contextlib import AbstractContextManager -from dataclasses import dataclass -from functools import wraps -from pathlib import Path -from types import TracebackType -from typing import Any - -BlockingCallable = Callable[..., Any] - - -@dataclass(frozen=True) -class BlockingCallSpec: - """Describes one blocking callable to wrap during a detector run.""" - - name: str - target: str - record_on_iteration: bool = False - - -@dataclass(frozen=True) -class BlockingCall: - """One blocking call observed on an asyncio event loop thread.""" - - name: str - target: str - stack: tuple[traceback.FrameSummary, ...] - - -DEFAULT_BLOCKING_CALL_SPECS: tuple[BlockingCallSpec, ...] = ( - BlockingCallSpec("time.sleep", "time:sleep"), - BlockingCallSpec("requests.Session.request", "requests.sessions:Session.request"), - BlockingCallSpec("httpx.Client.request", "httpx:Client.request"), - BlockingCallSpec("os.walk", "os:walk", record_on_iteration=True), - BlockingCallSpec("pathlib.Path.resolve", "pathlib:Path.resolve"), - BlockingCallSpec("pathlib.Path.read_text", "pathlib:Path.read_text"), - BlockingCallSpec("pathlib.Path.write_text", "pathlib:Path.write_text"), -) - - -def _is_event_loop_thread() -> bool: - try: - loop = asyncio.get_running_loop() - except RuntimeError: - return False - return loop.is_running() - - -def _resolve_target(target: str) -> tuple[object, str, BlockingCallable]: - module_name, attr_path = target.split(":", maxsplit=1) - owner: object = importlib.import_module(module_name) - parts = attr_path.split(".") - for part in parts[:-1]: - owner = getattr(owner, part) - - attr_name = parts[-1] - original = getattr(owner, attr_name) - return owner, attr_name, original - - -def _trim_detector_frames(stack: Iterable[traceback.FrameSummary]) -> tuple[traceback.FrameSummary, ...]: - return tuple(frame for frame in stack if frame.filename != __file__) - - -class BlockingIODetector(AbstractContextManager["BlockingIODetector"]): - """Record blocking calls made from async runtime code. - - By default the detector reports violations but does not fail on context - exit. Tests can set ``fail_on_exit=True`` or call - ``assert_no_blocking_calls()`` explicitly. - """ - - def __init__( - self, - specs: Iterable[BlockingCallSpec] = DEFAULT_BLOCKING_CALL_SPECS, - *, - fail_on_exit: bool = False, - patch_loaded_aliases: bool = True, - stack_limit: int = 12, - ) -> None: - self._specs = tuple(specs) - self._fail_on_exit = fail_on_exit - self._patch_loaded_aliases_enabled = patch_loaded_aliases - self._stack_limit = stack_limit - self._patches: list[tuple[object, str, BlockingCallable]] = [] - self._patch_keys: set[tuple[int, str]] = set() - self.violations: list[BlockingCall] = [] - self._active = False - - def __enter__(self) -> BlockingIODetector: - try: - self._active = True - alias_replacements: dict[int, BlockingCallable] = {} - for spec in self._specs: - owner, attr_name, original = _resolve_target(spec.target) - wrapper = self._wrap(spec, original) - self._patch_attribute(owner, attr_name, original, wrapper) - alias_replacements[id(original)] = wrapper - - if self._patch_loaded_aliases_enabled: - self._patch_loaded_module_aliases(alias_replacements) - except Exception: - self._restore() - self._active = False - raise - return self - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc_value: BaseException | None, - traceback_value: TracebackType | None, - ) -> bool | None: - self._restore() - self._active = False - if exc_type is None and self._fail_on_exit: - self.assert_no_blocking_calls() - return None - - def _restore(self) -> None: - for owner, attr_name, original in reversed(self._patches): - setattr(owner, attr_name, original) - self._patches.clear() - self._patch_keys.clear() - - def _patch_attribute(self, owner: object, attr_name: str, original: BlockingCallable, replacement: BlockingCallable) -> None: - key = (id(owner), attr_name) - if key in self._patch_keys: - return - setattr(owner, attr_name, replacement) - self._patches.append((owner, attr_name, original)) - self._patch_keys.add(key) - - def _patch_loaded_module_aliases(self, replacements_by_id: dict[int, BlockingCallable]) -> None: - for module in tuple(sys.modules.values()): - namespace = getattr(module, "__dict__", None) - if not isinstance(namespace, dict): - continue - - for attr_name, value in tuple(namespace.items()): - replacement = replacements_by_id.get(id(value)) - if replacement is not None: - self._patch_attribute(module, attr_name, value, replacement) - - def _wrap(self, spec: BlockingCallSpec, original: BlockingCallable) -> BlockingCallable: - @wraps(original) - def wrapper(*args: Any, **kwargs: Any) -> Any: - if spec.record_on_iteration: - result = original(*args, **kwargs) - return self._wrap_iteration(spec, result) - self._record_if_blocking(spec) - return original(*args, **kwargs) - - return wrapper - - def _wrap_iteration(self, spec: BlockingCallSpec, iterable: Iterable[Any]) -> Iterator[Any]: - iterator = iter(iterable) - reported = False - - while True: - if not reported: - reported = self._record_if_blocking(spec) - try: - yield next(iterator) - except StopIteration: - return - - def _record_if_blocking(self, spec: BlockingCallSpec) -> bool: - if self._active and _is_event_loop_thread(): - stack = _trim_detector_frames(traceback.extract_stack(limit=self._stack_limit)) - self.violations.append(BlockingCall(spec.name, spec.target, stack)) - return True - return False - - def assert_no_blocking_calls(self) -> None: - if self.violations: - raise AssertionError(format_blocking_calls(self.violations)) - - -class BlockingIOProbe: - """Collect detector output across tests and format a compact summary.""" - - def __init__(self, project_root: Path) -> None: - self._project_root = project_root.resolve() - self._observed: list[tuple[str, BlockingCall]] = [] - - @property - def violation_count(self) -> int: - return len(self._observed) - - @property - def test_count(self) -> int: - return len({nodeid for nodeid, _violation in self._observed}) - - def clear(self) -> None: - self._observed.clear() - - def record(self, nodeid: str, violations: Iterable[BlockingCall]) -> None: - for violation in violations: - self._observed.append((nodeid, violation)) - - def format_summary(self, *, limit: int = 30) -> str: - if not self._observed: - return "blocking io probe: no violations" - - call_sites: Counter[tuple[str, str, int, str, str]] = Counter() - for _nodeid, violation in self._observed: - frame = self._local_call_site(violation.stack) - if frame is None: - call_sites[(violation.name, "", 0, "", "")] += 1 - continue - - call_sites[ - ( - violation.name, - self._relative(frame.filename), - frame.lineno, - frame.name, - (frame.line or "").strip(), - ) - ] += 1 - - lines = [f"blocking io probe: {self.violation_count} violations across {self.test_count} tests", "Top call sites:"] - for (name, filename, lineno, function, line), count in call_sites.most_common(limit): - lines.append(f"{count:4d} {name} {filename}:{lineno} {function} | {line}") - return "\n".join(lines) - - def _relative(self, filename: str) -> str: - try: - return str(Path(filename).resolve().relative_to(self._project_root)) - except ValueError: - return filename - - def _local_call_site(self, stack: tuple[traceback.FrameSummary, ...]) -> traceback.FrameSummary | None: - local_frames = [frame for frame in stack if str(self._project_root) in frame.filename and "/.venv/" not in frame.filename and not self._relative(frame.filename).startswith("tests/")] - if local_frames: - return local_frames[-1] - - test_frames = [frame for frame in stack if str(self._project_root) in frame.filename and "/.venv/" not in frame.filename] - return test_frames[-1] if test_frames else None - - -def detect_blocking_io( - specs: Iterable[BlockingCallSpec] = DEFAULT_BLOCKING_CALL_SPECS, - *, - fail_on_exit: bool = False, - patch_loaded_aliases: bool = True, - stack_limit: int = 12, -) -> BlockingIODetector: - """Create a detector context manager for a focused test scope.""" - - return BlockingIODetector(specs, fail_on_exit=fail_on_exit, patch_loaded_aliases=patch_loaded_aliases, stack_limit=stack_limit) - - -def format_blocking_calls(violations: Iterable[BlockingCall]) -> str: - """Format detector output with enough stack context to locate call sites.""" - - lines = ["Blocking calls were executed on an asyncio event loop thread:"] - for index, violation in enumerate(violations, start=1): - lines.append(f"{index}. {violation.name} ({violation.target})") - lines.extend(_format_stack(violation.stack)) - return "\n".join(lines) - - -def _format_stack(stack: Iterable[traceback.FrameSummary]) -> Iterator[str]: - for frame in stack: - location = f"{frame.filename}:{frame.lineno}" - lines = [f" at {frame.name} ({location})"] - if frame.line: - lines.append(f" {frame.line.strip()}") - yield from lines diff --git a/backend/tests/support/detectors/blocking_io_static.py b/backend/tests/support/detectors/blocking_io_static.py new file mode 100644 index 000000000..aa9482d08 --- /dev/null +++ b/backend/tests/support/detectors/blocking_io_static.py @@ -0,0 +1,892 @@ +#!/usr/bin/env python3 +"""Static inventory for likely backend event-loop blocking IO. + +This detector parses backend business source with AST so untested paths are +still visible during review. Findings are prioritized static candidates, not +automatic bug decisions. +""" + +from __future__ import annotations + +import argparse +import ast +import json +import os +import sys +from collections import Counter, defaultdict, deque +from collections.abc import Callable, Iterable, Sequence +from dataclasses import dataclass +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[4] +DEFAULT_SCAN_PATHS = ( + REPO_ROOT / "backend" / "app", + REPO_ROOT / "backend" / "packages" / "harness" / "deerflow", + REPO_ROOT / "backend" / "scripts", +) +IGNORED_DIR_NAMES = { + ".git", + ".mypy_cache", + ".pytest_cache", + ".ruff_cache", + ".venv", + "__pycache__", + "node_modules", +} +CODE_SNIPPET_LIMIT = 200 + +PATH_METHOD_NAMES = { + "exists", + "glob", + "hardlink_to", + "is_dir", + "is_file", + "iterdir", + "mkdir", + "open", + "readlink", + "read_bytes", + "read_text", + "rename", + "resolve", + "rglob", + "rmdir", + "samefile", + "stat", + "symlink_to", + "touch", + "unlink", + "write_bytes", + "write_text", +} +AMBIGUOUS_PATH_METHOD_NAMES = {"replace"} +HTTP_METHOD_NAMES = { + "delete", + "get", + "head", + "options", + "patch", + "post", + "put", + "request", + "stream", +} +BUILTIN_OPEN_NAMES = {"builtins.open", "io.open", "open"} +BLOCKING_SLEEP_NAMES = {"time.sleep"} +BLOCKING_OS_FILE_NAMES = { + "os.listdir", + "os.lstat", + "os.makedirs", + "os.mkdir", + "os.remove", + "os.rename", + "os.replace", + "os.rmdir", + "os.scandir", + "os.stat", + "os.unlink", + "os.walk", + "os.path.exists", + "os.path.getsize", + "os.path.isdir", + "os.path.isfile", +} +BLOCKING_SUBPROCESS_NAMES = { + "subprocess.Popen", + "subprocess.check_call", + "subprocess.check_output", + "subprocess.run", +} +BLOCKING_HTTP_NAMES = { + "requests.delete", + "requests.get", + "requests.head", + "requests.options", + "requests.patch", + "requests.post", + "requests.put", + "requests.request", + "requests.sessions.Session.request", + "httpx.delete", + "httpx.get", + "httpx.head", + "httpx.options", + "httpx.patch", + "httpx.post", + "httpx.put", + "httpx.request", + "httpx.stream", + "urllib.request.urlopen", +} +SYNC_HTTP_CLIENT_FACTORIES = { + "httpx.Client": "httpx.Client", + "requests.Session": "requests.Session", + "requests.sessions.Session": "requests.Session", + "requests.session": "requests.Session", +} +BLOCKING_SHUTIL_NAMES = { + "shutil.copy", + "shutil.copyfile", + "shutil.copytree", + "shutil.move", + "shutil.rmtree", +} +SYNC_AGENT_MIDDLEWARE_HOOKS = { + "before_agent": "abefore_agent", + "before_model": "abefore_model", + "after_model": "aafter_model", + "after_agent": "aafter_agent", +} +PATH_METHOD_OPERATIONS = { + "exists": "FILE_METADATA", + "glob": "FILE_ENUMERATION", + "hardlink_to": "FILE_WRITE", + "is_dir": "FILE_METADATA", + "is_file": "FILE_METADATA", + "iterdir": "FILE_ENUMERATION", + "mkdir": "FILE_WRITE", + "open": "FILE_OPEN", + "readlink": "FILE_METADATA", + "read_bytes": "FILE_READ", + "read_text": "FILE_READ", + "rename": "FILE_COPY_MOVE", + "replace": "FILE_COPY_MOVE", + "resolve": "FILE_METADATA", + "rglob": "FILE_ENUMERATION", + "rmdir": "FILE_DELETE", + "samefile": "FILE_METADATA", + "stat": "FILE_METADATA", + "symlink_to": "FILE_WRITE", + "touch": "FILE_WRITE", + "unlink": "FILE_DELETE", + "write_bytes": "FILE_WRITE", + "write_text": "FILE_WRITE", +} +OS_FILE_OPERATIONS = { + "os.listdir": "FILE_ENUMERATION", + "os.lstat": "FILE_METADATA", + "os.makedirs": "FILE_WRITE", + "os.mkdir": "FILE_WRITE", + "os.remove": "FILE_DELETE", + "os.rename": "FILE_COPY_MOVE", + "os.replace": "FILE_COPY_MOVE", + "os.rmdir": "FILE_DELETE", + "os.scandir": "FILE_ENUMERATION", + "os.stat": "FILE_METADATA", + "os.unlink": "FILE_DELETE", + "os.walk": "FILE_ENUMERATION", + "os.path.exists": "FILE_METADATA", + "os.path.getsize": "FILE_METADATA", + "os.path.isdir": "FILE_METADATA", + "os.path.isfile": "FILE_METADATA", +} +SHUTIL_OPERATIONS = { + "shutil.copy": "FILE_COPY_MOVE", + "shutil.copyfile": "FILE_COPY_MOVE", + "shutil.copytree": "FILE_TREE_COPY", + "shutil.move": "FILE_COPY_MOVE", + "shutil.rmtree": "FILE_TREE_DELETE", +} +OPERATION_BASE_PRIORITY = { + "FILE_METADATA": "LOW", + "FILE_OPEN": "MEDIUM", + "FILE_READ": "MEDIUM", + "FILE_WRITE": "MEDIUM", + "FILE_ENUMERATION": "HIGH", + "FILE_DELETE": "MEDIUM", + "FILE_COPY_MOVE": "HIGH", + "FILE_TREE_COPY": "HIGH", + "FILE_TREE_DELETE": "HIGH", + "HTTP_REQUEST": "HIGH", + "SUBPROCESS": "HIGH", + "SLEEP": "HIGH", + "PARSE_ERROR": "MEDIUM", +} + + +@dataclass(frozen=True) +class BlockingIOStaticFinding: + category: str + operation: str + priority: str + path: str + line: int + column: int + function: str + exposure: str + symbol: str + code: str + + def to_dict(self) -> dict[str, object]: + return { + "priority": self.priority, + "location": { + "path": self.path, + "line": self.line, + "column": self.column + 1, + "function": self.function, + }, + "blocking_call": { + "category": self.category, + "operation": self.operation, + "symbol": self.symbol, + }, + "event_loop_exposure": self.exposure, + "reason": _finding_reason(self.operation, self.exposure), + "code": self.code, + } + + +@dataclass(frozen=True) +class _FunctionContext: + qualname: str + class_name: str | None + is_async: bool + + +@dataclass(frozen=True) +class _FunctionInfo: + is_async: bool + + +@dataclass(frozen=True) +class _CallRef: + name: str + class_name: str | None + self_method: bool + + +@dataclass(frozen=True) +class _PotentialFinding: + category: str + operation: str + path: str + line: int + column: int + function: str + symbol: str + code: str + + +@dataclass(frozen=True) +class _BlockingRule: + category: str + operation: str + symbol: str + + +def dotted_name(node: ast.AST | None) -> str | None: + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Attribute): + parent = dotted_name(node.value) + if parent: + return f"{parent}.{node.attr}" + return node.attr + if isinstance(node, ast.Call): + return dotted_name(node.func) + if isinstance(node, ast.Subscript): + return dotted_name(node.value) + return None + + +def relative_to_repo(path: Path, repo_root: Path = REPO_ROOT) -> str: + try: + return path.resolve().relative_to(repo_root.resolve()).as_posix() + except ValueError: + return path.as_posix() + + +def _source_snippet(source_lines: Sequence[str], line: int) -> str: + if not 0 < line <= len(source_lines): + return "" + snippet = source_lines[line - 1].strip() + if len(snippet) <= CODE_SNIPPET_LIMIT: + return snippet + return f"{snippet[:CODE_SNIPPET_LIMIT]}..." + + +class BlockingIOStaticVisitor(ast.NodeVisitor): + def __init__(self, relative_path: str, source_lines: Sequence[str]) -> None: + self.relative_path = relative_path + self.source_lines = source_lines + self.import_aliases: dict[str, str] = {} + self.class_stack: list[str] = [] + self.function_stack: list[_FunctionContext] = [] + self.module_context = _FunctionContext("", None, False) + self.module_sync_http_clients: dict[str, str] = {} + self.sync_http_client_stack: list[dict[str, str]] = [] + self.class_bases: dict[str, set[str]] = defaultdict(set) + self.class_methods: dict[str, set[str]] = defaultdict(set) + self.function_defs: dict[str, _FunctionInfo] = {} + self.functions_by_name: dict[str, list[str]] = defaultdict(list) + self.call_refs: dict[str, list[_CallRef]] = defaultdict(list) + self.path_like_name_stack: list[set[str]] = [] + self.potential_findings: list[_PotentialFinding] = [] + + @property + def current_function(self) -> _FunctionContext | None: + return self.function_stack[-1] if self.function_stack else None + + @property + def current_context(self) -> _FunctionContext: + return self.current_function or self.module_context + + @property + def current_sync_http_clients(self) -> dict[str, str]: + return self.sync_http_client_stack[-1] if self.sync_http_client_stack else self.module_sync_http_clients + + def visit_Import(self, node: ast.Import) -> None: + for alias in node.names: + local_name = alias.asname or alias.name.split(".", 1)[0] + canonical_name = alias.name if alias.asname else local_name + self.import_aliases[local_name] = canonical_name + + def visit_ImportFrom(self, node: ast.ImportFrom) -> None: + if node.module is None: + return + for alias in node.names: + local_name = alias.asname or alias.name + self.import_aliases[local_name] = f"{node.module}.{alias.name}" + + def visit_ClassDef(self, node: ast.ClassDef) -> None: + class_name = ".".join((*self.class_stack, node.name)) if self.class_stack else node.name + self.class_bases[class_name].update(canonical_name for base in node.bases if (canonical_name := self._canonical_name(dotted_name(base))) is not None) + self.class_stack.append(node.name) + self.generic_visit(node) + self.class_stack.pop() + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self._visit_function(node, is_async=False) + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self._visit_function(node, is_async=True) + + def visit_Assign(self, node: ast.Assign) -> None: + self._record_sync_http_client_targets(node.value, node.targets) + self.generic_visit(node) + + def visit_AnnAssign(self, node: ast.AnnAssign) -> None: + self._record_path_like_annotation(node.annotation, [node.target]) + if node.value is not None: + self._record_sync_http_client_targets(node.value, [node.target]) + self.generic_visit(node) + + def visit_With(self, node: ast.With) -> None: + temporary_clients: dict[str, str | None] = {} + current_clients = self.current_sync_http_clients + for item in node.items: + self.visit(item.context_expr) + client_base = self._sync_http_client_factory_base(item.context_expr) + if client_base is None or not isinstance(item.optional_vars, ast.Name): + continue + name = item.optional_vars.id + temporary_clients[name] = current_clients.get(name) + current_clients[name] = client_base + + try: + for statement in node.body: + self.visit(statement) + finally: + for name, previous in temporary_clients.items(): + if previous is None: + current_clients.pop(name, None) + else: + current_clients[name] = previous + + def visit_Call(self, node: ast.Call) -> None: + current = self.current_context + call_name = self._canonical_name(dotted_name(node.func)) + if call_name is not None: + self._record_call_ref(node, call_name, current) + self._record_blocking_candidate(node, call_name, current) + self.generic_visit(node) + + def _visit_function(self, node: ast.FunctionDef | ast.AsyncFunctionDef, *, is_async: bool) -> None: + qualname = ".".join((*self.class_stack, node.name)) if self.class_stack else node.name + class_name = self.class_stack[-1] if self.class_stack else None + context = _FunctionContext(qualname, class_name, is_async) + self.function_defs[qualname] = _FunctionInfo(is_async) + self.functions_by_name[node.name].append(qualname) + if class_name is not None: + self.class_methods[class_name].add(node.name) + self.function_stack.append(context) + self.sync_http_client_stack.append({}) + self.path_like_name_stack.append(set(_path_like_argument_names(node.args, self._canonical_name))) + self.generic_visit(node) + self.path_like_name_stack.pop() + self.sync_http_client_stack.pop() + self.function_stack.pop() + + def _canonical_name(self, name: str | None) -> str | None: + if name is None: + return None + parts = name.split(".") + if parts and parts[0] in self.import_aliases: + return ".".join((self.import_aliases[parts[0]], *parts[1:])) + return name + + def _record_call_ref(self, node: ast.Call, call_name: str, current: _FunctionContext) -> None: + if current.qualname == "": + return + if isinstance(node.func, ast.Name): + self.call_refs[current.qualname].append(_CallRef(node.func.id, current.class_name, self_method=False)) + return + if not isinstance(node.func, ast.Attribute): + return + receiver = dotted_name(node.func.value) + if receiver in {"self", "cls"}: + self.call_refs[current.qualname].append(_CallRef(node.func.attr, current.class_name, self_method=True)) + return + # Keep same-module direct calls through canonical aliases out of the call graph. + # External calls are handled as blocking candidates instead. + if "." not in call_name: + self.call_refs[current.qualname].append(_CallRef(call_name, current.class_name, self_method=False)) + + def _record_blocking_candidate(self, node: ast.Call, call_name: str, current: _FunctionContext) -> None: + rule = self._blocking_rule(node, call_name) + if rule is None: + return + line = getattr(node, "lineno", 0) + column = getattr(node, "col_offset", 0) + code = _source_snippet(self.source_lines, line) + self.potential_findings.append( + _PotentialFinding( + category=rule.category, + operation=rule.operation, + path=self.relative_path, + line=line, + column=column, + function=current.qualname, + symbol=rule.symbol, + code=code, + ) + ) + + def _blocking_rule(self, node: ast.Call, call_name: str) -> _BlockingRule | None: + sync_client_symbol = self._sync_http_client_method_symbol(call_name) + if sync_client_symbol is not None: + return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", sync_client_symbol) + chained_client_symbol = _sync_http_client_chained_method_symbol(call_name) + if chained_client_symbol is not None: + return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", chained_client_symbol) + leaf_name = call_name.rsplit(".", 1)[-1] + if call_name in BUILTIN_OPEN_NAMES: + return _BlockingRule("BLOCKING_FILE_IO", "FILE_OPEN", call_name) + if leaf_name in PATH_METHOD_NAMES | AMBIGUOUS_PATH_METHOD_NAMES: + if self._is_path_method_call(node): + return _BlockingRule("BLOCKING_FILE_IO", _path_method_operation(leaf_name), call_name) + if call_name in BLOCKING_OS_FILE_NAMES: + return _BlockingRule("BLOCKING_FILE_IO", OS_FILE_OPERATIONS[call_name], call_name) + if call_name in BLOCKING_SLEEP_NAMES: + return _BlockingRule("BLOCKING_SLEEP", "SLEEP", call_name) + if call_name in BLOCKING_SUBPROCESS_NAMES: + return _BlockingRule("BLOCKING_SUBPROCESS", "SUBPROCESS", call_name) + if call_name in BLOCKING_HTTP_NAMES: + return _BlockingRule("BLOCKING_HTTP_IO", "HTTP_REQUEST", call_name) + if call_name in BLOCKING_SHUTIL_NAMES: + return _BlockingRule("BLOCKING_FILE_IO", SHUTIL_OPERATIONS[call_name], call_name) + return None + + def _is_path_method_call(self, node: ast.Call) -> bool: + if not isinstance(node.func, ast.Attribute): + return False + if node.func.attr in AMBIGUOUS_PATH_METHOD_NAMES and node.func.attr == "replace" and len(node.args) >= 2: + return False + receiver = node.func.value + if _is_constructed_path(receiver): + return True + receiver_name = dotted_name(receiver) + if receiver_name in self.current_path_like_names: + return True + if _looks_like_path_receiver_name(receiver_name): + return True + if node.func.attr in PATH_METHOD_NAMES and isinstance(receiver, ast.Attribute): + return True + return False + + @property + def current_path_like_names(self) -> set[str]: + return self.path_like_name_stack[-1] if self.path_like_name_stack else set() + + def _record_path_like_annotation(self, annotation: ast.AST, targets: Iterable[ast.AST]) -> None: + if not self.path_like_name_stack or not _is_path_annotation(annotation, self._canonical_name): + return + self.current_path_like_names.update(name for target in targets for name in _iter_assigned_names(target)) + + def _record_sync_http_client_targets(self, value: ast.AST, targets: Iterable[ast.AST]) -> None: + client_base = self._sync_http_client_factory_base(value) + if client_base is None: + return + current_clients = self.current_sync_http_clients + for target in targets: + for name in _iter_assigned_names(target): + current_clients[name] = client_base + + def _sync_http_client_factory_base(self, node: ast.AST) -> str | None: + if not isinstance(node, ast.Call): + return None + call_name = self._canonical_name(dotted_name(node.func)) + if call_name is None: + return None + return SYNC_HTTP_CLIENT_FACTORIES.get(call_name) + + def _sync_http_client_method_symbol(self, call_name: str) -> str | None: + parts = call_name.split(".") + if len(parts) != 2 or parts[1] not in HTTP_METHOD_NAMES: + return None + client_base = self.current_sync_http_clients.get(parts[0]) + if client_base is None: + return None + return f"{client_base}.{parts[1]}" + + +def _path_method_operation(method_name: str) -> str: + return PATH_METHOD_OPERATIONS.get(method_name, "FILE_METADATA") + + +def _is_constructed_path(node: ast.AST) -> bool: + return isinstance(node, ast.Call) and dotted_name(node.func) in {"Path", "pathlib.Path"} + + +def _looks_like_path_receiver_name(receiver_name: str | None) -> bool: + if receiver_name is None: + return False + leaf = receiver_name.rsplit(".", 1)[-1].lower() + return leaf in {"path", "file_path", "dir_path", "target", "dest", "destination", "source"} or leaf.endswith(("_path", "_dir", "_file", "_root")) or "path" in leaf + + +def _is_path_annotation(annotation: ast.AST | None, canonical_name: Callable[[str | None], str | None]) -> bool: + if annotation is None: + return False + if isinstance(annotation, ast.BinOp) and isinstance(annotation.op, ast.BitOr): + return _is_path_annotation(annotation.left, canonical_name) or _is_path_annotation(annotation.right, canonical_name) + name = dotted_name(annotation) + canonical = canonical_name(name) + if canonical in {"pathlib.Path", "Path"}: + return True + if isinstance(annotation, ast.Subscript): + return _is_path_annotation(annotation.slice, canonical_name) + return False + + +def _path_like_argument_names(arguments: ast.arguments, canonical_name: Callable[[str | None], str | None]) -> Iterable[str]: + candidates = [*arguments.posonlyargs, *arguments.args, *arguments.kwonlyargs] + if arguments.vararg is not None: + candidates.append(arguments.vararg) + if arguments.kwarg is not None: + candidates.append(arguments.kwarg) + for argument in candidates: + if _is_path_annotation(argument.annotation, canonical_name): + yield argument.arg + + +def _iter_assigned_names(target: ast.AST) -> Iterable[str]: + if isinstance(target, ast.Name): + yield target.id + return + if isinstance(target, (ast.Tuple, ast.List)): + for element in target.elts: + yield from _iter_assigned_names(element) + + +def _sync_http_client_chained_method_symbol(call_name: str) -> str | None: + for factory_name, client_base in SYNC_HTTP_CLIENT_FACTORIES.items(): + prefix = f"{factory_name}." + if not call_name.startswith(prefix): + continue + method_name = call_name[len(prefix) :] + if method_name in HTTP_METHOD_NAMES: + return f"{client_base}.{method_name}" + return None + + +def _resolve_call_ref(visitor: BlockingIOStaticVisitor, ref: _CallRef) -> list[str]: + if ref.self_method and ref.class_name is not None: + qualname = f"{ref.class_name}.{ref.name}" + return [qualname] if qualname in visitor.function_defs else [] + return list(visitor.functions_by_name.get(ref.name, ())) + + +def _reachable_functions(visitor: BlockingIOStaticVisitor, roots: Iterable[str]) -> set[str]: + reachable = set(roots) + queue: deque[str] = deque(reachable) + while queue: + qualname = queue.popleft() + for ref in visitor.call_refs.get(qualname, ()): + for target in _resolve_call_ref(visitor, ref): + if target in reachable: + continue + reachable.add(target) + queue.append(target) + return reachable + + +def _async_reachable_functions(visitor: BlockingIOStaticVisitor) -> set[str]: + return _reachable_functions( + visitor, + (qualname for qualname, info in visitor.function_defs.items() if info.is_async), + ) + + +def _agent_middleware_classes(visitor: BlockingIOStaticVisitor) -> set[str]: + middleware_classes: set[str] = set() + changed = True + while changed: + changed = False + for class_name, bases in visitor.class_bases.items(): + if class_name in middleware_classes: + continue + if any(_is_agent_middleware_base(base, middleware_classes) for base in bases): + middleware_classes.add(class_name) + changed = True + return middleware_classes + + +def _is_agent_middleware_base(base: str, known_middleware_classes: set[str]) -> bool: + leaf = base.rsplit(".", 1)[-1] + return leaf == "AgentMiddleware" or leaf in known_middleware_classes + + +def _sync_only_agent_middleware_entrypoints(visitor: BlockingIOStaticVisitor) -> set[str]: + entrypoints: set[str] = set() + middleware_classes = _agent_middleware_classes(visitor) + for class_name in middleware_classes: + methods = visitor.class_methods.get(class_name, set()) + for sync_hook, async_hook in SYNC_AGENT_MIDDLEWARE_HOOKS.items(): + if sync_hook in methods and async_hook not in methods: + qualname = f"{class_name}.{sync_hook}" + if qualname in visitor.function_defs: + entrypoints.add(qualname) + return entrypoints + + +def _event_loop_exposures( + visitor: BlockingIOStaticVisitor, + async_reachable: set[str], + middleware_reachable: set[str], +) -> dict[str, str]: + exposures: dict[str, str] = {} + for qualname, info in visitor.function_defs.items(): + if info.is_async: + exposures[qualname] = "DIRECT_ASYNC" + for qualname in async_reachable: + exposures.setdefault(qualname, "ASYNC_REACHABLE_SAME_FILE") + for qualname in middleware_reachable: + exposures.setdefault(qualname, "SYNC_AGENT_MIDDLEWARE_HOOK") + return exposures + + +def _priority(operation: str) -> str: + return OPERATION_BASE_PRIORITY[operation] + + +def _finding_reason(operation: str, exposure: str) -> str: + if exposure == "DIRECT_ASYNC": + return f"{operation} is called directly inside an async function." + if exposure == "ASYNC_REACHABLE_SAME_FILE": + return f"{operation} is statically reachable from an async function in the same file." + if exposure == "SYNC_AGENT_MIDDLEWARE_HOOK": + return f"{operation} is statically reachable from a sync AgentMiddleware hook used by the async graph." + return "Source could not be parsed; scan coverage is incomplete for this file." + + +def _finalize_findings(visitor: BlockingIOStaticVisitor) -> list[BlockingIOStaticFinding]: + reachable = _async_reachable_functions(visitor) + middleware_reachable = _reachable_functions(visitor, _sync_only_agent_middleware_entrypoints(visitor)) + event_loop_exposures = _event_loop_exposures(visitor, reachable, middleware_reachable) + findings: list[BlockingIOStaticFinding] = [] + for candidate in visitor.potential_findings: + exposure = event_loop_exposures.get(candidate.function) + if exposure is None: + continue + findings.append( + BlockingIOStaticFinding( + category=candidate.category, + operation=candidate.operation, + priority=_priority(candidate.operation), + path=candidate.path, + line=candidate.line, + column=candidate.column, + function=candidate.function, + exposure=exposure, + symbol=candidate.symbol, + code=candidate.code, + ) + ) + return findings + + +def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]: + source = path.read_text(encoding="utf-8") + source_lines = source.splitlines() + relative_path = relative_to_repo(path, repo_root) + try: + tree = ast.parse(source, filename=str(path)) + except SyntaxError as exc: + line = exc.lineno or 0 + code = _source_snippet(source_lines, line) + return [ + BlockingIOStaticFinding( + category="PARSE_ERROR", + operation="PARSE_ERROR", + priority="MEDIUM", + path=relative_path, + line=line, + column=max((exc.offset or 1) - 1, 0), + function="", + exposure="PARSE_INCOMPLETE", + symbol="SyntaxError", + code=code, + ) + ] + + visitor = BlockingIOStaticVisitor(relative_path, source_lines) + visitor.visit(tree) + return sorted(_finalize_findings(visitor), key=lambda finding: (finding.path, finding.line, finding.column, finding.category)) + + +def is_ignored_path(path: Path) -> bool: + return any(part in IGNORED_DIR_NAMES for part in path.parts) + + +def iter_python_files(paths: Iterable[Path]) -> Iterable[Path]: + for path in paths: + if not path.exists() or is_ignored_path(path): + continue + if path.is_file(): + if path.suffix == ".py" and not is_ignored_path(path): + yield path + continue + for dirpath, dirnames, filenames in os.walk(path): + dirnames[:] = [dirname for dirname in dirnames if dirname not in IGNORED_DIR_NAMES] + for filename in filenames: + if filename.endswith(".py"): + yield Path(dirpath) / filename + + +def scan_paths(paths: Iterable[Path], *, repo_root: Path = REPO_ROOT) -> list[BlockingIOStaticFinding]: + findings: list[BlockingIOStaticFinding] = [] + for path in sorted(iter_python_files(paths)): + findings.extend(scan_file(path, repo_root=repo_root)) + return sorted(findings, key=lambda finding: (finding.path, finding.line, finding.column, finding.category)) + + +def findings_to_json(findings: Sequence[BlockingIOStaticFinding]) -> str: + return json.dumps([finding.to_dict() for finding in findings], indent=2) + "\n" + + +def write_json_report(findings: Sequence[BlockingIOStaticFinding], output_path: Path) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(findings_to_json(findings), encoding="utf-8") + + +def _scan_root(path: str) -> str: + parts = path.split("/") + if parts[:4] == ["backend", "packages", "harness", "deerflow"]: + return "backend/packages/harness/deerflow" + if len(parts) >= 2 and parts[0] == "backend": + return "/".join(parts[:2]) + return parts[0] if parts else path + + +def _format_counter(title: str, counter: Counter[str], *, limit: int | None = None, order: Sequence[str] | None = None) -> list[str]: + lines = [title] + if order is None: + items = sorted(counter.items(), key=lambda item: (-item[1], item[0])) + else: + ordered = [(name, counter[name]) for name in order if counter.get(name)] + ordered_names = {name for name, _ in ordered} + extras = sorted((item for item in counter.items() if item[0] not in ordered_names), key=lambda item: (-item[1], item[0])) + items = ordered + extras + if limit is not None: + items = items[:limit] + width = max((len(str(count)) for _, count in items), default=1) + lines.extend(f" {count:>{width}} {name}" for name, count in items) + return lines + + +def format_summary(findings: Sequence[BlockingIOStaticFinding], *, output_path: Path | None = None) -> str: + if not findings: + lines = ["No static blocking IO event-loop risk findings in backend business code."] + else: + lines = [ + f"Static blocking IO event-loop risk findings: {len(findings)}", + "", + *_format_counter("By category:", Counter(finding.category for finding in findings)), + "", + *_format_counter("By priority:", Counter(finding.priority for finding in findings), order=("HIGH", "MEDIUM", "LOW")), + "", + *_format_counter("By operation:", Counter(finding.operation for finding in findings)), + "", + *_format_counter("By event-loop exposure:", Counter(finding.exposure for finding in findings)), + "", + *_format_counter("By scan root:", Counter(_scan_root(finding.path) for finding in findings)), + "", + *_format_counter("Top files:", Counter(finding.path for finding in findings), limit=10), + ] + + if output_path is not None: + lines.extend(["", f"Full JSON report: {relative_to_repo(output_path.resolve())}"]) + else: + lines.extend(["", "Use --format json for full structured findings."]) + return "\n".join(lines) + + +def format_text(findings: Sequence[BlockingIOStaticFinding]) -> str: + if not findings: + return "No static blocking IO event-loop risk findings in backend business code." + + lines: list[str] = [] + for finding in findings: + lines.append(f"{finding.priority} {finding.category}/{finding.operation} {finding.path}:{finding.line}:{finding.column + 1} in {finding.function} exposure={finding.exposure}") + lines.append(f" symbol: {finding.symbol}") + lines.append(f" reason: {_finding_reason(finding.operation, finding.exposure)}") + if finding.code: + lines.append(f" code: {finding.code}") + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=("Statically inventory blocking IO calls that may block the backend asyncio event loop. Findings are prioritized review candidates, not automatic bug decisions.")) + parser.add_argument( + "paths", + nargs="*", + type=Path, + help="Files or directories to scan. Defaults to backend app and harness sources.", + ) + parser.add_argument( + "--format", + choices=("summary", "text", "json"), + default="summary", + help="Output format.", + ) + parser.add_argument( + "--output", + type=Path, + help="Write the complete finding list as JSON to this file.", + ) + return parser + + +def main(argv: Sequence[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + paths = args.paths or list(DEFAULT_SCAN_PATHS) + findings = scan_paths(paths) + output_path = args.output + + if output_path is not None: + write_json_report(findings, output_path) + + if args.format == "summary": + print(format_summary(findings, output_path=output_path)) + elif args.format == "json": + print(findings_to_json(findings), end="") + else: + print(format_text(findings)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/backend/tests/test_blocking_io_detector.py b/backend/tests/test_blocking_io_detector.py deleted file mode 100644 index af44d746d..000000000 --- a/backend/tests/test_blocking_io_detector.py +++ /dev/null @@ -1,190 +0,0 @@ -from __future__ import annotations - -import asyncio -import os -import time -from os import walk as imported_walk -from pathlib import Path -from time import sleep as imported_sleep - -import httpx -import pytest -import requests -from support.detectors.blocking_io import ( - BlockingCallSpec, - BlockingIOProbe, - detect_blocking_io, -) - -pytestmark = pytest.mark.asyncio - - -TIME_SLEEP_ONLY = (BlockingCallSpec("time.sleep", "time:sleep"),) -REQUESTS_ONLY = (BlockingCallSpec("requests.Session.request", "requests.sessions:Session.request"),) -HTTPX_ONLY = (BlockingCallSpec("httpx.Client.request", "httpx:Client.request"),) -OS_WALK_ONLY = (BlockingCallSpec("os.walk", "os:walk", record_on_iteration=True),) -PATH_READ_TEXT_ONLY = (BlockingCallSpec("pathlib.Path.read_text", "pathlib:Path.read_text"),) - - -async def test_records_time_sleep_on_event_loop() -> None: - with detect_blocking_io(TIME_SLEEP_ONLY) as detector: - time.sleep(0) - - assert [violation.name for violation in detector.violations] == ["time.sleep"] - - -async def test_records_already_imported_sleep_alias_on_event_loop() -> None: - original_alias = imported_sleep - - with detect_blocking_io(TIME_SLEEP_ONLY) as detector: - imported_sleep(0) - - assert imported_sleep is original_alias - assert [violation.name for violation in detector.violations] == ["time.sleep"] - - -async def test_can_disable_loaded_alias_patching() -> None: - with detect_blocking_io(TIME_SLEEP_ONLY, patch_loaded_aliases=False) as detector: - imported_sleep(0) - - assert detector.violations == [] - - -async def test_does_not_record_time_sleep_offloaded_to_thread() -> None: - with detect_blocking_io(TIME_SLEEP_ONLY) as detector: - await asyncio.to_thread(time.sleep, 0) - - assert detector.violations == [] - - -async def test_fixture_allows_offloaded_sync_work(blocking_io_detector) -> None: - await asyncio.to_thread(time.sleep, 0) - - assert blocking_io_detector.violations == [] - - -async def test_does_not_record_sync_call_without_running_event_loop() -> None: - def call_sleep() -> list[str]: - with detect_blocking_io(TIME_SLEEP_ONLY) as detector: - time.sleep(0) - return [violation.name for violation in detector.violations] - - assert await asyncio.to_thread(call_sleep) == [] - - -async def test_fail_on_exit_includes_call_site() -> None: - with pytest.raises(AssertionError) as exc_info: - with detect_blocking_io(TIME_SLEEP_ONLY, fail_on_exit=True): - time.sleep(0) - - message = str(exc_info.value) - assert "time.sleep" in message - assert "test_fail_on_exit_includes_call_site" in message - - -async def test_records_requests_session_request_without_real_network(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_request(self: requests.Session, method: str, url: str, **kwargs: object) -> str: - return f"{method}:{url}" - - monkeypatch.setattr(requests.sessions.Session, "request", fake_request) - - with detect_blocking_io(REQUESTS_ONLY) as detector: - assert requests.get("https://example.invalid") == "get:https://example.invalid" - - assert [violation.name for violation in detector.violations] == ["requests.Session.request"] - - -async def test_records_sync_httpx_client_request_without_real_network(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_request(self: httpx.Client, method: str, url: str, **kwargs: object) -> httpx.Response: - return httpx.Response(200, request=httpx.Request(method, url)) - - monkeypatch.setattr(httpx.Client, "request", fake_request) - - with detect_blocking_io(HTTPX_ONLY) as detector: - with httpx.Client() as client: - response = client.get("https://example.invalid") - - assert response.status_code == 200 - assert [violation.name for violation in detector.violations] == ["httpx.Client.request"] - - -async def test_records_os_walk_on_event_loop(tmp_path: Path) -> None: - (tmp_path / "nested").mkdir() - - with detect_blocking_io(OS_WALK_ONLY) as detector: - assert list(os.walk(tmp_path)) - - assert [violation.name for violation in detector.violations] == ["os.walk"] - - -async def test_records_already_imported_os_walk_alias_on_iteration(tmp_path: Path) -> None: - (tmp_path / "nested").mkdir() - original_alias = imported_walk - - with detect_blocking_io(OS_WALK_ONLY) as detector: - assert list(imported_walk(tmp_path)) - - assert imported_walk is original_alias - assert [violation.name for violation in detector.violations] == ["os.walk"] - - -async def test_does_not_record_os_walk_before_iteration(tmp_path: Path) -> None: - with detect_blocking_io(OS_WALK_ONLY) as detector: - walker = os.walk(tmp_path) - - assert list(walker) - assert detector.violations == [] - - -async def test_does_not_record_os_walk_iterated_off_event_loop(tmp_path: Path) -> None: - (tmp_path / "nested").mkdir() - - with detect_blocking_io(OS_WALK_ONLY) as detector: - walker = os.walk(tmp_path) - assert await asyncio.to_thread(lambda: list(walker)) - - assert detector.violations == [] - - -async def test_records_path_read_text_on_event_loop(tmp_path: Path) -> None: - path = tmp_path / "data.txt" - path.write_text("content", encoding="utf-8") - - with detect_blocking_io(PATH_READ_TEXT_ONLY) as detector: - assert path.read_text(encoding="utf-8") == "content" - - assert [violation.name for violation in detector.violations] == ["pathlib.Path.read_text"] - - -async def test_probe_formats_summary_for_recorded_violations(tmp_path: Path) -> None: - probe = BlockingIOProbe(Path(__file__).resolve().parents[1]) - path = tmp_path / "data.txt" - path.write_text("content", encoding="utf-8") - - with detect_blocking_io(PATH_READ_TEXT_ONLY, stack_limit=18) as detector: - assert path.read_text(encoding="utf-8") == "content" - - probe.record("tests/test_example.py::test_example", detector.violations) - summary = probe.format_summary() - - assert "blocking io probe: 1 violations across 1 tests" in summary - assert "pathlib.Path.read_text" in summary - - -async def test_probe_formats_empty_summary_and_can_be_cleared(tmp_path: Path) -> None: - probe = BlockingIOProbe(Path(__file__).resolve().parents[1]) - - assert probe.format_summary() == "blocking io probe: no violations" - - path = tmp_path / "data.txt" - path.write_text("content", encoding="utf-8") - with detect_blocking_io(PATH_READ_TEXT_ONLY, stack_limit=18) as detector: - assert path.read_text(encoding="utf-8") == "content" - - probe.record("tests/test_example.py::test_example", detector.violations) - assert probe.violation_count == 1 - - probe.clear() - - assert probe.violation_count == 0 - assert probe.format_summary() == "blocking io probe: no violations" diff --git a/backend/tests/test_blocking_io_probe_integration.py b/backend/tests/test_blocking_io_probe_integration.py deleted file mode 100644 index af7a31b9d..000000000 --- a/backend/tests/test_blocking_io_probe_integration.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import annotations - -import time - -import pytest - -ORIGINAL_SLEEP = time.sleep - - -def replacement_sleep(seconds: float) -> None: - return None - - -def test_probe_survives_monkeypatch_teardown(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setattr(time, "sleep", replacement_sleep) - assert time.sleep is replacement_sleep - - -@pytest.mark.no_blocking_io_probe -def test_probe_restores_original_after_monkeypatch_teardown() -> None: - assert time.sleep is ORIGINAL_SLEEP - assert getattr(time.sleep, "__wrapped__", None) is None diff --git a/backend/tests/test_detect_blocking_io_static.py b/backend/tests/test_detect_blocking_io_static.py new file mode 100644 index 000000000..4615781e7 --- /dev/null +++ b/backend/tests/test_detect_blocking_io_static.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import json +import textwrap +from pathlib import Path + +from support.detectors import blocking_io_static as detector + + +def _write_python(path: Path, source: str) -> Path: + path.write_text(textwrap.dedent(source).strip() + "\n", encoding="utf-8") + return path + + +def _payload(path: Path, repo_root: Path) -> list[dict[str, object]]: + return [finding.to_dict() for finding in detector.scan_file(path, repo_root=repo_root)] + + +def test_scan_file_detects_direct_blocking_calls_in_async_code(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import subprocess + import time + import urllib.request + from pathlib import Path + + async def handler(path: Path): + time.sleep(1) + subprocess.run(["echo", "ok"]) + path.read_text(encoding="utf-8") + with open(path, encoding="utf-8") as handle: + return urllib.request.urlopen(handle.read()) + """, + ) + + findings = _payload(source_file, tmp_path) + categories = {finding["blocking_call"]["category"] for finding in findings} + symbols = {finding["blocking_call"]["symbol"] for finding in findings} + + assert categories == { + "BLOCKING_FILE_IO", + "BLOCKING_HTTP_IO", + "BLOCKING_SLEEP", + "BLOCKING_SUBPROCESS", + } + assert {"time.sleep", "subprocess.run", "path.read_text", "open", "urllib.request.urlopen"}.issubset(symbols) + assert {finding["event_loop_exposure"] for finding in findings} == {"DIRECT_ASYNC"} + + +def test_scan_file_detects_blocking_calls_in_sync_helper_reached_from_async_code(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from pathlib import Path + + def load_payload(path: Path) -> bytes: + return path.read_bytes() + + async def route(path: Path) -> bytes: + return load_payload(path) + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert findings[0]["blocking_call"]["category"] == "BLOCKING_FILE_IO" + assert findings[0]["location"]["function"] == "load_payload" + assert findings[0]["event_loop_exposure"] == "ASYNC_REACHABLE_SAME_FILE" + assert findings[0]["blocking_call"]["symbol"] == "path.read_bytes" + + +def test_scan_file_omits_sync_only_blocking_calls_from_default_results(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from pathlib import Path + + def load_payload(path: Path) -> str: + return path.read_text() + """, + ) + + assert detector.scan_file(source_file, repo_root=tmp_path) == [] + + +def test_scan_file_detects_self_helper_reached_from_async_method(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + class ArtifactRouter: + def read_payload(self, path): + return path.read_text(encoding="utf-8") + + async def get(self, path): + return self.read_payload(path) + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert findings[0]["location"]["function"] == "ArtifactRouter.read_payload" + assert findings[0]["event_loop_exposure"] == "ASYNC_REACHABLE_SAME_FILE" + + +def test_json_output_uses_concise_review_record_schema(tmp_path: Path, capsys) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import subprocess + + async def handler(): + subprocess.run(["echo", "ok"]) + """, + ) + + exit_code = detector.main(["--format", "json", str(source_file)]) + + assert exit_code == 0 + payload = json.loads(capsys.readouterr().out) + assert payload == [ + { + "priority": "HIGH", + "location": { + "path": str(source_file), + "line": 4, + "column": 5, + "function": "handler", + }, + "blocking_call": { + "category": "BLOCKING_SUBPROCESS", + "operation": "SUBPROCESS", + "symbol": "subprocess.run", + }, + "event_loop_exposure": "DIRECT_ASYNC", + "reason": "SUBPROCESS is called directly inside an async function.", + "code": 'subprocess.run(["echo", "ok"])', + } + ] + assert "confidence" not in payload[0] + assert "severity" not in payload[0] + assert "event_loop_risk" not in payload[0] + + +def test_summary_output_writes_json_report(tmp_path: Path, capsys) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import subprocess + + async def handler(): + subprocess.run(["echo", "ok"]) + """, + ) + output_path = tmp_path / "reports" / "blocking-io.json" + + exit_code = detector.main(["--output", str(output_path), str(source_file)]) + + assert exit_code == 0 + stdout = capsys.readouterr().out + assert "Static blocking IO event-loop risk findings: 1" in stdout + assert "By category:" in stdout + assert "BLOCKING_SUBPROCESS" in stdout + assert "Full JSON report:" in stdout + payload = json.loads(output_path.read_text(encoding="utf-8")) + assert [finding["blocking_call"]["category"] for finding in payload] == ["BLOCKING_SUBPROCESS"] + + +def test_json_output_ranks_operations_without_confidence_noise(tmp_path: Path, capsys) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import shutil + + async def handler(path): + path.exists() + path.read_text() + shutil.rmtree(path) + """, + ) + + exit_code = detector.main(["--format", "json", str(source_file)]) + + assert exit_code == 0 + payload = json.loads(capsys.readouterr().out) + by_symbol = {finding["blocking_call"]["symbol"]: finding for finding in payload} + assert by_symbol["path.exists"]["blocking_call"]["operation"] == "FILE_METADATA" + assert by_symbol["path.exists"]["priority"] == "LOW" + assert by_symbol["path.read_text"]["blocking_call"]["operation"] == "FILE_READ" + assert by_symbol["path.read_text"]["priority"] == "MEDIUM" + assert by_symbol["shutil.rmtree"]["blocking_call"]["operation"] == "FILE_TREE_DELETE" + assert by_symbol["shutil.rmtree"]["priority"] == "HIGH" + assert {finding["event_loop_exposure"] for finding in payload} == {"DIRECT_ASYNC"} + assert all("confidence" not in finding for finding in payload) + + +def test_path_receiver_detection_uses_path_annotations(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from pathlib import Path + + async def typed(path: Path): + return path.read_text() + + async def constructed(): + return Path("payload.txt").read_text() + """, + ) + + findings = _payload(source_file, tmp_path) + + assert {finding["blocking_call"]["symbol"] for finding in findings} == {"path.read_text", "pathlib.Path.read_text"} + assert {finding["priority"] for finding in findings} == {"MEDIUM"} + + +def test_summary_groups_findings_by_priority_and_operation(tmp_path: Path, capsys) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import os + from pathlib import Path + + def load_payload(path: Path) -> str: + return path.read_text() + + async def handler(path: Path) -> str: + path.exists() + list(os.walk(path)) + return load_payload(path) + """, + ) + + exit_code = detector.main([str(source_file)]) + + assert exit_code == 0 + stdout = capsys.readouterr().out + assert "By priority:" in stdout + assert "HIGH" in stdout + assert "MEDIUM" in stdout + assert "By operation:" in stdout + assert "FILE_ENUMERATION" in stdout + assert "FILE_METADATA" in stdout + assert "FILE_READ" in stdout + assert "By event-loop exposure:" in stdout + assert "DIRECT_ASYNC" in stdout + assert "ASYNC_REACHABLE_SAME_FILE" in stdout + + +def test_source_code_snippet_is_truncated_for_json_output(tmp_path: Path) -> None: + long_suffix = " + ".join('"chunk"' for _ in range(80)) + source_file = _write_python( + tmp_path / "sample.py", + f""" + async def handler(path): + return path.read_text() + {long_suffix} + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert len(findings[0]["code"]) <= 203 + assert findings[0]["code"].endswith("...") + + +def test_cli_default_filters_sync_only_inventory_items(tmp_path: Path, capsys) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from pathlib import Path + + def load_payload(path: Path) -> str: + return path.read_text() + """, + ) + + exit_code = detector.main(["--format", "json", str(source_file)]) + + assert exit_code == 0 + assert json.loads(capsys.readouterr().out) == [] + + +def test_sync_only_agent_middleware_hook_gets_event_loop_exposure(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from langchain.agents.middleware import AgentMiddleware + from pathlib import Path + + class UploadsMiddleware(AgentMiddleware): + def before_agent(self, state, runtime): + return self._load(Path("uploads")) + + def _load(self, path: Path) -> str: + return path.read_text() + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert findings[0]["location"]["function"] == "UploadsMiddleware._load" + assert findings[0]["event_loop_exposure"] == "SYNC_AGENT_MIDDLEWARE_HOOK" + assert "statically reachable from a sync AgentMiddleware hook" in findings[0]["reason"] + + +def test_sync_agent_middleware_hook_with_async_counterpart_is_not_reported(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + from langchain.agents.middleware import AgentMiddleware + from pathlib import Path + + class UploadsMiddleware(AgentMiddleware): + def before_agent(self, state, runtime): + return Path("uploads").read_text() + + async def abefore_agent(self, state, runtime): + return None + """, + ) + + assert detector.scan_file(source_file, repo_root=tmp_path) == [] + + +def test_scan_file_detects_sync_httpx_client_methods_in_async_code(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import httpx + + async def search() -> str: + with httpx.Client(timeout=30) as client: + return client.post("https://example.invalid").text + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert findings[0]["blocking_call"]["category"] == "BLOCKING_HTTP_IO" + assert findings[0]["location"]["function"] == "search" + assert findings[0]["event_loop_exposure"] == "DIRECT_ASYNC" + assert findings[0]["blocking_call"]["symbol"] == "httpx.Client.post" + + +def test_scan_file_detects_chained_sync_http_client_methods_in_async_code(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import httpx + import requests + + async def fetch() -> tuple[object, object]: + return ( + httpx.Client().get("https://example.invalid"), + requests.Session().post("https://example.invalid"), + ) + """, + ) + + findings = _payload(source_file, tmp_path) + symbols = {finding["blocking_call"]["symbol"] for finding in findings} + + assert symbols == {"httpx.Client.get", "requests.Session.post"} + assert {finding["blocking_call"]["category"] for finding in findings} == {"BLOCKING_HTTP_IO"} + + +def test_scan_file_detects_os_walk_and_path_resolve_in_async_code(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + import os + from pathlib import Path + + async def inspect_tree(path: Path) -> list[str]: + root = path.resolve() + return [name for _, _, names in os.walk(root) for name in names] + """, + ) + + findings = _payload(source_file, tmp_path) + symbols = {finding["blocking_call"]["symbol"] for finding in findings} + + assert symbols == {"path.resolve", "os.walk"} + assert {finding["blocking_call"]["category"] for finding in findings} == {"BLOCKING_FILE_IO"} + + +def test_scan_file_does_not_treat_string_replace_as_file_io(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "sample.py", + """ + def _path_variants(path: str) -> set[str]: + return {path, path.replace("\\\\", "/"), path.replace("/", "\\\\")} + + async def normalize(text: str) -> str: + return text.replace("a", "b") + """, + ) + + assert detector.scan_file(source_file, repo_root=tmp_path) == [] + + +def test_parse_errors_are_reported_as_findings(tmp_path: Path) -> None: + source_file = _write_python( + tmp_path / "broken.py", + """ + async def broken(: + pass + """, + ) + + findings = _payload(source_file, tmp_path) + + assert len(findings) == 1 + assert findings[0]["blocking_call"]["category"] == "PARSE_ERROR" + assert findings[0]["priority"] == "MEDIUM" + assert f"{source_file.name}:1:18" in detector.format_text(detector.scan_file(source_file, repo_root=tmp_path)) diff --git a/scripts/detect_blocking_io_static.py b/scripts/detect_blocking_io_static.py new file mode 100644 index 000000000..7a9c94a71 --- /dev/null +++ b/scripts/detect_blocking_io_static.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +"""CLI wrapper for the static blocking IO detector.""" + +from __future__ import annotations + +import sys +from collections.abc import Sequence +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +TEST_SUPPORT_PATH = REPO_ROOT / "backend" / "tests" +if str(TEST_SUPPORT_PATH) not in sys.path: + sys.path.insert(0, str(TEST_SUPPORT_PATH)) + + +def main(argv: Sequence[str] | None = None) -> int: + from support.detectors.blocking_io_static import main as detector_main + + return detector_main(argv) + + +if __name__ == "__main__": + sys.exit(main())