fix(threads): fall back to Store search when ThreadMetaRepository is unavailable

When database.backend=memory (default) or no SQL session factory is
configured, search_threads now queries the LangGraph Store instead of
returning 503. Returns empty list if neither Store nor repo is available.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-06 17:30:00 +08:00
parent a5831d3abf
commit 51c68db376
+34 -11
View File
@@ -332,22 +332,15 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
@router.post("/search", response_model=list[ThreadResponse]) @router.post("/search", response_model=list[ThreadResponse])
async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]: async def search_threads(body: ThreadSearchRequest, request: Request) -> list[ThreadResponse]:
"""Search and list threads from the threads_meta table. """Search and list threads.
NOTE: Migration from pre-persistence-layer deployments: Uses ThreadMetaRepository (SQL) when available, otherwise falls back
Threads created via LangGraph Server before this change are NOT to the LangGraph Store for memory/lightweight deployments.
automatically indexed in threads_meta. They will not appear in
search results until a new run is created on them (which triggers
thread_meta upsert in services.py). For bulk migration, run:
python -m deerflow.persistence.migrate_threads_from_checkpointer
(migration script TBD in a follow-up PR)
""" """
from app.gateway.deps import get_thread_meta_repo from app.gateway.deps import get_thread_meta_repo
repo = get_thread_meta_repo(request) repo = get_thread_meta_repo(request)
if repo is None: if repo is not None:
raise HTTPException(status_code=503, detail="Thread metadata store not available")
rows = await repo.search( rows = await repo.search(
metadata=body.metadata or None, metadata=body.metadata or None,
status=body.status, status=body.status,
@@ -367,6 +360,36 @@ async def search_threads(body: ThreadSearchRequest, request: Request) -> list[Th
for r in rows for r in rows
] ]
# Fallback: search the LangGraph Store (memory / no-SQL deployments)
store = get_store(request)
if store is None:
return []
filter_dict: dict[str, Any] = {}
if body.metadata:
filter_dict.update(body.metadata)
if body.status:
filter_dict["status"] = body.status
items = await store.asearch(
THREADS_NS,
filter=filter_dict or None,
limit=body.limit,
offset=body.offset,
)
return [
ThreadResponse(
thread_id=item.key,
status=item.value.get("status", "idle"),
created_at=str(item.value.get("created_at", "")),
updated_at=str(item.value.get("updated_at", "")),
metadata=item.value.get("metadata", {}),
values=item.value.get("values", {}),
interrupts={},
)
for item in items
]
@router.patch("/{thread_id}", response_model=ThreadResponse) @router.patch("/{thread_id}", response_model=ThreadResponse)
async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Request) -> ThreadResponse: async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Request) -> ThreadResponse: