mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 16:35:59 +00:00
fix(gateway): sync thread rename and delete through ThreadMetaStore
The POST /threads/{id}/state endpoint previously synced title changes
only to the LangGraph Store via _store_upsert. In sqlite mode the search
endpoint reads from the ThreadMetaRepository SQL table, so renames never
appeared in /threads/search until the next agent run completed (worker.py
syncs title from checkpoint to thread_meta in its finally block).
Likewise the DELETE /threads/{id} endpoint cleaned up the filesystem,
Store, and checkpointer but left the threads_meta row orphaned in sqlite,
so deleted threads kept appearing in /threads/search.
Fix both endpoints by routing through the ThreadMetaStore abstraction
which already has the correct sqlite/memory implementations wired up by
deps.py. The rename path now calls update_display_name() and the delete
path calls delete() — both work uniformly across backends.
Verified end-to-end with curl in gateway mode against sqlite backend.
Existing test suite (1690 passed) and focused router/repo tests pass.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -221,12 +221,15 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
|
|||||||
"""Delete local persisted filesystem data for a thread.
|
"""Delete local persisted filesystem data for a thread.
|
||||||
|
|
||||||
Cleans DeerFlow-managed thread directories, removes checkpoint data,
|
Cleans DeerFlow-managed thread directories, removes checkpoint data,
|
||||||
and removes the thread record from the Store.
|
removes the thread record from the Store, and removes the thread_meta
|
||||||
|
row from the configured ThreadMetaStore (sqlite or memory).
|
||||||
"""
|
"""
|
||||||
|
from app.gateway.deps import get_thread_meta_repo
|
||||||
|
|
||||||
# Clean local filesystem
|
# Clean local filesystem
|
||||||
response = _delete_thread_data(thread_id)
|
response = _delete_thread_data(thread_id)
|
||||||
|
|
||||||
# Remove from Store (best-effort)
|
# Remove from Store (best-effort) — legacy in-memory thread record
|
||||||
store = get_store(request)
|
store = get_store(request)
|
||||||
if store is not None:
|
if store is not None:
|
||||||
try:
|
try:
|
||||||
@@ -243,6 +246,14 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Could not delete checkpoints for thread %s (not critical)", sanitize_log_param(thread_id))
|
logger.debug("Could not delete checkpoints for thread %s (not critical)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
|
# Remove thread_meta row (best-effort) — required for sqlite backend
|
||||||
|
# so the deleted thread no longer appears in /threads/search.
|
||||||
|
try:
|
||||||
|
thread_meta_repo = get_thread_meta_repo(request)
|
||||||
|
await thread_meta_repo.delete(thread_id)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Could not delete thread_meta for %s (not critical)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
@@ -499,11 +510,14 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
|||||||
"""Update thread state (e.g. for human-in-the-loop resume or title rename).
|
"""Update thread state (e.g. for human-in-the-loop resume or title rename).
|
||||||
|
|
||||||
Writes a new checkpoint that merges *body.values* into the latest
|
Writes a new checkpoint that merges *body.values* into the latest
|
||||||
channel values, then syncs any updated ``title`` field back to the Store
|
channel values, then syncs any updated ``title`` field through the
|
||||||
so that ``/threads/search`` reflects the change immediately.
|
ThreadMetaStore abstraction so that ``/threads/search`` reflects the
|
||||||
|
change immediately in both sqlite and memory backends.
|
||||||
"""
|
"""
|
||||||
|
from app.gateway.deps import get_thread_meta_repo
|
||||||
|
|
||||||
checkpointer = get_checkpointer(request)
|
checkpointer = get_checkpointer(request)
|
||||||
store = get_store(request)
|
thread_meta_repo = get_thread_meta_repo(request)
|
||||||
|
|
||||||
# checkpoint_ns must be present in the config for aput — default to ""
|
# checkpoint_ns must be present in the config for aput — default to ""
|
||||||
# (the root graph namespace). checkpoint_id is optional; omitting it
|
# (the root graph namespace). checkpoint_id is optional; omitting it
|
||||||
@@ -561,12 +575,15 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
|||||||
if isinstance(new_config, dict):
|
if isinstance(new_config, dict):
|
||||||
new_checkpoint_id = new_config.get("configurable", {}).get("checkpoint_id")
|
new_checkpoint_id = new_config.get("configurable", {}).get("checkpoint_id")
|
||||||
|
|
||||||
# Sync title changes to the Store so /threads/search reflects them immediately.
|
# Sync title changes through the ThreadMetaStore abstraction so /threads/search
|
||||||
if store is not None and body.values and "title" in body.values:
|
# reflects them immediately in both sqlite and memory backends.
|
||||||
try:
|
if body.values and "title" in body.values:
|
||||||
await _store_upsert(store, thread_id, values={"title": body.values["title"]})
|
new_title = body.values["title"]
|
||||||
except Exception:
|
if new_title: # Skip empty strings and None
|
||||||
logger.debug("Failed to sync title to store for thread %s (non-fatal)", sanitize_log_param(thread_id))
|
try:
|
||||||
|
await thread_meta_repo.update_display_name(thread_id, new_title)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Failed to sync title to thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
return ThreadStateResponse(
|
return ThreadStateResponse(
|
||||||
values=serialize_channel_values(channel_values),
|
values=serialize_channel_values(channel_values),
|
||||||
|
|||||||
Reference in New Issue
Block a user