fix(channels): offload blocking filesystem IO in inbound file ingestion (#3529)

_ingest_inbound_files ensured the thread uploads dir (mkdir), enumerated it
(iterdir/is_file) to de-duplicate names, and wrote each downloaded attachment
(write_upload_file_no_symlink) directly on the event loop. Offload the directory
prep and every per-file write via asyncio.to_thread; the genuinely async network
read (file_reader) stays on the loop. Externally observable behavior is unchanged.

Found via `make detect-blocking-io` (HIGH: iterdir on an async path).

Add tests/blocking_io/test_channels_ingest.py anchor, verified red->green under
the strict Blockbuster gate.

Co-authored-by: ly-wang19 <ly-wang19@users.noreply.github.com>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
ly-wang19
2026-06-13 06:38:54 +08:00
committed by GitHub
parent 579e416459
commit 420a886e1d
2 changed files with 66 additions and 3 deletions
+9 -3
View File
@@ -608,8 +608,14 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic
write_upload_file_no_symlink, write_upload_file_no_symlink,
) )
uploads_dir = ensure_uploads_dir(thread_id) def _prepare_uploads_dir() -> tuple[Path, set[str]]:
seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()} # Worker thread: ensure_uploads_dir's mkdir and the iterdir enumeration are
# blocking filesystem IO that must stay off the event loop.
target = ensure_uploads_dir(thread_id)
existing = {entry.name for entry in target.iterdir() if entry.is_file()}
return target, existing
uploads_dir, seen_names = await asyncio.to_thread(_prepare_uploads_dir)
created: list[dict[str, Any]] = [] created: list[dict[str, Any]] = []
file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file) file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file)
@@ -657,7 +663,7 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic
dest = uploads_dir / safe_name dest = uploads_dir / safe_name
try: try:
dest = write_upload_file_no_symlink(uploads_dir, safe_name, data) dest = await asyncio.to_thread(write_upload_file_no_symlink, uploads_dir, safe_name, data)
except UnsafeUploadPathError: except UnsafeUploadPathError:
logger.warning("[Manager] skipping inbound file with unsafe destination: %s", safe_name) logger.warning("[Manager] skipping inbound file with unsafe destination: %s", safe_name)
continue continue
@@ -0,0 +1,57 @@
"""Regression anchor: ingesting inbound channel files must not block the event loop.
``ChannelManager``'s ``_ingest_inbound_files`` ensures the thread uploads
directory (``mkdir``), enumerates it (``iterdir`` / ``is_file``) to de-duplicate
filenames, and writes each downloaded attachment to disk
(``write_upload_file_no_symlink``) — all blocking filesystem IO. The async
function offloads the directory prep and every per-file write via
``asyncio.to_thread`` while keeping the genuinely async network read
(``file_reader``) on the loop. If any of that regresses back onto the event
loop, the strict Blockbuster gate raises ``BlockingError`` and this test fails.
Imports are kept at module top so any import-time IO runs at collection (outside
the gate); the surface under test runs on the event loop inside the gated test.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
import pytest
from app.channels import manager as mgr
from app.channels.message_bus import InboundMessage
from deerflow.uploads.manager import get_uploads_dir
pytestmark = pytest.mark.asyncio
async def test_ingest_inbound_files_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None:
monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path))
# Rebuild the cached Paths against the tmp home so uploads resolve under it.
import deerflow.config.paths as paths_mod
monkeypatch.setattr(paths_mod, "_paths", None)
# Swap the network reader for an in-memory one: no real HTTP, so the only IO
# left for this anchor to guard is the filesystem work.
async def _fake_reader(f, client):
return b"payload-bytes"
monkeypatch.setattr(mgr, "_read_http_inbound_file", _fake_reader)
msg = InboundMessage(
channel_name="unit-test-channel", # absent from INBOUND_FILE_READERS -> default reader
chat_id="c1",
user_id="u1",
text="hi",
files=[{"type": "file", "filename": "report.txt"}],
)
created = await mgr._ingest_inbound_files("t1", msg)
assert len(created) == 1
assert created[0]["filename"] == "report.txt"
written = await asyncio.to_thread(lambda: (get_uploads_dir("t1") / "report.txt").exists())
assert written, "inbound file should be written under the tmp uploads dir"