diff --git a/.github/workflows/backend-blocking-io-tests.yml b/.github/workflows/backend-blocking-io-tests.yml new file mode 100644 index 000000000..8da82d906 --- /dev/null +++ b/.github/workflows/backend-blocking-io-tests.yml @@ -0,0 +1,46 @@ +name: Backend Blocking IO + +on: + push: + branches: ["main"] + paths: + - "backend/**" + - ".github/workflows/backend-blocking-io-tests.yml" + pull_request: + types: [opened, synchronize, reopened, ready_for_review] + paths: + - "backend/**" + - ".github/workflows/backend-blocking-io-tests.yml" + +concurrency: + group: blocking-io-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +permissions: + contents: read + +jobs: + backend-blocking-io: + if: github.event_name != 'pull_request' || github.event.pull_request.draft == false + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install uv + uses: astral-sh/setup-uv@v3 + + - name: Install backend dependencies + working-directory: backend + run: uv sync --group dev + + - name: Run blocking IO regression tests + working-directory: backend + run: make test-blocking-io diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index f04774050..1d1cbadf8 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -88,18 +88,38 @@ make stop # Stop all services **Backend directory** (for backend development only): ```bash -make install # Install backend dependencies -make dev # Run Gateway API with reload (port 8001) -make gateway # Run Gateway API only (port 8001) -make test # Run all backend tests -make lint # Lint with ruff -make format # Format code with ruff +make install # Install backend dependencies +make dev # Run Gateway API with reload (port 8001) +make gateway # Run Gateway API only (port 8001) +make test # Run all backend tests +make test-blocking-io # Run strict Blockbuster runtime gate on tests/blocking_io/ +make lint # Lint with ruff +make format # Format code with ruff ``` Regression tests related to Docker/provisioner behavior: - `tests/test_docker_sandbox_mode_detection.py` (mode detection from `config.yaml`) - `tests/test_provisioner_kubeconfig.py` (kubeconfig file/directory handling) +Blocking-IO runtime gate (`tests/blocking_io/`): +- Wraps every item under `tests/blocking_io/` with a strict Blockbuster + context scoped to `app.*` and `deerflow.*` (see + `tests/support/detectors/blocking_io_runtime.py`). Any sync blocking IO + call whose stack passes through DeerFlow business code while running on + the asyncio event loop raises `BlockingError` and fails the test. +- Two regression anchors live there: `test_skills_load.py` (locks the + `asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix + for #1917) and `test_sqlite_lifespan.py` (locks the offload around + SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912). +- `test_gate_smoke.py` is a meta-test asserting the gate actually catches + unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io` + opt-out works. +- Coverage boundary: the gate only sees code that test execution actually + touches. Static AST coverage is a separate concern (out of scope for + this PR). +- CI: runs on every PR via `.github/workflows/backend-blocking-io-tests.yml`, + hard-fail. + Boundary check (harness → app import firewall): - `tests/test_harness_boundary.py` — ensures `packages/harness/deerflow/` never imports from `app.*` diff --git a/backend/Makefile b/backend/Makefile index a1206547b..fc8cab1e0 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -10,6 +10,9 @@ gateway: test: PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run pytest tests/ -v +test-blocking-io: + PYTHONPATH=. PYTHONIOENCODING=utf-8 PYTHONUTF8=1 uv run pytest tests/blocking_io -q --tb=short + lint: uvx ruff check . uvx ruff format --check . diff --git a/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py index 9a04cb1af..ac2d1da51 100644 --- a/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py +++ b/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py @@ -34,6 +34,19 @@ from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resol logger = logging.getLogger(__name__) + +def _prepare_sqlite_checkpointer_path(raw: str) -> str: + conn_str = resolve_sqlite_conn_str(raw) + ensure_sqlite_parent_dir(conn_str) + return conn_str + + +def _prepare_database_sqlite_checkpointer_path(db_config) -> str: + conn_str = db_config.checkpointer_sqlite_path + ensure_sqlite_parent_dir(conn_str) + return conn_str + + # --------------------------------------------------------------------------- # Async factory # --------------------------------------------------------------------------- @@ -54,8 +67,7 @@ async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]: except ImportError as exc: raise ImportError(SQLITE_INSTALL) from exc - conn_str = resolve_sqlite_conn_str(config.connection_string or "store.db") - await asyncio.to_thread(ensure_sqlite_parent_dir, conn_str) + conn_str = await asyncio.to_thread(_prepare_sqlite_checkpointer_path, config.connection_string or "store.db") async with AsyncSqliteSaver.from_conn_string(conn_str) as saver: await saver.setup() yield saver @@ -98,8 +110,7 @@ async def _async_checkpointer_from_database(db_config) -> AsyncIterator[Checkpoi except ImportError as exc: raise ImportError(SQLITE_INSTALL) from exc - conn_str = db_config.checkpointer_sqlite_path - ensure_sqlite_parent_dir(conn_str) + conn_str = await asyncio.to_thread(_prepare_database_sqlite_checkpointer_path, db_config) async with AsyncSqliteSaver.from_conn_string(conn_str) as saver: await saver.setup() yield saver diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 082c3d07d..d9dfdddaf 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -29,6 +29,7 @@ discord = ["discord.py>=2.7.0"] [dependency-groups] dev = [ + "blockbuster>=1.5.26,<1.6", "prompt-toolkit>=3.0.0", "pytest>=9.0.3", "pytest-asyncio>=1.3.0", @@ -38,6 +39,7 @@ dev = [ [tool.pytest.ini_options] markers = [ "no_auto_user: disable the conftest autouse contextvar fixture for this test", + "allow_blocking_io: opt out of the strict Blockbuster gate in tests/blocking_io/", ] [tool.uv] diff --git a/backend/tests/blocking_io/__init__.py b/backend/tests/blocking_io/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/tests/blocking_io/conftest.py b/backend/tests/blocking_io/conftest.py new file mode 100644 index 000000000..32ee4b86b --- /dev/null +++ b/backend/tests/blocking_io/conftest.py @@ -0,0 +1,37 @@ +"""Pytest conftest for the strict Blockbuster runtime gate. + +Activates `detect_blocking_io_strict()` around the entire pytest item +protocol (setup + call + teardown) so blocking IO in async fixtures and +lifespan code is also caught, not just blocking IO inside the test body. + +Scope: only applies to items whose path is under `backend/tests/blocking_io/`. +Pytest registers conftest hookwrappers globally once the file is loaded, +so an explicit path filter is required to keep the strict gate from +firing on unrelated tests when the full suite is collected. + +Opt-out: mark a test with `@pytest.mark.allow_blocking_io` to skip the gate. +""" + +from __future__ import annotations + +from collections.abc import Generator +from pathlib import Path + +import pytest +from support.detectors.blocking_io_runtime import detect_blocking_io_strict + +_BLOCKING_IO_TEST_ROOT = Path(__file__).resolve().parent + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_protocol(item: pytest.Item, nextitem: pytest.Item | None) -> Generator[None, None, None]: + if not _is_blocking_io_item(item) or item.get_closest_marker("allow_blocking_io") is not None: + yield + return + + with detect_blocking_io_strict(): + yield + + +def _is_blocking_io_item(item: pytest.Item) -> bool: + return Path(item.path).resolve().is_relative_to(_BLOCKING_IO_TEST_ROOT) diff --git a/backend/tests/blocking_io/test_gate_smoke.py b/backend/tests/blocking_io/test_gate_smoke.py new file mode 100644 index 000000000..370b2cc80 --- /dev/null +++ b/backend/tests/blocking_io/test_gate_smoke.py @@ -0,0 +1,55 @@ +"""Smoke test: the strict Blockbuster gate is wired up and actively catching. + +Independent of any specific production code path, asserts that calling a +known blocking IO function directly from an `async def` (without an +`asyncio.to_thread` wrapper) raises `BlockingError`. If this test ever +stops raising, the gate machinery itself is broken — typical causes are +`scanned_modules` misconfiguration, accidental removal of the Blockbuster +dev dependency, or the conftest hookwrapper no longer firing. + +This is the meta-test that protects every other test in this directory +from silent regressions (a green gate that no longer catches anything is +worse than no gate at all). +""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +from blockbuster import BlockingError +from support.detectors.blocking_io_runtime import detect_blocking_io_strict + +pytestmark = pytest.mark.asyncio + + +async def test_gate_catches_unoffloaded_blocking_io_in_deerflow_module(tmp_path: Path) -> None: + from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir + + db_file = tmp_path / "subdir" / "store.db" + + with pytest.raises(BlockingError): + ensure_sqlite_parent_dir(str(db_file)) + + +async def test_gate_restores_blockbuster_patches_after_exceptions() -> None: + original_stat = os.stat + + with pytest.raises(RuntimeError, match="boom"): + with detect_blocking_io_strict(): + raise RuntimeError("boom") + + assert os.stat is original_stat + + +@pytest.mark.allow_blocking_io +async def test_allow_blocking_io_marker_opts_out_of_gate(tmp_path: Path) -> None: + """Verify the @pytest.mark.allow_blocking_io opt-out actually disables the gate.""" + from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir + + db_file = tmp_path / "subdir" / "store.db" + + ensure_sqlite_parent_dir(str(db_file)) + + assert db_file.parent.exists() diff --git a/backend/tests/blocking_io/test_skills_load.py b/backend/tests/blocking_io/test_skills_load.py new file mode 100644 index 000000000..96a9fb061 --- /dev/null +++ b/backend/tests/blocking_io/test_skills_load.py @@ -0,0 +1,102 @@ +"""Regression test: skill loading must remain releasable to a worker thread. + +Anchors the production offload from `subagents/executor.py:_load_skills`, +where both `get_or_new_skill_storage` and the sync `storage.load_skills(...)` +method are dispatched via `asyncio.to_thread`. That fix addressed #1917, +where `os.walk` inside `load_skills` blocked the LangGraph async event loop. + +This test invokes the production `_load_skills()` call path under the strict +Blockbuster context against a real `LocalSkillStorage` instance pointed at +a tmp directory. If the production `asyncio.to_thread` offload is removed, +Blockbuster raises `BlockingError` and this test fails. +""" + +from __future__ import annotations + +import importlib +import sys +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +pytestmark = pytest.mark.asyncio + +_MISSING = object() +_EXECUTOR_IMPORT_MOCKS = ( + "deerflow.agents", + "deerflow.agents.thread_state", + "deerflow.models", +) + + +def _seed_skill(skills_root: Path) -> None: + skill = skills_root / "public" / "demo" + skill.mkdir(parents=True, exist_ok=True) + (skill / "SKILL.md").write_text( + "---\nname: demo\ndescription: regression-test skill\n---\n# demo\n", + encoding="utf-8", + ) + + +@contextmanager +def _real_subagent_executor() -> Iterator[type]: + """Import the real executor despite the suite-level circular-import mock.""" + original_modules = {name: sys.modules.get(name, _MISSING) for name in _EXECUTOR_IMPORT_MOCKS} + original_executor = sys.modules.get("deerflow.subagents.executor", _MISSING) + parent_module = sys.modules.get("deerflow.subagents") + original_parent_executor = getattr(parent_module, "executor", _MISSING) if parent_module is not None else _MISSING + + sys.modules.pop("deerflow.subagents.executor", None) + for name in _EXECUTOR_IMPORT_MOCKS: + sys.modules[name] = MagicMock() + + try: + executor_module = importlib.import_module("deerflow.subagents.executor") + yield executor_module.SubagentExecutor + finally: + if original_executor is _MISSING: + sys.modules.pop("deerflow.subagents.executor", None) + else: + sys.modules["deerflow.subagents.executor"] = original_executor + + if parent_module is not None: + if original_parent_executor is _MISSING: + try: + delattr(parent_module, "executor") + except AttributeError: + pass + else: + parent_module.executor = original_parent_executor + + for name, module in original_modules.items(): + if module is _MISSING: + sys.modules.pop(name, None) + else: + sys.modules[name] = module + + +async def test_load_skills_via_to_thread_does_not_block_event_loop(tmp_path: Path) -> None: + from deerflow.config.skills_config import SkillsConfig + from deerflow.subagents.config import SubagentConfig + + _seed_skill(tmp_path) + + with _real_subagent_executor() as SubagentExecutor: + executor = SubagentExecutor( + config=SubagentConfig( + name="demo", + description="Loads skills through the production async path.", + ), + tools=[], + app_config=SimpleNamespace(skills=SkillsConfig(path=str(tmp_path))), + parent_model="test-model", + ) + + skills = await executor._load_skills() + + assert isinstance(skills, list) + assert any(s.name == "demo" for s in skills) diff --git a/backend/tests/blocking_io/test_sqlite_lifespan.py b/backend/tests/blocking_io/test_sqlite_lifespan.py new file mode 100644 index 000000000..d05f5288f --- /dev/null +++ b/backend/tests/blocking_io/test_sqlite_lifespan.py @@ -0,0 +1,52 @@ +"""Regression test: sqlite path setup must run off the event loop. + +Anchors the production offload from +`runtime/checkpointer/async_provider.py:_async_checkpointer`, where SQLite +path resolution and `ensure_sqlite_parent_dir` are dispatched via +`await asyncio.to_thread(...)`. +That fix addressed #1912, where the sync `Path.mkdir` / `os.mkdir` inside +`ensure_sqlite_parent_dir` ran on the FastAPI lifespan event loop thread +and blocked startup. + +This test invokes the production `_async_checkpointer()` path under the +strict Blockbuster context. The target path's parent does not yet exist, so +the underlying path resolution and `os.mkdir` both execute. If either step is +regressed to run directly on the event loop, Blockbuster raises +`BlockingError` and this test fails. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +pytestmark = pytest.mark.asyncio + + +async def test_async_checkpointer_sqlite_setup_does_not_block_event_loop(tmp_path: Path) -> None: + from deerflow.config.checkpointer_config import CheckpointerConfig + from deerflow.runtime.checkpointer.async_provider import _async_checkpointer + + db_file = tmp_path / "subdir" / "store.db" + + mock_saver = AsyncMock() + mock_context_manager = AsyncMock() + mock_context_manager.__aenter__.return_value = mock_saver + mock_context_manager.__aexit__.return_value = False + + mock_saver_cls = MagicMock() + mock_saver_cls.from_conn_string.return_value = mock_context_manager + + mock_module = MagicMock() + mock_module.AsyncSqliteSaver = mock_saver_cls + + with patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}): + async with _async_checkpointer(CheckpointerConfig(type="sqlite", connection_string=str(db_file))) as saver: + assert saver is mock_saver + + assert db_file.parent.exists() + mock_saver_cls.from_conn_string.assert_called_once_with(str(db_file.resolve())) + mock_saver.setup.assert_awaited_once() diff --git a/backend/tests/support/detectors/blocking_io_runtime.py b/backend/tests/support/detectors/blocking_io_runtime.py new file mode 100644 index 000000000..0f13d39e4 --- /dev/null +++ b/backend/tests/support/detectors/blocking_io_runtime.py @@ -0,0 +1,44 @@ +"""Strict Blockbuster runtime context scoped to DeerFlow business code. + +Creates a `BlockBuster` instance with `scanned_modules=("app", "deerflow")` +so that test infrastructure (pytest, langchain, importlib, third-party libs) +is out of scope and does not produce false positives. Only loop-blocking +sync IO whose caller stack passes through `app.*` or `deerflow.*` raises +`BlockingError`. + +Used by `backend/tests/blocking_io/conftest.py` to gate the regression suite. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from contextlib import contextmanager + +from blockbuster import BlockBuster, BlockBusterFunction, BlockingError + +_SCANNED_MODULES: tuple[str, ...] = ("app", "deerflow") + +# Add DeerFlow-local rules here only when Blockbuster's default rule set misses +# a generic blocking primitive used by production code. If a path is invisible +# because no test exercises it, add a production-path runtime anchor instead. +_PROJECT_BLOCKING_RULES: tuple[tuple[str, BlockBusterFunction], ...] = () + + +def _install_project_rules(bb: BlockBuster) -> None: + for name, rule in _PROJECT_BLOCKING_RULES: + bb.functions[name] = rule + + +@contextmanager +def detect_blocking_io_strict() -> Iterator[BlockBuster]: + """Activate Blockbuster scoped to app.* and deerflow.* callers only.""" + bb = BlockBuster(scanned_modules=list(_SCANNED_MODULES)) + _install_project_rules(bb) + try: + bb.activate() + yield bb + finally: + bb.deactivate() + + +__all__ = ["BlockingError", "detect_blocking_io_strict"] diff --git a/backend/tests/test_checkpointer.py b/backend/tests/test_checkpointer.py index e7714e3ce..166282928 100644 --- a/backend/tests/test_checkpointer.py +++ b/backend/tests/test_checkpointer.py @@ -291,7 +291,7 @@ class TestAsyncCheckpointer: @pytest.mark.anyio async def test_sqlite_creates_parent_dir_via_to_thread(self): """Async SQLite setup should move mkdir off the event loop.""" - from deerflow.runtime.checkpointer.async_provider import make_checkpointer + from deerflow.runtime.checkpointer.async_provider import _prepare_sqlite_checkpointer_path, make_checkpointer mock_config = MagicMock() mock_config.checkpointer = CheckpointerConfig(type="sqlite", connection_string="relative/test.db") @@ -310,22 +310,63 @@ class TestAsyncCheckpointer: with ( patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config), patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}), - patch("deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread, patch( - "deerflow.runtime.checkpointer.async_provider.resolve_sqlite_conn_str", + "deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", + new_callable=AsyncMock, return_value="/tmp/resolved/test.db", - ), + ) as mock_to_thread, ): async with make_checkpointer() as saver: assert saver is mock_saver mock_to_thread.assert_awaited_once() called_fn, called_path = mock_to_thread.await_args.args - assert called_fn.__name__ == "ensure_sqlite_parent_dir" - assert called_path == "/tmp/resolved/test.db" + assert called_fn is _prepare_sqlite_checkpointer_path + assert called_path == "relative/test.db" mock_saver_cls.from_conn_string.assert_called_once_with("/tmp/resolved/test.db") mock_saver.setup.assert_awaited_once() + @pytest.mark.anyio + async def test_database_sqlite_creates_parent_dir_via_to_thread(self): + """Unified database SQLite setup should also move path IO off the event loop.""" + from deerflow.config.database_config import DatabaseConfig + from deerflow.runtime.checkpointer.async_provider import _prepare_database_sqlite_checkpointer_path, make_checkpointer + + db_config = DatabaseConfig(backend="sqlite", sqlite_dir="relative-data") + mock_config = MagicMock() + mock_config.checkpointer = None + mock_config.database = db_config + + mock_saver = AsyncMock() + mock_cm = AsyncMock() + mock_cm.__aenter__.return_value = mock_saver + mock_cm.__aexit__.return_value = False + + mock_saver_cls = MagicMock() + mock_saver_cls.from_conn_string.return_value = mock_cm + + mock_module = MagicMock() + mock_module.AsyncSqliteSaver = mock_saver_cls + + with ( + patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config), + patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}), + patch( + "deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", + new_callable=AsyncMock, + return_value="/tmp/data/deerflow.db", + ) as mock_to_thread, + ): + async with make_checkpointer() as saver: + assert saver is mock_saver + + mock_to_thread.assert_awaited_once() + called_fn, called_db_config = mock_to_thread.await_args.args + assert called_fn is _prepare_database_sqlite_checkpointer_path + assert called_db_config is db_config + mock_saver_cls.from_conn_string.assert_called_once_with("/tmp/data/deerflow.db") + mock_saver.setup.assert_awaited_once() + # --------------------------------------------------------------------------- # app_config.py integration diff --git a/backend/uv.lock b/backend/uv.lock index 5501fb81f..f4008b9a1 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -772,6 +772,7 @@ postgres = [ [package.dev-dependencies] dev = [ + { name = "blockbuster" }, { name = "prompt-toolkit" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -803,6 +804,7 @@ provides-extras = ["postgres", "discord"] [package.metadata.requires-dev] dev = [ + { name = "blockbuster", specifier = ">=1.5.26,<1.6" }, { name = "prompt-toolkit", specifier = ">=3.0.0" }, { name = "pytest", specifier = ">=9.0.3" }, { name = "pytest-asyncio", specifier = ">=1.3.0" },