fix(persistence): address 22 review comments from CodeQL, Copilot, and Code Quality

Bug fixes:
- Sanitize log params to prevent log injection (CodeQL)
- Reset threads_meta.status to idle/error when run completes
- Attach messages only to latest checkpoint in /history response
- Write threads_meta on POST /threads so new threads appear in search

Lint fixes:
- Remove unused imports (journal.py, migrations/env.py, test_converters.py)
- Convert lambda to named function (engine.py, Ruff E731)
- Remove unused logger definitions in repos (Ruff F841)
- Add logging to JSONL decode errors and empty except blocks
- Separate assert side-effects in tests (CodeQL)
- Remove unused local variables in tests (Ruff F841)
- Fix max_trace_content truncation to use byte length, not char length

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-05 22:49:26 +08:00
parent 32f69674a5
commit b94383c93a
15 changed files with 94 additions and 55 deletions
+39 -23
View File
@@ -35,6 +35,11 @@ logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/threads", tags=["threads"])
def _sanitize_log_param(value: str) -> str:
"""Strip control characters to prevent log injection."""
return value.replace("\n", "").replace("\r", "").replace("\x00", "")
# ---------------------------------------------------------------------------
# Response / request models
# ---------------------------------------------------------------------------
@@ -136,13 +141,13 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel
raise HTTPException(status_code=422, detail=str(exc)) from exc
except FileNotFoundError:
# Not critical — thread data may not exist on disk
logger.debug("No local thread data to delete for %s", thread_id)
logger.debug("No local thread data to delete for %s", _sanitize_log_param(thread_id))
return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}")
except Exception as exc:
logger.exception("Failed to delete thread data for %s", thread_id)
logger.exception("Failed to delete thread data for %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc
logger.info("Deleted local thread data for %s", thread_id)
logger.info("Deleted local thread data for %s", _sanitize_log_param(thread_id))
return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}")
@@ -231,7 +236,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
try:
await store.adelete(THREADS_NS, thread_id)
except Exception:
logger.debug("Could not delete store record for thread %s (not critical)", thread_id)
logger.debug("Could not delete store record for thread %s (not critical)", _sanitize_log_param(thread_id))
# Remove checkpoints (best-effort)
checkpointer = getattr(request.app.state, "checkpointer", None)
@@ -240,7 +245,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
if hasattr(checkpointer, "adelete_thread"):
await checkpointer.adelete_thread(thread_id)
except Exception:
logger.debug("Could not delete checkpoints for thread %s (not critical)", thread_id)
logger.debug("Could not delete checkpoints for thread %s (not critical)", _sanitize_log_param(thread_id))
return response
@@ -284,7 +289,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
},
)
except Exception:
logger.exception("Failed to write thread %s to store", thread_id)
logger.exception("Failed to write thread %s to store", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to create thread")
# Write an empty checkpoint so state endpoints work immediately
@@ -302,10 +307,24 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
}
await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {})
except Exception:
logger.exception("Failed to create checkpoint for thread %s", thread_id)
logger.exception("Failed to create checkpoint for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to create thread")
logger.info("Thread created: %s", thread_id)
# Write thread_meta so the thread appears in /threads/search immediately
from app.gateway.deps import get_thread_meta_repo
thread_meta_repo = get_thread_meta_repo(request)
if thread_meta_repo is not None:
try:
await thread_meta_repo.create(
thread_id,
assistant_id=getattr(body, "assistant_id", None),
metadata=body.metadata,
)
except Exception:
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", _sanitize_log_param(thread_id))
logger.info("Thread created: %s", _sanitize_log_param(thread_id))
return ThreadResponse(
thread_id=thread_id,
status="idle",
@@ -372,7 +391,7 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques
try:
await _store_put(store, updated)
except Exception:
logger.exception("Failed to patch thread %s", thread_id)
logger.exception("Failed to patch thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to update thread")
return ThreadResponse(
@@ -404,7 +423,7 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
try:
checkpoint_tuple = await checkpointer.aget_tuple(config)
except Exception:
logger.exception("Failed to get checkpoint for thread %s", thread_id)
logger.exception("Failed to get checkpoint for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to get thread")
if record is None and checkpoint_tuple is None:
@@ -452,7 +471,7 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
try:
checkpoint_tuple = await checkpointer.aget_tuple(config)
except Exception:
logger.exception("Failed to get state for thread %s", thread_id)
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to get thread state")
if checkpoint_tuple is None:
@@ -514,7 +533,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
try:
checkpoint_tuple = await checkpointer.aget_tuple(read_config)
except Exception:
logger.exception("Failed to get state for thread %s", thread_id)
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to get thread state")
if checkpoint_tuple is None:
@@ -548,7 +567,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
try:
new_config = await checkpointer.aput(write_config, checkpoint, metadata, {})
except Exception:
logger.exception("Failed to update state for thread %s", thread_id)
logger.exception("Failed to update state for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to update thread state")
new_checkpoint_id: str | None = None
@@ -560,7 +579,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
try:
await _store_upsert(store, thread_id, values={"title": body.values["title"]})
except Exception:
logger.debug("Failed to sync title to store for thread %s (non-fatal)", thread_id)
logger.debug("Failed to sync title to store for thread %s (non-fatal)", _sanitize_log_param(thread_id))
return ThreadStateResponse(
values=serialize_channel_values(channel_values),
@@ -594,16 +613,12 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
try:
all_messages = await event_store.list_messages(thread_id, limit=10_000)
except Exception:
logger.warning("Failed to load messages from event store for thread %s", thread_id, exc_info=True)
logger.warning("Failed to load messages from event store for thread %s", _sanitize_log_param(thread_id), exc_info=True)
all_messages = []
# Group messages by run_id for per-checkpoint assembly
messages_by_run: dict[str, list[dict]] = {}
for msg in all_messages:
run_id = msg.get("run_id", "")
messages_by_run.setdefault(run_id, []).append(msg.get("content", {}))
entries: list[HistoryEntry] = []
is_latest_checkpoint = True
try:
async for checkpoint_tuple in checkpointer.alist(config, limit=body.limit):
ckpt_config = getattr(checkpoint_tuple, "config", {})
@@ -625,9 +640,10 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
if thread_data := channel_values.get("thread_data"):
values["thread_data"] = thread_data
# Attach all messages from event store (not just this checkpoint's run)
if all_messages:
# Attach all messages only to the latest (first) checkpoint entry
if is_latest_checkpoint and all_messages:
values["messages"] = [m.get("content", {}) for m in all_messages]
is_latest_checkpoint = False
# Derive next tasks
tasks_raw = getattr(checkpoint_tuple, "tasks", []) or []
@@ -650,7 +666,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
)
)
except Exception:
logger.exception("Failed to get history for thread %s", thread_id)
logger.exception("Failed to get history for thread %s", _sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to get thread history")
return entries
+3 -2
View File
@@ -18,6 +18,7 @@ from fastapi import HTTPException, Request
from langchain_core.messages import HumanMessage
from app.gateway.deps import get_checkpointer, get_run_event_store, get_run_manager, get_run_store, get_store, get_stream_bridge, get_thread_meta_repo
from app.gateway.routers.threads import _sanitize_log_param
from deerflow.runtime import (
END_SENTINEL,
HEARTBEAT_SENTINEL,
@@ -184,7 +185,7 @@ async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None)
try:
await _store_upsert(store, thread_id, metadata=metadata)
except Exception:
logger.warning("Failed to upsert thread %s in store (non-fatal)", thread_id)
logger.warning("Failed to upsert thread %s in store (non-fatal)", _sanitize_log_param(thread_id))
async def _sync_thread_title_after_run(
@@ -312,7 +313,7 @@ async def start_run(
else:
await thread_meta_repo.update_status(thread_id, "running")
except Exception:
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", thread_id)
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", _sanitize_log_param(thread_id))
agent_factory = resolve_agent_factory(body.assistant_id)
graph_input = normalize_input(body.input)