mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-10 17:35:57 +00:00
fix(mcp): close stdio sessions on their owning loop to avoid cross-task cancel-scope error (#3379) (#3392)
* fix(mcp): close stdio sessions on their owning loop to avoid cross-task cancel-scope error (#3379) Adopt an owner-task lifecycle for pooled MCP ClientSessions so each session is entered, initialized, and exited within a single asyncio task on its owning event loop. This eliminates the anyio "Attempted to exit cancel scope in a different task than it was entered in" RuntimeError that surfaced when stdio MCP tools were used via the sync tool wrapper (which spins up and tears down event loops across tasks). Also harden the pool lifecycle: - track in-flight session creation per (server, scope) to dedupe concurrent get_session() calls for the same key - make close_scope/close_server/close_all/close_all_sync cover both established entries and in-flight creations so sessions cannot be resurrected or leaked after close - handle cross-loop preemption of an in-flight creation by cancelling the stale owner task instead of only signalling it - define close_all_sync() semantics for a running loop on the current thread (signal-only, async completion) and route reset_mcp_tools_cache through a deterministic async close in that case * fix(mcp): avoid reset deadlock on running loop cache reset * fix(mcp): address session pool review feedback
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
"""MCP (Model Context Protocol) integration using langchain-mcp-adapters."""
|
||||
|
||||
from .cache import get_cached_mcp_tools, initialize_mcp_tools, reset_mcp_tools_cache
|
||||
from .cache import (
|
||||
get_cached_mcp_tools,
|
||||
initialize_mcp_tools,
|
||||
reset_mcp_tools_cache,
|
||||
)
|
||||
from .client import build_server_params, build_servers_config
|
||||
from .tools import get_mcp_tools
|
||||
|
||||
|
||||
@@ -143,11 +143,20 @@ def reset_mcp_tools_cache() -> None:
|
||||
|
||||
# Close persistent sessions – they will be recreated by the next
|
||||
# get_mcp_tools() call with the (possibly updated) connection config.
|
||||
#
|
||||
# close_all_sync() already picks the correct strategy per owning loop:
|
||||
# * sessions owned by the *current* running loop are only *signalled*
|
||||
# (their owner task runs __aexit__ once the loop regains control –
|
||||
# this is correct and leak-free, since the loop keeps the task alive),
|
||||
# * sessions on other threads' loops are torn down deterministically,
|
||||
# * idle/closed loops are handled or skipped.
|
||||
# We deliberately do NOT try to synchronously wait for the current running
|
||||
# loop to finish teardown here: that is a self-deadlock (the loop can only
|
||||
# run the teardown after this synchronous call returns control to it).
|
||||
try:
|
||||
from deerflow.mcp.session_pool import get_session_pool
|
||||
|
||||
pool = get_session_pool()
|
||||
pool.close_all_sync()
|
||||
get_session_pool().close_all_sync()
|
||||
except Exception:
|
||||
logger.debug("Could not close MCP session pool on cache reset", exc_info=True)
|
||||
|
||||
|
||||
@@ -8,6 +8,27 @@ This module provides a session pool that maintains persistent MCP sessions,
|
||||
scoped by ``(server_name, scope_key)`` — typically scope_key is the thread_id —
|
||||
so that consecutive tool calls share the same session and server-side state.
|
||||
Sessions are evicted in LRU order when the pool reaches capacity.
|
||||
|
||||
Lifecycle model (owner task)
|
||||
----------------------------
|
||||
An MCP ``ClientSession`` is implemented on top of an ``anyio`` task group, and
|
||||
anyio enforces that a cancel scope must be exited from the *same task* that
|
||||
entered it. Calling ``cm.__aexit__`` from any task other than the one that ran
|
||||
``cm.__aenter__`` raises::
|
||||
|
||||
RuntimeError: Attempted to exit cancel scope in a different task than it
|
||||
was entered in
|
||||
|
||||
The sync-tool path (``make_sync_tool_wrapper``) drives each call through a fresh
|
||||
``asyncio.run`` event loop, so a session entered while answering one call would
|
||||
otherwise be exited while answering another — from a different task — and crash
|
||||
(GitHub issue #3379).
|
||||
|
||||
To make this impossible, every pooled session is owned by a dedicated
|
||||
``_run_session`` task. That task enters the context manager, hands the live
|
||||
session back to the caller, and then *waits* on a close event. All shutdown
|
||||
paths only ever **signal** that event; the owner task performs ``__aexit__``
|
||||
itself, guaranteeing enter and exit always happen in the same task.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -27,18 +48,81 @@ class MCPSessionPool:
|
||||
"""Manages persistent MCP sessions scoped by ``(server_name, scope_key)``."""
|
||||
|
||||
MAX_SESSIONS = 256
|
||||
SESSION_CLOSE_TIMEOUT = 5.0 # seconds to wait when closing a session via run_coroutine_threadsafe
|
||||
SESSION_CLOSE_TIMEOUT = 5.0 # seconds to wait when closing a session on a foreign loop
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Each entry: (session, owning_loop, owner_task, close_event).
|
||||
self._entries: OrderedDict[
|
||||
tuple[str, str],
|
||||
tuple[ClientSession, asyncio.AbstractEventLoop],
|
||||
tuple[
|
||||
ClientSession,
|
||||
asyncio.AbstractEventLoop,
|
||||
asyncio.Task[Any],
|
||||
asyncio.Event,
|
||||
],
|
||||
] = OrderedDict()
|
||||
self._context_managers: dict[tuple[str, str], Any] = {}
|
||||
# In-flight creations, keyed by (server, scope). Lets concurrent callers
|
||||
# on the same loop share a single creation instead of each spawning a
|
||||
# duplicate session. Value: (loop, ready_future, owner_task, close_event).
|
||||
self._inflight: dict[
|
||||
tuple[str, str],
|
||||
tuple[
|
||||
asyncio.AbstractEventLoop,
|
||||
asyncio.Future[ClientSession],
|
||||
asyncio.Task[Any],
|
||||
asyncio.Event,
|
||||
],
|
||||
] = {}
|
||||
# threading.Lock is not bound to any event loop, so it is safe to
|
||||
# acquire from both async paths and sync/worker-thread paths.
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Session owner task
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_session(
|
||||
self,
|
||||
connection: dict[str, Any],
|
||||
ready: asyncio.Future[ClientSession],
|
||||
close_evt: asyncio.Event,
|
||||
) -> None:
|
||||
"""Own a single MCP session for its entire lifetime.
|
||||
|
||||
Enters the session context manager, initializes it, publishes the live
|
||||
session via ``ready``, then blocks until ``close_evt`` is set. The
|
||||
context manager is *always* exited from this task, satisfying anyio's
|
||||
cancel-scope same-task requirement.
|
||||
"""
|
||||
from langchain_mcp_adapters.sessions import create_session
|
||||
|
||||
cm = create_session(connection)
|
||||
try:
|
||||
session = await cm.__aenter__()
|
||||
except BaseException as e:
|
||||
# Never entered the cancel scope, so there is nothing to exit.
|
||||
if not ready.done():
|
||||
ready.set_exception(e)
|
||||
return
|
||||
|
||||
# The context manager is now entered. From here on __aexit__ MUST run in
|
||||
# this task — on init failure, on cancellation, or on the close signal —
|
||||
# to satisfy anyio's same-task cancel-scope requirement and to avoid
|
||||
# leaking the session/subprocess.
|
||||
try:
|
||||
await session.initialize()
|
||||
if not ready.done():
|
||||
ready.set_result(session)
|
||||
await close_evt.wait()
|
||||
except BaseException as e:
|
||||
if not ready.done():
|
||||
ready.set_exception(e)
|
||||
finally:
|
||||
try:
|
||||
await cm.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session", exc_info=True)
|
||||
|
||||
async def get_session(
|
||||
self,
|
||||
server_name: str,
|
||||
@@ -47,9 +131,9 @@ class MCPSessionPool:
|
||||
) -> ClientSession:
|
||||
"""Get or create a persistent MCP session.
|
||||
|
||||
If an existing session was created in a different event loop (e.g.
|
||||
the sync-wrapper path), it is closed and replaced with a fresh one
|
||||
in the current loop.
|
||||
If an existing session was created in a different (or closed) event
|
||||
loop, it is evicted and replaced with a fresh one owned by a task on
|
||||
the current loop.
|
||||
|
||||
Args:
|
||||
server_name: MCP server name.
|
||||
@@ -63,44 +147,118 @@ class MCPSessionPool:
|
||||
current_loop = asyncio.get_running_loop()
|
||||
|
||||
# Phase 1: inspect/mutate the registry under the thread lock (no awaits).
|
||||
cms_to_close: list[tuple[tuple[str, str], Any]] = []
|
||||
# Decide one of three outcomes atomically: return an existing session,
|
||||
# join an in-flight creation, or become the creator for this key.
|
||||
# Each item: (loop, owner_task, close_event, cancel). ``cancel`` is True
|
||||
# for in-flight creations, whose owner may be blocked inside
|
||||
# ``initialize()`` where close_evt cannot wake it — it must be cancelled.
|
||||
evicted: list[tuple[asyncio.AbstractEventLoop, asyncio.Task[Any], asyncio.Event, bool]] = []
|
||||
join: asyncio.Future[ClientSession] | None = None
|
||||
ready: asyncio.Future[ClientSession] | None = None
|
||||
close_evt: asyncio.Event | None = None
|
||||
task: asyncio.Task[Any] | None = None
|
||||
with self._lock:
|
||||
if key in self._entries:
|
||||
session, loop = self._entries[key]
|
||||
if loop is current_loop:
|
||||
session, loop, ent_task, ent_close = self._entries[key]
|
||||
if loop is current_loop and not loop.is_closed():
|
||||
self._entries.move_to_end(key)
|
||||
return session
|
||||
# Session belongs to a different event loop – evict it.
|
||||
cm = self._context_managers.pop(key, None)
|
||||
# Session belongs to a different/closed event loop – evict it.
|
||||
self._entries.pop(key)
|
||||
if cm is not None:
|
||||
cms_to_close.append((key, cm))
|
||||
evicted.append((loop, ent_task, ent_close, False))
|
||||
|
||||
inflight = self._inflight.get(key)
|
||||
if inflight is not None and inflight[0] is current_loop and not inflight[0].is_closed():
|
||||
# Another caller on this loop is already creating the session;
|
||||
# wait for the same result instead of building a duplicate.
|
||||
join = inflight[1]
|
||||
else:
|
||||
if inflight is not None:
|
||||
# Stale in-flight creation owned by a different/closed loop.
|
||||
# Drop the record and tear its owner down; because that owner
|
||||
# may be blocked inside initialize() (where close_evt cannot
|
||||
# wake it), it must be cancelled. We then create a fresh
|
||||
# session here.
|
||||
self._inflight.pop(key)
|
||||
evicted.append((inflight[0], inflight[2], inflight[3], True))
|
||||
# Become the creator: publish an in-flight record before any
|
||||
# await so concurrent callers join us instead of racing.
|
||||
ready = current_loop.create_future()
|
||||
close_evt = asyncio.Event()
|
||||
task = current_loop.create_task(self._run_session(connection, ready, close_evt))
|
||||
self._inflight[key] = (current_loop, ready, task, close_evt)
|
||||
|
||||
# Evict LRU entries when at capacity.
|
||||
while len(self._entries) >= self.MAX_SESSIONS:
|
||||
oldest_key = next(iter(self._entries))
|
||||
cm = self._context_managers.pop(oldest_key, None)
|
||||
oldest_key, (_, loop, ent_task, ent_close) = next(iter(self._entries.items()))
|
||||
self._entries.pop(oldest_key)
|
||||
if cm is not None:
|
||||
cms_to_close.append((oldest_key, cm))
|
||||
evicted.append((loop, ent_task, ent_close, False))
|
||||
|
||||
# Phase 2: async cleanup outside the lock so we never await while holding it.
|
||||
for close_key, cm in cms_to_close:
|
||||
# Phase 2: shut down evicted sessions/creations. Same-loop owners are
|
||||
# awaited so they finish deterministically; foreign-loop owners are
|
||||
# routed to their own loop. In every case the owner task — never this
|
||||
# one — runs __aexit__. In-flight owners are cancelled (cancel=True) so a
|
||||
# blocking initialize() cannot leave them hung.
|
||||
for loop, ent_task, ent_close, cancel in evicted:
|
||||
if loop is current_loop and not loop.is_closed():
|
||||
await self._shutdown(ent_close, ent_task, cancel)
|
||||
elif cancel:
|
||||
await self._shutdown_entry(loop, ent_task, ent_close, cancel=True)
|
||||
else:
|
||||
self._signal_close(loop, ent_close)
|
||||
|
||||
# Phase 2b: a concurrent creation for this key is already in progress on
|
||||
# this loop — share its result rather than create a duplicate session.
|
||||
if join is not None:
|
||||
return await asyncio.shield(join)
|
||||
|
||||
assert ready is not None and close_evt is not None and task is not None
|
||||
|
||||
# Phase 3: wait for our owner task to publish the initialized session.
|
||||
try:
|
||||
session = await asyncio.shield(ready)
|
||||
except BaseException:
|
||||
# Two distinct cases reach here:
|
||||
#
|
||||
# 1. The owner task failed (e.g. connect/initialize error) and
|
||||
# reported it via ready.set_exception(). It is *already* in its
|
||||
# finally block running cm.__aexit__ in its own task, so we must
|
||||
# NOT cancel it — doing so would interrupt that cleanup. We only
|
||||
# wait for it to finish unwinding.
|
||||
# 2. This call itself was cancelled (CancelledError). Because of the
|
||||
# shield, `ready` is still pending and the owner task is alive and
|
||||
# blocked. We signal close and cancel it so it exits the cancel
|
||||
# scope in its own task, then wait for it to finish.
|
||||
#
|
||||
# The session is never registered yet, so nobody else can close it;
|
||||
# waiting here guarantees we never leak a session or owner task.
|
||||
owner_already_failed = ready.done() and not ready.cancelled() and ready.exception() is not None
|
||||
if not owner_already_failed:
|
||||
close_evt.set()
|
||||
task.cancel()
|
||||
try:
|
||||
await cm.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session %s", close_key, exc_info=True)
|
||||
await asyncio.shield(task)
|
||||
except BaseException:
|
||||
logger.debug("Owner task ended during get_session unwind", exc_info=True)
|
||||
with self._lock:
|
||||
if self._inflight.get(key) == (current_loop, ready, task, close_evt):
|
||||
self._inflight.pop(key)
|
||||
raise
|
||||
|
||||
from langchain_mcp_adapters.sessions import create_session
|
||||
|
||||
cm = create_session(connection)
|
||||
session = await cm.__aenter__()
|
||||
await session.initialize()
|
||||
|
||||
# Phase 3: register the new session under the lock.
|
||||
# Phase 4: promote the in-flight creation to a registered entry — but
|
||||
# only if our in-flight record is still the live one. A concurrent
|
||||
# close_* / close_all may have removed it while we were initializing; in
|
||||
# that case we must NOT resurrect the session into _entries. Instead we
|
||||
# own the teardown: signal our owner task and wait for it to run
|
||||
# __aexit__ in its own task, then surface the cancellation.
|
||||
with self._lock:
|
||||
self._entries[key] = (session, current_loop)
|
||||
self._context_managers[key] = cm
|
||||
still_ours = self._inflight.get(key) == (current_loop, ready, task, close_evt)
|
||||
if still_ours:
|
||||
self._inflight.pop(key)
|
||||
self._entries[key] = (session, current_loop, task, close_evt)
|
||||
if not still_ours:
|
||||
await self._shutdown(close_evt, task)
|
||||
raise asyncio.CancelledError("MCP session pool was closed while the session was being created")
|
||||
logger.info("Created persistent MCP session for %s/%s", server_name, scope_key)
|
||||
return session
|
||||
|
||||
@@ -108,70 +266,169 @@ class MCPSessionPool:
|
||||
# Cleanup helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _close_cm(self, key: tuple[str, str], cm: Any) -> None:
|
||||
"""Close a single context manager (must be called WITHOUT the lock)."""
|
||||
@staticmethod
|
||||
def _signal_close(loop: asyncio.AbstractEventLoop, close_evt: asyncio.Event) -> None:
|
||||
"""Ask an owner task to shut down without waiting.
|
||||
|
||||
``asyncio.Event.set`` is not thread-safe, so it is scheduled on the
|
||||
owning loop. A closed loop means the owner task is already gone.
|
||||
"""
|
||||
if loop.is_closed():
|
||||
return
|
||||
try:
|
||||
await cm.__aexit__(None, None, None)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session %s", key, exc_info=True)
|
||||
loop.call_soon_threadsafe(close_evt.set)
|
||||
except RuntimeError:
|
||||
# Loop was closed between the is_closed() check and now.
|
||||
pass
|
||||
|
||||
async def _shutdown(
|
||||
self,
|
||||
close_evt: asyncio.Event,
|
||||
task: asyncio.Task[Any],
|
||||
cancel: bool = False,
|
||||
) -> None:
|
||||
"""Signal an owner task and wait for it to finish (runs on its loop).
|
||||
|
||||
``cancel=True`` is used for in-flight creations: the owner task may be
|
||||
blocked inside ``initialize()`` where ``close_evt`` cannot wake it, so it
|
||||
must be cancelled. Its ``finally`` block still runs ``__aexit__`` in its
|
||||
own task, satisfying anyio's same-task cancel-scope requirement.
|
||||
"""
|
||||
close_evt.set()
|
||||
if cancel:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except (Exception, asyncio.CancelledError):
|
||||
logger.debug("Owner task ended during shutdown", exc_info=True)
|
||||
|
||||
async def _shutdown_entry(
|
||||
self,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
task: asyncio.Task[Any],
|
||||
close_evt: asyncio.Event,
|
||||
cancel: bool = False,
|
||||
) -> None:
|
||||
"""Shut down one entry, routing the close to its owning loop."""
|
||||
if loop.is_closed():
|
||||
return
|
||||
current_loop = asyncio.get_running_loop()
|
||||
if loop is current_loop:
|
||||
await self._shutdown(close_evt, task, cancel)
|
||||
elif loop.is_running():
|
||||
future = asyncio.run_coroutine_threadsafe(self._shutdown(close_evt, task, cancel), loop)
|
||||
try:
|
||||
await asyncio.wrap_future(future)
|
||||
except Exception:
|
||||
logger.warning("Error closing MCP session on owning loop", exc_info=True)
|
||||
else:
|
||||
# Owning loop exists but is neither the current loop nor running.
|
||||
# We are inside an async context here, so run_until_complete() would
|
||||
# raise "Cannot run the event loop while another loop is running";
|
||||
# and the loop may belong to another thread, where driving it from
|
||||
# here is unsafe. This branch is not expected in practice — a
|
||||
# session's owning loop is either the long-lived gateway loop (which
|
||||
# is running) or a short-lived asyncio.run loop (which is closed and
|
||||
# caught above). Fall back to a best-effort thread-safe signal so the
|
||||
# owner task tears down if/when its loop runs again.
|
||||
logger.warning("Owning loop for MCP session is idle; signalling close best-effort. Session may leak until the loop runs again.")
|
||||
self._signal_close(loop, close_evt)
|
||||
if cancel:
|
||||
try:
|
||||
loop.call_soon_threadsafe(task.cancel)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
async def close_scope(self, scope_key: str) -> None:
|
||||
"""Close all sessions for a given scope (e.g. thread_id)."""
|
||||
with self._lock:
|
||||
keys = [k for k in self._entries if k[1] == scope_key]
|
||||
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
|
||||
for k in keys:
|
||||
self._entries.pop(k, None)
|
||||
for key, cm in cms:
|
||||
if cm is not None:
|
||||
await self._close_cm(key, cm)
|
||||
entries = [(self._entries.pop(k)) for k in keys]
|
||||
inflight_keys = [k for k in self._inflight if k[1] == scope_key]
|
||||
inflight = [self._inflight.pop(k) for k in inflight_keys]
|
||||
for _session, loop, task, close_evt in entries:
|
||||
await self._shutdown_entry(loop, task, close_evt)
|
||||
for loop, _ready, task, close_evt in inflight:
|
||||
await self._shutdown_entry(loop, task, close_evt, cancel=True)
|
||||
|
||||
async def close_server(self, server_name: str) -> None:
|
||||
"""Close all sessions for a given server."""
|
||||
with self._lock:
|
||||
keys = [k for k in self._entries if k[0] == server_name]
|
||||
cms = [(k, self._context_managers.pop(k, None)) for k in keys]
|
||||
for k in keys:
|
||||
self._entries.pop(k, None)
|
||||
for key, cm in cms:
|
||||
if cm is not None:
|
||||
await self._close_cm(key, cm)
|
||||
entries = [(self._entries.pop(k)) for k in keys]
|
||||
inflight_keys = [k for k in self._inflight if k[0] == server_name]
|
||||
inflight = [self._inflight.pop(k) for k in inflight_keys]
|
||||
for _session, loop, task, close_evt in entries:
|
||||
await self._shutdown_entry(loop, task, close_evt)
|
||||
for loop, _ready, task, close_evt in inflight:
|
||||
await self._shutdown_entry(loop, task, close_evt, cancel=True)
|
||||
|
||||
async def close_all(self) -> None:
|
||||
"""Close every managed session."""
|
||||
with self._lock:
|
||||
cms = list(self._context_managers.items())
|
||||
self._context_managers.clear()
|
||||
entries = list(self._entries.values())
|
||||
self._entries.clear()
|
||||
for key, cm in cms:
|
||||
await self._close_cm(key, cm)
|
||||
inflight = list(self._inflight.values())
|
||||
self._inflight.clear()
|
||||
for _session, loop, task, close_evt in entries:
|
||||
await self._shutdown_entry(loop, task, close_evt)
|
||||
for loop, _ready, task, close_evt in inflight:
|
||||
await self._shutdown_entry(loop, task, close_evt, cancel=True)
|
||||
|
||||
def close_all_sync(self) -> None:
|
||||
"""Close all sessions using their owning event loops (synchronous).
|
||||
"""Close all sessions on their owning event loops (synchronous).
|
||||
|
||||
Each session is closed on the loop it was created in, avoiding
|
||||
cross-loop resource leaks. Safe to call from any thread without an
|
||||
active event loop.
|
||||
Each session is closed by its owner task on the loop it was created in,
|
||||
avoiding cross-loop and cross-task errors. Safe to call from any thread
|
||||
without an active event loop.
|
||||
|
||||
Closing semantics differ by where the owning loop runs:
|
||||
|
||||
* Owning loop is idle, or running on another thread — this call blocks
|
||||
until teardown completes (or ``SESSION_CLOSE_TIMEOUT`` elapses).
|
||||
* Owning loop is the one currently running on *this* thread — we cannot
|
||||
block on it without deadlocking, so teardown is only *signalled* here
|
||||
and completes asynchronously once control returns to that loop. The
|
||||
caller must therefore keep that loop running afterwards; if it stops
|
||||
the loop immediately, the owner task's ``__aexit__`` may not run. When
|
||||
a deterministic close is required from inside a running loop, ``await
|
||||
close_all()`` instead.
|
||||
"""
|
||||
with self._lock:
|
||||
entries = list(self._entries.items())
|
||||
cms = dict(self._context_managers)
|
||||
entries = list(self._entries.values())
|
||||
self._entries.clear()
|
||||
self._context_managers.clear()
|
||||
inflight = list(self._inflight.values())
|
||||
self._inflight.clear()
|
||||
|
||||
for key, (_, loop) in entries:
|
||||
cm = cms.get(key)
|
||||
if cm is None or loop.is_closed():
|
||||
# Entries are initialized (gentle close_evt path). In-flight creations
|
||||
# may be blocked mid-init, so they are cancelled to unblock teardown.
|
||||
owners = [(loop, task, close_evt, False) for _s, loop, task, close_evt in entries]
|
||||
owners += [(loop, task, close_evt, True) for loop, _r, task, close_evt in inflight]
|
||||
try:
|
||||
current_running_loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
current_running_loop = None
|
||||
for loop, task, close_evt, cancel in owners:
|
||||
if loop.is_closed():
|
||||
continue
|
||||
try:
|
||||
if loop.is_running():
|
||||
# Schedule on the owning loop from this (different) thread.
|
||||
future = asyncio.run_coroutine_threadsafe(cm.__aexit__(None, None, None), loop)
|
||||
if loop is current_running_loop:
|
||||
# We are executing inside this loop's thread, so synchronously
|
||||
# waiting on run_coroutine_threadsafe(...).result() would
|
||||
# deadlock until timeout. Signal the owner task directly and
|
||||
# let it finish once this synchronous call returns control to
|
||||
# the running loop.
|
||||
close_evt.set()
|
||||
if cancel:
|
||||
task.cancel()
|
||||
elif loop.is_running():
|
||||
# Schedule the shutdown on the owning loop from this thread.
|
||||
future = asyncio.run_coroutine_threadsafe(self._shutdown(close_evt, task, cancel), loop)
|
||||
future.result(timeout=self.SESSION_CLOSE_TIMEOUT)
|
||||
else:
|
||||
loop.run_until_complete(cm.__aexit__(None, None, None))
|
||||
loop.run_until_complete(self._shutdown(close_evt, task, cancel))
|
||||
except Exception:
|
||||
logger.debug("Error closing MCP session %s during sync close", key, exc_info=True)
|
||||
logger.debug("Error closing MCP session during sync close", exc_info=True)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user