mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-10 09:25:57 +00:00
fix(agents): offload blocking filesystem IO in the custom-agent router off the event loop (#3457)
* fix(agents): offload blocking filesystem IO in delete_agent off the event loop delete_agent is an async route handler but resolved the agent directory (Paths.base_dir -> Path.resolve), probed it (Path.exists), and removed it (shutil.rmtree) directly on the event loop, blocking it for the duration of every delete. Surfaced by 'make detect-blocking-io'. Move the resolve/exists/rmtree sequence into a sync helper run via asyncio.to_thread, mapping its outcome back to the existing 404/409/500 responses (behavior unchanged). Adds a tests/blocking_io/ regression anchor under the strict Blockbuster gate, mirroring test_skills_load.py (#1917). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(agents): offload blocking filesystem IO in create_agent_endpoint too Like delete_agent, the async create_agent_endpoint resolved and created the agent directory and wrote config.yaml + SOUL.md (with rmtree cleanup on failure) directly on the event loop. Move the whole create-or-409 sequence into a sync helper run via asyncio.to_thread; behavior is unchanged (201 / 409 / 500). Extends the blocking_io regression anchor to cover create as well as delete and renames it to test_agents_router.py. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: ly-wang19 <ly-wang19@users.noreply.github.com> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
"""CRUD API for custom agents."""
|
"""CRUD API for custom agents."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
@@ -213,15 +214,21 @@ async def create_agent_endpoint(request: AgentCreateRequest) -> AgentResponse:
|
|||||||
user_id = get_effective_user_id()
|
user_id = get_effective_user_id()
|
||||||
paths = get_paths()
|
paths = get_paths()
|
||||||
|
|
||||||
|
def _create_agent() -> AgentResponse | None:
|
||||||
|
# Worker thread: base-dir resolution, existence checks, directory/file
|
||||||
|
# creation, read-back, and failure cleanup are all blocking filesystem
|
||||||
|
# IO that must stay off the event loop.
|
||||||
agent_dir = paths.user_agent_dir(user_id, normalized_name)
|
agent_dir = paths.user_agent_dir(user_id, normalized_name)
|
||||||
legacy_dir = paths.agent_dir(normalized_name)
|
legacy_dir = paths.agent_dir(normalized_name)
|
||||||
|
|
||||||
if agent_dir.exists() or legacy_dir.exists():
|
if legacy_dir.exists():
|
||||||
raise HTTPException(status_code=409, detail=f"Agent '{normalized_name}' already exists")
|
return None # signals 409 to the caller
|
||||||
|
|
||||||
try:
|
try:
|
||||||
agent_dir.mkdir(parents=True, exist_ok=True)
|
try:
|
||||||
|
agent_dir.mkdir(parents=True, exist_ok=False)
|
||||||
|
except FileExistsError:
|
||||||
|
return None # signals 409 to the caller
|
||||||
# Write config.yaml
|
# Write config.yaml
|
||||||
config_data: dict = {"name": normalized_name}
|
config_data: dict = {"name": normalized_name}
|
||||||
if request.description:
|
if request.description:
|
||||||
@@ -245,16 +252,23 @@ async def create_agent_endpoint(request: AgentCreateRequest) -> AgentResponse:
|
|||||||
|
|
||||||
agent_cfg = load_agent_config(normalized_name, user_id=user_id)
|
agent_cfg = load_agent_config(normalized_name, user_id=user_id)
|
||||||
return _agent_config_to_response(agent_cfg, include_soul=True, user_id=user_id)
|
return _agent_config_to_response(agent_cfg, include_soul=True, user_id=user_id)
|
||||||
|
except Exception:
|
||||||
except HTTPException:
|
# Clean up partial state on failure before surfacing the error.
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
# Clean up on failure
|
|
||||||
if agent_dir.exists():
|
if agent_dir.exists():
|
||||||
shutil.rmtree(agent_dir)
|
shutil.rmtree(agent_dir)
|
||||||
|
raise
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = await asyncio.to_thread(_create_agent)
|
||||||
|
except Exception as e:
|
||||||
logger.error(f"Failed to create agent '{request.name}': {e}", exc_info=True)
|
logger.error(f"Failed to create agent '{request.name}': {e}", exc_info=True)
|
||||||
raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}")
|
raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}")
|
||||||
|
|
||||||
|
if response is None:
|
||||||
|
raise HTTPException(status_code=409, detail=f"Agent '{normalized_name}' already exists")
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
@router.put(
|
@router.put(
|
||||||
"/agents/{name}",
|
"/agents/{name}",
|
||||||
@@ -428,19 +442,30 @@ async def delete_agent(name: str) -> None:
|
|||||||
name = _normalize_agent_name(name)
|
name = _normalize_agent_name(name)
|
||||||
user_id = get_effective_user_id()
|
user_id = get_effective_user_id()
|
||||||
paths = get_paths()
|
paths = get_paths()
|
||||||
agent_dir = paths.user_agent_dir(user_id, name)
|
|
||||||
|
|
||||||
|
def _remove_agent_dir() -> tuple[str, str]:
|
||||||
|
# Runs in a worker thread: resolving the base dir, probing the directory
|
||||||
|
# (`exists`), and removing it (`rmtree`) are all blocking filesystem IO
|
||||||
|
# that must stay off the event loop.
|
||||||
|
agent_dir = paths.user_agent_dir(user_id, name)
|
||||||
if not agent_dir.exists():
|
if not agent_dir.exists():
|
||||||
if paths.agent_dir(name).exists():
|
outcome = "legacy" if paths.agent_dir(name).exists() else "missing"
|
||||||
|
return outcome, str(agent_dir)
|
||||||
|
shutil.rmtree(agent_dir)
|
||||||
|
return "deleted", str(agent_dir)
|
||||||
|
|
||||||
|
try:
|
||||||
|
outcome, agent_dir = await asyncio.to_thread(_remove_agent_dir)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to delete agent '{name}': {e}", exc_info=True)
|
||||||
|
raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}")
|
||||||
|
|
||||||
|
if outcome == "legacy":
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=409,
|
status_code=409,
|
||||||
detail=(f"Agent '{name}' only exists in the legacy shared layout and is not scoped to a user. Run scripts/migrate_user_isolation.py to move legacy agents into the per-user layout before deleting."),
|
detail=(f"Agent '{name}' only exists in the legacy shared layout and is not scoped to a user. Run scripts/migrate_user_isolation.py to move legacy agents into the per-user layout before deleting."),
|
||||||
)
|
)
|
||||||
|
if outcome == "missing":
|
||||||
raise HTTPException(status_code=404, detail=f"Agent '{name}' not found")
|
raise HTTPException(status_code=404, detail=f"Agent '{name}' not found")
|
||||||
|
|
||||||
try:
|
|
||||||
shutil.rmtree(agent_dir)
|
|
||||||
logger.info(f"Deleted agent '{name}' from {agent_dir}")
|
logger.info(f"Deleted agent '{name}' from {agent_dir}")
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to delete agent '{name}': {e}", exc_info=True)
|
|
||||||
raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}")
|
|
||||||
|
|||||||
@@ -0,0 +1,64 @@
|
|||||||
|
"""Regression anchors: the custom-agent router must not block the event loop.
|
||||||
|
|
||||||
|
``app.gateway.routers.agents.create_agent_endpoint`` and ``delete_agent`` are
|
||||||
|
async route handlers that resolve the agent directory (``Paths.base_dir`` calls
|
||||||
|
``Path.resolve``), probe it (``Path.exists``), and create/remove it (``mkdir``,
|
||||||
|
config/SOUL writes, ``shutil.rmtree``) — all blocking IO. Both offload that work
|
||||||
|
via ``asyncio.to_thread``; if any of it regresses back onto the event loop, the
|
||||||
|
strict Blockbuster gate raises ``BlockingError`` and these tests fail.
|
||||||
|
|
||||||
|
Imports live at module scope so the one-time FastAPI app construction (which
|
||||||
|
reads files while building OpenAPI schemas) happens at collection time, not on
|
||||||
|
the event loop under test. Test-side path resolution is itself offloaded with
|
||||||
|
``asyncio.to_thread`` (matching ``test_uploads_middleware``) so only the
|
||||||
|
handlers' own filesystem access is exercised on the loop.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from app.gateway.routers.agents import AgentCreateRequest, create_agent_endpoint, delete_agent
|
||||||
|
from deerflow.config.agents_api_config import load_agents_api_config_from_dict
|
||||||
|
from deerflow.config.paths import get_paths
|
||||||
|
from deerflow.runtime.user_context import get_effective_user_id
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
|
||||||
|
async def test_create_agent_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None:
|
||||||
|
monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path))
|
||||||
|
monkeypatch.setattr("deerflow.config.paths._paths", None)
|
||||||
|
load_agents_api_config_from_dict({"enabled": True})
|
||||||
|
try:
|
||||||
|
response = await create_agent_endpoint(AgentCreateRequest(name="loop-make-agent", soul="You are a test agent."))
|
||||||
|
assert response is not None
|
||||||
|
|
||||||
|
user_id = get_effective_user_id()
|
||||||
|
# test-side check (resolution offloaded; not exercised on the loop)
|
||||||
|
agent_dir = await asyncio.to_thread(get_paths().user_agent_dir, user_id, "loop-make-agent")
|
||||||
|
assert await asyncio.to_thread((agent_dir / "config.yaml").exists)
|
||||||
|
finally:
|
||||||
|
load_agents_api_config_from_dict({})
|
||||||
|
|
||||||
|
|
||||||
|
async def test_delete_agent_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None:
|
||||||
|
monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path))
|
||||||
|
monkeypatch.setattr("deerflow.config.paths._paths", None)
|
||||||
|
load_agents_api_config_from_dict({"enabled": True})
|
||||||
|
try:
|
||||||
|
user_id = get_effective_user_id()
|
||||||
|
user_id = get_effective_user_id()
|
||||||
|
# test-side seeding (resolution offloaded; not exercised on the loop)
|
||||||
|
agent_dir = await asyncio.to_thread(get_paths().user_agent_dir, user_id, "loop-test-agent")
|
||||||
|
await asyncio.to_thread(agent_dir.mkdir, parents=True, exist_ok=True)
|
||||||
|
await asyncio.to_thread((agent_dir / "config.yaml").write_text, "name: loop-test-agent\n", encoding="utf-8")
|
||||||
|
|
||||||
|
await delete_agent("loop-test-agent")
|
||||||
|
|
||||||
|
assert not await asyncio.to_thread(agent_dir.exists)
|
||||||
|
finally:
|
||||||
|
load_agents_api_config_from_dict({})
|
||||||
Reference in New Issue
Block a user