diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index b3cd23765..2ff9e5860 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -608,8 +608,14 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic write_upload_file_no_symlink, ) - uploads_dir = ensure_uploads_dir(thread_id) - seen_names = {entry.name for entry in uploads_dir.iterdir() if entry.is_file()} + def _prepare_uploads_dir() -> tuple[Path, set[str]]: + # Worker thread: ensure_uploads_dir's mkdir and the iterdir enumeration are + # blocking filesystem IO that must stay off the event loop. + target = ensure_uploads_dir(thread_id) + existing = {entry.name for entry in target.iterdir() if entry.is_file()} + return target, existing + + uploads_dir, seen_names = await asyncio.to_thread(_prepare_uploads_dir) created: list[dict[str, Any]] = [] file_reader = INBOUND_FILE_READERS.get(msg.channel_name, _read_http_inbound_file) @@ -657,7 +663,7 @@ async def _ingest_inbound_files(thread_id: str, msg: InboundMessage) -> list[dic dest = uploads_dir / safe_name try: - dest = write_upload_file_no_symlink(uploads_dir, safe_name, data) + dest = await asyncio.to_thread(write_upload_file_no_symlink, uploads_dir, safe_name, data) except UnsafeUploadPathError: logger.warning("[Manager] skipping inbound file with unsafe destination: %s", safe_name) continue diff --git a/backend/tests/blocking_io/test_channels_ingest.py b/backend/tests/blocking_io/test_channels_ingest.py new file mode 100644 index 000000000..7b16c7084 --- /dev/null +++ b/backend/tests/blocking_io/test_channels_ingest.py @@ -0,0 +1,57 @@ +"""Regression anchor: ingesting inbound channel files must not block the event loop. + +``ChannelManager``'s ``_ingest_inbound_files`` ensures the thread uploads +directory (``mkdir``), enumerates it (``iterdir`` / ``is_file``) to de-duplicate +filenames, and writes each downloaded attachment to disk +(``write_upload_file_no_symlink``) — all blocking filesystem IO. The async +function offloads the directory prep and every per-file write via +``asyncio.to_thread`` while keeping the genuinely async network read +(``file_reader``) on the loop. If any of that regresses back onto the event +loop, the strict Blockbuster gate raises ``BlockingError`` and this test fails. + +Imports are kept at module top so any import-time IO runs at collection (outside +the gate); the surface under test runs on the event loop inside the gated test. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +import pytest + +from app.channels import manager as mgr +from app.channels.message_bus import InboundMessage +from deerflow.uploads.manager import get_uploads_dir + +pytestmark = pytest.mark.asyncio + + +async def test_ingest_inbound_files_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None: + monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path)) + # Rebuild the cached Paths against the tmp home so uploads resolve under it. + import deerflow.config.paths as paths_mod + + monkeypatch.setattr(paths_mod, "_paths", None) + + # Swap the network reader for an in-memory one: no real HTTP, so the only IO + # left for this anchor to guard is the filesystem work. + async def _fake_reader(f, client): + return b"payload-bytes" + + monkeypatch.setattr(mgr, "_read_http_inbound_file", _fake_reader) + + msg = InboundMessage( + channel_name="unit-test-channel", # absent from INBOUND_FILE_READERS -> default reader + chat_id="c1", + user_id="u1", + text="hi", + files=[{"type": "file", "filename": "report.txt"}], + ) + + created = await mgr._ingest_inbound_files("t1", msg) + + assert len(created) == 1 + assert created[0]["filename"] == "report.txt" + written = await asyncio.to_thread(lambda: (get_uploads_dir("t1") / "report.txt").exists()) + assert written, "inbound file should be written under the tmp uploads dir"