mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-10 09:25:57 +00:00
Merge branch 'main' into fix-2788
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import base64
|
||||
import errno
|
||||
import logging
|
||||
import shlex
|
||||
import threading
|
||||
@@ -6,11 +7,14 @@ import uuid
|
||||
|
||||
from agent_sandbox import Sandbox as AioSandboxClient
|
||||
|
||||
from deerflow.config.paths import VIRTUAL_PATH_PREFIX
|
||||
from deerflow.sandbox.sandbox import Sandbox
|
||||
from deerflow.sandbox.search import GrepMatch, path_matches, should_ignore_path, truncate_line
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MAX_DOWNLOAD_SIZE = 100 * 1024 * 1024 # 100 MB
|
||||
|
||||
_ERROR_OBSERVATION_SIGNATURE = "'ErrorObservation' object has no attribute 'exit_code'"
|
||||
|
||||
|
||||
@@ -102,6 +106,49 @@ class AioSandbox(Sandbox):
|
||||
logger.error(f"Failed to read file in sandbox: {e}")
|
||||
return f"Error: {e}"
|
||||
|
||||
def download_file(self, path: str) -> bytes:
|
||||
"""Download file bytes from the sandbox.
|
||||
|
||||
Raises:
|
||||
PermissionError: If the path contains '..' traversal segments or is
|
||||
outside ``VIRTUAL_PATH_PREFIX``.
|
||||
OSError: If the file cannot be retrieved from the sandbox.
|
||||
"""
|
||||
# Reject path traversal before sending to the container API.
|
||||
# LocalSandbox gets this implicitly via _resolve_path;
|
||||
# here the path is forwarded verbatim so we must check explicitly.
|
||||
normalised = path.replace("\\", "/")
|
||||
for segment in normalised.split("/"):
|
||||
if segment == "..":
|
||||
logger.error(f"Refused download due to path traversal: {path}")
|
||||
raise PermissionError(f"Access denied: path traversal detected in '{path}'")
|
||||
|
||||
stripped_path = normalised.lstrip("/")
|
||||
allowed_prefix = VIRTUAL_PATH_PREFIX.lstrip("/")
|
||||
if stripped_path != allowed_prefix and not stripped_path.startswith(f"{allowed_prefix}/"):
|
||||
logger.error("Refused download outside allowed directory: path=%s, allowed_prefix=%s", path, VIRTUAL_PATH_PREFIX)
|
||||
raise PermissionError(f"Access denied: path must be under '{VIRTUAL_PATH_PREFIX}': '{path}'")
|
||||
|
||||
with self._lock:
|
||||
try:
|
||||
chunks: list[bytes] = []
|
||||
total = 0
|
||||
for chunk in self._client.file.download_file(path=path):
|
||||
total += len(chunk)
|
||||
if total > _MAX_DOWNLOAD_SIZE:
|
||||
raise OSError(
|
||||
errno.EFBIG,
|
||||
f"File exceeds maximum download size of {_MAX_DOWNLOAD_SIZE} bytes",
|
||||
path,
|
||||
)
|
||||
chunks.append(chunk)
|
||||
return b"".join(chunks)
|
||||
except OSError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download file in sandbox: {e}")
|
||||
raise OSError(f"Failed to download file '{path}' from sandbox: {e}") from e
|
||||
|
||||
def list_dir(self, path: str, max_depth: int = 2) -> list[str]:
|
||||
"""List the contents of a directory in the sandbox.
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ The provider itself handles:
|
||||
- Mount computation (thread-specific, skills)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import atexit
|
||||
import hashlib
|
||||
import logging
|
||||
@@ -18,6 +19,7 @@ import signal
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
try:
|
||||
import fcntl
|
||||
@@ -32,7 +34,7 @@ from deerflow.sandbox.sandbox import Sandbox
|
||||
from deerflow.sandbox.sandbox_provider import SandboxProvider
|
||||
|
||||
from .aio_sandbox import AioSandbox
|
||||
from .backend import SandboxBackend, wait_for_sandbox_ready
|
||||
from .backend import SandboxBackend, wait_for_sandbox_ready, wait_for_sandbox_ready_async
|
||||
from .local_backend import LocalContainerBackend
|
||||
from .remote_backend import RemoteSandboxBackend
|
||||
from .sandbox_info import SandboxInfo
|
||||
@@ -46,6 +48,9 @@ DEFAULT_CONTAINER_PREFIX = "deer-flow-sandbox"
|
||||
DEFAULT_IDLE_TIMEOUT = 600 # 10 minutes in seconds
|
||||
DEFAULT_REPLICAS = 3 # Maximum concurrent sandbox containers
|
||||
IDLE_CHECK_INTERVAL = 60 # Check every 60 seconds
|
||||
THREAD_LOCK_EXECUTOR_WORKERS = min(32, (os.cpu_count() or 1) + 4)
|
||||
_THREAD_LOCK_EXECUTOR = ThreadPoolExecutor(max_workers=THREAD_LOCK_EXECUTOR_WORKERS, thread_name_prefix="sandbox-lock-wait")
|
||||
atexit.register(_THREAD_LOCK_EXECUTOR.shutdown, wait=False, cancel_futures=True)
|
||||
|
||||
|
||||
def _lock_file_exclusive(lock_file) -> None:
|
||||
@@ -66,6 +71,40 @@ def _unlock_file(lock_file) -> None:
|
||||
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
def _open_lock_file(lock_path):
|
||||
return open(lock_path, "a", encoding="utf-8")
|
||||
|
||||
|
||||
async def _acquire_thread_lock_async(lock: threading.Lock) -> None:
|
||||
"""Acquire a threading.Lock without polling or using the default executor."""
|
||||
loop = asyncio.get_running_loop()
|
||||
acquire_future = loop.run_in_executor(_THREAD_LOCK_EXECUTOR, lock.acquire, True)
|
||||
|
||||
try:
|
||||
acquired = await asyncio.shield(acquire_future)
|
||||
except asyncio.CancelledError:
|
||||
acquire_future.add_done_callback(lambda task: _release_cancelled_lock_acquire(lock, task))
|
||||
raise
|
||||
|
||||
if not acquired:
|
||||
raise RuntimeError("Failed to acquire sandbox thread lock")
|
||||
|
||||
|
||||
def _release_cancelled_lock_acquire(lock: threading.Lock, task: asyncio.Future[bool]) -> None:
|
||||
"""Release a lock acquired after its awaiting coroutine was cancelled."""
|
||||
if task.cancelled():
|
||||
return
|
||||
|
||||
try:
|
||||
acquired = task.result()
|
||||
except Exception as e:
|
||||
logger.warning(f"Cancelled sandbox lock acquisition finished with error: {e}")
|
||||
return
|
||||
|
||||
if acquired:
|
||||
lock.release()
|
||||
|
||||
|
||||
class AioSandboxProvider(SandboxProvider):
|
||||
"""Sandbox provider that manages containers running the AIO sandbox.
|
||||
|
||||
@@ -419,6 +458,96 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._thread_locks[thread_id] = threading.Lock()
|
||||
return self._thread_locks[thread_id]
|
||||
|
||||
def _sandbox_id_for_thread(self, thread_id: str | None) -> str:
|
||||
"""Return deterministic IDs for thread sandboxes and random IDs otherwise."""
|
||||
return self._deterministic_sandbox_id(thread_id) if thread_id else str(uuid.uuid4())[:8]
|
||||
|
||||
def _reuse_in_process_sandbox(self, thread_id: str | None, *, post_lock: bool = False) -> str | None:
|
||||
"""Reuse an active in-process sandbox for a thread if one is still tracked."""
|
||||
if thread_id is None:
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
if thread_id not in self._thread_sandboxes:
|
||||
return None
|
||||
|
||||
existing_id = self._thread_sandboxes[thread_id]
|
||||
if existing_id in self._sandboxes:
|
||||
suffix = " (post-lock check)" if post_lock else ""
|
||||
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}{suffix}")
|
||||
self._last_activity[existing_id] = time.time()
|
||||
return existing_id
|
||||
|
||||
del self._thread_sandboxes[thread_id]
|
||||
return None
|
||||
|
||||
def _reclaim_warm_pool_sandbox(self, thread_id: str | None, sandbox_id: str, *, post_lock: bool = False) -> str | None:
|
||||
"""Promote a warm-pool sandbox back to active tracking if available."""
|
||||
if thread_id is None:
|
||||
return None
|
||||
|
||||
with self._lock:
|
||||
if sandbox_id not in self._warm_pool:
|
||||
return None
|
||||
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
|
||||
suffix = " (post-lock check)" if post_lock else f" at {info.sandbox_url}"
|
||||
logger.info(f"Reclaimed warm-pool sandbox {sandbox_id} for thread {thread_id}{suffix}")
|
||||
return sandbox_id
|
||||
|
||||
def _recheck_cached_sandbox(self, thread_id: str, sandbox_id: str) -> str | None:
|
||||
"""Re-check in-memory caches after acquiring the cross-process file lock."""
|
||||
return self._reuse_in_process_sandbox(thread_id, post_lock=True) or self._reclaim_warm_pool_sandbox(thread_id, sandbox_id, post_lock=True)
|
||||
|
||||
def _register_discovered_sandbox(self, thread_id: str, info: SandboxInfo) -> str:
|
||||
"""Track a sandbox discovered through the backend."""
|
||||
sandbox = AioSandbox(id=info.sandbox_id, base_url=info.sandbox_url)
|
||||
with self._lock:
|
||||
self._sandboxes[info.sandbox_id] = sandbox
|
||||
self._sandbox_infos[info.sandbox_id] = info
|
||||
self._last_activity[info.sandbox_id] = time.time()
|
||||
self._thread_sandboxes[thread_id] = info.sandbox_id
|
||||
|
||||
logger.info(f"Discovered existing sandbox {info.sandbox_id} for thread {thread_id} at {info.sandbox_url}")
|
||||
return info.sandbox_id
|
||||
|
||||
def _register_created_sandbox(self, thread_id: str | None, sandbox_id: str, info: SandboxInfo) -> str:
|
||||
"""Track a newly-created sandbox in the active maps."""
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
with self._lock:
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
if thread_id:
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
|
||||
logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}")
|
||||
return sandbox_id
|
||||
|
||||
def _replica_count(self) -> tuple[int, int]:
|
||||
"""Return configured replicas and currently tracked sandbox count."""
|
||||
replicas = self._config.get("replicas", DEFAULT_REPLICAS)
|
||||
with self._lock:
|
||||
total = len(self._sandboxes) + len(self._warm_pool)
|
||||
return replicas, total
|
||||
|
||||
def _log_replicas_soft_cap(self, replicas: int, sandbox_id: str, evicted: str | None) -> None:
|
||||
"""Log the result of enforcing the warm-pool replica budget."""
|
||||
if evicted:
|
||||
logger.info(f"Evicted warm-pool sandbox {evicted} to stay within replicas={replicas}")
|
||||
return
|
||||
|
||||
# All slots are occupied by active sandboxes — proceed anyway and log.
|
||||
# The replicas limit is a soft cap; we never forcibly stop a container
|
||||
# that is actively serving a thread.
|
||||
logger.warning(f"All {replicas} replica slots are in active use; creating sandbox {sandbox_id} beyond the soft limit")
|
||||
|
||||
# ── Core: acquire / get / release / shutdown ─────────────────────────
|
||||
|
||||
def acquire(self, thread_id: str | None = None) -> str:
|
||||
@@ -443,6 +572,23 @@ class AioSandboxProvider(SandboxProvider):
|
||||
else:
|
||||
return self._acquire_internal(thread_id)
|
||||
|
||||
async def acquire_async(self, thread_id: str | None = None) -> str:
|
||||
"""Acquire a sandbox environment without blocking the event loop.
|
||||
|
||||
Mirrors ``acquire()`` while keeping blocking backend operations off the
|
||||
event loop and using async-native readiness polling for newly created
|
||||
sandboxes.
|
||||
"""
|
||||
if thread_id:
|
||||
thread_lock = self._get_thread_lock(thread_id)
|
||||
await _acquire_thread_lock_async(thread_lock)
|
||||
try:
|
||||
return await self._acquire_internal_async(thread_id)
|
||||
finally:
|
||||
thread_lock.release()
|
||||
|
||||
return await self._acquire_internal_async(thread_id)
|
||||
|
||||
def _acquire_internal(self, thread_id: str | None) -> str:
|
||||
"""Internal sandbox acquisition with two-layer consistency.
|
||||
|
||||
@@ -451,33 +597,17 @@ class AioSandboxProvider(SandboxProvider):
|
||||
sandbox_id is deterministic from thread_id so no shared state file
|
||||
is needed — any process can derive the same container name)
|
||||
"""
|
||||
# ── Layer 1: In-process cache (fast path) ──
|
||||
if thread_id:
|
||||
with self._lock:
|
||||
if thread_id in self._thread_sandboxes:
|
||||
existing_id = self._thread_sandboxes[thread_id]
|
||||
if existing_id in self._sandboxes:
|
||||
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}")
|
||||
self._last_activity[existing_id] = time.time()
|
||||
return existing_id
|
||||
else:
|
||||
del self._thread_sandboxes[thread_id]
|
||||
cached_id = self._reuse_in_process_sandbox(thread_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
# Deterministic ID for thread-specific, random for anonymous
|
||||
sandbox_id = self._deterministic_sandbox_id(thread_id) if thread_id else str(uuid.uuid4())[:8]
|
||||
sandbox_id = self._sandbox_id_for_thread(thread_id)
|
||||
|
||||
# ── Layer 1.5: Warm pool (container still running, no cold-start) ──
|
||||
if thread_id:
|
||||
with self._lock:
|
||||
if sandbox_id in self._warm_pool:
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
logger.info(f"Reclaimed warm-pool sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}")
|
||||
return sandbox_id
|
||||
reclaimed_id = self._reclaim_warm_pool_sandbox(thread_id, sandbox_id)
|
||||
if reclaimed_id is not None:
|
||||
return reclaimed_id
|
||||
|
||||
# ── Layer 2: Backend discovery + create (protected by cross-process lock) ──
|
||||
# Use a file lock so that two processes racing to create the same sandbox
|
||||
@@ -488,6 +618,26 @@ class AioSandboxProvider(SandboxProvider):
|
||||
|
||||
return self._create_sandbox(thread_id, sandbox_id)
|
||||
|
||||
async def _acquire_internal_async(self, thread_id: str | None) -> str:
|
||||
"""Async counterpart to ``_acquire_internal``."""
|
||||
cached_id = self._reuse_in_process_sandbox(thread_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
# Deterministic ID for thread-specific, random for anonymous
|
||||
sandbox_id = self._sandbox_id_for_thread(thread_id)
|
||||
|
||||
# ── Layer 1.5: Warm pool (container still running, no cold-start) ──
|
||||
reclaimed_id = self._reclaim_warm_pool_sandbox(thread_id, sandbox_id)
|
||||
if reclaimed_id is not None:
|
||||
return reclaimed_id
|
||||
|
||||
# ── Layer 2: Backend discovery + create (protected by cross-process lock) ──
|
||||
if thread_id:
|
||||
return await self._discover_or_create_with_lock_async(thread_id, sandbox_id)
|
||||
|
||||
return await self._create_sandbox_async(thread_id, sandbox_id)
|
||||
|
||||
def _discover_or_create_with_lock(self, thread_id: str, sandbox_id: str) -> str:
|
||||
"""Discover an existing sandbox or create a new one under a cross-process file lock.
|
||||
|
||||
@@ -506,40 +656,50 @@ class AioSandboxProvider(SandboxProvider):
|
||||
locked = True
|
||||
# Re-check in-process caches under the file lock in case another
|
||||
# thread in this process won the race while we were waiting.
|
||||
with self._lock:
|
||||
if thread_id in self._thread_sandboxes:
|
||||
existing_id = self._thread_sandboxes[thread_id]
|
||||
if existing_id in self._sandboxes:
|
||||
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id} (post-lock check)")
|
||||
self._last_activity[existing_id] = time.time()
|
||||
return existing_id
|
||||
if sandbox_id in self._warm_pool:
|
||||
info, _ = self._warm_pool.pop(sandbox_id)
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
logger.info(f"Reclaimed warm-pool sandbox {sandbox_id} for thread {thread_id} (post-lock check)")
|
||||
return sandbox_id
|
||||
cached_id = self._recheck_cached_sandbox(thread_id, sandbox_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
# Backend discovery: another process may have created the container.
|
||||
discovered = self._backend.discover(sandbox_id)
|
||||
if discovered is not None:
|
||||
sandbox = AioSandbox(id=discovered.sandbox_id, base_url=discovered.sandbox_url)
|
||||
with self._lock:
|
||||
self._sandboxes[discovered.sandbox_id] = sandbox
|
||||
self._sandbox_infos[discovered.sandbox_id] = discovered
|
||||
self._last_activity[discovered.sandbox_id] = time.time()
|
||||
self._thread_sandboxes[thread_id] = discovered.sandbox_id
|
||||
logger.info(f"Discovered existing sandbox {discovered.sandbox_id} for thread {thread_id} at {discovered.sandbox_url}")
|
||||
return discovered.sandbox_id
|
||||
return self._register_discovered_sandbox(thread_id, discovered)
|
||||
|
||||
return self._create_sandbox(thread_id, sandbox_id)
|
||||
finally:
|
||||
if locked:
|
||||
_unlock_file(lock_file)
|
||||
|
||||
async def _discover_or_create_with_lock_async(self, thread_id: str, sandbox_id: str) -> str:
|
||||
"""Async counterpart to ``_discover_or_create_with_lock``."""
|
||||
paths = get_paths()
|
||||
user_id = get_effective_user_id()
|
||||
await asyncio.to_thread(paths.ensure_thread_dirs, thread_id, user_id=user_id)
|
||||
lock_path = paths.thread_dir(thread_id, user_id=user_id) / f"{sandbox_id}.lock"
|
||||
|
||||
lock_file = await asyncio.to_thread(_open_lock_file, lock_path)
|
||||
locked = False
|
||||
try:
|
||||
await asyncio.to_thread(_lock_file_exclusive, lock_file)
|
||||
locked = True
|
||||
# Re-check in-process caches under the file lock in case another
|
||||
# thread in this process won the race while we were waiting.
|
||||
cached_id = self._recheck_cached_sandbox(thread_id, sandbox_id)
|
||||
if cached_id is not None:
|
||||
return cached_id
|
||||
|
||||
# Backend discovery is sync because local discovery may inspect
|
||||
# Docker and perform a health check; keep it off the event loop.
|
||||
discovered = await asyncio.to_thread(self._backend.discover, sandbox_id)
|
||||
if discovered is not None:
|
||||
return self._register_discovered_sandbox(thread_id, discovered)
|
||||
|
||||
return await self._create_sandbox_async(thread_id, sandbox_id)
|
||||
finally:
|
||||
if locked:
|
||||
await asyncio.to_thread(_unlock_file, lock_file)
|
||||
await asyncio.to_thread(lock_file.close)
|
||||
|
||||
def _evict_oldest_warm(self) -> str | None:
|
||||
"""Destroy the oldest container in the warm pool to free capacity.
|
||||
|
||||
@@ -577,18 +737,10 @@ class AioSandboxProvider(SandboxProvider):
|
||||
|
||||
# Enforce replicas: only warm-pool containers count toward eviction budget.
|
||||
# Active sandboxes are in use by live threads and must not be forcibly stopped.
|
||||
replicas = self._config.get("replicas", DEFAULT_REPLICAS)
|
||||
with self._lock:
|
||||
total = len(self._sandboxes) + len(self._warm_pool)
|
||||
replicas, total = self._replica_count()
|
||||
if total >= replicas:
|
||||
evicted = self._evict_oldest_warm()
|
||||
if evicted:
|
||||
logger.info(f"Evicted warm-pool sandbox {evicted} to stay within replicas={replicas}")
|
||||
else:
|
||||
# All slots are occupied by active sandboxes — proceed anyway and log.
|
||||
# The replicas limit is a soft cap; we never forcibly stop a container
|
||||
# that is actively serving a thread.
|
||||
logger.warning(f"All {replicas} replica slots are in active use; creating sandbox {sandbox_id} beyond the soft limit")
|
||||
self._log_replicas_soft_cap(replicas, sandbox_id, evicted)
|
||||
|
||||
info = self._backend.create(thread_id, sandbox_id, extra_mounts=extra_mounts or None)
|
||||
|
||||
@@ -597,16 +749,27 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._backend.destroy(info)
|
||||
raise RuntimeError(f"Sandbox {sandbox_id} failed to become ready within timeout at {info.sandbox_url}")
|
||||
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
|
||||
with self._lock:
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._sandbox_infos[sandbox_id] = info
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
if thread_id:
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
return self._register_created_sandbox(thread_id, sandbox_id, info)
|
||||
|
||||
logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}")
|
||||
return sandbox_id
|
||||
async def _create_sandbox_async(self, thread_id: str | None, sandbox_id: str) -> str:
|
||||
"""Async counterpart to ``_create_sandbox``."""
|
||||
extra_mounts = await asyncio.to_thread(self._get_extra_mounts, thread_id)
|
||||
|
||||
# Enforce replicas: only warm-pool containers count toward eviction budget.
|
||||
# Active sandboxes are in use by live threads and must not be forcibly stopped.
|
||||
replicas, total = self._replica_count()
|
||||
if total >= replicas:
|
||||
evicted = await asyncio.to_thread(self._evict_oldest_warm)
|
||||
self._log_replicas_soft_cap(replicas, sandbox_id, evicted)
|
||||
|
||||
info = await asyncio.to_thread(self._backend.create, thread_id, sandbox_id, extra_mounts=extra_mounts or None)
|
||||
|
||||
# Wait for sandbox to be ready without blocking the event loop.
|
||||
if not await wait_for_sandbox_ready_async(info.sandbox_url, timeout=60):
|
||||
await asyncio.to_thread(self._backend.destroy, info)
|
||||
raise RuntimeError(f"Sandbox {sandbox_id} failed to become ready within timeout at {info.sandbox_url}")
|
||||
|
||||
return self._register_created_sandbox(thread_id, sandbox_id, info)
|
||||
|
||||
def get(self, sandbox_id: str) -> Sandbox | None:
|
||||
"""Get a sandbox by ID. Updates last activity timestamp.
|
||||
|
||||
@@ -2,10 +2,12 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
import httpx
|
||||
import requests
|
||||
|
||||
from .sandbox_info import SandboxInfo
|
||||
@@ -35,6 +37,34 @@ def wait_for_sandbox_ready(sandbox_url: str, timeout: int = 30) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
async def wait_for_sandbox_ready_async(sandbox_url: str, timeout: int = 30, poll_interval: float = 1.0) -> bool:
|
||||
"""Async variant of sandbox readiness polling.
|
||||
|
||||
Use this from async runtime paths so sandbox startup waits do not block the
|
||||
event loop. The synchronous ``wait_for_sandbox_ready`` function remains for
|
||||
existing synchronous backend/provider call sites.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline = loop.time() + timeout
|
||||
|
||||
async with httpx.AsyncClient(timeout=5) as client:
|
||||
while True:
|
||||
remaining = deadline - loop.time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
try:
|
||||
response = await client.get(f"{sandbox_url}/v1/sandbox", timeout=min(5.0, remaining))
|
||||
if response.status_code == 200:
|
||||
return True
|
||||
except httpx.RequestError:
|
||||
pass
|
||||
remaining = deadline - loop.time()
|
||||
if remaining <= 0:
|
||||
break
|
||||
await asyncio.sleep(min(poll_interval, remaining))
|
||||
return False
|
||||
|
||||
|
||||
class SandboxBackend(ABC):
|
||||
"""Abstract base for sandbox provisioning backends.
|
||||
|
||||
@@ -44,7 +74,7 @@ class SandboxBackend(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
def create(self, thread_id: str | None, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
"""Create/provision a new sandbox.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -241,7 +241,7 @@ class LocalContainerBackend(SandboxBackend):
|
||||
|
||||
# ── SandboxBackend interface ──────────────────────────────────────────
|
||||
|
||||
def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
def create(self, thread_id: str | None, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
"""Start a new container and return its connection info.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -21,6 +21,8 @@ import logging
|
||||
|
||||
import requests
|
||||
|
||||
from deerflow.runtime.user_context import get_effective_user_id
|
||||
|
||||
from .backend import SandboxBackend
|
||||
from .sandbox_info import SandboxInfo
|
||||
|
||||
@@ -57,7 +59,7 @@ class RemoteSandboxBackend(SandboxBackend):
|
||||
|
||||
def create(
|
||||
self,
|
||||
thread_id: str,
|
||||
thread_id: str | None,
|
||||
sandbox_id: str,
|
||||
extra_mounts: list[tuple[str, str, bool]] | None = None,
|
||||
) -> SandboxInfo:
|
||||
@@ -130,7 +132,7 @@ class RemoteSandboxBackend(SandboxBackend):
|
||||
logger.warning("Provisioner list_running failed: %s", exc)
|
||||
return []
|
||||
|
||||
def _provisioner_create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
def _provisioner_create(self, thread_id: str | None, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
|
||||
"""POST /api/sandboxes → create Pod + Service."""
|
||||
try:
|
||||
resp = requests.post(
|
||||
@@ -138,6 +140,7 @@ class RemoteSandboxBackend(SandboxBackend):
|
||||
json={
|
||||
"sandbox_id": sandbox_id,
|
||||
"thread_id": thread_id,
|
||||
"user_id": get_effective_user_id(),
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user