From 0c22349029067d5efde3ca5624a2d2f2120a48bd Mon Sep 17 00:00:00 2001 From: AochenShen99 <142667174+ShenAC-SAC@users.noreply.github.com> Date: Wed, 20 May 2026 10:00:17 +0800 Subject: [PATCH] chore(dev): add async/thread boundary detector (#2936) * chore(dev): add thread boundary detector * chore(dev): reduce thread boundary detector false positives --- Makefile | 6 +- .../support/detectors/thread_boundaries.py | 507 ++++++++++++++++++ .../tests/test_detect_thread_boundaries.py | 182 +++++++ scripts/detect_thread_boundaries.py | 23 + 4 files changed, 717 insertions(+), 1 deletion(-) create mode 100644 backend/tests/support/detectors/thread_boundaries.py create mode 100644 backend/tests/test_detect_thread_boundaries.py create mode 100644 scripts/detect_thread_boundaries.py diff --git a/Makefile b/Makefile index c60d9b9b2..fb83cd556 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # DeerFlow - Unified Development Environment -.PHONY: help config config-upgrade check install setup doctor dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway +.PHONY: help config config-upgrade check install setup doctor detect-thread-boundaries dev dev-daemon start start-daemon stop up down clean docker-init docker-start docker-stop docker-logs docker-logs-frontend docker-logs-gateway BASH ?= bash BACKEND_UV_RUN = cd backend && uv run @@ -23,6 +23,7 @@ help: @echo " make config - Generate local config files (aborts if config already exists)" @echo " make config-upgrade - Merge new fields from config.example.yaml into config.yaml" @echo " make check - Check if all required tools are installed" + @echo " make detect-thread-boundaries - Inventory async/thread boundary points" @echo " make install - Install all dependencies (frontend + backend + pre-commit hooks)" @echo " make setup-sandbox - Pre-pull sandbox container image (recommended)" @echo " make dev - Start all services in development mode (with hot-reloading)" @@ -51,6 +52,9 @@ setup: doctor: @$(BACKEND_UV_RUN) python ../scripts/doctor.py +detect-thread-boundaries: + @$(PYTHON) ./scripts/detect_thread_boundaries.py + config: @$(PYTHON) ./scripts/configure.py diff --git a/backend/tests/support/detectors/thread_boundaries.py b/backend/tests/support/detectors/thread_boundaries.py new file mode 100644 index 000000000..b1d043d47 --- /dev/null +++ b/backend/tests/support/detectors/thread_boundaries.py @@ -0,0 +1,507 @@ +#!/usr/bin/env python3 +"""Inventory async/thread boundary points for developer review. + +This detector is intentionally non-invasive: it parses Python source with AST +and reports places where code crosses sync/async/thread boundaries. Findings +are review evidence, not automatic bug decisions. +""" + +from __future__ import annotations + +import argparse +import ast +import json +import os +import sys +from collections.abc import Iterable, Sequence +from dataclasses import asdict, dataclass +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[4] +DEFAULT_SCAN_PATHS = ( + REPO_ROOT / "backend" / "app", + REPO_ROOT / "backend" / "packages" / "harness" / "deerflow", +) +IGNORED_DIR_NAMES = { + ".git", + ".mypy_cache", + ".pytest_cache", + ".ruff_cache", + ".venv", + "__pycache__", + "node_modules", +} +SEVERITY_ORDER = {"INFO": 0, "WARN": 1, "FAIL": 2} + + +@dataclass(frozen=True) +class BoundaryFinding: + severity: str + category: str + path: str + line: int + column: int + function: str + async_context: bool + symbol: str + message: str + code: str + + def to_dict(self) -> dict[str, object]: + return asdict(self) + + +@dataclass(frozen=True) +class _FunctionContext: + name: str + is_async: bool + + +@dataclass(frozen=True) +class _CallRule: + severity: str + category: str + message: str + + +EXACT_CALL_RULES: dict[str, _CallRule] = { + "asyncio.run": _CallRule( + "WARN", + "SYNC_ASYNC_BRIDGE", + "Runs a coroutine from synchronous code by creating an event loop boundary.", + ), + "asyncio.to_thread": _CallRule( + "INFO", + "ASYNC_THREAD_OFFLOAD", + "Offloads synchronous work from an async context into a worker thread.", + ), + "asyncio.new_event_loop": _CallRule( + "WARN", + "NEW_EVENT_LOOP", + "Creates a separate event loop; review resource ownership across loops.", + ), + "asyncio.run_coroutine_threadsafe": _CallRule( + "WARN", + "CROSS_THREAD_COROUTINE", + "Submits a coroutine to an event loop from another thread.", + ), + "concurrent.futures.ThreadPoolExecutor": _CallRule( + "INFO", + "THREAD_POOL", + "Creates a thread pool boundary.", + ), + "threading.Thread": _CallRule( + "INFO", + "RAW_THREAD", + "Creates a raw thread; ContextVar values do not propagate automatically.", + ), + "threading.Timer": _CallRule( + "INFO", + "RAW_TIMER_THREAD", + "Creates a timer-backed raw thread; ContextVar values do not propagate automatically.", + ), + "make_sync_tool_wrapper": _CallRule( + "INFO", + "SYNC_TOOL_WRAPPER", + "Adapts an async tool coroutine for synchronous tool invocation.", + ), +} +THREAD_POOL_CONSTRUCTORS = {"concurrent.futures.ThreadPoolExecutor"} +ASYNC_TOOL_FACTORY_CALLS = { + "StructuredTool.from_function", + "langchain.tools.StructuredTool.from_function", + "langchain_core.tools.StructuredTool.from_function", +} +LANGCHAIN_INVOKE_RECEIVER_NAMES = { + "agent", + "chain", + "chat_model", + "graph", + "llm", + "model", + "runnable", +} +LANGCHAIN_INVOKE_RECEIVER_SUFFIXES = ( + "_agent", + "_chain", + "_graph", + "_llm", + "_model", + "_runnable", +) + +ASYNC_BLOCKING_CALL_RULES: dict[str, _CallRule] = { + "time.sleep": _CallRule( + "WARN", + "BLOCKING_CALL_IN_ASYNC", + "Blocks the event loop when called directly inside async code.", + ), + "subprocess.run": _CallRule( + "WARN", + "BLOCKING_SUBPROCESS_IN_ASYNC", + "Runs a blocking subprocess from async code.", + ), + "subprocess.check_call": _CallRule( + "WARN", + "BLOCKING_SUBPROCESS_IN_ASYNC", + "Runs a blocking subprocess from async code.", + ), + "subprocess.check_output": _CallRule( + "WARN", + "BLOCKING_SUBPROCESS_IN_ASYNC", + "Runs a blocking subprocess from async code.", + ), + "subprocess.Popen": _CallRule( + "WARN", + "BLOCKING_SUBPROCESS_IN_ASYNC", + "Starts a subprocess from async code; review whether it blocks later.", + ), +} + + +def dotted_name(node: ast.AST | None) -> str | None: + if isinstance(node, ast.Name): + return node.id + if isinstance(node, ast.Attribute): + parent = dotted_name(node.value) + if parent: + return f"{parent}.{node.attr}" + return node.attr + return None + + +def call_receiver_name(node: ast.Call) -> str | None: + if not isinstance(node.func, ast.Attribute): + return None + return dotted_name(node.func.value) + + +def is_none_node(node: ast.AST | None) -> bool: + return isinstance(node, ast.Constant) and node.value is None + + +class BoundaryVisitor(ast.NodeVisitor): + def __init__(self, path: Path, relative_path: str, source_lines: Sequence[str]) -> None: + self.path = path + self.relative_path = relative_path + self.source_lines = source_lines + self.findings: list[BoundaryFinding] = [] + self.function_stack: list[_FunctionContext] = [] + self.import_aliases: dict[str, str] = {} + self.executor_names: set[str] = set() + + @property + def current_function(self) -> str: + if not self.function_stack: + return "" + return ".".join(context.name for context in self.function_stack) + + @property + def in_async_context(self) -> bool: + return bool(self.function_stack and self.function_stack[-1].is_async) + + def visit_Import(self, node: ast.Import) -> None: + for alias in node.names: + local_name = alias.asname or alias.name.split(".", 1)[0] + canonical_name = alias.name if alias.asname else local_name + self.import_aliases[local_name] = canonical_name + + def visit_ImportFrom(self, node: ast.ImportFrom) -> None: + if node.module is None: + return + for alias in node.names: + local_name = alias.asname or alias.name + self.import_aliases[local_name] = f"{node.module}.{alias.name}" + + def visit_Assign(self, node: ast.Assign) -> None: + self._record_executor_targets(node.value, node.targets) + self.generic_visit(node) + + def visit_AnnAssign(self, node: ast.AnnAssign) -> None: + if node.value is not None: + self._record_executor_targets(node.value, [node.target]) + self.generic_visit(node) + + def visit_With(self, node: ast.With) -> None: + for item in node.items: + if item.optional_vars is not None: + self._record_executor_targets(item.context_expr, [item.optional_vars]) + self.generic_visit(node) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + self.function_stack.append(_FunctionContext(node.name, is_async=False)) + self.generic_visit(node) + self.function_stack.pop() + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> None: + self.function_stack.append(_FunctionContext(node.name, is_async=True)) + try: + self._check_async_tool_definition(node) + self.generic_visit(node) + finally: + self.function_stack.pop() + + def visit_Call(self, node: ast.Call) -> None: + call_name = self._canonical_name(dotted_name(node.func)) + if call_name: + self._check_call(node, call_name) + self.generic_visit(node) + + def _check_async_tool_definition(self, node: ast.AsyncFunctionDef) -> None: + for decorator in node.decorator_list: + decorator_call = decorator.func if isinstance(decorator, ast.Call) else decorator + decorator_name = self._canonical_name(dotted_name(decorator_call)) + if decorator_name in {"langchain.tools.tool", "langchain_core.tools.tool"}: + self._emit( + node, + severity="INFO", + category="ASYNC_TOOL_DEFINITION", + symbol=decorator_name, + message="Defines an async LangChain tool; sync clients need a wrapper before invoke().", + ) + return + + def _check_call(self, node: ast.Call, call_name: str) -> None: + rule = EXACT_CALL_RULES.get(call_name) + if rule: + self._emit_rule(node, call_name, rule) + + if call_name.endswith(".run_until_complete"): + self._emit( + node, + severity="WARN", + category="RUN_UNTIL_COMPLETE", + symbol=call_name, + message="Drives an event loop from synchronous code; review nested-loop behavior.", + ) + + if self._is_executor_submit(node, call_name): + self._emit( + node, + severity="INFO", + category="EXECUTOR_SUBMIT", + symbol=call_name, + message="Submits work to an executor; review context propagation and cancellation.", + ) + + if call_name in ASYNC_TOOL_FACTORY_CALLS: + if any(keyword.arg == "coroutine" and not is_none_node(keyword.value) for keyword in node.keywords): + self._emit( + node, + severity="INFO", + category="ASYNC_ONLY_TOOL_FACTORY", + symbol=call_name, + message="Creates a StructuredTool from a coroutine; sync clients need a wrapper.", + ) + + if self.in_async_context and call_name in ASYNC_BLOCKING_CALL_RULES: + self._emit_rule(node, call_name, ASYNC_BLOCKING_CALL_RULES[call_name]) + + if self.in_async_context and self._is_langchain_invoke(node, call_name, method_name="invoke"): + self._emit( + node, + severity="WARN", + category="SYNC_INVOKE_IN_ASYNC", + symbol=call_name, + message="Calls a synchronous invoke() from async code; review event-loop blocking.", + ) + + if not self.in_async_context and self._is_langchain_invoke(node, call_name, method_name="ainvoke"): + self._emit( + node, + severity="WARN", + category="ASYNC_INVOKE_IN_SYNC", + symbol=call_name, + message="Calls async ainvoke() from sync code; review how the coroutine is awaited.", + ) + + def _canonical_name(self, name: str | None) -> str | None: + if name is None: + return None + parts = name.split(".") + if parts and parts[0] in self.import_aliases: + return ".".join((self.import_aliases[parts[0]], *parts[1:])) + return name + + def _record_executor_targets(self, value: ast.AST, targets: Sequence[ast.AST]) -> None: + if not isinstance(value, ast.Call): + return + call_name = self._canonical_name(dotted_name(value.func)) + if call_name not in THREAD_POOL_CONSTRUCTORS: + return + for target in targets: + for name in self._target_names(target): + self.executor_names.add(name) + + def _target_names(self, target: ast.AST) -> Iterable[str]: + if isinstance(target, ast.Name): + yield target.id + elif isinstance(target, (ast.Tuple, ast.List)): + for element in target.elts: + yield from self._target_names(element) + + def _is_executor_submit(self, node: ast.Call, call_name: str) -> bool: + if not call_name.endswith(".submit"): + return False + receiver_name = call_receiver_name(node) + return receiver_name in self.executor_names + + def _is_langchain_invoke(self, node: ast.Call, call_name: str, *, method_name: str) -> bool: + if not call_name.endswith(f".{method_name}"): + return False + receiver_name = call_receiver_name(node) + if receiver_name is None: + return False + receiver_leaf = receiver_name.rsplit(".", 1)[-1] + return receiver_leaf in LANGCHAIN_INVOKE_RECEIVER_NAMES or receiver_leaf.endswith(LANGCHAIN_INVOKE_RECEIVER_SUFFIXES) + + def _emit_rule(self, node: ast.AST, symbol: str, rule: _CallRule) -> None: + self._emit( + node, + severity=rule.severity, + category=rule.category, + symbol=symbol, + message=rule.message, + ) + + def _emit(self, node: ast.AST, *, severity: str, category: str, symbol: str, message: str) -> None: + line = getattr(node, "lineno", 0) + column = getattr(node, "col_offset", 0) + code = "" + if line > 0 and line <= len(self.source_lines): + code = self.source_lines[line - 1].strip() + self.findings.append( + BoundaryFinding( + severity=severity, + category=category, + path=self.relative_path, + line=line, + column=column, + function=self.current_function, + async_context=self.in_async_context, + symbol=symbol, + message=message, + code=code, + ) + ) + + +def relative_to_repo(path: Path, repo_root: Path = REPO_ROOT) -> str: + try: + return path.resolve().relative_to(repo_root.resolve()).as_posix() + except ValueError: + return path.as_posix() + + +def scan_file(path: Path, *, repo_root: Path = REPO_ROOT) -> list[BoundaryFinding]: + source = path.read_text(encoding="utf-8") + source_lines = source.splitlines() + relative_path = relative_to_repo(path, repo_root) + try: + tree = ast.parse(source, filename=str(path)) + except SyntaxError as exc: + line = exc.lineno or 0 + code = source_lines[line - 1].strip() if line > 0 and line <= len(source_lines) else "" + return [ + BoundaryFinding( + severity="WARN", + category="PARSE_ERROR", + path=relative_path, + line=line, + column=max((exc.offset or 1) - 1, 0), + function="", + async_context=False, + symbol="SyntaxError", + message=str(exc), + code=code, + ) + ] + + visitor = BoundaryVisitor(path, relative_path, source_lines) + visitor.visit(tree) + return visitor.findings + + +def is_ignored_path(path: Path) -> bool: + return any(part in IGNORED_DIR_NAMES for part in path.parts) + + +def iter_python_files(paths: Iterable[Path]) -> Iterable[Path]: + for path in paths: + if not path.exists() or is_ignored_path(path): + continue + if path.is_file(): + if path.suffix == ".py" and not is_ignored_path(path): + yield path + continue + for dirpath, dirnames, filenames in os.walk(path): + dirnames[:] = [dirname for dirname in dirnames if dirname not in IGNORED_DIR_NAMES] + for filename in filenames: + if filename.endswith(".py"): + yield Path(dirpath) / filename + + +def scan_paths(paths: Iterable[Path], *, repo_root: Path = REPO_ROOT) -> list[BoundaryFinding]: + findings: list[BoundaryFinding] = [] + for path in sorted(iter_python_files(paths)): + findings.extend(scan_file(path, repo_root=repo_root)) + return sorted(findings, key=lambda finding: (finding.path, finding.line, finding.column, finding.category)) + + +def filter_findings(findings: Iterable[BoundaryFinding], min_severity: str) -> list[BoundaryFinding]: + threshold = SEVERITY_ORDER[min_severity] + return [finding for finding in findings if SEVERITY_ORDER[finding.severity] >= threshold] + + +def format_text(findings: Sequence[BoundaryFinding]) -> str: + if not findings: + return "No async/thread boundary findings." + + lines: list[str] = [] + for finding in findings: + lines.append(f"{finding.severity} {finding.category} {finding.path}:{finding.line}:{finding.column + 1} in {finding.function} async={str(finding.async_context).lower()}") + lines.append(f" symbol: {finding.symbol}") + lines.append(f" note: {finding.message}") + if finding.code: + lines.append(f" code: {finding.code}") + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description=("Detect async/thread boundary points for developer review. Findings are an inventory, not automatic bug decisions.")) + parser.add_argument( + "paths", + nargs="*", + type=Path, + help="Files or directories to scan. Defaults to backend app and harness sources.", + ) + parser.add_argument( + "--format", + choices=("text", "json"), + default="text", + help="Output format.", + ) + parser.add_argument( + "--min-severity", + choices=tuple(SEVERITY_ORDER), + default="INFO", + help="Only show findings at or above this severity.", + ) + return parser + + +def main(argv: Sequence[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + paths = args.paths or list(DEFAULT_SCAN_PATHS) + findings = filter_findings(scan_paths(paths), args.min_severity) + + if args.format == "json": + print(json.dumps([finding.to_dict() for finding in findings], indent=2, sort_keys=True)) + else: + print(format_text(findings)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/backend/tests/test_detect_thread_boundaries.py b/backend/tests/test_detect_thread_boundaries.py new file mode 100644 index 000000000..102613e39 --- /dev/null +++ b/backend/tests/test_detect_thread_boundaries.py @@ -0,0 +1,182 @@ +from __future__ import annotations + +import json +import textwrap +from pathlib import Path + +from support.detectors import thread_boundaries as detector + + +def _write_python(path: Path, source: str) -> Path: + path.write_text(textwrap.dedent(source).strip() + "\n", encoding="utf-8") + return path + + +def test_scan_file_detects_async_thread_and_tool_boundaries(tmp_path): + source_file = _write_python( + tmp_path / "sample.py", + """ + import asyncio + import threading + import time + from concurrent.futures import ThreadPoolExecutor + from langchain.tools import tool + from langchain_core.tools import StructuredTool + + @tool + async def async_tool(value: int) -> str: + return str(value) + + async def handler(model): + await asyncio.to_thread(str, "x") + model.invoke("blocking") + time.sleep(1) + + def sync_entry(): + asyncio.run(handler(None)) + pool = ThreadPoolExecutor(max_workers=1) + pool.submit(str, "x") + threading.Thread(target=sync_entry).start() + return StructuredTool.from_function( + name="factory_tool", + description="factory", + coroutine=async_tool, + ) + """, + ) + + findings = detector.scan_file(source_file, repo_root=tmp_path) + categories = {finding.category for finding in findings} + async_tool_finding = next(finding for finding in findings if finding.category == "ASYNC_TOOL_DEFINITION") + + assert "ASYNC_TOOL_DEFINITION" in categories + assert async_tool_finding.function == "async_tool" + assert async_tool_finding.async_context is True + assert "ASYNC_THREAD_OFFLOAD" in categories + assert "SYNC_INVOKE_IN_ASYNC" in categories + assert "BLOCKING_CALL_IN_ASYNC" in categories + assert "SYNC_ASYNC_BRIDGE" in categories + assert "THREAD_POOL" in categories + assert "EXECUTOR_SUBMIT" in categories + assert "RAW_THREAD" in categories + assert "ASYNC_ONLY_TOOL_FACTORY" in categories + + +def test_scan_file_ignores_unqualified_threads_and_generic_method_names(tmp_path): + source_file = _write_python( + tmp_path / "sample.py", + """ + class Thread: + pass + + class Timer: + pass + + async def handler(form, runner): + form.submit() + runner.invoke("not a langchain model") + + def sync_entry(runner): + Thread() + Timer() + runner.ainvoke("not a langchain model") + """, + ) + + findings = detector.scan_file(source_file, repo_root=tmp_path) + categories = {finding.category for finding in findings} + + assert "RAW_THREAD" not in categories + assert "RAW_TIMER_THREAD" not in categories + assert "EXECUTOR_SUBMIT" not in categories + assert "SYNC_INVOKE_IN_ASYNC" not in categories + assert "ASYNC_INVOKE_IN_SYNC" not in categories + + +def test_scan_file_uses_import_evidence_for_thread_and_executor_aliases(tmp_path): + source_file = _write_python( + tmp_path / "sample.py", + """ + from concurrent.futures import ThreadPoolExecutor as Pool + from threading import Thread as WorkerThread, Timer + + def sync_entry(): + pool = Pool(max_workers=1) + pool.submit(str, "x") + WorkerThread(target=sync_entry).start() + Timer(1, sync_entry).start() + """, + ) + + findings = detector.scan_file(source_file, repo_root=tmp_path) + categories = {finding.category for finding in findings} + + assert "THREAD_POOL" in categories + assert "EXECUTOR_SUBMIT" in categories + assert "RAW_THREAD" in categories + assert "RAW_TIMER_THREAD" in categories + + +def test_scan_paths_ignores_virtualenv_like_directories(tmp_path): + scanned_file = _write_python( + tmp_path / "app.py", + """ + import asyncio + + def main(): + return asyncio.run(asyncio.sleep(0)) + """, + ) + ignored_dir = tmp_path / ".venv" + ignored_dir.mkdir() + _write_python( + ignored_dir / "ignored.py", + """ + import threading + + thread = threading.Thread(target=lambda: None) + """, + ) + + findings = detector.scan_paths([tmp_path], repo_root=tmp_path) + + assert any(finding.path == scanned_file.name for finding in findings) + assert all(".venv" not in finding.path for finding in findings) + + +def test_json_output_and_min_severity_filter(tmp_path, capsys): + source_file = _write_python( + tmp_path / "sample.py", + """ + import asyncio + + async def handler(model): + await asyncio.to_thread(str, "x") + model.invoke("blocking") + """, + ) + + exit_code = detector.main(["--format", "json", "--min-severity", "WARN", str(source_file)]) + + assert exit_code == 0 + payload = json.loads(capsys.readouterr().out) + categories = {finding["category"] for finding in payload} + assert categories == {"SYNC_INVOKE_IN_ASYNC"} + + +def test_parse_errors_are_reported_as_findings(tmp_path): + source_file = _write_python( + tmp_path / "broken.py", + """ + def broken(: + pass + """, + ) + + findings = detector.scan_file(source_file, repo_root=tmp_path) + + assert len(findings) == 1 + assert findings[0].category == "PARSE_ERROR" + assert findings[0].severity == "WARN" + assert findings[0].column == 11 + assert f"{source_file.name}:1:12" in detector.format_text(findings) diff --git a/scripts/detect_thread_boundaries.py b/scripts/detect_thread_boundaries.py new file mode 100644 index 000000000..c4a59132e --- /dev/null +++ b/scripts/detect_thread_boundaries.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +"""CLI wrapper for the async/thread boundary detector.""" + +from __future__ import annotations + +import sys +from collections.abc import Sequence +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +TEST_SUPPORT_PATH = REPO_ROOT / "backend" / "tests" +if str(TEST_SUPPORT_PATH) not in sys.path: + sys.path.insert(0, str(TEST_SUPPORT_PATH)) + + +def main(argv: Sequence[str] | None = None) -> int: + from support.detectors.thread_boundaries import main as detector_main + + return detector_main(argv) + + +if __name__ == "__main__": + sys.exit(main())