fix(skill): make skill prompt cache refresh nonblocking (#1924)

* fix: make skill prompt cache refresh nonblocking

* fix: harden skills prompt cache refresh

* chore: add timeout to skills cache warm-up
This commit is contained in:
DanielWalnut
2026-04-07 10:50:34 +08:00
committed by GitHub
parent c4da0e8ca9
commit 7643a46fca
8 changed files with 346 additions and 29 deletions
@@ -2,8 +2,14 @@ from .checkpointer import get_checkpointer, make_checkpointer, reset_checkpointe
from .factory import create_deerflow_agent
from .features import Next, Prev, RuntimeFeatures
from .lead_agent import make_lead_agent
from .lead_agent.prompt import prime_enabled_skills_cache
from .thread_state import SandboxState, ThreadState
# LangGraph imports deerflow.agents when registering the graph. Prime the
# enabled-skills cache here so the request path can usually read a warm cache
# without forcing synchronous filesystem work during prompt module import.
prime_enabled_skills_cache()
__all__ = [
"create_deerflow_agent",
"RuntimeFeatures",
@@ -1,20 +1,114 @@
import asyncio
import logging
import threading
from datetime import datetime
from functools import lru_cache
from deerflow.config.agents_config import load_agent_soul
from deerflow.skills import load_skills
from deerflow.skills.types import Skill
from deerflow.subagents import get_available_subagent_names
logger = logging.getLogger(__name__)
_ENABLED_SKILLS_REFRESH_WAIT_TIMEOUT_SECONDS = 5.0
_enabled_skills_lock = threading.Lock()
_enabled_skills_cache: list[Skill] | None = None
_enabled_skills_refresh_active = False
_enabled_skills_refresh_version = 0
_enabled_skills_refresh_event = threading.Event()
def _load_enabled_skills_sync() -> list[Skill]:
return list(load_skills(enabled_only=True))
def _start_enabled_skills_refresh_thread() -> None:
threading.Thread(
target=_refresh_enabled_skills_cache_worker,
name="deerflow-enabled-skills-loader",
daemon=True,
).start()
def _refresh_enabled_skills_cache_worker() -> None:
global _enabled_skills_cache, _enabled_skills_refresh_active
while True:
with _enabled_skills_lock:
target_version = _enabled_skills_refresh_version
try:
skills = _load_enabled_skills_sync()
except Exception:
logger.exception("Failed to load enabled skills for prompt injection")
skills = []
with _enabled_skills_lock:
if _enabled_skills_refresh_version == target_version:
_enabled_skills_cache = skills
_enabled_skills_refresh_active = False
_enabled_skills_refresh_event.set()
return
# A newer invalidation happened while loading. Keep the worker alive
# and loop again so the cache always converges on the latest version.
_enabled_skills_cache = None
def _ensure_enabled_skills_cache() -> threading.Event:
global _enabled_skills_refresh_active
with _enabled_skills_lock:
if _enabled_skills_cache is not None:
_enabled_skills_refresh_event.set()
return _enabled_skills_refresh_event
if _enabled_skills_refresh_active:
return _enabled_skills_refresh_event
_enabled_skills_refresh_active = True
_enabled_skills_refresh_event.clear()
_start_enabled_skills_refresh_thread()
return _enabled_skills_refresh_event
def _invalidate_enabled_skills_cache() -> threading.Event:
global _enabled_skills_cache, _enabled_skills_refresh_active, _enabled_skills_refresh_version
_get_cached_skills_prompt_section.cache_clear()
with _enabled_skills_lock:
_enabled_skills_cache = None
_enabled_skills_refresh_version += 1
_enabled_skills_refresh_event.clear()
if _enabled_skills_refresh_active:
return _enabled_skills_refresh_event
_enabled_skills_refresh_active = True
_start_enabled_skills_refresh_thread()
return _enabled_skills_refresh_event
def prime_enabled_skills_cache() -> None:
_ensure_enabled_skills_cache()
def warm_enabled_skills_cache(timeout_seconds: float = _ENABLED_SKILLS_REFRESH_WAIT_TIMEOUT_SECONDS) -> bool:
if _ensure_enabled_skills_cache().wait(timeout=timeout_seconds):
return True
logger.warning("Timed out waiting %.1fs for enabled skills cache warm-up", timeout_seconds)
return False
def _get_enabled_skills():
try:
return list(load_skills(enabled_only=True))
except Exception:
logger.exception("Failed to load enabled skills for prompt injection")
return []
with _enabled_skills_lock:
cached = _enabled_skills_cache
if cached is not None:
return list(cached)
_ensure_enabled_skills_cache()
return []
def _skill_mutability_label(category: str) -> str:
@@ -22,7 +116,36 @@ def _skill_mutability_label(category: str) -> str:
def clear_skills_system_prompt_cache() -> None:
_invalidate_enabled_skills_cache()
async def refresh_skills_system_prompt_cache_async() -> None:
await asyncio.to_thread(_invalidate_enabled_skills_cache().wait)
def _reset_skills_system_prompt_cache_state() -> None:
global _enabled_skills_cache, _enabled_skills_refresh_active, _enabled_skills_refresh_version
_get_cached_skills_prompt_section.cache_clear()
with _enabled_skills_lock:
_enabled_skills_cache = None
_enabled_skills_refresh_active = False
_enabled_skills_refresh_version = 0
_enabled_skills_refresh_event.clear()
def _refresh_enabled_skills_cache() -> None:
"""Backward-compatible test helper for direct synchronous reload."""
try:
skills = _load_enabled_skills_sync()
except Exception:
logger.exception("Failed to load enabled skills for prompt injection")
skills = []
with _enabled_skills_lock:
_enabled_skills_cache = skills
_enabled_skills_refresh_active = False
_enabled_skills_refresh_event.set()
def _build_skill_evolution_section(skill_evolution_enabled: bool) -> str:
@@ -11,7 +11,7 @@ from weakref import WeakValueDictionary
from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from deerflow.agents.lead_agent.prompt import clear_skills_system_prompt_cache
from deerflow.agents.lead_agent.prompt import refresh_skills_system_prompt_cache_async
from deerflow.agents.thread_state import ThreadState
from deerflow.mcp.tools import _make_sync_tool_wrapper
from deerflow.skills.manager import (
@@ -115,7 +115,7 @@ async def _skill_manage_impl(
name,
_history_record(action="create", file_path="SKILL.md", prev_content=None, new_content=content, thread_id=thread_id, scanner=scan),
)
clear_skills_system_prompt_cache()
await refresh_skills_system_prompt_cache_async()
return f"Created custom skill '{name}'."
if action == "edit":
@@ -132,7 +132,7 @@ async def _skill_manage_impl(
name,
_history_record(action="edit", file_path="SKILL.md", prev_content=prev_content, new_content=content, thread_id=thread_id, scanner=scan),
)
clear_skills_system_prompt_cache()
await refresh_skills_system_prompt_cache_async()
return f"Updated custom skill '{name}'."
if action == "patch":
@@ -156,7 +156,7 @@ async def _skill_manage_impl(
name,
_history_record(action="patch", file_path="SKILL.md", prev_content=prev_content, new_content=new_content, thread_id=thread_id, scanner=scan),
)
clear_skills_system_prompt_cache()
await refresh_skills_system_prompt_cache_async()
return f"Patched custom skill '{name}' ({replacement_count} replacement(s) applied, {occurrences} match(es) found)."
if action == "delete":
@@ -169,7 +169,7 @@ async def _skill_manage_impl(
_history_record(action="delete", file_path="SKILL.md", prev_content=prev_content, new_content=None, thread_id=thread_id, scanner={"decision": "allow", "reason": "Deletion requested."}),
)
await _to_thread(shutil.rmtree, skill_dir)
clear_skills_system_prompt_cache()
await refresh_skills_system_prompt_cache_async()
return f"Deleted custom skill '{name}'."
if action == "write_file":