mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-13 19:06:01 +00:00
fix(skills): keep skill archive installation off the event loop (#3505)
* fix(skills): keep skill archive installation off the event loop ainstall_skill_from_archive — the async entry point awaited by the gateway POST /skills/install route — ran its entire filesystem pipeline inline on the event loop: zip extraction, frontmatter validation, rglob enumeration, per-file read_text, shutil.copytree staging, and tempdir cleanup. Restructure into offloaded phases: prepare (extract + validate) and commit (stage + move) run via asyncio.to_thread, the tempdir lifecycle is offloaded, and the security scanner's file enumeration and reads move off the loop — only the per-file LLM scan (genuinely async) stays awaited. Security decision logic and exception contract are unchanged. Anchor: tests/blocking_io/test_skills_install.py drives the real install pipeline (real .skill archive, real FS; only scan_skill_content stubbed) under the strict Blockbuster gate. Verified red on pre-fix code (BlockingError: os.stat), green with the fix. * fix(skills): log temp-dir cleanup failures instead of swallowing them Review follow-up on the install offload: rmtree(ignore_errors=True) kept the primary install exception but silently leaked the extraction dir on cleanup failure. Keep the never-mask behaviour, add a warning log. * fix(skills): bound install tmp cleanup and pass skill_dir explicitly (review) - Wrap the best-effort temp-dir cleanup in asyncio.wait_for (5s) so a hung filesystem in the finally block cannot stall or mask the install outcome; timeout is logged like the existing OSError path. - Hoist _collect_scannable_files to module level with skill_dir as an explicit argument instead of a closure capture.
This commit is contained in:
@@ -153,7 +153,7 @@ async def _scan_skill_file_or_raise(skill_dir: Path, path: Path, skill_name: str
|
|||||||
rel_path = path.relative_to(skill_dir).as_posix()
|
rel_path = path.relative_to(skill_dir).as_posix()
|
||||||
location = f"{skill_name}/{rel_path}"
|
location = f"{skill_name}/{rel_path}"
|
||||||
try:
|
try:
|
||||||
content = path.read_text(encoding="utf-8")
|
content = await asyncio.to_thread(path.read_text, encoding="utf-8")
|
||||||
except UnicodeDecodeError as e:
|
except UnicodeDecodeError as e:
|
||||||
raise SkillSecurityScanError(f"Security scan failed for skill '{skill_name}': {location} must be valid UTF-8") from e
|
raise SkillSecurityScanError(f"Security scan failed for skill '{skill_name}': {location} must be valid UTF-8") from e
|
||||||
|
|
||||||
@@ -174,15 +174,17 @@ async def _scan_skill_file_or_raise(skill_dir: Path, path: Path, skill_name: str
|
|||||||
raise SkillSecurityScanError(f"Security scan failed for {location}: invalid scanner decision {decision!r}")
|
raise SkillSecurityScanError(f"Security scan failed for {location}: invalid scanner decision {decision!r}")
|
||||||
|
|
||||||
|
|
||||||
|
def _collect_scannable_files(skill_dir: Path) -> list[Path]:
|
||||||
|
"""Enumerate archive files for scanning (blocking; run off the event loop)."""
|
||||||
|
return [candidate for candidate in sorted(skill_dir.rglob("*")) if candidate.is_file()]
|
||||||
|
|
||||||
|
|
||||||
async def _scan_skill_archive_contents_or_raise(skill_dir: Path, skill_name: str) -> None:
|
async def _scan_skill_archive_contents_or_raise(skill_dir: Path, skill_name: str) -> None:
|
||||||
"""Run the skill security scanner against all installable text and script files."""
|
"""Run the skill security scanner against all installable text and script files."""
|
||||||
skill_md = skill_dir / "SKILL.md"
|
skill_md = skill_dir / "SKILL.md"
|
||||||
await _scan_skill_file_or_raise(skill_dir, skill_md, skill_name, executable=False)
|
await _scan_skill_file_or_raise(skill_dir, skill_md, skill_name, executable=False)
|
||||||
|
|
||||||
for path in sorted(skill_dir.rglob("*")):
|
for path in await asyncio.to_thread(_collect_scannable_files, skill_dir):
|
||||||
if not path.is_file():
|
|
||||||
continue
|
|
||||||
|
|
||||||
rel_path = path.relative_to(skill_dir)
|
rel_path = path.relative_to(skill_dir)
|
||||||
if rel_path == Path("SKILL.md"):
|
if rel_path == Path("SKILL.md"):
|
||||||
continue
|
continue
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import errno
|
import errno
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
@@ -21,6 +22,10 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills"
|
DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills"
|
||||||
|
|
||||||
|
# Bound for the best-effort temp-dir cleanup so a stalled filesystem (e.g. NFS)
|
||||||
|
# cannot hold back the install outcome propagating out of the finally block.
|
||||||
|
_INSTALL_TMP_CLEANUP_TIMEOUT_SECONDS = 5.0
|
||||||
|
|
||||||
|
|
||||||
class LocalSkillStorage(SkillStorage):
|
class LocalSkillStorage(SkillStorage):
|
||||||
"""Skill storage backed by the local filesystem.
|
"""Skill storage backed by the local filesystem.
|
||||||
@@ -94,19 +99,56 @@ class LocalSkillStorage(SkillStorage):
|
|||||||
make_skill_written_path_sandbox_readable(self.get_custom_skill_dir(name), target)
|
make_skill_written_path_sandbox_readable(self.get_custom_skill_dir(name), target)
|
||||||
|
|
||||||
async def ainstall_skill_from_archive(self, archive_path: str | Path) -> dict:
|
async def ainstall_skill_from_archive(self, archive_path: str | Path) -> dict:
|
||||||
|
from deerflow.skills.installer import _scan_skill_archive_contents_or_raise
|
||||||
|
|
||||||
|
logger.info("Installing skill from %s", archive_path)
|
||||||
|
path = Path(archive_path)
|
||||||
|
custom_dir = self._host_root / "custom"
|
||||||
|
|
||||||
|
# The per-file security scan is an async LLM call and must stay on the
|
||||||
|
# event loop; every filesystem phase around it runs in a worker thread.
|
||||||
|
tmp = await asyncio.to_thread(tempfile.mkdtemp)
|
||||||
|
try:
|
||||||
|
skill_dir, skill_name, target = await asyncio.to_thread(self._prepare_skill_archive, path, Path(tmp), custom_dir, archive_path)
|
||||||
|
|
||||||
|
await _scan_skill_archive_contents_or_raise(skill_dir, skill_name)
|
||||||
|
|
||||||
|
await asyncio.to_thread(self._commit_skill_install, skill_dir, skill_name, custom_dir, target)
|
||||||
|
logger.info("Skill %r installed to %s", skill_name, target)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
asyncio.to_thread(self._cleanup_install_tmp, tmp),
|
||||||
|
timeout=_INSTALL_TMP_CLEANUP_TIMEOUT_SECONDS,
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.warning("Timed out cleaning up skill install temp dir %s", tmp)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"skill_name": skill_name,
|
||||||
|
"message": f"Skill '{skill_name}' installed successfully",
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _cleanup_install_tmp(tmp: str) -> None:
|
||||||
|
"""Best-effort removal that never masks the install outcome, but leaves a trace."""
|
||||||
|
try:
|
||||||
|
shutil.rmtree(tmp)
|
||||||
|
except OSError:
|
||||||
|
logger.warning("Failed to clean up skill install temp dir %s", tmp, exc_info=True)
|
||||||
|
|
||||||
|
def _prepare_skill_archive(self, path: Path, tmp_path: Path, custom_dir: Path, archive_path: str | Path) -> tuple[Path, str, Path]:
|
||||||
|
"""Extract and validate the archive (blocking; runs off the event loop)."""
|
||||||
import zipfile
|
import zipfile
|
||||||
|
|
||||||
from deerflow.skills.installer import (
|
from deerflow.skills.installer import (
|
||||||
SkillAlreadyExistsError,
|
SkillAlreadyExistsError,
|
||||||
_move_staged_skill_into_reserved_target,
|
|
||||||
_scan_skill_archive_contents_or_raise,
|
|
||||||
resolve_skill_dir_from_archive,
|
resolve_skill_dir_from_archive,
|
||||||
safe_extract_skill_archive,
|
safe_extract_skill_archive,
|
||||||
)
|
)
|
||||||
from deerflow.skills.validation import _validate_skill_frontmatter
|
from deerflow.skills.validation import _validate_skill_frontmatter
|
||||||
|
|
||||||
logger.info("Installing skill from %s", archive_path)
|
|
||||||
path = Path(archive_path)
|
|
||||||
if not path.is_file():
|
if not path.is_file():
|
||||||
if not path.exists():
|
if not path.exists():
|
||||||
raise FileNotFoundError(f"Skill file not found: {archive_path}")
|
raise FileNotFoundError(f"Skill file not found: {archive_path}")
|
||||||
@@ -114,47 +156,40 @@ class LocalSkillStorage(SkillStorage):
|
|||||||
if path.suffix != ".skill":
|
if path.suffix != ".skill":
|
||||||
raise ValueError("File must have .skill extension")
|
raise ValueError("File must have .skill extension")
|
||||||
|
|
||||||
custom_dir = self._host_root / "custom"
|
|
||||||
custom_dir.mkdir(parents=True, exist_ok=True)
|
custom_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory() as tmp:
|
try:
|
||||||
tmp_path = Path(tmp)
|
zf = zipfile.ZipFile(path, "r")
|
||||||
|
except FileNotFoundError:
|
||||||
|
raise FileNotFoundError(f"Skill file not found: {archive_path}") from None
|
||||||
|
except (zipfile.BadZipFile, IsADirectoryError):
|
||||||
|
raise ValueError("File is not a valid ZIP archive") from None
|
||||||
|
|
||||||
try:
|
with zf:
|
||||||
zf = zipfile.ZipFile(path, "r")
|
safe_extract_skill_archive(zf, tmp_path)
|
||||||
except FileNotFoundError:
|
|
||||||
raise FileNotFoundError(f"Skill file not found: {archive_path}") from None
|
|
||||||
except (zipfile.BadZipFile, IsADirectoryError):
|
|
||||||
raise ValueError("File is not a valid ZIP archive") from None
|
|
||||||
|
|
||||||
with zf:
|
skill_dir = resolve_skill_dir_from_archive(tmp_path)
|
||||||
safe_extract_skill_archive(zf, tmp_path)
|
|
||||||
|
|
||||||
skill_dir = resolve_skill_dir_from_archive(tmp_path)
|
is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir)
|
||||||
|
if not is_valid:
|
||||||
|
raise ValueError(f"Invalid skill: {message}")
|
||||||
|
if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name:
|
||||||
|
raise ValueError(f"Invalid skill name: {skill_name}")
|
||||||
|
|
||||||
is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir)
|
target = custom_dir / skill_name
|
||||||
if not is_valid:
|
if target.exists():
|
||||||
raise ValueError(f"Invalid skill: {message}")
|
raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists")
|
||||||
if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name:
|
|
||||||
raise ValueError(f"Invalid skill name: {skill_name}")
|
|
||||||
|
|
||||||
target = custom_dir / skill_name
|
return skill_dir, skill_name, target
|
||||||
if target.exists():
|
|
||||||
raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists")
|
|
||||||
|
|
||||||
await _scan_skill_archive_contents_or_raise(skill_dir, skill_name)
|
def _commit_skill_install(self, skill_dir: Path, skill_name: str, custom_dir: Path, target: Path) -> None:
|
||||||
|
"""Stage and move the validated skill into place (blocking; runs off the event loop)."""
|
||||||
|
from deerflow.skills.installer import _move_staged_skill_into_reserved_target
|
||||||
|
|
||||||
with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root:
|
with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root:
|
||||||
staging_target = Path(staging_root) / skill_name
|
staging_target = Path(staging_root) / skill_name
|
||||||
shutil.copytree(skill_dir, staging_target)
|
shutil.copytree(skill_dir, staging_target)
|
||||||
_move_staged_skill_into_reserved_target(staging_target, target)
|
_move_staged_skill_into_reserved_target(staging_target, target)
|
||||||
logger.info("Skill %r installed to %s", skill_name, target)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"success": True,
|
|
||||||
"skill_name": skill_name,
|
|
||||||
"message": f"Skill '{skill_name}' installed successfully",
|
|
||||||
}
|
|
||||||
|
|
||||||
def delete_custom_skill(self, name: str, *, history_meta: dict | None = None) -> None:
|
def delete_custom_skill(self, name: str, *, history_meta: dict | None = None) -> None:
|
||||||
self.validate_skill_name(name)
|
self.validate_skill_name(name)
|
||||||
|
|||||||
@@ -0,0 +1,73 @@
|
|||||||
|
"""Regression anchor: skill archive installation must not block the event loop.
|
||||||
|
|
||||||
|
``LocalSkillStorage.ainstall_skill_from_archive`` is the async entry point the
|
||||||
|
gateway ``POST /skills/install`` route awaits. It extracts the archive,
|
||||||
|
validates frontmatter, security-scans every installable file, and stages the
|
||||||
|
skill into the custom directory — all filesystem work that previously ran
|
||||||
|
inline on the event loop (zip extract, ``rglob`` enumeration, ``read_text``,
|
||||||
|
``shutil.copytree``). The fix offloads those phases via ``asyncio.to_thread``
|
||||||
|
while keeping the per-file LLM security scan as the only awaited work; if any
|
||||||
|
phase regresses back onto the loop, the strict Blockbuster gate raises
|
||||||
|
``BlockingError`` and this test fails.
|
||||||
|
|
||||||
|
Only the external LLM boundary (``scan_skill_content``) is stubbed — the
|
||||||
|
archive, extraction, validation, and staging all run against the real local
|
||||||
|
filesystem. Test-side setup IO is itself offloaded with ``asyncio.to_thread``
|
||||||
|
(matching ``test_agents_router``) so only the production path is exercised on
|
||||||
|
the loop.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import zipfile
|
||||||
|
from pathlib import Path
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from deerflow.skills.storage.local_skill_storage import LocalSkillStorage
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.asyncio
|
||||||
|
|
||||||
|
_SKILL_MD = """---
|
||||||
|
name: loop-skill
|
||||||
|
description: Anchor fixture skill for the blocking-IO gate.
|
||||||
|
---
|
||||||
|
|
||||||
|
# Loop Skill
|
||||||
|
|
||||||
|
Drives the full install pipeline under the Blockbuster gate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_SUPPORT_MD = "Reference notes scanned by the per-file security pass.\n"
|
||||||
|
|
||||||
|
|
||||||
|
def _build_archive(archive: Path) -> None:
|
||||||
|
with zipfile.ZipFile(archive, "w") as zf:
|
||||||
|
zf.writestr("loop-skill/SKILL.md", _SKILL_MD)
|
||||||
|
zf.writestr("loop-skill/references/usage.md", _SUPPORT_MD)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_install_skill_archive_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None:
|
||||||
|
archive = tmp_path / "loop-skill.skill"
|
||||||
|
await asyncio.to_thread(_build_archive, archive)
|
||||||
|
|
||||||
|
async def _allow_scan(content: str, *, executable: bool = False, location: str = "SKILL.md", app_config=None):
|
||||||
|
return SimpleNamespace(decision="allow", reason="anchor stub")
|
||||||
|
|
||||||
|
# External dependency boundary only: the security scanner is an LLM call.
|
||||||
|
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _allow_scan)
|
||||||
|
|
||||||
|
# Constructor resolves paths (one-time, cached in production via
|
||||||
|
# get_or_new_skill_storage); offloaded here so the anchor exercises only
|
||||||
|
# the install pipeline itself on the loop.
|
||||||
|
storage = await asyncio.to_thread(LocalSkillStorage, host_path=str(tmp_path / "skills"))
|
||||||
|
|
||||||
|
result = await storage.ainstall_skill_from_archive(archive)
|
||||||
|
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["skill_name"] == "loop-skill"
|
||||||
|
installed_md = tmp_path / "skills" / "custom" / "loop-skill" / "SKILL.md"
|
||||||
|
assert await asyncio.to_thread(installed_md.exists)
|
||||||
|
assert await asyncio.to_thread((tmp_path / "skills" / "custom" / "loop-skill" / "references" / "usage.md").exists)
|
||||||
Reference in New Issue
Block a user