refactor(persistence): rename owner_id to user_id and thread_meta_repo to thread_store

Rename owner_id to user_id across all persistence models, repositories,
stores, routers, and tests for clearer semantics. Rename thread_meta_repo
to thread_store for consistency with run_store/run_event_store naming.
Add ThreadMetaStore return type annotation to get_thread_store().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-10 15:05:10 +08:00
parent 03952eca53
commit 8da1903168
32 changed files with 256 additions and 276 deletions
+11 -4
View File
@@ -42,6 +42,11 @@ logger = logging.getLogger(__name__)
async def _ensure_admin_user(app: FastAPI) -> None:
"""Startup hook: generate init token on first boot; migrate orphan threads otherwise.
After admin creation, migrate orphan threads from the LangGraph
store (metadata.user_id unset) to the admin account. This is the
"no-auth → with-auth" upgrade path: users who ran DeerFlow without
authentication have existing LangGraph thread data that needs an
owner assigned.
First boot (no admin exists):
- Generates a one-time ``init_token`` stored in ``app.state.init_token``
- Logs the token to stdout so the operator can copy-paste it into the
@@ -52,7 +57,7 @@ async def _ensure_admin_user(app: FastAPI) -> None:
- Runs the one-time "no-auth → with-auth" orphan thread migration for
existing LangGraph thread metadata that has no owner_id.
No SQL persistence migration is needed: the four owner_id columns
No SQL persistence migration is needed: the four user_id columns
(threads_meta, runs, run_events, feedback) only come into existence
alongside the auth module via create_all, so freshly created tables
never contain NULL-owner rows.
@@ -96,6 +101,8 @@ async def _ensure_admin_user(app: FastAPI) -> None:
admin_id = str(row.id)
# LangGraph store orphan migration — non-fatal.
# This covers the "no-auth → with-auth" upgrade path for users
# whose existing LangGraph thread metadata has no user_id set.
store = getattr(app.state, "store", None)
if store is not None:
try:
@@ -127,7 +134,7 @@ async def _iter_store_items(store, namespace, *, page_size: int = 500):
async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
"""Migrate LangGraph store threads with no owner_id to the given admin.
"""Migrate LangGraph store threads with no user_id to the given admin.
Uses cursor pagination so all orphans are migrated regardless of
count. Returns the number of rows migrated.
@@ -135,8 +142,8 @@ async def _migrate_orphaned_threads(store, admin_user_id: str) -> int:
migrated = 0
async for item in _iter_store_items(store, ("threads",)):
metadata = item.value.get("metadata", {})
if not metadata.get("owner_id"):
metadata["owner_id"] = admin_user_id
if not metadata.get("user_id"):
metadata["user_id"] = admin_user_id
item.value["metadata"] = metadata
await store.aput(("threads",), item.key, item.value)
migrated += 1
+5 -5
View File
@@ -233,18 +233,18 @@ def require_permission(
# (``threads_meta`` table). We verify ownership via
# ``ThreadMetaStore.check_access``: it returns True for
# missing rows (untracked legacy thread) and for rows whose
# ``owner_id`` is NULL (shared / pre-auth data), so this is
# ``user_id`` is NULL (shared / pre-auth data), so this is
# strict-deny rather than strict-allow — only an *existing*
# row with a *different* owner_id triggers 404.
# row with a *different* user_id triggers 404.
if owner_check:
thread_id = kwargs.get("thread_id")
if thread_id is None:
raise ValueError("require_permission with owner_check=True requires 'thread_id' parameter")
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
thread_meta_repo = get_thread_meta_repo(request)
allowed = await thread_meta_repo.check_access(
thread_store = get_thread_store(request)
allowed = await thread_store.check_access(
thread_id,
str(auth.user.id),
require_existing=require_existing,
+14 -9
View File
@@ -1,8 +1,7 @@
"""Centralized accessors for singleton objects stored on ``app.state``.
**Getters** (used by routers): raise 503 when a required dependency is
missing, except ``get_store`` and ``get_thread_meta_repo`` which return
``None``.
missing, except ``get_store`` which returns ``None``.
Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
"""
@@ -20,6 +19,7 @@ from deerflow.runtime import RunContext, RunManager
if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
from app.gateway.auth.repositories.sqlite import SQLiteUserRepository
from deerflow.persistence.thread_meta.base import ThreadMetaStore
@asynccontextmanager
@@ -31,10 +31,10 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
async with langgraph_runtime(app):
yield
"""
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
from deerflow.config import get_app_config
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine_from_config
from deerflow.runtime import make_store, make_stream_bridge
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
from deerflow.runtime.events.store import make_run_event_store
async with AsyncExitStack() as stack:
@@ -53,18 +53,18 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
if sf is not None:
from deerflow.persistence.feedback import FeedbackRepository
from deerflow.persistence.run import RunRepository
from deerflow.persistence.thread_meta import ThreadMetaRepository
app.state.run_store = RunRepository(sf)
app.state.feedback_repo = FeedbackRepository(sf)
app.state.thread_meta_repo = ThreadMetaRepository(sf)
else:
from deerflow.persistence.thread_meta import MemoryThreadMetaStore
from deerflow.runtime.runs.store.memory import MemoryRunStore
app.state.run_store = MemoryRunStore()
app.state.feedback_repo = None
app.state.thread_meta_repo = MemoryThreadMetaStore(app.state.store)
from deerflow.persistence.thread_meta import make_thread_store
app.state.thread_store = make_thread_store(sf, app.state.store)
# Run event store (has its own factory with config-driven backend selection)
run_events_config = getattr(config, "run_events", None)
@@ -110,7 +110,12 @@ def get_store(request: Request):
return getattr(request.app.state, "store", None)
get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store")
def get_thread_store(request: Request) -> ThreadMetaStore:
"""Return the thread metadata store (SQL or memory-backed)."""
val = getattr(request.app.state, "thread_store", None)
if val is None:
raise HTTPException(status_code=503, detail="Thread metadata store not available")
return val
def get_run_context(request: Request) -> RunContext:
@@ -128,7 +133,7 @@ def get_run_context(request: Request) -> RunContext:
store=get_store(request),
event_store=get_run_event_store(request),
run_events_config=getattr(get_app_config(), "run_events", None),
thread_meta_repo=get_thread_meta_repo(request),
thread_store=get_thread_store(request),
)
+5 -5
View File
@@ -93,14 +93,14 @@ async def authenticate(request):
@auth.on
async def add_owner_filter(ctx: Auth.types.AuthContext, value: dict):
"""Inject owner_id metadata on writes; filter by owner_id on reads.
"""Inject user_id metadata on writes; filter by user_id on reads.
Gateway stores thread ownership as ``metadata.owner_id``.
Gateway stores thread ownership as ``metadata.user_id``.
This handler ensures LangGraph Server enforces the same isolation.
"""
# On create/update: stamp owner_id into metadata
# On create/update: stamp user_id into metadata
metadata = value.setdefault("metadata", {})
metadata["owner_id"] = ctx.user.identity
metadata["user_id"] = ctx.user.identity
# Return filter dict — LangGraph applies it to search/read/delete
return {"owner_id": ctx.user.identity}
return {"user_id": ctx.user.identity}
+2 -2
View File
@@ -34,7 +34,7 @@ class FeedbackResponse(BaseModel):
feedback_id: str
run_id: str
thread_id: str
owner_id: str | None = None
user_id: str | None = None
message_id: str | None = None
rating: int
comment: str | None = None
@@ -80,7 +80,7 @@ async def create_feedback(
run_id=run_id,
thread_id=thread_id,
rating=body.rating,
owner_id=user_id,
user_id=user_id,
message_id=body.message_id,
comment=body.comment,
)
+21 -21
View File
@@ -34,7 +34,7 @@ router = APIRouter(prefix="/api/threads", tags=["threads"])
# them. Pydantic ``@field_validator("metadata")`` strips them on every
# inbound model below so a malicious client cannot reflect a forged
# owner identity through the API surface. Defense-in-depth — the
# row-level invariant is still ``threads_meta.owner_id`` populated from
# row-level invariant is still ``threads_meta.user_id`` populated from
# the auth contextvar; this list closes the metadata-blob echo gap.
_SERVER_RESERVED_METADATA_KEYS: frozenset[str] = frozenset({"owner_id", "user_id"})
@@ -194,7 +194,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
and removes the thread_meta row from the configured ThreadMetaStore
(sqlite or memory).
"""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
# Clean local filesystem
response = _delete_thread_data(thread_id)
@@ -211,8 +211,8 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe
# Remove thread_meta row (best-effort) — required for sqlite backend
# so the deleted thread no longer appears in /threads/search.
try:
thread_meta_repo = get_thread_meta_repo(request)
await thread_meta_repo.delete(thread_id)
thread_store = get_thread_store(request)
await thread_store.delete(thread_id)
except Exception:
logger.debug("Could not delete thread_meta for %s (not critical)", sanitize_log_param(thread_id))
@@ -227,17 +227,17 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
and an empty checkpoint (so state endpoints work immediately).
Idempotent: returns the existing record when ``thread_id`` already exists.
"""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
checkpointer = get_checkpointer(request)
thread_meta_repo = get_thread_meta_repo(request)
thread_store = get_thread_store(request)
thread_id = body.thread_id or str(uuid.uuid4())
now = time.time()
# ``body.metadata`` is already stripped of server-reserved keys by
# ``ThreadCreateRequest._strip_reserved`` — see the model definition.
# Idempotency: return existing record when already present
existing_record = await thread_meta_repo.get(thread_id)
existing_record = await thread_store.get(thread_id)
if existing_record is not None:
return ThreadResponse(
thread_id=thread_id,
@@ -249,7 +249,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe
# Write thread_meta so the thread appears in /threads/search immediately
try:
await thread_meta_repo.create(
await thread_store.create(
thread_id,
assistant_id=getattr(body, "assistant_id", None),
metadata=body.metadata,
@@ -293,9 +293,9 @@ async def search_threads(body: ThreadSearchRequest, request: Request) -> list[Th
Delegates to the configured ThreadMetaStore implementation
(SQL-backed for sqlite/postgres, Store-backed for memory mode).
"""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
repo = get_thread_meta_repo(request)
repo = get_thread_store(request)
rows = await repo.search(
metadata=body.metadata or None,
status=body.status,
@@ -320,22 +320,22 @@ async def search_threads(body: ThreadSearchRequest, request: Request) -> list[Th
@require_permission("threads", "write", owner_check=True, require_existing=True)
async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Request) -> ThreadResponse:
"""Merge metadata into a thread record."""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
thread_meta_repo = get_thread_meta_repo(request)
record = await thread_meta_repo.get(thread_id)
thread_store = get_thread_store(request)
record = await thread_store.get(thread_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Thread {thread_id} not found")
# ``body.metadata`` already stripped by ``ThreadPatchRequest._strip_reserved``.
try:
await thread_meta_repo.update_metadata(thread_id, body.metadata)
await thread_store.update_metadata(thread_id, body.metadata)
except Exception:
logger.exception("Failed to patch thread %s", sanitize_log_param(thread_id))
raise HTTPException(status_code=500, detail="Failed to update thread")
# Re-read to get the merged metadata + refreshed updated_at
record = await thread_meta_repo.get(thread_id) or record
record = await thread_store.get(thread_id) or record
return ThreadResponse(
thread_id=thread_id,
status=record.get("status", "idle"),
@@ -354,12 +354,12 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
execution status from the checkpointer. Falls back to the checkpointer
alone for threads that pre-date ThreadMetaStore adoption (backward compat).
"""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
thread_meta_repo = get_thread_meta_repo(request)
thread_store = get_thread_store(request)
checkpointer = get_checkpointer(request)
record: dict | None = await thread_meta_repo.get(thread_id)
record: dict | None = await thread_store.get(thread_id)
# Derive accurate status from the checkpointer
config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}}
@@ -462,10 +462,10 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
ThreadMetaStore abstraction so that ``/threads/search`` reflects the
change immediately in both sqlite and memory backends.
"""
from app.gateway.deps import get_thread_meta_repo
from app.gateway.deps import get_thread_store
checkpointer = get_checkpointer(request)
thread_meta_repo = get_thread_meta_repo(request)
thread_store = get_thread_store(request)
# checkpoint_ns must be present in the config for aput — default to ""
# (the root graph namespace). checkpoint_id is optional; omitting it
@@ -529,7 +529,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re
new_title = body.values["title"]
if new_title: # Skip empty strings and None
try:
await thread_meta_repo.update_display_name(thread_id, new_title)
await thread_store.update_display_name(thread_id, new_title)
except Exception:
logger.debug("Failed to sync title to thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
+4 -4
View File
@@ -229,15 +229,15 @@ async def start_run(
# even for threads that were never explicitly created via POST /threads
# (e.g. stateless runs).
try:
existing = await run_ctx.thread_meta_repo.get(thread_id)
existing = await run_ctx.thread_store.get(thread_id)
if existing is None:
await run_ctx.thread_meta_repo.create(
await run_ctx.thread_store.create(
thread_id,
assistant_id=body.assistant_id,
metadata=body.metadata,
)
else:
await run_ctx.thread_meta_repo.update_status(thread_id, "running")
await run_ctx.thread_store.update_status(thread_id, "running")
except Exception:
logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id))
@@ -285,7 +285,7 @@ async def start_run(
record.task = task
# Title sync is handled by worker.py's finally block which reads the
# title from the checkpoint and calls thread_meta_repo.update_display_name
# title from the checkpoint and calls thread_store.update_display_name
# after the run completes.
return record