mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-22 07:56:48 +00:00
300e5a519a
* Adds Kubernetes sandbox provisioner support * Improves Docker dev setup by standardizing host paths Replaces hardcoded host paths with a configurable root directory, making the development environment more portable and easier to use across different machines. Automatically sets the root path if not already defined, reducing manual setup steps.
96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
import os
|
|
from pathlib import Path
|
|
from typing import NotRequired, override
|
|
|
|
from langchain.agents import AgentState
|
|
from langchain.agents.middleware import AgentMiddleware
|
|
from langgraph.runtime import Runtime
|
|
|
|
from src.agents.thread_state import ThreadDataState
|
|
from src.sandbox.consts import THREAD_DATA_BASE_DIR
|
|
|
|
|
|
class ThreadDataMiddlewareState(AgentState):
|
|
"""Compatible with the `ThreadState` schema."""
|
|
|
|
thread_data: NotRequired[ThreadDataState | None]
|
|
|
|
|
|
class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
|
|
"""Create thread data directories for each thread execution.
|
|
|
|
Creates the following directory structure:
|
|
- backend/.deer-flow/threads/{thread_id}/user-data/workspace
|
|
- backend/.deer-flow/threads/{thread_id}/user-data/uploads
|
|
- backend/.deer-flow/threads/{thread_id}/user-data/outputs
|
|
|
|
Lifecycle Management:
|
|
- With lazy_init=True (default): Only compute paths, directories created on-demand
|
|
- With lazy_init=False: Eagerly create directories in before_agent()
|
|
"""
|
|
|
|
state_schema = ThreadDataMiddlewareState
|
|
|
|
def __init__(self, base_dir: str | None = None, lazy_init: bool = True):
|
|
"""Initialize the middleware.
|
|
|
|
Args:
|
|
base_dir: Base directory for thread data. Defaults to the current working directory.
|
|
lazy_init: If True, defer directory creation until needed.
|
|
If False, create directories eagerly in before_agent().
|
|
Default is True for optimal performance.
|
|
"""
|
|
super().__init__()
|
|
self._base_dir = base_dir or os.getcwd()
|
|
self._lazy_init = lazy_init
|
|
|
|
def _get_thread_paths(self, thread_id: str) -> dict[str, str]:
|
|
"""Get the paths for a thread's data directories.
|
|
|
|
Args:
|
|
thread_id: The thread ID.
|
|
|
|
Returns:
|
|
Dictionary with workspace_path, uploads_path, and outputs_path.
|
|
"""
|
|
thread_dir = Path(self._base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
|
|
return {
|
|
"workspace_path": str(thread_dir / "workspace"),
|
|
"uploads_path": str(thread_dir / "uploads"),
|
|
"outputs_path": str(thread_dir / "outputs"),
|
|
}
|
|
|
|
def _create_thread_directories(self, thread_id: str) -> dict[str, str]:
|
|
"""Create the thread data directories.
|
|
|
|
Args:
|
|
thread_id: The thread ID.
|
|
|
|
Returns:
|
|
Dictionary with the created directory paths.
|
|
"""
|
|
paths = self._get_thread_paths(thread_id)
|
|
for path in paths.values():
|
|
os.makedirs(path, exist_ok=True)
|
|
return paths
|
|
|
|
@override
|
|
def before_agent(self, state: ThreadDataMiddlewareState, runtime: Runtime) -> dict | None:
|
|
thread_id = runtime.context.get("thread_id")
|
|
if thread_id is None:
|
|
raise ValueError("Thread ID is required in the context")
|
|
|
|
if self._lazy_init:
|
|
# Lazy initialization: only compute paths, don't create directories
|
|
paths = self._get_thread_paths(thread_id)
|
|
else:
|
|
# Eager initialization: create directories immediately
|
|
paths = self._create_thread_directories(thread_id)
|
|
print(f"Created thread data directories for thread {thread_id}")
|
|
|
|
return {
|
|
"thread_data": {
|
|
**paths,
|
|
}
|
|
}
|