refactor(harness): update modules to use new runtime imports

Update import paths across harness modules:
- agents/lead_agent/prompt.py
- agents/middlewares/ (memory, thread_data, uploads)
- client.py - enhanced with new capabilities
- community/aio_sandbox/
- config/app_config.py - remove deprecated configs
- sandbox/tools.py
- tools/builtins/ (invoke_acp_agent, present_file)
- uploads/manager.py

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-22 11:29:12 +08:00
parent 5f2f1941e9
commit e3e00af51d
11 changed files with 121 additions and 31 deletions
@@ -519,7 +519,7 @@ def _get_memory_context(agent_name: str | None = None) -> str:
try: try:
from deerflow.agents.memory import format_memory_for_injection, get_memory_data from deerflow.agents.memory import format_memory_for_injection, get_memory_data
from deerflow.config.memory_config import get_memory_config from deerflow.config.memory_config import get_memory_config
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
config = get_memory_config() config = get_memory_config()
if not config.enabled or not config.injection_enabled: if not config.enabled or not config.injection_enabled:
@@ -11,7 +11,7 @@ from langgraph.runtime import Runtime
from deerflow.agents.memory.queue import get_memory_queue from deerflow.agents.memory.queue import get_memory_queue
from deerflow.config.memory_config import get_memory_config from deerflow.config.memory_config import get_memory_config
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -10,7 +10,7 @@ from langgraph.runtime import Runtime
from deerflow.agents.thread_state import ThreadDataState from deerflow.agents.thread_state import ThreadDataState
from deerflow.config.paths import Paths, get_paths from deerflow.config.paths import Paths, get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -10,7 +10,7 @@ from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime from langgraph.runtime import Runtime
from deerflow.config.paths import Paths, get_paths from deerflow.config.paths import Paths, get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
from deerflow.utils.file_conversion import extract_outline from deerflow.utils.file_conversion import extract_outline
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
+112 -14
View File
@@ -26,6 +26,7 @@ from collections.abc import Generator, Sequence
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Literal from typing import Any, Literal
from typing_extensions import TypedDict
from langchain.agents import create_agent from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware from langchain.agents.middleware import AgentMiddleware
@@ -40,7 +41,7 @@ from deerflow.config.app_config import get_app_config, reload_app_config
from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config from deerflow.config.extensions_config import ExtensionsConfig, SkillStateConfig, get_extensions_config, reload_extensions_config
from deerflow.config.paths import get_paths from deerflow.config.paths import get_paths
from deerflow.models import create_chat_model from deerflow.models import create_chat_model
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
from deerflow.skills.installer import install_skill_from_archive from deerflow.skills.installer import install_skill_from_archive
from deerflow.uploads.manager import ( from deerflow.uploads.manager import (
claim_unique_filename, claim_unique_filename,
@@ -59,6 +60,14 @@ logger = logging.getLogger(__name__)
StreamEventType = Literal["values", "messages-tuple", "custom", "end"] StreamEventType = Literal["values", "messages-tuple", "custom", "end"]
class AgentContext(TypedDict, total=False):
"""Typed runtime context passed into LangGraph agent execution."""
thread_id: str
agent_name: str
sandbox_id: str
@dataclass @dataclass
class StreamEvent: class StreamEvent:
"""A single event from the streaming agent response. """A single event from the streaming agent response.
@@ -143,12 +152,17 @@ class DeerFlowClient:
""" """
if config_path is not None: if config_path is not None:
reload_app_config(config_path) reload_app_config(config_path)
from store.config.app_config import reload_app_config as reload_storage_app_config
reload_storage_app_config(config_path)
self._app_config = get_app_config() self._app_config = get_app_config()
if agent_name is not None and not AGENT_NAME_PATTERN.match(agent_name): if agent_name is not None and not AGENT_NAME_PATTERN.match(agent_name):
raise ValueError(f"Invalid agent name '{agent_name}'. Must match pattern: {AGENT_NAME_PATTERN.pattern}") raise ValueError(f"Invalid agent name '{agent_name}'. Must match pattern: {AGENT_NAME_PATTERN.pattern}")
self._checkpointer = checkpointer self._checkpointer = checkpointer
self._default_checkpointer = None
self._default_checkpointer_resource = None
self._model_name = model_name self._model_name = model_name
self._thinking_enabled = thinking_enabled self._thinking_enabled = thinking_enabled
self._subagent_enabled = subagent_enabled self._subagent_enabled = subagent_enabled
@@ -207,6 +221,97 @@ class DeerFlowClient:
recursion_limit=overrides.get("recursion_limit", 100), recursion_limit=overrides.get("recursion_limit", 100),
) )
@staticmethod
def _resolve_sqlite_conn_str(connection_string: str) -> str:
"""Normalize sqlite connection strings to a filesystem path string."""
if connection_string == ":memory:":
return connection_string
return str(Path(connection_string).expanduser())
@staticmethod
def _ensure_sqlite_parent_dir(connection_string: str) -> None:
"""Create parent directory for a sqlite file if needed."""
if connection_string == ":memory:":
return
Path(connection_string).expanduser().parent.mkdir(parents=True, exist_ok=True)
def _get_configured_checkpointer(self):
"""Build or reuse the client fallback checkpointer from storage config."""
if self._default_checkpointer is not None:
return self._default_checkpointer
config = self._resolve_checkpointer_config()
self._default_checkpointer = self._build_configured_checkpointer(config)
return self._default_checkpointer
@staticmethod
def _resolve_checkpointer_config() -> dict[str, str]:
"""Resolve checkpointer backend from the unified storage config entry."""
from store.config.app_config import get_app_config as get_storage_app_config
storage = get_storage_app_config().storage
driver = storage.driver
if driver == "sqlite":
return {"backend": "sqlite", "connection_string": storage.sqlite_storage_path}
if driver == "postgres":
return {
"backend": "postgres",
"connection_string": (
f"postgresql://{storage.username}:{storage.password}@{storage.host}:{storage.port}/{storage.db_name}"
),
}
if driver == "mysql":
raise ValueError("DeerFlowClient does not support a MySQL checkpointer")
raise ValueError(f"Unsupported storage driver for checkpointer: {driver}")
def _build_configured_checkpointer(self, config: dict[str, str]):
"""Build a sync checkpointer using the embedded client's config."""
backend = config["backend"]
connection_string = config.get("connection_string", "")
if backend == "memory":
from langgraph.checkpoint.memory import InMemorySaver
return InMemorySaver()
if backend == "sqlite":
try:
from langgraph.checkpoint.sqlite import SqliteSaver
except ImportError as exc:
raise ImportError("SQLite checkpointer requires langgraph-checkpoint-sqlite") from exc
if not connection_string:
raise ValueError("connection_string is required for sqlite checkpointer")
conn_str = self._resolve_sqlite_conn_str(connection_string)
self._ensure_sqlite_parent_dir(conn_str)
resource = SqliteSaver.from_conn_string(conn_str)
saver = resource.__enter__()
saver.setup()
self._default_checkpointer_resource = resource
return saver
if backend == "postgres":
try:
from langgraph.checkpoint.postgres import PostgresSaver
except ImportError as exc:
raise ImportError("Postgres checkpointer requires langgraph-checkpoint-postgres") from exc
if not connection_string:
raise ValueError("connection_string is required for postgres checkpointer")
resource = PostgresSaver.from_conn_string(connection_string)
saver = resource.__enter__()
saver.setup()
self._default_checkpointer_resource = resource
return saver
raise ValueError(f"Unsupported checkpointer type: {backend}")
def _get_active_checkpointer(self):
"""Return the explicitly injected or config-derived checkpointer."""
if self._checkpointer is not None:
return self._checkpointer
return self._get_configured_checkpointer()
def _ensure_agent(self, config: RunnableConfig): def _ensure_agent(self, config: RunnableConfig):
"""Create (or recreate) the agent when config-dependent params change.""" """Create (or recreate) the agent when config-dependent params change."""
cfg = config.get("configurable", {}) cfg = config.get("configurable", {})
@@ -238,12 +343,9 @@ class DeerFlowClient:
available_skills=self._available_skills, available_skills=self._available_skills,
), ),
"state_schema": ThreadState, "state_schema": ThreadState,
"context_schema": AgentContext,
} }
checkpointer = self._checkpointer checkpointer = self._get_active_checkpointer()
if checkpointer is None:
from deerflow.runtime.checkpointer import get_checkpointer
checkpointer = get_checkpointer()
if checkpointer is not None: if checkpointer is not None:
kwargs["checkpointer"] = checkpointer kwargs["checkpointer"] = checkpointer
@@ -373,11 +475,9 @@ class DeerFlowClient:
Dict with "thread_list" key containing list of thread info dicts, Dict with "thread_list" key containing list of thread info dicts,
sorted by thread creation time descending. sorted by thread creation time descending.
""" """
checkpointer = self._checkpointer checkpointer = self._get_active_checkpointer()
if checkpointer is None: if checkpointer is None:
from deerflow.runtime.checkpointer.provider import get_checkpointer return {"thread_list": []}
checkpointer = get_checkpointer()
thread_info_map = {} thread_info_map = {}
@@ -428,11 +528,9 @@ class DeerFlowClient:
Returns: Returns:
Dict containing the thread's full checkpoint history. Dict containing the thread's full checkpoint history.
""" """
checkpointer = self._checkpointer checkpointer = self._get_active_checkpointer()
if checkpointer is None: if checkpointer is None:
from deerflow.runtime.checkpointer.provider import get_checkpointer return {"thread_id": thread_id, "checkpoints": []}
checkpointer = get_checkpointer()
config = {"configurable": {"thread_id": thread_id}} config = {"configurable": {"thread_id": thread_id}}
checkpoints = [] checkpoints = []
@@ -27,7 +27,7 @@ except ImportError: # pragma: no cover - Windows fallback
from deerflow.config import get_app_config from deerflow.config import get_app_config
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox import Sandbox
from deerflow.sandbox.sandbox_provider import SandboxProvider from deerflow.sandbox.sandbox_provider import SandboxProvider
@@ -9,8 +9,6 @@ from dotenv import load_dotenv
from pydantic import BaseModel, ConfigDict, Field from pydantic import BaseModel, ConfigDict, Field
from deerflow.config.acp_config import load_acp_config_from_dict from deerflow.config.acp_config import load_acp_config_from_dict
from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict
from deerflow.config.database_config import DatabaseConfig
from deerflow.config.extensions_config import ExtensionsConfig from deerflow.config.extensions_config import ExtensionsConfig
from deerflow.config.guardrails_config import GuardrailsConfig, load_guardrails_config_from_dict from deerflow.config.guardrails_config import GuardrailsConfig, load_guardrails_config_from_dict
from deerflow.config.memory_config import MemoryConfig, load_memory_config_from_dict from deerflow.config.memory_config import MemoryConfig, load_memory_config_from_dict
@@ -58,9 +56,7 @@ class AppConfig(BaseModel):
subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration") subagents: SubagentsAppConfig = Field(default_factory=SubagentsAppConfig, description="Subagent runtime configuration")
guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration") guardrails: GuardrailsConfig = Field(default_factory=GuardrailsConfig, description="Guardrail middleware configuration")
model_config = ConfigDict(extra="allow", frozen=False) model_config = ConfigDict(extra="allow", frozen=False)
database: DatabaseConfig = Field(default_factory=DatabaseConfig, description="Unified database backend configuration")
run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration") run_events: RunEventsConfig = Field(default_factory=RunEventsConfig, description="Run event storage configuration")
checkpointer: CheckpointerConfig | None = Field(default=None, description="Checkpointer configuration")
stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration") stream_bridge: StreamBridgeConfig | None = Field(default=None, description="Stream bridge configuration")
@classmethod @classmethod
@@ -133,10 +129,6 @@ class AppConfig(BaseModel):
if "guardrails" in config_data: if "guardrails" in config_data:
load_guardrails_config_from_dict(config_data["guardrails"]) load_guardrails_config_from_dict(config_data["guardrails"])
# Load checkpointer config if present
if "checkpointer" in config_data:
load_checkpointer_config_from_dict(config_data["checkpointer"])
# Load stream bridge config if present # Load stream bridge config if present
if "stream_bridge" in config_data: if "stream_bridge" in config_data:
load_stream_bridge_config_from_dict(config_data["stream_bridge"]) load_stream_bridge_config_from_dict(config_data["stream_bridge"])
@@ -200,7 +200,7 @@ def _get_acp_workspace_host_path(thread_id: str | None = None) -> str | None:
if thread_id is not None: if thread_id is not None:
try: try:
from deerflow.config.paths import get_paths from deerflow.config.paths import get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
host_path = get_paths().acp_workspace_dir(thread_id, user_id=get_effective_user_id()) host_path = get_paths().acp_workspace_dir(thread_id, user_id=get_effective_user_id())
if host_path.exists(): if host_path.exists():
@@ -33,7 +33,7 @@ def _get_work_dir(thread_id: str | None) -> str:
An absolute physical filesystem path to use as the working directory. An absolute physical filesystem path to use as the working directory.
""" """
from deerflow.config.paths import get_paths from deerflow.config.paths import get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
paths = get_paths() paths = get_paths()
if thread_id: if thread_id:
@@ -8,7 +8,7 @@ from langgraph.typing import ContextT
from deerflow.agents.thread_state import ThreadState from deerflow.agents.thread_state import ThreadState
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
OUTPUTS_VIRTUAL_PREFIX = f"{VIRTUAL_PATH_PREFIX}/outputs" OUTPUTS_VIRTUAL_PREFIX = f"{VIRTUAL_PATH_PREFIX}/outputs"
@@ -10,7 +10,7 @@ from pathlib import Path
from urllib.parse import quote from urllib.parse import quote
from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from deerflow.runtime.user_context import get_effective_user_id from deerflow.runtime.actor_context import get_effective_user_id
class PathTraversalError(ValueError): class PathTraversalError(ValueError):