diff --git a/backend/packages/harness/deerflow/skills/installer.py b/backend/packages/harness/deerflow/skills/installer.py index fe78ca633..2e28f8c13 100644 --- a/backend/packages/harness/deerflow/skills/installer.py +++ b/backend/packages/harness/deerflow/skills/installer.py @@ -153,7 +153,7 @@ async def _scan_skill_file_or_raise(skill_dir: Path, path: Path, skill_name: str rel_path = path.relative_to(skill_dir).as_posix() location = f"{skill_name}/{rel_path}" try: - content = path.read_text(encoding="utf-8") + content = await asyncio.to_thread(path.read_text, encoding="utf-8") except UnicodeDecodeError as e: raise SkillSecurityScanError(f"Security scan failed for skill '{skill_name}': {location} must be valid UTF-8") from e @@ -174,15 +174,17 @@ async def _scan_skill_file_or_raise(skill_dir: Path, path: Path, skill_name: str raise SkillSecurityScanError(f"Security scan failed for {location}: invalid scanner decision {decision!r}") +def _collect_scannable_files(skill_dir: Path) -> list[Path]: + """Enumerate archive files for scanning (blocking; run off the event loop).""" + return [candidate for candidate in sorted(skill_dir.rglob("*")) if candidate.is_file()] + + async def _scan_skill_archive_contents_or_raise(skill_dir: Path, skill_name: str) -> None: """Run the skill security scanner against all installable text and script files.""" skill_md = skill_dir / "SKILL.md" await _scan_skill_file_or_raise(skill_dir, skill_md, skill_name, executable=False) - for path in sorted(skill_dir.rglob("*")): - if not path.is_file(): - continue - + for path in await asyncio.to_thread(_collect_scannable_files, skill_dir): rel_path = path.relative_to(skill_dir) if rel_path == Path("SKILL.md"): continue diff --git a/backend/packages/harness/deerflow/skills/storage/local_skill_storage.py b/backend/packages/harness/deerflow/skills/storage/local_skill_storage.py index 69114d51f..f7e2959d5 100644 --- a/backend/packages/harness/deerflow/skills/storage/local_skill_storage.py +++ b/backend/packages/harness/deerflow/skills/storage/local_skill_storage.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import errno import json import logging @@ -21,6 +22,10 @@ logger = logging.getLogger(__name__) DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills" +# Bound for the best-effort temp-dir cleanup so a stalled filesystem (e.g. NFS) +# cannot hold back the install outcome propagating out of the finally block. +_INSTALL_TMP_CLEANUP_TIMEOUT_SECONDS = 5.0 + class LocalSkillStorage(SkillStorage): """Skill storage backed by the local filesystem. @@ -94,19 +99,56 @@ class LocalSkillStorage(SkillStorage): make_skill_written_path_sandbox_readable(self.get_custom_skill_dir(name), target) async def ainstall_skill_from_archive(self, archive_path: str | Path) -> dict: + from deerflow.skills.installer import _scan_skill_archive_contents_or_raise + + logger.info("Installing skill from %s", archive_path) + path = Path(archive_path) + custom_dir = self._host_root / "custom" + + # The per-file security scan is an async LLM call and must stay on the + # event loop; every filesystem phase around it runs in a worker thread. + tmp = await asyncio.to_thread(tempfile.mkdtemp) + try: + skill_dir, skill_name, target = await asyncio.to_thread(self._prepare_skill_archive, path, Path(tmp), custom_dir, archive_path) + + await _scan_skill_archive_contents_or_raise(skill_dir, skill_name) + + await asyncio.to_thread(self._commit_skill_install, skill_dir, skill_name, custom_dir, target) + logger.info("Skill %r installed to %s", skill_name, target) + finally: + try: + await asyncio.wait_for( + asyncio.to_thread(self._cleanup_install_tmp, tmp), + timeout=_INSTALL_TMP_CLEANUP_TIMEOUT_SECONDS, + ) + except TimeoutError: + logger.warning("Timed out cleaning up skill install temp dir %s", tmp) + + return { + "success": True, + "skill_name": skill_name, + "message": f"Skill '{skill_name}' installed successfully", + } + + @staticmethod + def _cleanup_install_tmp(tmp: str) -> None: + """Best-effort removal that never masks the install outcome, but leaves a trace.""" + try: + shutil.rmtree(tmp) + except OSError: + logger.warning("Failed to clean up skill install temp dir %s", tmp, exc_info=True) + + def _prepare_skill_archive(self, path: Path, tmp_path: Path, custom_dir: Path, archive_path: str | Path) -> tuple[Path, str, Path]: + """Extract and validate the archive (blocking; runs off the event loop).""" import zipfile from deerflow.skills.installer import ( SkillAlreadyExistsError, - _move_staged_skill_into_reserved_target, - _scan_skill_archive_contents_or_raise, resolve_skill_dir_from_archive, safe_extract_skill_archive, ) from deerflow.skills.validation import _validate_skill_frontmatter - logger.info("Installing skill from %s", archive_path) - path = Path(archive_path) if not path.is_file(): if not path.exists(): raise FileNotFoundError(f"Skill file not found: {archive_path}") @@ -114,47 +156,40 @@ class LocalSkillStorage(SkillStorage): if path.suffix != ".skill": raise ValueError("File must have .skill extension") - custom_dir = self._host_root / "custom" custom_dir.mkdir(parents=True, exist_ok=True) - with tempfile.TemporaryDirectory() as tmp: - tmp_path = Path(tmp) + try: + zf = zipfile.ZipFile(path, "r") + except FileNotFoundError: + raise FileNotFoundError(f"Skill file not found: {archive_path}") from None + except (zipfile.BadZipFile, IsADirectoryError): + raise ValueError("File is not a valid ZIP archive") from None - try: - zf = zipfile.ZipFile(path, "r") - except FileNotFoundError: - raise FileNotFoundError(f"Skill file not found: {archive_path}") from None - except (zipfile.BadZipFile, IsADirectoryError): - raise ValueError("File is not a valid ZIP archive") from None + with zf: + safe_extract_skill_archive(zf, tmp_path) - with zf: - safe_extract_skill_archive(zf, tmp_path) + skill_dir = resolve_skill_dir_from_archive(tmp_path) - skill_dir = resolve_skill_dir_from_archive(tmp_path) + is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir) + if not is_valid: + raise ValueError(f"Invalid skill: {message}") + if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name: + raise ValueError(f"Invalid skill name: {skill_name}") - is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir) - if not is_valid: - raise ValueError(f"Invalid skill: {message}") - if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name: - raise ValueError(f"Invalid skill name: {skill_name}") + target = custom_dir / skill_name + if target.exists(): + raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists") - target = custom_dir / skill_name - if target.exists(): - raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists") + return skill_dir, skill_name, target - await _scan_skill_archive_contents_or_raise(skill_dir, skill_name) + def _commit_skill_install(self, skill_dir: Path, skill_name: str, custom_dir: Path, target: Path) -> None: + """Stage and move the validated skill into place (blocking; runs off the event loop).""" + from deerflow.skills.installer import _move_staged_skill_into_reserved_target - with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root: - staging_target = Path(staging_root) / skill_name - shutil.copytree(skill_dir, staging_target) - _move_staged_skill_into_reserved_target(staging_target, target) - logger.info("Skill %r installed to %s", skill_name, target) - - return { - "success": True, - "skill_name": skill_name, - "message": f"Skill '{skill_name}' installed successfully", - } + with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root: + staging_target = Path(staging_root) / skill_name + shutil.copytree(skill_dir, staging_target) + _move_staged_skill_into_reserved_target(staging_target, target) def delete_custom_skill(self, name: str, *, history_meta: dict | None = None) -> None: self.validate_skill_name(name) diff --git a/backend/tests/blocking_io/test_skills_install.py b/backend/tests/blocking_io/test_skills_install.py new file mode 100644 index 000000000..1f7f233af --- /dev/null +++ b/backend/tests/blocking_io/test_skills_install.py @@ -0,0 +1,73 @@ +"""Regression anchor: skill archive installation must not block the event loop. + +``LocalSkillStorage.ainstall_skill_from_archive`` is the async entry point the +gateway ``POST /skills/install`` route awaits. It extracts the archive, +validates frontmatter, security-scans every installable file, and stages the +skill into the custom directory — all filesystem work that previously ran +inline on the event loop (zip extract, ``rglob`` enumeration, ``read_text``, +``shutil.copytree``). The fix offloads those phases via ``asyncio.to_thread`` +while keeping the per-file LLM security scan as the only awaited work; if any +phase regresses back onto the loop, the strict Blockbuster gate raises +``BlockingError`` and this test fails. + +Only the external LLM boundary (``scan_skill_content``) is stubbed — the +archive, extraction, validation, and staging all run against the real local +filesystem. Test-side setup IO is itself offloaded with ``asyncio.to_thread`` +(matching ``test_agents_router``) so only the production path is exercised on +the loop. +""" + +from __future__ import annotations + +import asyncio +import zipfile +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from deerflow.skills.storage.local_skill_storage import LocalSkillStorage + +pytestmark = pytest.mark.asyncio + +_SKILL_MD = """--- +name: loop-skill +description: Anchor fixture skill for the blocking-IO gate. +--- + +# Loop Skill + +Drives the full install pipeline under the Blockbuster gate. +""" + +_SUPPORT_MD = "Reference notes scanned by the per-file security pass.\n" + + +def _build_archive(archive: Path) -> None: + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("loop-skill/SKILL.md", _SKILL_MD) + zf.writestr("loop-skill/references/usage.md", _SUPPORT_MD) + + +async def test_install_skill_archive_does_not_block_event_loop(tmp_path: Path, monkeypatch) -> None: + archive = tmp_path / "loop-skill.skill" + await asyncio.to_thread(_build_archive, archive) + + async def _allow_scan(content: str, *, executable: bool = False, location: str = "SKILL.md", app_config=None): + return SimpleNamespace(decision="allow", reason="anchor stub") + + # External dependency boundary only: the security scanner is an LLM call. + monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _allow_scan) + + # Constructor resolves paths (one-time, cached in production via + # get_or_new_skill_storage); offloaded here so the anchor exercises only + # the install pipeline itself on the loop. + storage = await asyncio.to_thread(LocalSkillStorage, host_path=str(tmp_path / "skills")) + + result = await storage.ainstall_skill_from_archive(archive) + + assert result["success"] is True + assert result["skill_name"] == "loop-skill" + installed_md = tmp_path / "skills" / "custom" / "loop-skill" / "SKILL.md" + assert await asyncio.to_thread(installed_md.exists) + assert await asyncio.to_thread((tmp_path / "skills" / "custom" / "loop-skill" / "references" / "usage.md").exists)