Files
deer-flow/backend/packages/harness/deerflow/runtime/checkpointer/async_provider.py
T
AochenShen99 e344be8d94 feat(tests): add Blockbuster runtime gate for event-loop blocking IO (#3229)
* feat(tests): add Blockbuster runtime gate for event-loop blocking IO

Adds a strict runtime gate that fails CI when sync blocking IO calls run
on the asyncio event loop thread through DeerFlow business code.

Components:
- backend/tests/support/detectors/blocking_io_runtime.py — Blockbuster
  context scoped to `app.*` and `deerflow.*` so test infrastructure,
  pytest internals, and third-party libraries stay silent.
- backend/tests/blocking_io/conftest.py — pytest_runtest_protocol
  hookwrapper that wraps every item (setup + call + teardown) with the
  strict context. Respects `@pytest.mark.allow_blocking_io` opt-out.
- backend/tests/blocking_io/test_skills_load.py — regression anchor for
  the #1917 fix (asyncio.to_thread offload around
  LocalSkillStorage.load_skills).
- backend/tests/blocking_io/test_sqlite_lifespan.py — regression anchor
  for the #1912 fix (asyncio.to_thread offload around
  ensure_sqlite_parent_dir).
- backend/tests/blocking_io/test_gate_smoke.py — meta-test asserting the
  gate actually catches unoffloaded blocking IO and that the
  `@pytest.mark.allow_blocking_io` opt-out works.
- backend/Makefile — `make test-blocking-io` target.
- .github/workflows/backend-blocking-io-tests.yml — hard-fail PR gate on
  ubuntu-latest. Windows matrix deferred to follow-up.

Dependencies:
- blockbuster>=1.5.26,<1.6 added to dev group.

Coverage boundary (called out in PR body): the gate only catches blocking
IO on code paths the test suite actually exercises. Static AST inventory
(separate, informational) is the complementary coverage tool. Three blind
spot categories — untested paths, mocked-away paths, env-mismatched paths
— are documented in the PR description.

Findings surfaced while authoring this PR:
- resolve_sqlite_conn_str in runtime/store/_sqlite_utils.py:19 does sync
  Path.resolve() -> os.path.abspath on the lifespan loop thread, ahead of
  the #1912 fix. Not addressed here; tracked as follow-up.

Tests: 4 passed locally (`make test-blocking-io`).
Lint/format: clean (`ruff check` and `ruff format --check`).

* fix(tests): scope Blockbuster gate to blocking-io suite

* fix(tests): harden Blockbuster runtime gate

* test(blocking-io): add project rule extension point

* test(blocking-io): address review cleanup
2026-05-26 23:03:49 +08:00

172 lines
5.6 KiB
Python

"""Async checkpointer factory.
Provides an **async context manager** for long-running async servers that need
proper resource cleanup.
Supported backends: memory, sqlite, postgres.
Usage (e.g. FastAPI lifespan)::
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
async with make_checkpointer() as checkpointer:
app.state.checkpointer = checkpointer # InMemorySaver if not configured
For sync usage see :mod:`deerflow.runtime.checkpointer.provider`.
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator
from langgraph.types import Checkpointer
from deerflow.config.app_config import AppConfig, get_app_config
from deerflow.runtime.checkpointer.provider import (
POSTGRES_CONN_REQUIRED,
POSTGRES_INSTALL,
SQLITE_INSTALL,
)
from deerflow.runtime.store._sqlite_utils import ensure_sqlite_parent_dir, resolve_sqlite_conn_str
logger = logging.getLogger(__name__)
def _prepare_sqlite_checkpointer_path(raw: str) -> str:
conn_str = resolve_sqlite_conn_str(raw)
ensure_sqlite_parent_dir(conn_str)
return conn_str
def _prepare_database_sqlite_checkpointer_path(db_config) -> str:
conn_str = db_config.checkpointer_sqlite_path
ensure_sqlite_parent_dir(conn_str)
return conn_str
# ---------------------------------------------------------------------------
# Async factory
# ---------------------------------------------------------------------------
@contextlib.asynccontextmanager
async def _async_checkpointer(config) -> AsyncIterator[Checkpointer]:
"""Async context manager that constructs and tears down a checkpointer."""
if config.type == "memory":
from langgraph.checkpoint.memory import InMemorySaver
yield InMemorySaver()
return
if config.type == "sqlite":
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
conn_str = await asyncio.to_thread(_prepare_sqlite_checkpointer_path, config.connection_string or "store.db")
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
return
if config.type == "postgres":
try:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
except ImportError as exc:
raise ImportError(POSTGRES_INSTALL) from exc
if not config.connection_string:
raise ValueError(POSTGRES_CONN_REQUIRED)
async with AsyncPostgresSaver.from_conn_string(config.connection_string) as saver:
await saver.setup()
yield saver
return
raise ValueError(f"Unknown checkpointer type: {config.type!r}")
# ---------------------------------------------------------------------------
# Public async context manager
# ---------------------------------------------------------------------------
@contextlib.asynccontextmanager
async def _async_checkpointer_from_database(db_config) -> AsyncIterator[Checkpointer]:
"""Async context manager that constructs a checkpointer from unified DatabaseConfig."""
if db_config.backend == "memory":
from langgraph.checkpoint.memory import InMemorySaver
yield InMemorySaver()
return
if db_config.backend == "sqlite":
try:
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
except ImportError as exc:
raise ImportError(SQLITE_INSTALL) from exc
conn_str = await asyncio.to_thread(_prepare_database_sqlite_checkpointer_path, db_config)
async with AsyncSqliteSaver.from_conn_string(conn_str) as saver:
await saver.setup()
yield saver
return
if db_config.backend == "postgres":
try:
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
except ImportError as exc:
raise ImportError(POSTGRES_INSTALL) from exc
if not db_config.postgres_url:
raise ValueError("database.postgres_url is required for the postgres backend")
async with AsyncPostgresSaver.from_conn_string(db_config.postgres_url) as saver:
await saver.setup()
yield saver
return
raise ValueError(f"Unknown database backend: {db_config.backend!r}")
@contextlib.asynccontextmanager
async def make_checkpointer(app_config: AppConfig | None = None) -> AsyncIterator[Checkpointer]:
"""Async context manager that yields a checkpointer for the caller's lifetime.
Resources are opened on enter and closed on exit -- no global state::
async with make_checkpointer(app_config) as checkpointer:
app.state.checkpointer = checkpointer
Yields an ``InMemorySaver`` when no checkpointer is configured in *config.yaml*.
Priority:
1. Legacy ``checkpointer:`` config section (backward compatible)
2. Unified ``database:`` config section
3. Default InMemorySaver
"""
if app_config is None:
app_config = get_app_config()
# Legacy: standalone checkpointer config takes precedence
if app_config.checkpointer is not None:
async with _async_checkpointer(app_config.checkpointer) as saver:
yield saver
return
# Unified database config
db_config = getattr(app_config, "database", None)
if db_config is not None and db_config.backend != "memory":
async with _async_checkpointer_from_database(db_config) as saver:
yield saver
return
# Default: in-memory
from langgraph.checkpoint.memory import InMemorySaver
yield InMemorySaver()