fix(skills): scan skill archives before install (#2561)

* fix(skills): scan skill archives before install

Fixes #2536

* fix(skills): scan archive support files before install

* style(skills): format archive installer

* fix(skills): address archive install review comments
This commit is contained in:
DanielWalnut
2026-04-28 11:56:11 +08:00
committed by GitHub
parent f7dfb88a30
commit 707ed328dd
7 changed files with 400 additions and 9 deletions
+2 -2
View File
@@ -11,7 +11,7 @@ from app.gateway.path_utils import resolve_thread_virtual_path
from deerflow.agents.lead_agent.prompt import refresh_skills_system_prompt_cache_async from deerflow.agents.lead_agent.prompt import refresh_skills_system_prompt_cache_async
from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config
from deerflow.skills import Skill, load_skills from deerflow.skills import Skill, load_skills
from deerflow.skills.installer import SkillAlreadyExistsError, install_skill_from_archive from deerflow.skills.installer import SkillAlreadyExistsError, ainstall_skill_from_archive
from deerflow.skills.manager import ( from deerflow.skills.manager import (
append_history, append_history,
atomic_write, atomic_write,
@@ -119,7 +119,7 @@ async def list_skills() -> SkillsListResponse:
async def install_skill(request: SkillInstallRequest) -> SkillInstallResponse: async def install_skill(request: SkillInstallRequest) -> SkillInstallResponse:
try: try:
skill_file_path = resolve_thread_virtual_path(request.thread_id, request.path) skill_file_path = resolve_thread_virtual_path(request.thread_id, request.path)
result = install_skill_from_archive(skill_file_path) result = await ainstall_skill_from_archive(skill_file_path)
await refresh_skills_system_prompt_cache_async() await refresh_skills_system_prompt_cache_async()
return SkillInstallResponse(**result) return SkillInstallResponse(**result)
except FileNotFoundError as e: except FileNotFoundError as e:
@@ -1,4 +1,4 @@
from .installer import SkillAlreadyExistsError, install_skill_from_archive from .installer import SkillAlreadyExistsError, SkillSecurityScanError, ainstall_skill_from_archive, install_skill_from_archive
from .loader import get_skills_root_path, load_skills from .loader import get_skills_root_path, load_skills
from .types import Skill from .types import Skill
from .validation import ALLOWED_FRONTMATTER_PROPERTIES, _validate_skill_frontmatter from .validation import ALLOWED_FRONTMATTER_PROPERTIES, _validate_skill_frontmatter
@@ -10,5 +10,7 @@ __all__ = [
"ALLOWED_FRONTMATTER_PROPERTIES", "ALLOWED_FRONTMATTER_PROPERTIES",
"_validate_skill_frontmatter", "_validate_skill_frontmatter",
"install_skill_from_archive", "install_skill_from_archive",
"ainstall_skill_from_archive",
"SkillAlreadyExistsError", "SkillAlreadyExistsError",
"SkillSecurityScanError",
] ]
@@ -4,6 +4,8 @@ Pure business logic — no FastAPI/HTTP dependencies.
Both Gateway and Client delegate to these functions. Both Gateway and Client delegate to these functions.
""" """
import asyncio
import concurrent.futures
import logging import logging
import posixpath import posixpath
import shutil import shutil
@@ -13,15 +15,23 @@ import zipfile
from pathlib import Path, PurePosixPath, PureWindowsPath from pathlib import Path, PurePosixPath, PureWindowsPath
from deerflow.skills.loader import get_skills_root_path from deerflow.skills.loader import get_skills_root_path
from deerflow.skills.security_scanner import scan_skill_content
from deerflow.skills.validation import _validate_skill_frontmatter from deerflow.skills.validation import _validate_skill_frontmatter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_PROMPT_INPUT_DIRS = {"references", "templates"}
_PROMPT_INPUT_SUFFIXES = frozenset({".json", ".markdown", ".md", ".rst", ".txt", ".yaml", ".yml"})
class SkillAlreadyExistsError(ValueError): class SkillAlreadyExistsError(ValueError):
"""Raised when a skill with the same name is already installed.""" """Raised when a skill with the same name is already installed."""
class SkillSecurityScanError(ValueError):
"""Raised when a skill archive fails security scanning."""
def is_unsafe_zip_member(info: zipfile.ZipInfo) -> bool: def is_unsafe_zip_member(info: zipfile.ZipInfo) -> bool:
"""Return True if the zip member path is absolute or attempts directory traversal.""" """Return True if the zip member path is absolute or attempts directory traversal."""
name = info.filename name = info.filename
@@ -114,7 +124,78 @@ def safe_extract_skill_archive(
dst.write(chunk) dst.write(chunk)
def install_skill_from_archive( def _is_script_support_file(rel_path: Path) -> bool:
return bool(rel_path.parts) and rel_path.parts[0] == "scripts"
def _should_scan_support_file(rel_path: Path) -> bool:
if _is_script_support_file(rel_path):
return True
return bool(rel_path.parts) and rel_path.parts[0] in _PROMPT_INPUT_DIRS and rel_path.suffix.lower() in _PROMPT_INPUT_SUFFIXES
def _move_staged_skill_into_reserved_target(staging_target: Path, target: Path) -> None:
installed = False
reserved = False
try:
target.mkdir(mode=0o700)
reserved = True
for child in staging_target.iterdir():
shutil.move(str(child), target / child.name)
installed = True
except FileExistsError as e:
raise SkillAlreadyExistsError(f"Skill '{target.name}' already exists") from e
finally:
if reserved and not installed and target.exists():
shutil.rmtree(target)
async def _scan_skill_file_or_raise(skill_dir: Path, path: Path, skill_name: str, *, executable: bool) -> None:
rel_path = path.relative_to(skill_dir).as_posix()
location = f"{skill_name}/{rel_path}"
try:
content = path.read_text(encoding="utf-8")
except UnicodeDecodeError as e:
raise SkillSecurityScanError(f"Security scan failed for skill '{skill_name}': {location} must be valid UTF-8") from e
try:
result = await scan_skill_content(content, executable=executable, location=location)
except Exception as e:
raise SkillSecurityScanError(f"Security scan failed for {location}: {e}") from e
decision = getattr(result, "decision", None)
reason = str(getattr(result, "reason", "") or "No reason provided.")
if decision == "block":
if rel_path == "SKILL.md":
raise SkillSecurityScanError(f"Security scan blocked skill '{skill_name}': {reason}")
raise SkillSecurityScanError(f"Security scan blocked {location}: {reason}")
if executable and decision != "allow":
raise SkillSecurityScanError(f"Security scan rejected executable {location}: {reason}")
if decision not in {"allow", "warn"}:
raise SkillSecurityScanError(f"Security scan failed for {location}: invalid scanner decision {decision!r}")
async def _scan_skill_archive_contents_or_raise(skill_dir: Path, skill_name: str) -> None:
"""Run the skill security scanner against all installable text and script files."""
skill_md = skill_dir / "SKILL.md"
await _scan_skill_file_or_raise(skill_dir, skill_md, skill_name, executable=False)
for path in sorted(skill_dir.rglob("*")):
if not path.is_file():
continue
rel_path = path.relative_to(skill_dir)
if rel_path == Path("SKILL.md"):
continue
if path.name == "SKILL.md":
raise SkillSecurityScanError(f"Security scan failed for skill '{skill_name}': nested SKILL.md is not allowed at {skill_name}/{rel_path.as_posix()}")
if not _should_scan_support_file(rel_path):
continue
await _scan_skill_file_or_raise(skill_dir, path, skill_name, executable=_is_script_support_file(rel_path))
async def ainstall_skill_from_archive(
zip_path: str | Path, zip_path: str | Path,
*, *,
skills_root: Path | None = None, skills_root: Path | None = None,
@@ -173,7 +254,12 @@ def install_skill_from_archive(
if target.exists(): if target.exists():
raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists") raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists")
shutil.copytree(skill_dir, target) await _scan_skill_archive_contents_or_raise(skill_dir, skill_name)
with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root:
staging_target = Path(staging_root) / skill_name
shutil.copytree(skill_dir, staging_target)
_move_staged_skill_into_reserved_target(staging_target, target)
logger.info("Skill %r installed to %s", skill_name, target) logger.info("Skill %r installed to %s", skill_name, target)
return { return {
@@ -181,3 +267,24 @@ def install_skill_from_archive(
"skill_name": skill_name, "skill_name": skill_name,
"message": f"Skill '{skill_name}' installed successfully", "message": f"Skill '{skill_name}' installed successfully",
} }
def _run_async_install(coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
return executor.submit(asyncio.run, coro).result()
return asyncio.run(coro)
def install_skill_from_archive(
zip_path: str | Path,
*,
skills_root: Path | None = None,
) -> dict:
"""Install a skill from a .skill archive (ZIP)."""
return _run_async_install(ainstall_skill_from_archive(zip_path, skills_root=skills_root))
+15 -4
View File
@@ -49,6 +49,17 @@ def client(mock_app_config):
return DeerFlowClient() return DeerFlowClient()
@pytest.fixture
def allow_skill_security_scan():
async def _scan(*args, **kwargs):
from deerflow.skills.security_scanner import ScanResult
return ScanResult(decision="allow", reason="ok")
with patch("deerflow.skills.installer.scan_skill_content", _scan):
yield
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# __init__ # __init__
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -1195,7 +1206,7 @@ class TestSkillsManagement:
with pytest.raises(ValueError, match="not found"): with pytest.raises(ValueError, match="not found"):
client.update_skill("nonexistent", enabled=True) client.update_skill("nonexistent", enabled=True)
def test_install_skill(self, client): def test_install_skill(self, client, allow_skill_security_scan):
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)
@@ -2015,7 +2026,7 @@ class TestScenarioMemoryWorkflow:
class TestScenarioSkillInstallAndUse: class TestScenarioSkillInstallAndUse:
"""Scenario: Install a skill → verify it appears → toggle it.""" """Scenario: Install a skill → verify it appears → toggle it."""
def test_install_then_toggle(self, client): def test_install_then_toggle(self, client, allow_skill_security_scan):
"""Install .skill archive → list to verify → disable → verify disabled.""" """Install .skill archive → list to verify → disable → verify disabled."""
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp) tmp_path = Path(tmp)
@@ -2261,7 +2272,7 @@ class TestGatewayConformance:
parsed = SkillResponse(**result) parsed = SkillResponse(**result)
assert parsed.name == "web-search" assert parsed.name == "web-search"
def test_install_skill(self, client, tmp_path): def test_install_skill(self, client, tmp_path, allow_skill_security_scan):
skill_dir = tmp_path / "my-skill" skill_dir = tmp_path / "my-skill"
skill_dir.mkdir() skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A test skill\n---\nBody\n") (skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A test skill\n---\nBody\n")
@@ -2459,7 +2470,7 @@ class TestInstallSkillSecurity:
with pytest.raises(ValueError, match="unsafe"): with pytest.raises(ValueError, match="unsafe"):
client.install_skill(archive) client.install_skill(archive)
def test_symlinks_skipped_during_extraction(self, client): def test_symlinks_skipped_during_extraction(self, client, allow_skill_security_scan):
"""Symlink entries in the archive are skipped (never written to disk).""" """Symlink entries in the archive are skipped (never written to disk)."""
import stat as stat_mod import stat as stat_mod
+9
View File
@@ -522,6 +522,15 @@ class TestArtifactAccess:
class TestSkillInstallation: class TestSkillInstallation:
"""install_skill() with real ZIP handling and filesystem.""" """install_skill() with real ZIP handling and filesystem."""
@pytest.fixture(autouse=True)
def _allow_skill_security_scan(self, monkeypatch):
async def _scan(*args, **kwargs):
from deerflow.skills.security_scanner import ScanResult
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def _isolate_skills_dir(self, tmp_path, monkeypatch): def _isolate_skills_dir(self, tmp_path, monkeypatch):
"""Redirect skill installation to a temp directory.""" """Redirect skill installation to a temp directory."""
@@ -1,5 +1,6 @@
import errno import errno
import json import json
import zipfile
from pathlib import Path from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
@@ -35,6 +36,85 @@ def _make_skill(name: str, *, enabled: bool) -> Skill:
) )
def _make_skill_archive(tmp_path: Path, name: str, content: str | None = None) -> Path:
archive = tmp_path / f"{name}.skill"
skill_content = content or _skill_content(name)
with zipfile.ZipFile(archive, "w") as zf:
zf.writestr(f"{name}/SKILL.md", skill_content)
return archive
def test_install_skill_archive_runs_security_scan(monkeypatch, tmp_path):
skills_root = tmp_path / "skills"
(skills_root / "custom").mkdir(parents=True)
archive = _make_skill_archive(tmp_path, "archive-skill")
scan_calls = []
refresh_calls = []
async def _scan(content, *, executable, location):
from deerflow.skills.security_scanner import ScanResult
scan_calls.append({"content": content, "executable": executable, "location": location})
return ScanResult(decision="allow", reason="ok")
async def _refresh():
refresh_calls.append("refresh")
monkeypatch.setattr(skills_router, "resolve_thread_virtual_path", lambda thread_id, path: archive)
monkeypatch.setattr("deerflow.skills.installer.get_skills_root_path", lambda: skills_root)
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
monkeypatch.setattr(skills_router, "refresh_skills_system_prompt_cache_async", _refresh)
app = FastAPI()
app.include_router(skills_router.router)
with TestClient(app) as client:
response = client.post("/api/skills/install", json={"thread_id": "thread-1", "path": "mnt/user-data/outputs/archive-skill.skill"})
assert response.status_code == 200
assert response.json()["skill_name"] == "archive-skill"
assert (skills_root / "custom" / "archive-skill" / "SKILL.md").exists()
assert scan_calls == [
{
"content": _skill_content("archive-skill"),
"executable": False,
"location": "archive-skill/SKILL.md",
}
]
assert refresh_calls == ["refresh"]
def test_install_skill_archive_security_scan_block_returns_400(monkeypatch, tmp_path):
skills_root = tmp_path / "skills"
(skills_root / "custom").mkdir(parents=True)
archive = _make_skill_archive(tmp_path, "blocked-skill")
refresh_calls = []
async def _scan(*args, **kwargs):
from deerflow.skills.security_scanner import ScanResult
return ScanResult(decision="block", reason="prompt injection")
async def _refresh():
refresh_calls.append("refresh")
monkeypatch.setattr(skills_router, "resolve_thread_virtual_path", lambda thread_id, path: archive)
monkeypatch.setattr("deerflow.skills.installer.get_skills_root_path", lambda: skills_root)
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
monkeypatch.setattr(skills_router, "refresh_skills_system_prompt_cache_async", _refresh)
app = FastAPI()
app.include_router(skills_router.router)
with TestClient(app) as client:
response = client.post("/api/skills/install", json={"thread_id": "thread-1", "path": "mnt/user-data/outputs/blocked-skill.skill"})
assert response.status_code == 400
assert "Security scan blocked skill 'blocked-skill': prompt injection" in response.json()["detail"]
assert not (skills_root / "custom" / "blocked-skill").exists()
assert refresh_calls == []
def test_custom_skills_router_lifecycle(monkeypatch, tmp_path): def test_custom_skills_router_lifecycle(monkeypatch, tmp_path):
skills_root = tmp_path / "skills" skills_root = tmp_path / "skills"
custom_dir = skills_root / "custom" / "demo-skill" custom_dir = skills_root / "custom" / "demo-skill"
+182
View File
@@ -1,5 +1,6 @@
"""Tests for deerflow.skills.installer — shared skill installation logic.""" """Tests for deerflow.skills.installer — shared skill installation logic."""
import shutil
import stat import stat
import zipfile import zipfile
from pathlib import Path from pathlib import Path
@@ -7,6 +8,7 @@ from pathlib import Path
import pytest import pytest
from deerflow.skills.installer import ( from deerflow.skills.installer import (
SkillSecurityScanError,
install_skill_from_archive, install_skill_from_archive,
is_symlink_member, is_symlink_member,
is_unsafe_zip_member, is_unsafe_zip_member,
@@ -14,6 +16,7 @@ from deerflow.skills.installer import (
safe_extract_skill_archive, safe_extract_skill_archive,
should_ignore_archive_entry, should_ignore_archive_entry,
) )
from deerflow.skills.security_scanner import ScanResult
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# is_unsafe_zip_member # is_unsafe_zip_member
@@ -169,6 +172,13 @@ class TestSafeExtract:
class TestInstallSkillFromArchive: class TestInstallSkillFromArchive:
@pytest.fixture(autouse=True)
def _allow_security_scan(self, monkeypatch):
async def _scan(*args, **kwargs):
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
def _make_skill_zip(self, tmp_path: Path, skill_name: str = "test-skill") -> Path: def _make_skill_zip(self, tmp_path: Path, skill_name: str = "test-skill") -> Path:
"""Create a valid .skill archive.""" """Create a valid .skill archive."""
zip_path = tmp_path / f"{skill_name}.skill" zip_path = tmp_path / f"{skill_name}.skill"
@@ -188,6 +198,178 @@ class TestInstallSkillFromArchive:
assert result["skill_name"] == "test-skill" assert result["skill_name"] == "test-skill"
assert (skills_root / "custom" / "test-skill" / "SKILL.md").exists() assert (skills_root / "custom" / "test-skill" / "SKILL.md").exists()
def test_scans_skill_markdown_before_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
calls = []
async def _scan(content, *, executable, location):
calls.append({"content": content, "executable": executable, "location": location})
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
install_skill_from_archive(zip_path, skills_root=skills_root)
assert calls == [
{
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
"executable": False,
"location": "test-skill/SKILL.md",
}
]
def test_scans_support_files_and_scripts_before_install(self, tmp_path, monkeypatch):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/references/guide.md", "# Guide\n")
zf.writestr("test-skill/templates/prompt.txt", "Use care.\n")
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
zf.writestr("test-skill/assets/logo.png", b"\x89PNG\r\n\x1a\n")
zf.writestr("test-skill/references/.env", "TOKEN=secret\n")
zf.writestr("test-skill/templates/config.cfg", "TOKEN=secret\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
calls = []
async def _scan(content, *, executable, location):
calls.append({"content": content, "executable": executable, "location": location})
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
install_skill_from_archive(zip_path, skills_root=skills_root)
assert calls == [
{
"content": "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n",
"executable": False,
"location": "test-skill/SKILL.md",
},
{
"content": "# Guide\n",
"executable": False,
"location": "test-skill/references/guide.md",
},
{
"content": "#!/bin/sh\necho ok\n",
"executable": True,
"location": "test-skill/scripts/run.sh",
},
{
"content": "Use care.\n",
"executable": False,
"location": "test-skill/templates/prompt.txt",
},
]
assert all("secret" not in call["content"] for call in calls)
def test_nested_skill_markdown_prevents_install(self, tmp_path):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/references/other/SKILL.md", "# Nested skill\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
with pytest.raises(SkillSecurityScanError, match="nested SKILL.md"):
install_skill_from_archive(zip_path, skills_root=skills_root)
assert not (skills_root / "custom" / "test-skill").exists()
def test_script_warn_prevents_install(self, tmp_path, monkeypatch):
zip_path = tmp_path / "test-skill.skill"
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr("test-skill/SKILL.md", "---\nname: test-skill\ndescription: A test skill\n---\n\n# test-skill\n")
zf.writestr("test-skill/scripts/run.sh", "#!/bin/sh\necho ok\n")
skills_root = tmp_path / "skills"
skills_root.mkdir()
async def _scan(*args, executable, **kwargs):
if executable:
return ScanResult(decision="warn", reason="script needs review")
return ScanResult(decision="allow", reason="ok")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
with pytest.raises(SkillSecurityScanError, match="rejected executable.*script needs review"):
install_skill_from_archive(zip_path, skills_root=skills_root)
assert not (skills_root / "custom" / "test-skill").exists()
def test_security_scan_block_prevents_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path, skill_name="blocked-skill")
skills_root = tmp_path / "skills"
skills_root.mkdir()
async def _scan(*args, **kwargs):
return ScanResult(decision="block", reason="prompt injection")
monkeypatch.setattr("deerflow.skills.installer.scan_skill_content", _scan)
with pytest.raises(SkillSecurityScanError, match="Security scan blocked.*prompt injection"):
install_skill_from_archive(zip_path, skills_root=skills_root)
assert not (skills_root / "custom" / "blocked-skill").exists()
def test_copy_failure_does_not_leave_partial_install(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
def _copytree(src, dst):
partial = Path(dst)
partial.mkdir(parents=True)
(partial / "partial.txt").write_text("partial", encoding="utf-8")
raise OSError("copy failed")
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
with pytest.raises(OSError, match="copy failed"):
install_skill_from_archive(zip_path, skills_root=skills_root)
custom_dir = skills_root / "custom"
assert not (custom_dir / "test-skill").exists()
assert not [path for path in custom_dir.iterdir() if path.name.startswith(".installing-test-skill-")]
def test_concurrent_target_creation_does_not_get_clobbered(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
target = skills_root / "custom" / "test-skill"
original_copytree = shutil.copytree
def _copytree(src, dst):
target.mkdir(parents=True)
(target / "marker.txt").write_text("external", encoding="utf-8")
return original_copytree(src, dst)
monkeypatch.setattr("deerflow.skills.installer.shutil.copytree", _copytree)
with pytest.raises(ValueError, match="already exists"):
install_skill_from_archive(zip_path, skills_root=skills_root)
assert (target / "marker.txt").read_text(encoding="utf-8") == "external"
assert not (target / "SKILL.md").exists()
def test_move_failure_cleans_reserved_target(self, tmp_path, monkeypatch):
zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills"
skills_root.mkdir()
def _move(src, dst):
Path(dst).write_text("partial", encoding="utf-8")
raise OSError("move failed")
monkeypatch.setattr("deerflow.skills.installer.shutil.move", _move)
with pytest.raises(OSError, match="move failed"):
install_skill_from_archive(zip_path, skills_root=skills_root)
assert not (skills_root / "custom" / "test-skill").exists()
def test_duplicate_raises(self, tmp_path): def test_duplicate_raises(self, tmp_path):
zip_path = self._make_skill_zip(tmp_path) zip_path = self._make_skill_zip(tmp_path)
skills_root = tmp_path / "skills" skills_root = tmp_path / "skills"