test(runtime): add Blockbuster runtime anchor for JsonlRunEventStore async IO (#3313)

* test(runtime): add Blockbuster runtime anchor for JsonlRunEventStore async IO

#3084 offloaded `JsonlRunEventStore`'s file IO via `asyncio.to_thread` and added
a mock-based offload assertion (`tests/test_jsonl_event_store_async_io.py`) that
covers `put()` only. That guard is not part of the Blockbuster runtime gate
(`tests/blocking_io/`) run by `backend-blocking-io-tests.yml`.

Add a runtime anchor that drives the full async surface (`put`, `put_batch`,
`list_messages`, `list_events`, `list_messages_by_run`, `count_messages`,
`delete_by_run`, `delete_by_thread`) under the strict Blockbuster gate, so any
blocking IO reintroduced on the event loop in any of these methods fails CI —
not only removal of a specific `to_thread` call. Verified each offloaded method
goes red when its offload is reverted. Test-only; no production change.

* test(runtime): exercise list_events event_types filter branch

Per review feedback: the anchor called list_events without event_types,
so the filter branch never ran after _read_run_events' filesystem IO.
Add a second list_events call with event_types=["message"] so the full
read path -- including the filter branch -- executes under the gate.
This commit is contained in:
AochenShen99
2026-05-29 23:02:41 +08:00
committed by GitHub
parent ca487578a4
commit 052b1e2102
3 changed files with 73 additions and 4 deletions
+5 -3
View File
@@ -122,10 +122,12 @@ Blocking-IO runtime gate (`tests/blocking_io/`):
`tests/support/detectors/blocking_io_runtime.py`). Any sync blocking IO `tests/support/detectors/blocking_io_runtime.py`). Any sync blocking IO
call whose stack passes through DeerFlow business code while running on call whose stack passes through DeerFlow business code while running on
the asyncio event loop raises `BlockingError` and fails the test. the asyncio event loop raises `BlockingError` and fails the test.
- Two regression anchors live there: `test_skills_load.py` (locks the - Regression anchors live there: `test_skills_load.py` (locks the
`asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix `asyncio.to_thread` offload around `LocalSkillStorage.load_skills`, fix
for #1917) and `test_sqlite_lifespan.py` (locks the offload around for #1917); `test_sqlite_lifespan.py` (locks the offload around
SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912). SQLite path resolution plus `ensure_sqlite_parent_dir`, fix for #1912);
and `test_jsonl_run_event_store.py` (locks `JsonlRunEventStore`'s async
API offloading its file IO via `asyncio.to_thread`, fix #3084).
- `test_gate_smoke.py` is a meta-test asserting the gate actually catches - `test_gate_smoke.py` is a meta-test asserting the gate actually catches
unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io` unoffloaded blocking IO and that the `@pytest.mark.allow_blocking_io`
opt-out works. opt-out works.
+6 -1
View File
@@ -134,11 +134,16 @@ that production actually executes.
## Current runtime coverage ## Current runtime coverage
The initial runtime anchors protect confirmed blocking-IO bug shapes: The runtime anchors protect confirmed blocking-IO bug shapes:
- SQLite checkpointer setup, including path resolution and parent-directory - SQLite checkpointer setup, including path resolution and parent-directory
creation. creation.
- Subagent skill metadata loading through `SubagentExecutor._load_skills()`. - Subagent skill metadata loading through `SubagentExecutor._load_skills()`.
- `JsonlRunEventStore` async API (`put` / `list_*` / `delete_*`): the JSONL
run-event backend offloads its synchronous file IO via `asyncio.to_thread`
(fix #3084); this anchor drives the real async API under the gate so any
blocking IO reintroduced on the loop fails, not only removal of one
`to_thread` call.
- Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and - Gate health checks: Blockbuster catches unoffloaded calls, opt-out works, and
patches are restored after exceptions. patches are restored after exceptions.
@@ -0,0 +1,62 @@
"""Regression anchor: JsonlRunEventStore async API must not block the loop.
``JsonlRunEventStore`` is the ``run_events.backend == "jsonl"`` implementation.
Its ``async def`` methods perform synchronous filesystem IO (``Path.glob``,
``read_text``, ``open``, ``unlink``) that must be offloaded with
``asyncio.to_thread`` (fixed in #3084). ``put`` runs on every emitted run event,
so any blocking IO here stalls the event loop on the hot path.
#3084 added a mock-based offload assertion in
``tests/test_jsonl_event_store_async_io.py`` that covers ``put`` only. This
anchor complements it by driving the **full** async surface (``put``,
``put_batch``, ``list_messages``, ``list_events``, ``list_messages_by_run``,
``count_messages``, ``delete_by_run``, ``delete_by_thread``) under the strict
Blockbuster runtime gate, so any blocking IO reintroduced on the event loop in
any of these methods — not just removal of a specific ``to_thread`` call —
fails CI.
"""
from __future__ import annotations
from pathlib import Path
import pytest
pytestmark = pytest.mark.asyncio
async def test_jsonl_run_event_store_async_api_does_not_block_event_loop(tmp_path: Path) -> None:
from deerflow.runtime.events.store.jsonl import JsonlRunEventStore
store = JsonlRunEventStore(base_dir=str(tmp_path))
# Seed an existing run file so put()'s seq-load globs + reads, and the
# read/delete paths have files to scan. Test-side IO is invisible to the
# gate (this module is not in scanned_modules).
thread_dir = tmp_path / "threads" / "t1" / "runs"
thread_dir.mkdir(parents=True, exist_ok=True)
(thread_dir / "r0.jsonl").write_text('{"seq": 1, "category": "message", "run_id": "r0"}\n', encoding="utf-8")
# writes: put + put_batch
record = await store.put(thread_id="t1", run_id="r1", event_type="message", category="message", content="hi")
assert record["seq"] >= 2
batch = await store.put_batch(
[
{"thread_id": "t1", "run_id": "r2", "event_type": "message", "category": "message", "content": "a"},
{"thread_id": "t1", "run_id": "r2", "event_type": "trace", "category": "trace", "content": "b"},
]
)
assert len(batch) == 2
# reads: list_messages / list_events / list_messages_by_run / count_messages.
# list_events is exercised both without and with the event_types filter so
# the filter branch runs after _read_run_events' filesystem IO.
assert isinstance(await store.list_messages("t1"), list)
assert isinstance(await store.list_events("t1", "r1"), list)
assert isinstance(await store.list_events("t1", "r1", event_types=["message"]), list)
assert isinstance(await store.list_messages_by_run("t1", "r2"), list)
assert await store.count_messages("t1") >= 1
# deletes: delete_by_run (single file) then delete_by_thread (remaining)
assert await store.delete_by_run("t1", "r2") >= 1
assert await store.delete_by_thread("t1") >= 1