refactor(skills): Unified skill storage capability (#2613)

This commit is contained in:
Xun
2026-05-01 13:23:26 +08:00
committed by GitHub
parent eba3b9e18d
commit 1ad1420e31
29 changed files with 1031 additions and 968 deletions
@@ -0,0 +1,83 @@
"""SkillStorage singleton + reflection-based factory.
Mirrors the pattern used by ``deerflow/sandbox/sandbox_provider.py``.
"""
from __future__ import annotations
from deerflow.skills.storage.local_skill_storage import LocalSkillStorage
from deerflow.skills.storage.skill_storage import SkillStorage
_default_skill_storage: SkillStorage | None = None
_default_skill_storage_config: object | None = None # AppConfig identity the singleton was built from
def get_or_new_skill_storage(**kwargs) -> SkillStorage:
"""Return a ``SkillStorage`` instance — either a new one or the process singleton.
**New instance** is created (never cached) when:
- ``skills_path`` is provided — uses it as the ``host_path`` override (class still resolved via config).
- ``app_config`` is provided — constructs a storage from ``app_config.skills``
so that per-request config (e.g. Gateway ``Depends(get_config)``) is respected
without polluting the process-level singleton.
**Singleton** is returned (created on first call, then reused) when neither
``skills_path`` nor ``app_config`` is given — uses ``get_app_config()`` to
resolve the active configuration.
"""
global _default_skill_storage, _default_skill_storage_config
from deerflow.config import get_app_config
from deerflow.config.skills_config import SkillsConfig
def _make_storage(skills_config: SkillsConfig, *, host_path: str | None = None, **kwargs) -> SkillStorage:
from deerflow.reflection import resolve_class
cls = resolve_class(skills_config.use, SkillStorage)
return cls(
host_path=host_path if host_path is not None else str(skills_config.get_skills_path()),
container_path=skills_config.container_path,
**kwargs,
)
skills_path = kwargs.pop("skills_path", None)
app_config = kwargs.pop("app_config", None)
if skills_path is not None:
if app_config is not None:
return _make_storage(app_config.skills, host_path=str(skills_path), **kwargs)
# No app_config: use a default SkillsConfig so we never need to read config.yaml
# when the caller has already supplied an explicit host path.
from deerflow.config.skills_config import SkillsConfig
return _make_storage(SkillsConfig(), host_path=str(skills_path), **kwargs)
if app_config is not None:
return _make_storage(app_config.skills, **kwargs)
# If the singleton was manually injected (e.g. in tests) without a config
# identity (_default_skill_storage_config is None), skip get_app_config()
# entirely to avoid requiring a config.yaml on disk.
if _default_skill_storage is not None and _default_skill_storage_config is None:
return _default_skill_storage
app_config_now = get_app_config()
if _default_skill_storage is None or _default_skill_storage_config is not app_config_now:
_default_skill_storage = _make_storage(app_config_now.skills, **kwargs)
_default_skill_storage_config = app_config_now
return _default_skill_storage
def reset_skill_storage() -> None:
"""Clear the cached singleton (used in tests and hot-reload scenarios)."""
global _default_skill_storage, _default_skill_storage_config
_default_skill_storage = None
_default_skill_storage_config = None
__all__ = [
"LocalSkillStorage",
"SkillStorage",
"get_or_new_skill_storage",
"reset_skill_storage",
]
@@ -0,0 +1,198 @@
"""Local-filesystem implementation of ``SkillStorage``."""
from __future__ import annotations
import errno
import json
import logging
import os
import shutil
import tempfile
from collections.abc import Iterable
from datetime import UTC, datetime
from pathlib import Path
from deerflow.config.skills_config import _default_repo_root
from deerflow.skills.storage.skill_storage import SKILL_MD_FILE, SkillStorage
from deerflow.skills.types import SkillCategory
logger = logging.getLogger(__name__)
DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills"
class LocalSkillStorage(SkillStorage):
"""Skill storage backed by the local filesystem.
Layout::
<root>/public/<name>/SKILL.md
<root>/custom/<name>/SKILL.md
<root>/custom/.history/<name>.jsonl
"""
def __init__(
self,
host_path: str | None = None,
container_path: str = DEFAULT_SKILLS_CONTAINER_PATH,
app_config=None,
) -> None:
super().__init__(container_path=container_path)
if host_path is None:
from deerflow.config import get_app_config
config = app_config or get_app_config()
self._host_root: Path = config.skills.get_skills_path()
else:
path = Path(host_path)
if not path.is_absolute():
path = _default_repo_root() / path
self._host_root = path.resolve()
# ------------------------------------------------------------------
# Abstract operation implementations
# ------------------------------------------------------------------
def get_skills_root_path(self) -> Path:
return self._host_root
def custom_skill_exists(self, name: str) -> bool:
return self.get_custom_skill_file(name).exists()
def public_skill_exists(self, name: str) -> bool:
normalized_name = self.validate_skill_name(name)
return (self._host_root / SkillCategory.PUBLIC.value / normalized_name / SKILL_MD_FILE).exists()
def _iter_skill_files(self) -> Iterable[tuple[SkillCategory, Path, Path]]:
if not self._host_root.exists():
return
for category in SkillCategory:
category_path = self._host_root / category.value
if not category_path.exists() or not category_path.is_dir():
continue
for current_root, dir_names, file_names in os.walk(category_path, followlinks=True):
dir_names[:] = sorted(name for name in dir_names if not name.startswith("."))
if SKILL_MD_FILE not in file_names:
continue
yield category, category_path, Path(current_root) / SKILL_MD_FILE
def read_custom_skill(self, name: str) -> str:
if not self.custom_skill_exists(name):
raise FileNotFoundError(f"Custom skill '{name}' not found.")
return (self.get_custom_skill_dir(name) / SKILL_MD_FILE).read_text(encoding="utf-8")
def write_custom_skill(self, name: str, relative_path: str, content: str) -> None:
target = self.validate_relative_path(relative_path, self.get_custom_skill_dir(name))
target.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile(
"w",
encoding="utf-8",
delete=False,
dir=str(target.parent),
) as tmp_file:
tmp_file.write(content)
tmp_path = Path(tmp_file.name)
tmp_path.replace(target)
async def ainstall_skill_from_archive(self, archive_path: str | Path) -> dict:
import zipfile
from deerflow.skills.installer import (
SkillAlreadyExistsError,
_move_staged_skill_into_reserved_target,
_scan_skill_archive_contents_or_raise,
resolve_skill_dir_from_archive,
safe_extract_skill_archive,
)
from deerflow.skills.validation import _validate_skill_frontmatter
logger.info("Installing skill from %s", archive_path)
path = Path(archive_path)
if not path.is_file():
if not path.exists():
raise FileNotFoundError(f"Skill file not found: {archive_path}")
raise ValueError(f"Path is not a file: {archive_path}")
if path.suffix != ".skill":
raise ValueError("File must have .skill extension")
custom_dir = self._host_root / "custom"
custom_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
try:
zf = zipfile.ZipFile(path, "r")
except FileNotFoundError:
raise FileNotFoundError(f"Skill file not found: {archive_path}") from None
except (zipfile.BadZipFile, IsADirectoryError):
raise ValueError("File is not a valid ZIP archive") from None
with zf:
safe_extract_skill_archive(zf, tmp_path)
skill_dir = resolve_skill_dir_from_archive(tmp_path)
is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir)
if not is_valid:
raise ValueError(f"Invalid skill: {message}")
if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name:
raise ValueError(f"Invalid skill name: {skill_name}")
target = custom_dir / skill_name
if target.exists():
raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists")
await _scan_skill_archive_contents_or_raise(skill_dir, skill_name)
with tempfile.TemporaryDirectory(prefix=f".installing-{skill_name}-", dir=custom_dir) as staging_root:
staging_target = Path(staging_root) / skill_name
shutil.copytree(skill_dir, staging_target)
_move_staged_skill_into_reserved_target(staging_target, target)
logger.info("Skill %r installed to %s", skill_name, target)
return {
"success": True,
"skill_name": skill_name,
"message": f"Skill '{skill_name}' installed successfully",
}
def delete_custom_skill(self, name: str, *, history_meta: dict | None = None) -> None:
self.validate_skill_name(name)
self.ensure_custom_skill_is_editable(name)
target = self.get_custom_skill_dir(name)
if history_meta is not None:
prev_content = self.read_custom_skill(name)
try:
self.append_history(name, {**history_meta, "prev_content": prev_content})
except OSError as e:
if not isinstance(e, PermissionError) and e.errno not in {errno.EACCES, errno.EPERM, errno.EROFS}:
raise
logger.warning(
"Skipping delete history write for custom skill %s due to readonly/permission failure; continuing with skill directory removal: %s",
name,
e,
)
if target.exists():
shutil.rmtree(target)
def append_history(self, name: str, record: dict) -> None:
self.validate_skill_name(name)
payload = {"ts": datetime.now(UTC).isoformat(), **record}
history_path = self.get_skill_history_file(name)
history_path.parent.mkdir(parents=True, exist_ok=True)
with history_path.open("a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False))
f.write("\n")
def read_history(self, name: str) -> list[dict]:
self.validate_skill_name(name)
history_path = self.get_skill_history_file(name)
if not history_path.exists():
return []
records: list[dict] = []
for line in history_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
records.append(json.loads(line))
return records
@@ -0,0 +1,254 @@
"""Abstract SkillStorage base class with template-method flows."""
from __future__ import annotations
import logging
import re
from abc import ABC, abstractmethod
from collections.abc import Iterable
from pathlib import Path
from deerflow.skills.types import SKILL_MD_FILE, Skill, SkillCategory # noqa: F401
logger = logging.getLogger(__name__)
_SKILL_NAME_PATTERN = re.compile(r"^[a-z0-9]+(?:-[a-z0-9]+)*$")
class SkillStorage(ABC):
"""Abstract base for skill storage backends.
Subclasses implement a small set of storage-medium-specific atomic
operations; this base class provides final template-method flows
(load_skills, history serialisation, path helpers, validation) that
compose them with protocol-level helpers.
"""
def __init__(self, container_path: str = "/mnt/skills") -> None:
self._container_root = container_path
# ------------------------------------------------------------------
# Static protocol helpers (not storage-specific)
# ------------------------------------------------------------------
@staticmethod
def validate_skill_name(name: str) -> str:
"""Validate and normalise a skill name; return the normalised form."""
normalized = name.strip()
if not _SKILL_NAME_PATTERN.fullmatch(normalized):
raise ValueError("Skill name must be hyphen-case using lowercase letters, digits, and hyphens only.")
if len(normalized) > 64:
raise ValueError("Skill name must be 64 characters or fewer.")
return normalized
@staticmethod
def validate_relative_path(relative_path: str, base_dir: Path) -> Path:
"""Validate *relative_path* against *base_dir* and return the resolved target.
Checks that *relative_path* is non-empty, then joins it with *base_dir*
and resolves the result (following symlinks). Raises ``ValueError`` if
the resolved target does not lie within *base_dir*.
"""
if not relative_path:
raise ValueError("relative_path must not be empty.")
resolved_base = base_dir.resolve()
target = (resolved_base / relative_path).resolve()
try:
target.relative_to(resolved_base)
except ValueError as exc:
raise ValueError("relative_path must resolve within the skill directory.") from exc
return target
@staticmethod
def validate_skill_markdown_content(name: str, content: str) -> None:
"""Validate SKILL.md content: parse frontmatter and check name matches."""
import tempfile
from deerflow.skills.validation import _validate_skill_frontmatter
with tempfile.TemporaryDirectory() as tmp_dir:
temp_skill_dir = Path(tmp_dir) / SkillStorage.validate_skill_name(name)
temp_skill_dir.mkdir(parents=True, exist_ok=True)
(temp_skill_dir / SKILL_MD_FILE).write_text(content, encoding="utf-8")
is_valid, message, parsed_name = _validate_skill_frontmatter(temp_skill_dir)
if not is_valid:
raise ValueError(message)
if parsed_name != name:
raise ValueError(f"Frontmatter name '{parsed_name}' must match requested skill name '{name}'.")
def ensure_safe_support_path(self, name: str, relative_path: str) -> Path:
"""Validate and return the resolved absolute path for a support file."""
_ALLOWED_SUPPORT_SUBDIRS = {"references", "templates", "scripts", "assets"}
skill_dir = self.get_custom_skill_dir(self.validate_skill_name(name)).resolve()
if not relative_path or relative_path.endswith("/"):
raise ValueError("Supporting file path must include a filename.")
relative = Path(relative_path)
if relative.is_absolute():
raise ValueError("Supporting file path must be relative.")
if any(part in {"..", ""} for part in relative.parts):
raise ValueError("Supporting file path must not contain parent-directory traversal.")
top_level = relative.parts[0] if relative.parts else ""
if top_level not in _ALLOWED_SUPPORT_SUBDIRS:
raise ValueError(f"Supporting files must live under one of: {', '.join(sorted(_ALLOWED_SUPPORT_SUBDIRS))}.")
target = (skill_dir / relative).resolve()
allowed_root = (skill_dir / top_level).resolve()
try:
target.relative_to(allowed_root)
except ValueError as exc:
raise ValueError("Supporting file path must stay within the selected support directory.") from exc
return target
# ------------------------------------------------------------------
# Abstract atomic operations (storage-medium specific)
# ------------------------------------------------------------------
@abstractmethod
def get_skills_root_path(self) -> Path:
"""Absolute host path to the skills root, used for sandbox mounts.
Origin: ``deerflow.skills.loader.get_skills_root_path``.
"""
@abstractmethod
def _iter_skill_files(self) -> Iterable[tuple[SkillCategory, Path, Path]]:
"""Yield ``(category, category_root, skill_md_path)`` for every SKILL.md.
Origin: extracted from directory-walk logic inside
``deerflow.skills.loader.load_skills``.
"""
@abstractmethod
def read_custom_skill(self, name: str) -> str:
"""Read SKILL.md content for a custom skill.
Origin: ``deerflow.skills.manager.read_custom_skill_content``.
"""
@abstractmethod
def write_custom_skill(self, name: str, relative_path: str, content: str) -> None:
"""Atomically write a text file under ``custom/<name>/<relative_path>``.
Origin: ``deerflow.skills.manager.atomic_write``.
"""
@abstractmethod
async def ainstall_skill_from_archive(self, archive_path: str | Path) -> dict:
"""Async install of a skill from a ``.skill`` ZIP archive.
Origin: ``deerflow.skills.installer.ainstall_skill_from_archive``.
"""
def install_skill_from_archive(self, archive_path: str | Path) -> dict:
"""Sync wrapper — delegates to :meth:`ainstall_skill_from_archive`."""
from deerflow.skills.installer import _run_async_install
return _run_async_install(self.ainstall_skill_from_archive(archive_path))
@abstractmethod
def delete_custom_skill(self, name: str, *, history_meta: dict | None = None) -> None:
"""Delete a custom skill (validation + optional history + directory removal).
Origin: ``app.gateway.routers.skills.delete_custom_skill`` + ``skill_manage_tool``.
"""
@abstractmethod
def custom_skill_exists(self, name: str) -> bool:
"""Origin: ``deerflow.skills.manager.custom_skill_exists``."""
@abstractmethod
def public_skill_exists(self, name: str) -> bool:
"""Origin: ``deerflow.skills.manager.public_skill_exists``."""
@abstractmethod
def append_history(self, name: str, record: dict) -> None:
"""Append a JSONL history entry for ``name``.
Origin: ``deerflow.skills.manager.append_history``.
"""
@abstractmethod
def read_history(self, name: str) -> list[dict]:
"""Return all history records for ``name``, oldest first.
Origin: ``deerflow.skills.manager.read_history``.
"""
# ------------------------------------------------------------------
# Concrete path helpers (layout is part of the SKILL.md protocol)
# ------------------------------------------------------------------
def get_container_root(self) -> str:
"""Origin: ``deerflow.config.skills_config.SkillsConfig.container_path`` accessor."""
return self._container_root
def get_custom_skill_dir(self, name: str) -> Path:
"""Path to ``custom/<name>``. Does not create the directory.
Origin: ``deerflow.skills.manager.get_custom_skill_dir``.
"""
normalized_name = self.validate_skill_name(name)
return self.get_skills_root_path() / SkillCategory.CUSTOM.value / normalized_name
def get_custom_skill_file(self, name: str) -> Path:
"""Path to ``custom/<name>/SKILL.md``.
Origin: ``deerflow.skills.manager.get_custom_skill_file``.
"""
normalized_name = self.validate_skill_name(name)
return self.get_custom_skill_dir(normalized_name) / SKILL_MD_FILE
def get_skill_history_file(self, name: str) -> Path:
"""Path to ``custom/.history/<name>.jsonl``. Does not create parents.
Origin: ``deerflow.skills.manager.get_skill_history_file``.
"""
normalized_name = self.validate_skill_name(name)
return self.get_skills_root_path() / SkillCategory.CUSTOM.value / ".history" / f"{normalized_name}.jsonl"
# ------------------------------------------------------------------
# Final template-method flows
# ------------------------------------------------------------------
def load_skills(self, *, enabled_only: bool = False) -> list[Skill]:
"""Discover all skills, merge enabled state, sort and optionally filter.
Origin: ``deerflow.skills.loader.load_skills``.
"""
from deerflow.skills.parser import parse_skill_file
skills_by_name: dict[str, Skill] = {}
for category, category_root, md_path in self._iter_skill_files():
skill = parse_skill_file(
md_path,
category=category,
relative_path=md_path.parent.relative_to(category_root),
)
if skill:
skills_by_name[skill.name] = skill
skills = list(skills_by_name.values())
# Merge enabled state from extensions config (re-read every call so
# changes made by another process are picked up immediately).
try:
from deerflow.config.extensions_config import ExtensionsConfig
extensions_config = ExtensionsConfig.from_file()
for skill in skills:
skill.enabled = extensions_config.is_skill_enabled(skill.name, skill.category)
except Exception as e:
logger.warning("Failed to load extensions config: %s", e)
if enabled_only:
skills = [s for s in skills if s.enabled]
skills.sort(key=lambda s: s.name)
return skills
def ensure_custom_skill_is_editable(self, name: str) -> None:
"""Origin: ``deerflow.skills.manager.ensure_custom_skill_is_editable``."""
if self.custom_skill_exists(name):
return
if self.public_skill_exists(name):
raise ValueError(f"'{name}' is a built-in skill. To customise it, create a new skill with the same name under skills/custom/.")
raise FileNotFoundError(f"Custom skill '{name}' not found.")