mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-12 02:15:58 +00:00
Merge remote-tracking branch 'origin/main' into codex/im-channel-connections
This commit is contained in:
@@ -470,14 +470,32 @@ class AioSandboxProvider(SandboxProvider):
|
||||
|
||||
existing_id = self._thread_sandboxes[thread_id]
|
||||
if existing_id in self._sandboxes:
|
||||
suffix = " (post-lock check)" if post_lock else ""
|
||||
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}{suffix}")
|
||||
self._last_activity[existing_id] = time.time()
|
||||
return existing_id
|
||||
info = self._sandbox_infos.get(existing_id)
|
||||
else:
|
||||
del self._thread_sandboxes[thread_id]
|
||||
return None
|
||||
|
||||
del self._thread_sandboxes[thread_id]
|
||||
alive = self._check_tracked_sandbox_alive(existing_id, info) if info is not None else True
|
||||
if alive is False:
|
||||
self._drop_unhealthy_sandbox(
|
||||
existing_id,
|
||||
"in-process cache failed health check",
|
||||
expected_info=info,
|
||||
)
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
if self._thread_sandboxes.get(thread_id) != existing_id:
|
||||
return None
|
||||
if existing_id not in self._sandboxes:
|
||||
self._thread_sandboxes.pop(thread_id, None)
|
||||
return None
|
||||
|
||||
suffix = " (post-lock check)" if post_lock else ""
|
||||
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}{suffix}")
|
||||
self._last_activity[existing_id] = time.time()
|
||||
return existing_id
|
||||
|
||||
def _reclaim_warm_pool_sandbox(self, thread_id: str | None, sandbox_id: str, *, post_lock: bool = False) -> str | None:
|
||||
"""Promote a warm-pool sandbox back to active tracking if available."""
|
||||
if thread_id is None:
|
||||
@@ -487,7 +505,22 @@ class AioSandboxProvider(SandboxProvider):
|
||||
if sandbox_id not in self._warm_pool:
|
||||
return None
|
||||
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
info, _ = self._warm_pool[sandbox_id]
|
||||
|
||||
alive = self._check_tracked_sandbox_alive(sandbox_id, info)
|
||||
if alive is False:
|
||||
self._drop_unhealthy_sandbox(
|
||||
sandbox_id,
|
||||
"warm-pool cache failed health check",
|
||||
expected_info=info,
|
||||
)
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
warm_item = self._warm_pool.pop(sandbox_id, None)
|
||||
if warm_item is None:
|
||||
return None
|
||||
info, _ = warm_item
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
@@ -527,6 +560,70 @@ class AioSandboxProvider(SandboxProvider):
|
||||
logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}")
|
||||
return sandbox_id
|
||||
|
||||
def _check_tracked_sandbox_alive(self, sandbox_id: str, info: SandboxInfo) -> bool | None:
|
||||
"""Return whether a tracked sandbox appears alive, or None if unknown."""
|
||||
try:
|
||||
return self._backend.is_alive(info)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to check sandbox {sandbox_id} health: {e}")
|
||||
return None
|
||||
|
||||
def _remove_tracked_sandbox(
|
||||
self,
|
||||
sandbox_id: str,
|
||||
*,
|
||||
expected_info: SandboxInfo | None = None,
|
||||
) -> tuple[Sandbox | None, SandboxInfo | None, bool]:
|
||||
"""Remove a sandbox from in-process tracking maps.
|
||||
|
||||
When expected_info is provided, removal only happens if the currently
|
||||
tracked active or warm-pool entry is the exact info object that was
|
||||
checked. This prevents a stale health-check result from deleting a
|
||||
freshly recreated sandbox with the same deterministic id.
|
||||
"""
|
||||
thread_ids_to_remove: list[str] = []
|
||||
|
||||
with self._lock:
|
||||
active_info = self._sandbox_infos.get(sandbox_id)
|
||||
warm_item = self._warm_pool.get(sandbox_id)
|
||||
warm_info = warm_item[0] if warm_item is not None else None
|
||||
if expected_info is not None and active_info is not expected_info and warm_info is not expected_info:
|
||||
return None, None, False
|
||||
|
||||
sandbox = self._sandboxes.pop(sandbox_id, None)
|
||||
info = self._sandbox_infos.pop(sandbox_id, None)
|
||||
thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id]
|
||||
for tid in thread_ids_to_remove:
|
||||
del self._thread_sandboxes[tid]
|
||||
self._last_activity.pop(sandbox_id, None)
|
||||
if info is None and sandbox_id in self._warm_pool:
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
else:
|
||||
self._warm_pool.pop(sandbox_id, None)
|
||||
|
||||
return sandbox, info, True
|
||||
|
||||
def _drop_unhealthy_sandbox(self, sandbox_id: str, reason: str, *, expected_info: SandboxInfo | None = None) -> None:
|
||||
"""Remove and destroy a sandbox after a definitive failed health check."""
|
||||
sandbox, info, removed = self._remove_tracked_sandbox(sandbox_id, expected_info=expected_info)
|
||||
if not removed:
|
||||
logger.info(f"Skipped dropping sandbox {sandbox_id}: tracked info changed after health check")
|
||||
return
|
||||
|
||||
if sandbox is not None:
|
||||
try:
|
||||
sandbox.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing unhealthy sandbox {sandbox_id}: {e}")
|
||||
|
||||
if info is not None:
|
||||
try:
|
||||
self._backend.destroy(info)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error destroying unhealthy sandbox {sandbox_id}: {e}")
|
||||
|
||||
logger.warning(f"Dropped unhealthy sandbox {sandbox_id}: {reason}")
|
||||
|
||||
def _replica_count(self) -> tuple[int, int]:
|
||||
"""Return configured replicas and currently tracked sandbox count."""
|
||||
replicas = self._config.get("replicas", DEFAULT_REPLICAS)
|
||||
@@ -617,7 +714,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
|
||||
async def _acquire_internal_async(self, thread_id: str | None) -> str:
|
||||
"""Async counterpart to ``_acquire_internal``."""
|
||||
cached_id = self._reuse_in_process_sandbox(thread_id)
|
||||
cached_id = await asyncio.to_thread(self._reuse_in_process_sandbox, thread_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
@@ -625,7 +722,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
sandbox_id = self._sandbox_id_for_thread(thread_id)
|
||||
|
||||
# ── Layer 1.5: Warm pool (container still running, no cold-start) ──
|
||||
reclaimed_id = self._reclaim_warm_pool_sandbox(thread_id, sandbox_id)
|
||||
reclaimed_id = await asyncio.to_thread(self._reclaim_warm_pool_sandbox, thread_id, sandbox_id)
|
||||
if reclaimed_id is not None:
|
||||
return reclaimed_id
|
||||
|
||||
@@ -681,7 +778,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
locked = True
|
||||
# Re-check in-process caches under the file lock in case another
|
||||
# thread in this process won the race while we were waiting.
|
||||
cached_id = self._recheck_cached_sandbox(thread_id, sandbox_id)
|
||||
cached_id = await asyncio.to_thread(self._recheck_cached_sandbox, thread_id, sandbox_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
@@ -837,22 +934,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
Args:
|
||||
sandbox_id: The ID of the sandbox to destroy.
|
||||
"""
|
||||
info = None
|
||||
sandbox = None
|
||||
thread_ids_to_remove: list[str] = []
|
||||
|
||||
with self._lock:
|
||||
sandbox = self._sandboxes.pop(sandbox_id, None)
|
||||
info = self._sandbox_infos.pop(sandbox_id, None)
|
||||
thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id]
|
||||
for tid in thread_ids_to_remove:
|
||||
del self._thread_sandboxes[tid]
|
||||
self._last_activity.pop(sandbox_id, None)
|
||||
# Also pull from warm pool if it was parked there
|
||||
if info is None and sandbox_id in self._warm_pool:
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
else:
|
||||
self._warm_pool.pop(sandbox_id, None)
|
||||
sandbox, info, _ = self._remove_tracked_sandbox(sandbox_id)
|
||||
|
||||
if sandbox is not None:
|
||||
# Defense-in-depth: close() already swallows its own errors; this
|
||||
|
||||
@@ -169,6 +169,24 @@ def _resolve_docker_bind_host(sandbox_host: str | None = None, bind_host: str |
|
||||
return "0.0.0.0"
|
||||
|
||||
|
||||
def _is_no_such_container_error(stderr: str, container_name: str) -> bool:
|
||||
"""Return True only when stderr definitively says the container does not exist.
|
||||
|
||||
Docker reports "No such object" / "No such container". Apple Container
|
||||
reports a generic "not found", so that phrase is only trusted when the
|
||||
message also names the inspected container (or refers to a
|
||||
container/object); transient failures whose text happens to contain
|
||||
"not found" (e.g. "command not found", "context not found") must stay on
|
||||
the raise path instead of being misread as a dead container.
|
||||
"""
|
||||
message = stderr.lower()
|
||||
if "no such object" in message or "no such container" in message:
|
||||
return True
|
||||
if "not found" not in message:
|
||||
return False
|
||||
return container_name.lower() in message or "container" in message or "object" in message
|
||||
|
||||
|
||||
class LocalContainerBackend(SandboxBackend):
|
||||
"""Backend that manages sandbox containers locally using Docker or Apple Container.
|
||||
|
||||
@@ -335,11 +353,21 @@ class LocalContainerBackend(SandboxBackend):
|
||||
sandbox_id: The deterministic sandbox ID (determines container name).
|
||||
|
||||
Returns:
|
||||
SandboxInfo if container found and healthy, None otherwise.
|
||||
SandboxInfo if container found and healthy, None otherwise. A
|
||||
failed runtime check (e.g. transient daemon error) also returns
|
||||
None — discovery must not adopt a container it cannot verify, and
|
||||
falling through to create keeps acquire recoverable instead of
|
||||
hard-failing on a hiccup.
|
||||
"""
|
||||
container_name = f"{self._container_prefix}-{sandbox_id}"
|
||||
|
||||
if not self._is_container_running(container_name):
|
||||
try:
|
||||
running = self._is_container_running(container_name)
|
||||
except RuntimeError as e:
|
||||
logger.warning(f"Could not verify container {container_name} during discovery; not adopting it: {e}")
|
||||
return None
|
||||
|
||||
if not running:
|
||||
return None
|
||||
|
||||
port = self._get_container_port(container_name)
|
||||
@@ -582,6 +610,13 @@ class LocalContainerBackend(SandboxBackend):
|
||||
|
||||
This enables cross-process container discovery — any process can detect
|
||||
containers started by another process via the deterministic container name.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If the container runtime cannot answer the inspect
|
||||
query. A failed check is intentionally distinct from a
|
||||
definitive "container does not exist" result so callers do not
|
||||
destroy healthy containers during transient Docker/Container
|
||||
daemon failures.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
@@ -590,9 +625,14 @@ class LocalContainerBackend(SandboxBackend):
|
||||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
return result.returncode == 0 and result.stdout.strip().lower() == "true"
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
raise RuntimeError(f"Timed out checking container {container_name}") from exc
|
||||
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip().lower() == "true"
|
||||
if _is_no_such_container_error(result.stderr, container_name):
|
||||
return False
|
||||
raise RuntimeError(f"Failed to inspect container {container_name}: {result.stderr.strip()}")
|
||||
|
||||
def _get_container_port(self, container_name: str) -> int | None:
|
||||
"""Get the host port of a running container.
|
||||
|
||||
@@ -176,12 +176,16 @@ class RemoteSandboxBackend(SandboxBackend):
|
||||
f"{self._provisioner_url}/api/sandboxes/{sandbox_id}",
|
||||
timeout=10,
|
||||
)
|
||||
if resp.ok:
|
||||
data = resp.json()
|
||||
return data.get("status") == "Running"
|
||||
return False
|
||||
except requests.RequestException:
|
||||
except requests.RequestException as exc:
|
||||
raise RuntimeError(f"Provisioner health check failed for {sandbox_id}: {exc}") from exc
|
||||
|
||||
if resp.status_code == 404:
|
||||
return False
|
||||
if not resp.ok:
|
||||
raise RuntimeError(f"Provisioner health check failed for {sandbox_id}: HTTP {resp.status_code} {resp.text}")
|
||||
|
||||
data = resp.json()
|
||||
return data.get("status") == "Running"
|
||||
|
||||
def _provisioner_discover(self, sandbox_id: str) -> SandboxInfo | None:
|
||||
"""GET /api/sandboxes/{sandbox_id} → discover existing sandbox."""
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
from dataclasses import replace as dc_replace
|
||||
from typing import NotRequired, override
|
||||
|
||||
from langchain.agents import AgentState
|
||||
from langchain.agents.middleware import AgentMiddleware
|
||||
from langchain_core.messages import ToolMessage
|
||||
from langgraph.prebuilt.tool_node import ToolCallRequest
|
||||
from langgraph.runtime import Runtime
|
||||
from langgraph.types import Command
|
||||
|
||||
from deerflow.agents.thread_state import SandboxState, ThreadDataState
|
||||
from deerflow.sandbox import get_sandbox_provider
|
||||
@@ -126,3 +131,87 @@ class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):
|
||||
|
||||
# No sandbox to release
|
||||
return await super().aafter_agent(state, runtime)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Tool-call wrappers: persist lazily-acquired sandbox state into the
|
||||
# graph state via Command(update=...).
|
||||
#
|
||||
# Background:
|
||||
# ``ensure_sandbox_initialized*`` in ``deerflow.sandbox.tools`` mutates
|
||||
# ``runtime.state["sandbox"]`` directly. That mutation is local to the
|
||||
# current tool invocation and is NOT picked up by LangGraph's channel
|
||||
# reducer, so subsequent graph steps (and downstream consumers such as
|
||||
# ``ToolOutputBudgetMiddleware`` and the sub-agent ``task_tool``)
|
||||
# cannot observe the sandbox id. Wrapping the tool call lets us detect
|
||||
# a fresh lazy init by diffing the state snapshot before/after the
|
||||
# handler and emit a proper state update via ``Command``.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _read_sandbox_id_from_state(state: object) -> str | None:
|
||||
if not isinstance(state, dict):
|
||||
return None
|
||||
sandbox_state = state.get("sandbox")
|
||||
if not isinstance(sandbox_state, dict):
|
||||
return None
|
||||
sandbox_id = sandbox_state.get("sandbox_id")
|
||||
return sandbox_id if isinstance(sandbox_id, str) else None
|
||||
|
||||
@staticmethod
|
||||
def _attach_sandbox_update(result: ToolMessage | Command, sandbox_id: str) -> ToolMessage | Command:
|
||||
"""Wrap or merge ``result`` so that ``sandbox.sandbox_id`` is persisted.
|
||||
|
||||
- ``ToolMessage`` -> ``Command(update={"sandbox": ..., "messages": [msg]})``
|
||||
- ``Command`` with dict update -> merge ``sandbox`` key, preserve all
|
||||
existing fields (``messages``, ``goto``, ``graph``, ``resume``, ...).
|
||||
- ``Command`` with non-dict / None update -> leave it untouched to
|
||||
avoid silent data loss on unknown update shapes.
|
||||
"""
|
||||
sandbox_update = {"sandbox": {"sandbox_id": sandbox_id}}
|
||||
|
||||
if isinstance(result, ToolMessage):
|
||||
return Command(update={**sandbox_update, "messages": [result]})
|
||||
|
||||
existing_update = result.update
|
||||
if isinstance(existing_update, dict):
|
||||
merged_update = {**existing_update, **sandbox_update}
|
||||
return dc_replace(result, update=merged_update)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _read_sandbox_id_from_request(request: ToolCallRequest) -> str | None:
|
||||
"""Read sandbox_id from runtime.state (where ensure_sandbox_initialized writes)."""
|
||||
runtime = request.runtime
|
||||
if runtime is None or runtime.state is None:
|
||||
return None
|
||||
return SandboxMiddleware._read_sandbox_id_from_state(runtime.state)
|
||||
|
||||
@override
|
||||
def wrap_tool_call(
|
||||
self,
|
||||
request: ToolCallRequest,
|
||||
handler: Callable[[ToolCallRequest], ToolMessage | Command],
|
||||
) -> ToolMessage | Command:
|
||||
prev_sandbox_id = self._read_sandbox_id_from_request(request)
|
||||
result = handler(request)
|
||||
if prev_sandbox_id is not None:
|
||||
return result
|
||||
curr_sandbox_id = self._read_sandbox_id_from_request(request)
|
||||
if curr_sandbox_id is None:
|
||||
return result
|
||||
return self._attach_sandbox_update(result, curr_sandbox_id)
|
||||
|
||||
@override
|
||||
async def awrap_tool_call(
|
||||
self,
|
||||
request: ToolCallRequest,
|
||||
handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
|
||||
) -> ToolMessage | Command:
|
||||
prev_sandbox_id = self._read_sandbox_id_from_request(request)
|
||||
result = await handler(request)
|
||||
if prev_sandbox_id is not None:
|
||||
return result
|
||||
curr_sandbox_id = self._read_sandbox_id_from_request(request)
|
||||
if curr_sandbox_id is None:
|
||||
return result
|
||||
return self._attach_sandbox_update(result, curr_sandbox_id)
|
||||
|
||||
Reference in New Issue
Block a user