mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-21 15:36:48 +00:00
c881d95898
* fix(mcp): persist MCP sessions across tool calls for stateful servers MCP tools loaded via langchain-mcp-adapters created a new session on every call, causing stateful servers like Playwright to lose browser state (pages, forms) between consecutive tool invocations within the same thread. Add MCPSessionPool that maintains persistent sessions scoped by (server_name, thread_id). Tool calls within the same thread now reuse the same MCP session, preserving server-side state. Sessions are evicted in LRU order (max 256) and cleaned up on cache invalidation. Fixes #3054 * fix(sandbox): add group/other read permissions to uploaded files for Docker sandbox (#3127) When using AIO sandbox with LocalContainerBackend, uploaded files are created with 0o600 (owner-only) permissions by the gateway process running as root. The sandbox process inside the Docker container runs as a non-root user and cannot read these bind-mounted files, causing a "Permission denied" error on read_file. Add `needs_upload_permission_adjustment` attribute to SandboxProvider (default True) to indicate that uploaded files need chmod adjustment. LocalSandboxProvider opts out (same user). A new `_make_file_sandbox_readable` function adds S_IRGRP | S_IROTH bits after files are written, changing permissions from 0o600 to 0o644 so the sandbox can read the uploads. * fix(mcp): address review comments on session pool and tools - _extract_thread_id: return "default" instead of stringifying None when get_config() returns no thread_id - call_with_persistent_session: fix **arguments annotation from dict[str,Any] to Any - Replace private _convert_call_tool_result import with a local implementation that handles all MCP content block types - _make_session_pool_tool: accept tool_interceptors and apply the configured interceptor chain on every call (preserving OAuth and custom interceptors) - MCPSessionPool: replace asyncio.Lock with threading.Lock; restructure get/close methods to never await while holding the lock; add close_all_sync() that closes sessions on their owning event loops - reset_mcp_tools_cache: use pool.close_all_sync() instead of asyncio.run-in-thread to close sessions deterministically - test: add test_session_pool_tool_sync_wrapper_path_is_safe covering tool invocation via the sync wrapper (tool.func) path Agent-Logs-Url: https://github.com/bytedance/deer-flow/sessions/9e7f9e7f-1d2b-464a-b3b7-7f1649b74122 Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com> * fix(mcp): extract SESSION_CLOSE_TIMEOUT to class constant Agent-Logs-Url: https://github.com/bytedance/deer-flow/sessions/9e7f9e7f-1d2b-464a-b3b7-7f1649b74122 Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com> * Potential fix for pull request finding 'Empty except' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
"""Cache for MCP tools to avoid repeated loading."""
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
|
||
from langchain_core.tools import BaseTool
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_mcp_tools_cache: list[BaseTool] | None = None
|
||
_cache_initialized = False
|
||
_initialization_lock = asyncio.Lock()
|
||
_config_mtime: float | None = None # Track config file modification time
|
||
|
||
|
||
def _get_config_mtime() -> float | None:
|
||
"""Get the modification time of the extensions config file.
|
||
|
||
Returns:
|
||
The modification time as a float, or None if the file doesn't exist.
|
||
"""
|
||
from deerflow.config.extensions_config import ExtensionsConfig
|
||
|
||
config_path = ExtensionsConfig.resolve_config_path()
|
||
if config_path and config_path.exists():
|
||
return os.path.getmtime(config_path)
|
||
return None
|
||
|
||
|
||
def _is_cache_stale() -> bool:
|
||
"""Check if the cache is stale due to config file changes.
|
||
|
||
Returns:
|
||
True if the cache should be invalidated, False otherwise.
|
||
"""
|
||
global _config_mtime
|
||
|
||
if not _cache_initialized:
|
||
return False # Not initialized yet, not stale
|
||
|
||
current_mtime = _get_config_mtime()
|
||
|
||
# If we couldn't get mtime before or now, assume not stale
|
||
if _config_mtime is None or current_mtime is None:
|
||
return False
|
||
|
||
# If the config file has been modified since we cached, it's stale
|
||
if current_mtime > _config_mtime:
|
||
logger.info(f"MCP config file has been modified (mtime: {_config_mtime} -> {current_mtime}), cache is stale")
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
async def initialize_mcp_tools() -> list[BaseTool]:
|
||
"""Initialize and cache MCP tools.
|
||
|
||
This should be called once at application startup.
|
||
|
||
Returns:
|
||
List of LangChain tools from all enabled MCP servers.
|
||
"""
|
||
global _mcp_tools_cache, _cache_initialized, _config_mtime
|
||
|
||
async with _initialization_lock:
|
||
if _cache_initialized:
|
||
logger.info("MCP tools already initialized")
|
||
return _mcp_tools_cache or []
|
||
|
||
from deerflow.mcp.tools import get_mcp_tools
|
||
|
||
logger.info("Initializing MCP tools...")
|
||
_mcp_tools_cache = await get_mcp_tools()
|
||
_cache_initialized = True
|
||
_config_mtime = _get_config_mtime() # Record config file mtime
|
||
logger.info(f"MCP tools initialized: {len(_mcp_tools_cache)} tool(s) loaded (config mtime: {_config_mtime})")
|
||
|
||
return _mcp_tools_cache
|
||
|
||
|
||
def get_cached_mcp_tools() -> list[BaseTool]:
|
||
"""Get cached MCP tools with lazy initialization.
|
||
|
||
If tools are not initialized, automatically initializes them.
|
||
This ensures MCP tools work in both FastAPI and LangGraph Studio contexts.
|
||
|
||
Also checks if the config file has been modified since last initialization,
|
||
and re-initializes if needed. This ensures that changes made through the
|
||
Gateway API (which runs in a separate process) are reflected in the
|
||
LangGraph Server.
|
||
|
||
Returns:
|
||
List of cached MCP tools.
|
||
"""
|
||
global _cache_initialized
|
||
|
||
# Check if cache is stale due to config file changes
|
||
if _is_cache_stale():
|
||
logger.info("MCP cache is stale, resetting for re-initialization...")
|
||
reset_mcp_tools_cache()
|
||
|
||
if not _cache_initialized:
|
||
logger.info("MCP tools not initialized, performing lazy initialization...")
|
||
try:
|
||
# Try to initialize in the current event loop
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_running():
|
||
# If loop is already running (e.g., in LangGraph Studio),
|
||
# we need to create a new loop in a thread
|
||
import concurrent.futures
|
||
|
||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||
future = executor.submit(asyncio.run, initialize_mcp_tools())
|
||
future.result()
|
||
else:
|
||
# If no loop is running, we can use the current loop
|
||
loop.run_until_complete(initialize_mcp_tools())
|
||
except RuntimeError:
|
||
# No event loop exists, create one
|
||
try:
|
||
asyncio.run(initialize_mcp_tools())
|
||
except Exception:
|
||
logger.exception("Failed to lazy-initialize MCP tools")
|
||
return []
|
||
except Exception:
|
||
logger.exception("Failed to lazy-initialize MCP tools")
|
||
return []
|
||
|
||
return _mcp_tools_cache or []
|
||
|
||
|
||
def reset_mcp_tools_cache() -> None:
|
||
"""Reset the MCP tools cache.
|
||
|
||
This is useful for testing or when you want to reload MCP tools.
|
||
Also closes all persistent MCP sessions so they are recreated on
|
||
the next tool load.
|
||
"""
|
||
global _mcp_tools_cache, _cache_initialized, _config_mtime
|
||
_mcp_tools_cache = None
|
||
_cache_initialized = False
|
||
_config_mtime = None
|
||
|
||
# Close persistent sessions – they will be recreated by the next
|
||
# get_mcp_tools() call with the (possibly updated) connection config.
|
||
try:
|
||
from deerflow.mcp.session_pool import get_session_pool
|
||
|
||
pool = get_session_pool()
|
||
pool.close_all_sync()
|
||
except Exception:
|
||
logger.debug("Could not close MCP session pool on cache reset", exc_info=True)
|
||
|
||
from deerflow.mcp.session_pool import reset_session_pool
|
||
|
||
reset_session_pool()
|
||
logger.info("MCP tools cache reset")
|