mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 16:35:59 +00:00
refactor(gateway): move sanitize_log_param to app/gateway/utils.py
Extract the log-injection sanitizer from routers/threads.py into a shared utils module and rename to sanitize_log_param (public API). Eliminates the reverse service → router import in services.py. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -21,6 +21,7 @@ from fastapi import APIRouter, HTTPException, Request
|
|||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
from app.gateway.deps import get_checkpointer, get_store
|
from app.gateway.deps import get_checkpointer, get_store
|
||||||
|
from app.gateway.utils import sanitize_log_param
|
||||||
from deerflow.config.paths import Paths, get_paths
|
from deerflow.config.paths import Paths, get_paths
|
||||||
from deerflow.runtime import serialize_channel_values
|
from deerflow.runtime import serialize_channel_values
|
||||||
|
|
||||||
@@ -35,11 +36,6 @@ logger = logging.getLogger(__name__)
|
|||||||
router = APIRouter(prefix="/api/threads", tags=["threads"])
|
router = APIRouter(prefix="/api/threads", tags=["threads"])
|
||||||
|
|
||||||
|
|
||||||
def _sanitize_log_param(value: str) -> str:
|
|
||||||
"""Strip control characters to prevent log injection."""
|
|
||||||
return value.replace("\n", "").replace("\r", "").replace("\x00", "")
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Response / request models
|
# Response / request models
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -141,13 +137,13 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel
|
|||||||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
# Not critical — thread data may not exist on disk
|
# Not critical — thread data may not exist on disk
|
||||||
logger.debug("No local thread data to delete for %s", _sanitize_log_param(thread_id))
|
logger.debug("No local thread data to delete for %s", sanitize_log_param(thread_id))
|
||||||
return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}")
|
return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}")
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.exception("Failed to delete thread data for %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to delete thread data for %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc
|
raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc
|
||||||
|
|
||||||
logger.info("Deleted local thread data for %s", _sanitize_log_param(thread_id))
|
logger.info("Deleted local thread data for %s", sanitize_log_param(thread_id))
|
||||||
return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}")
|
return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}")
|
||||||
|
|
||||||
|
|
||||||
@@ -236,7 +232,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
|
|||||||
try:
|
try:
|
||||||
await store.adelete(THREADS_NS, thread_id)
|
await store.adelete(THREADS_NS, thread_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Could not delete store record for thread %s (not critical)", _sanitize_log_param(thread_id))
|
logger.debug("Could not delete store record for thread %s (not critical)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
# Remove checkpoints (best-effort)
|
# Remove checkpoints (best-effort)
|
||||||
checkpointer = getattr(request.app.state, "checkpointer", None)
|
checkpointer = getattr(request.app.state, "checkpointer", None)
|
||||||
@@ -245,7 +241,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
|
|||||||
if hasattr(checkpointer, "adelete_thread"):
|
if hasattr(checkpointer, "adelete_thread"):
|
||||||
await checkpointer.adelete_thread(thread_id)
|
await checkpointer.adelete_thread(thread_id)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Could not delete checkpoints for thread %s (not critical)", _sanitize_log_param(thread_id))
|
logger.debug("Could not delete checkpoints for thread %s (not critical)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -289,7 +285,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to write thread %s to store", _sanitize_log_param(thread_id))
|
logger.exception("Failed to write thread %s to store", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to create thread")
|
raise HTTPException(status_code=500, detail="Failed to create thread")
|
||||||
|
|
||||||
# Write an empty checkpoint so state endpoints work immediately
|
# Write an empty checkpoint so state endpoints work immediately
|
||||||
@@ -307,7 +303,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
|
|||||||
}
|
}
|
||||||
await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {})
|
await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {})
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to create checkpoint for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to create checkpoint for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to create thread")
|
raise HTTPException(status_code=500, detail="Failed to create thread")
|
||||||
|
|
||||||
# Write thread_meta so the thread appears in /threads/search immediately
|
# Write thread_meta so the thread appears in /threads/search immediately
|
||||||
@@ -322,9 +318,9 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
|
|||||||
metadata=body.metadata,
|
metadata=body.metadata,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", _sanitize_log_param(thread_id))
|
logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
logger.info("Thread created: %s", _sanitize_log_param(thread_id))
|
logger.info("Thread created: %s", sanitize_log_param(thread_id))
|
||||||
return ThreadResponse(
|
return ThreadResponse(
|
||||||
thread_id=thread_id,
|
thread_id=thread_id,
|
||||||
status="idle",
|
status="idle",
|
||||||
@@ -391,7 +387,7 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques
|
|||||||
try:
|
try:
|
||||||
await _store_put(store, updated)
|
await _store_put(store, updated)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to patch thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to patch thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to update thread")
|
raise HTTPException(status_code=500, detail="Failed to update thread")
|
||||||
|
|
||||||
return ThreadResponse(
|
return ThreadResponse(
|
||||||
@@ -423,7 +419,7 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
|
|||||||
try:
|
try:
|
||||||
checkpoint_tuple = await checkpointer.aget_tuple(config)
|
checkpoint_tuple = await checkpointer.aget_tuple(config)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to get checkpoint for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to get checkpoint for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to get thread")
|
raise HTTPException(status_code=500, detail="Failed to get thread")
|
||||||
|
|
||||||
if record is None and checkpoint_tuple is None:
|
if record is None and checkpoint_tuple is None:
|
||||||
@@ -471,7 +467,7 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
|
|||||||
try:
|
try:
|
||||||
checkpoint_tuple = await checkpointer.aget_tuple(config)
|
checkpoint_tuple = await checkpointer.aget_tuple(config)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to get state for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to get thread state")
|
raise HTTPException(status_code=500, detail="Failed to get thread state")
|
||||||
|
|
||||||
if checkpoint_tuple is None:
|
if checkpoint_tuple is None:
|
||||||
@@ -533,7 +529,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
|||||||
try:
|
try:
|
||||||
checkpoint_tuple = await checkpointer.aget_tuple(read_config)
|
checkpoint_tuple = await checkpointer.aget_tuple(read_config)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to get state for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to get thread state")
|
raise HTTPException(status_code=500, detail="Failed to get thread state")
|
||||||
|
|
||||||
if checkpoint_tuple is None:
|
if checkpoint_tuple is None:
|
||||||
@@ -567,7 +563,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
|||||||
try:
|
try:
|
||||||
new_config = await checkpointer.aput(write_config, checkpoint, metadata, {})
|
new_config = await checkpointer.aput(write_config, checkpoint, metadata, {})
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to update state for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to update state for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to update thread state")
|
raise HTTPException(status_code=500, detail="Failed to update thread state")
|
||||||
|
|
||||||
new_checkpoint_id: str | None = None
|
new_checkpoint_id: str | None = None
|
||||||
@@ -579,7 +575,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
|
|||||||
try:
|
try:
|
||||||
await _store_upsert(store, thread_id, values={"title": body.values["title"]})
|
await _store_upsert(store, thread_id, values={"title": body.values["title"]})
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Failed to sync title to store for thread %s (non-fatal)", _sanitize_log_param(thread_id))
|
logger.debug("Failed to sync title to store for thread %s (non-fatal)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
return ThreadStateResponse(
|
return ThreadStateResponse(
|
||||||
values=serialize_channel_values(channel_values),
|
values=serialize_channel_values(channel_values),
|
||||||
@@ -613,7 +609,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
|
|||||||
try:
|
try:
|
||||||
all_messages = await event_store.list_messages(thread_id, limit=10_000)
|
all_messages = await event_store.list_messages(thread_id, limit=10_000)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to load messages from event store for thread %s", _sanitize_log_param(thread_id), exc_info=True)
|
logger.warning("Failed to load messages from event store for thread %s", sanitize_log_param(thread_id), exc_info=True)
|
||||||
all_messages = []
|
all_messages = []
|
||||||
|
|
||||||
entries: list[HistoryEntry] = []
|
entries: list[HistoryEntry] = []
|
||||||
@@ -665,7 +661,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to get history for thread %s", _sanitize_log_param(thread_id))
|
logger.exception("Failed to get history for thread %s", sanitize_log_param(thread_id))
|
||||||
raise HTTPException(status_code=500, detail="Failed to get thread history")
|
raise HTTPException(status_code=500, detail="Failed to get thread history")
|
||||||
|
|
||||||
return entries
|
return entries
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ from fastapi import HTTPException, Request
|
|||||||
from langchain_core.messages import HumanMessage
|
from langchain_core.messages import HumanMessage
|
||||||
|
|
||||||
from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge
|
from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge
|
||||||
from app.gateway.routers.threads import _sanitize_log_param
|
from app.gateway.utils import sanitize_log_param
|
||||||
from deerflow.runtime import (
|
from deerflow.runtime import (
|
||||||
END_SENTINEL,
|
END_SENTINEL,
|
||||||
HEARTBEAT_SENTINEL,
|
HEARTBEAT_SENTINEL,
|
||||||
@@ -186,7 +186,7 @@ async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None)
|
|||||||
try:
|
try:
|
||||||
await _store_upsert(store, thread_id, metadata=metadata)
|
await _store_upsert(store, thread_id, metadata=metadata)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to upsert thread %s in store (non-fatal)", _sanitize_log_param(thread_id))
|
logger.warning("Failed to upsert thread %s in store (non-fatal)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
|
|
||||||
async def _sync_thread_title_after_run(
|
async def _sync_thread_title_after_run(
|
||||||
@@ -309,7 +309,7 @@ async def start_run(
|
|||||||
else:
|
else:
|
||||||
await run_ctx.thread_meta_repo.update_status(thread_id, "running")
|
await run_ctx.thread_meta_repo.update_status(thread_id, "running")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", _sanitize_log_param(thread_id))
|
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
|
||||||
|
|
||||||
agent_factory = resolve_agent_factory(body.assistant_id)
|
agent_factory = resolve_agent_factory(body.assistant_id)
|
||||||
graph_input = normalize_input(body.input)
|
graph_input = normalize_input(body.input)
|
||||||
|
|||||||
@@ -0,0 +1,6 @@
|
|||||||
|
"""Shared utility helpers for the Gateway layer."""
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_log_param(value: str) -> str:
|
||||||
|
"""Strip control characters to prevent log injection."""
|
||||||
|
return value.replace("\n", "").replace("\r", "").replace("\x00", "")
|
||||||
Reference in New Issue
Block a user