Compare commits

..

36 Commits

Author SHA1 Message Date
rayhpeng d1bcae69b9 docs: add run-level API endpoints to CLAUDE.md routers table 2026-04-12 16:17:09 +08:00
rayhpeng 597fb0e5f9 feat(api): retrofit cursor pagination onto GET /threads/{tid}/runs/{rid}/messages
Replace bare list[dict] response with {data: [...], has_more: bool} envelope,
forwarding limit/before_seq/after_seq query params to the event store.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 16:14:14 +08:00
rayhpeng c38b3a9280 feat(api): add GET /api/runs/{run_id}/feedback
Delegates to FeedbackRepository.list_by_run via the existing _resolve_run
helper; includes tests for success, 404, empty list, and 503 (no DB).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 16:11:21 +08:00
rayhpeng cbbe39d28c feat(api): add GET /api/runs/{run_id}/messages with cursor pagination
New endpoint resolves thread_id from the run record and delegates to
RunEventStore.list_messages_by_run for cursor-based pagination.
Ownership is enforced implicitly via RunStore.get() user filtering.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 16:09:34 +08:00
rayhpeng 82374eb18c feat(events): add pagination to list_messages_by_run on all store backends
Replicates the existing before_seq/after_seq/limit cursor-pagination pattern
from list_messages onto list_messages_by_run across the abstract interface,
MemoryRunEventStore, JsonlRunEventStore, and DbRunEventStore.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 15:58:33 +08:00
rayhpeng a36186cf54 docs: update CLAUDE.md and config docs for per-user isolation 2026-04-12 15:32:02 +08:00
rayhpeng 9f28115889 feat(migration): add idempotent script for per-user data migration
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 15:27:32 +08:00
rayhpeng 7ce9333200 feat(isolation): wire user_id through all Paths and memory callsites
Pass user_id=get_effective_user_id() at every callsite that invokes
Paths methods or memory functions, enabling per-user filesystem isolation
throughout the harness and app layers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 15:16:23 +08:00
rayhpeng 9af2f3e73c feat(memory): capture user_id at enqueue time for async-safe thread isolation
Add user_id field to ConversationContext and MemoryUpdateQueue.add() so the
user identity is stored explicitly at request time, before threading.Timer
fires on a different thread where ContextVar values do not propagate.
MemoryMiddleware.after_agent() now calls get_effective_user_id() at enqueue
time and passes the value through to updater.update_memory().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 14:59:51 +08:00
rayhpeng dfa9fc47b3 feat(memory): thread user_id through memory updater layer
Add `user_id` keyword-only parameter to all public updater functions
(_save_memory_to_file, get_memory_data, reload_memory_data, import_memory_data,
clear_memory_data, create/delete/update_memory_fact) and regular keyword param
to MemoryUpdater.update_memory + update_memory_from_conversation, propagating
it to every storage load/save/reload call.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 13:37:08 +08:00
rayhpeng 3877aabcfd feat(memory): add user_id to MemoryStorage interface for per-user isolation
Thread user_id through MemoryStorage.load/reload/save abstract methods and
FileMemoryStorage, re-keying the in-memory cache from bare agent_name to a
(user_id, agent_name) tuple to prevent cross-user cache collisions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 12:37:25 +08:00
rayhpeng e8f087cb37 feat(paths): add user-aware path methods with optional user_id parameter
Add _validate_user_id(), user_dir(), user_memory_file(), user_agent_memory_file()
and optional keyword-only user_id parameter to all thread-related path methods.
When user_id is provided, paths resolve under users/{user_id}/threads/{thread_id}/;
when omitted, legacy layout is preserved for backward compatibility.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:50:11 +08:00
rayhpeng 3540e157f1 feat(user-context): add DEFAULT_USER_ID and get_effective_user_id helper
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-12 10:37:55 +08:00
rayhpeng 8f7eb28c0d fix(rebase): restore FeedbackButtons component and enrichment lost during rebase
The FeedbackButtons component (defined inline in message-list-item.tsx)
was introduced in commit 95df8d13 but lost during rebase. The previous
rebase cleanup commit incorrectly removed the feedback/runId props and
enrichment hook as "orphaned code" instead of restoring the missing
component. This commit restores:

- FeedbackButtons component with thumbs up/down toggle and optimistic state
- FeedbackData/upsertFeedback/deleteFeedback imports
- feedback and runId props on MessageListItem
- useThreadMessageEnrichment hook and entry lookup in message-list.tsx

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 09:43:36 +08:00
rayhpeng 500cdfc8e4 fix(rebase): remove duplicate definitions and update stale module paths
Rebase left duplicate function blocks in worker.py (triple human_message
write causing 3x user messages in /history), deps.py, and prompt.py.
Also update checkpointer imports from the old deerflow.agents.checkpointer
path to deerflow.runtime.checkpointer, and clean up orphaned feedback
props in the frontend message components.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 09:30:39 +08:00
rayhpeng 3580897c56 refactor(feedback): inline feedback on history and drop positional mapping
The old ``useThreadFeedback`` hook loaded ``GET /api/threads/{id}/messages?limit=200``
and built two parallel lookup tables: ``runIdByAiIndex`` (an ordinal array of
run_ids for every ``ai_message``-typed event) and ``feedbackByRunId``. The render
loop in ``message-list.tsx`` walked the AI messages in order, incrementing
``aiMessageIndex`` on each non-human message, and used that ordinal to look up
the run_id and feedback.

This shape had three latent bugs we could observe on real threads:

1. **Fetch was capped at 200 messages.** Long or tool-heavy threads silently
   dropped earlier entries from the map, so feedback buttons could be missing
   on messages they should own.
2. **Ordinal mismatch.** The render loop counted every non-human message
   (including each intermediate ``ai_tool_call``), but ``runIdByAiIndex`` only
   pushed entries for ``event_type == "ai_message"``. A run with 3 tool_calls
   + 1 final AI message would push 1 entry while the render consumed 4
   positions, so buttons mapped to the wrong positions across multi-run
   threads.
3. **Two parallel data paths.** The ``/history`` render path and the
   ``/messages`` feedback-lookup path could drift in-between an
   ``invalidateQueries`` call and the next refetch, producing transient
   mismaps.

The previous commit moved the authoritative message source for history to
the event store and added ``run_id`` + ``feedback`` inline on each message
dict returned by ``_get_event_store_messages``. This commit aligns the
frontend with that contract:

- **Delete** ``useThreadFeedback``, ``ThreadFeedbackData``,
  ``runIdByAiIndex``, ``feedbackByRunId``, and ``fetchAllThreadMessages``.
- **Introduce** ``useThreadMessageEnrichment`` that fetches
  ``POST /history?limit=1`` once, indexes the returned messages by
  ``message.id`` into a ``Map<id, {run_id, feedback?}>``, and invalidates
  on stream completion (``onFinish`` in ``useThreadStream``). Keying by
  ``message.id`` is stable across runs, tool_call chains, and summarize.
- **Simplify** ``message-list.tsx`` to drop the ``aiMessageIndex``
  counter and read ``enrichment?.get(msg.id)`` at each render step.
- **Rewire** ``message-list-item.tsx`` so the feedback button renders
  when ``feedback !== undefined`` rather than when the message happens
  to be non-human. ``feedback`` is ``undefined`` for non-eligible
  messages (humans, non-final AI, tools), ``null`` for the final
  ai_message of an unrated run, and a ``FeedbackData`` object once
  rated — cleanly distinguishing "not eligible" from "eligible but
  unrated".

``/api/threads/{id}/messages`` is kept as a debug/export surface; no
frontend code calls it anymore but the backend router is untouched.

Validation:
- ``pnpm check`` clean (0 errors, 1 pre-existing unrelated warning)
- Live test on thread ``3d5dea4a`` after gateway restart confirmed the
  original user query is restored to position 0 and the feedback
  button behaves correctly on the final AI message.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:40:02 +08:00
rayhpeng 229c8095be fix(threads): load history messages from event store, immune to summarize
``get_thread_history`` and ``get_thread_state`` in Gateway mode read
messages from ``checkpoint.channel_values["messages"]``. After
SummarizationMiddleware runs mid-run, that list is rewritten in-place:
pre-summarize messages are dropped and a synthetic summary-as-human
message takes position 0. The frontend then renders a chat history that
starts with ``"Here is a summary of the conversation to date:..."``
instead of the user's original query, and all earlier turns are gone.

The event store (``RunEventStore``) is append-only and never rewritten,
so it retains the full transcript. This commit adds a helper
``_get_event_store_messages`` that loads the event store's message
stream and overrides ``values["messages"]`` in both endpoints; the
checkpoint fallback kicks in only when the event store is unavailable.

Behavior contract of the helper:

- **Full pagination.** ``list_messages`` returns the newest ``limit``
  records when no cursor is given, so a fixed limit silently drops
  older messages on long threads. The helper sizes the read from
  ``count_messages()`` and pages forward with ``after_seq`` cursors.
- **Copy-on-read.** Each content dict is copied before ``id`` is
  patched so the live store object (``MemoryRunEventStore`` returns
  references) is never mutated.
- **Stable ids.** Messages with ``id=None`` (human + tool_result,
  which don't receive an id until checkpoint persistence) get a
  deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so
  React keys stay stable across requests. AI messages keep their
  LLM-assigned ``lc_run--*`` ids.
- **Legacy ``Command`` repr sanitization.** Rows captured before the
  ``journal.py`` ``on_tool_end`` fix (previous commit) stored
  ``str(Command(update={'messages': [ToolMessage(content='X', ...)]}))``
  as the tool_result content. ``_sanitize_legacy_command_repr``
  regex-extracts the inner text so old threads render cleanly.
- **Inline feedback.** When loading the stream, the helper also pulls
  ``feedback_repo.list_by_thread_grouped`` and attaches ``run_id`` to
  every message plus ``feedback`` to the final ``ai_message`` of each
  run. This removes the frontend's need to fetch a second endpoint
  and positional-index-map its way back to the right run. When the
  feedback subsystem is unavailable, the ``feedback`` field is left
  absent entirely so the frontend hides the button rather than
  rendering it over a broken write path.
- **User context.** ``DbRunEventStore`` is user-scoped by default via
  ``resolve_user_id(AUTO)``. The helper relies on the ``@require_permission``
  decorator having populated the user contextvar on both callers; the
  docstring documents this dependency explicitly so nobody wires it
  into a CLI or migration script without passing ``user_id=None``.

Real data verification against thread
``6d30913e-dcd4-41c8-8941-f66c716cf359``: checkpoint showed 12 messages
(summarize-corrupted), event store had 16. The original human message
``"最新伊美局势"`` was preserved as seq=1 in the event store and
correctly restored to position 0 in the helper output. Helper output
for AI messages was byte-identical to checkpoint for every overlapping
message; only tool_result ids differed (patched to uuid5) and the
legacy Command repr at seq=48 was sanitized.

Tests:
- ``test_thread_state_event_store.py`` — 18 tests covering
  ``_sanitize_legacy_command_repr`` (passthrough, single/double-quote
  extraction, unparseable fallback), helper happy path (all message
  types, stable uuid5, store non-mutation), multi-page pagination,
  summarize regression (recovers pre-summarize messages), feedback
  attachment (per-run, multi-run threads, repo failure graceful),
  and dependency failure fallback to ``None``.

Docs:
- ``docs/superpowers/plans/2026-04-10-event-store-history.md`` — the
  implementation plan this commit realizes, with Task 1 revised after
  the evaluation findings (pagination, copy-on-read, Command wrap
  already landed in journal.py, frontend feedback pagination in the
  follow-up commit, Standard-mode follow-up noted).
- ``docs/superpowers/specs/2026-04-11-runjournal-history-evaluation.md``
  — the Claude + second-opinion evaluation document that drove the
  plan revisions (pagination bug, dict-mutation bug, feedback hidden
  bug, Command bug).
- ``docs/superpowers/specs/2026-04-11-summarize-marker-design.md`` —
  design for a follow-up PR that visually marks summarize events in
  history, based on a verified ``adispatch_custom_event`` experiment
  (``trace=False`` middleware nodes can still forward the Pregel task
  config via explicit signature injection).

Scope: Gateway mode only (``make dev-pro``). Standard mode
(``make dev``) hits LangGraph Server directly and bypasses these
endpoints; the summarize symptom is still present there and is
tracked as a separate follow-up in the plan.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng ce24424449 fix(journal): unwrap Command tool results in on_tool_end
Tools that update graph state (e.g. ``present_files``) return
``Command(update={'messages': [ToolMessage(...)], 'artifacts': [...]})``.
LangGraph later unwraps the inner ``ToolMessage`` into checkpoint state,
but ``RunJournal.on_tool_end`` was receiving the ``Command`` object
directly via the LangChain callback chain and storing
``str(Command(update={...}))`` as the tool_result content.

This produced a visible divergence between the event-store and the
checkpoint for any thread that used a Command-returning tool, blocking
the event-store-backed history fix in the follow-up commit. Concrete
example from thread ``6d30913e-dcd4-41c8-8941-f66c716cf359`` (seq=48):
checkpoint had ``'Successfully presented files'`` while event_store
stored the full Command repr.

The fix detects ``Command`` in ``on_tool_end``, extracts the first
``ToolMessage`` from ``update['messages']``, and lets the existing
ToolMessage branch handle the ``model_dump()`` path. Legacy rows still
containing the Command repr are separately cleaned up by the history
helper in the follow-up commit.

Tests:
- ``test_tool_end_unwraps_command_with_inner_tool_message`` — unit test
  of the unwrap branch with a constructed Command
- ``test_tool_invoke_end_to_end_unwraps_command`` — end-to-end via
  ``CallbackManager`` + ``tool.invoke`` to exercise the real LangChain
  dispatch path that production uses, matching the repro shape from
  ``present_files``
- Counter-proof: temporarily reverted the patch, both tests failed with
  the exact ``Command(update={...})`` repr that was stored in the
  production SQLite row at seq=48, confirming LangChain does pass the
  ``Command`` through callbacks (the unwrap is load-bearing)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng 4810898cfa chore(persistence): drop redundant busy_timeout PRAGMA
Python's sqlite3 driver defaults to a 5-second busy timeout via the
``timeout`` kwarg of ``sqlite3.connect``, and aiosqlite + SQLAlchemy's
aiosqlite dialect inherit that default. Setting ``PRAGMA busy_timeout=5000``
explicitly was a no-op — verified by reading back the PRAGMA on a fresh
connection (it already reports 5000ms without our PRAGMA).

Concurrent stress test (50 checkpoint writes + 20 event batches + 50
thread_meta updates on the same deerflow.db) still completes with zero
errors and 200/200 rows after removing the explicit PRAGMA.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng 10cc651578 fix(persistence): stream hang when run_events.backend=db
DbRunEventStore._user_id_from_context() returned user.id without
coercing it to str. User.id is a Pydantic UUID, and aiosqlite cannot
bind a raw UUID object to a VARCHAR column, so the INSERT for the
initial human_message event silently rolled back and raised out of
the worker task. Because that put() sat outside the worker's try
block, the finally-clause that publishes end-of-stream never ran
and the SSE stream hung forever.

jsonl mode was unaffected because json.dumps(default=str) coerces
UUID objects transparently.

Fixes:
- db.py: coerce user.id to str at the context-read boundary (matches
  what resolve_user_id already does for the other repositories)
- worker.py: move RunJournal init + human_message put inside the try
  block so any failure flows through the finally/publish_end path
  instead of hanging the subscriber

Defense-in-depth:
- engine.py: add PRAGMA busy_timeout=5000 so checkpointer and event
  store wait for each other on the shared deerflow.db file instead
  of failing immediately under write-lock contention
- journal.py: skip fire-and-forget _flush_sync when a previous flush
  task is still in flight, to avoid piling up concurrent put_batch
  writes on the same SQLAlchemy engine during streaming; flush() now
  waits for pending tasks before draining the buffer
- database_config.py: doc-only update clarifying WAL + busy_timeout
  keep the unified deerflow.db safe for both workloads

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng 20f64bbf4f style(feedback): always show toolbar buttons without hover
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng e1cb78fecf style(feedback): group copy and feedback buttons together on the left
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng 6476eabdf5 fix(feedback): use real threadId and refresh after stream
- Pass threadId prop to MessageListItem instead of reading "new" from URL params
- Invalidate thread-feedback query on stream finish so buttons appear immediately
- Show feedback buttons always visible, copy button on hover only

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:53 +08:00
rayhpeng 95d5c156a1 fix(feedback): correct run_id mapping for feedback echo
The feedbackMap was keyed by run_id but looked up by LangGraph message ID.
Fixed by tracking AI message ordinal index to correlate event store
run_ids with LangGraph SDK messages.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:51 +08:00
rayhpeng 18393b55d1 feat(feedback): wire feedback data into message rendering for history echo
Adds useThreadFeedback hook that fetches run-level feedback from the
messages API and builds a runId->FeedbackData map. MessageList now calls
this hook and passes feedback and runId to each MessageListItem so
previously-submitted thumbs are pre-filled when revisiting a thread.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:24 +08:00
rayhpeng 77491f2801 feat(feedback): add frontend feedback API client
Adds upsertFeedback and deleteFeedback API functions backed by
fetchWithAuth, targeting the /api/threads/{id}/runs/{id}/feedback
endpoint.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:20 +08:00
rayhpeng 8d3cb6da72 feat(feedback): enrich messages endpoint with per-run feedback data
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:20 +08:00
rayhpeng d1cf3f09b2 feat(feedback): add PUT upsert and DELETE-by-run endpoints
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:19 +08:00
rayhpeng 0d5b3a0ece feat(feedback): add delete_by_run() and list_by_thread_grouped()
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:19 +08:00
rayhpeng 4184d5ed2c feat(feedback): add upsert() method with UNIQUE enforcement
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-11 23:38:19 +08:00
rayhpeng 60a5ad7279 feat(feedback): add UNIQUE(thread_id, run_id, user_id) constraint
Add UNIQUE constraint to FeedbackRow to enforce one feedback per user per run,
enabling upsert behavior in Task 2. Update tests to use distinct user_ids for
multiple feedback records per run, and pass user_id=None to list_by_run for
admin-style queries that bypass user isolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:19 +08:00
rayhpeng b2ec1f99b9 feat(persistence): unify ThreadMetaStore interface with user isolation and factory
Add user_id parameter to all ThreadMetaStore abstract methods. Implement
owner isolation in MemoryThreadMetaStore with _get_owned_record helper.
Add check_access to base class and memory implementation. Add
make_thread_store factory to simplify deps.py initialization. Add
memory-backend isolation tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:19 +08:00
rayhpeng 8da1903168 refactor(persistence): rename owner_id to user_id and thread_meta_repo to thread_store
Rename owner_id to user_id across all persistence models, repositories,
stores, routers, and tests for clearer semantics. Rename thread_meta_repo
to thread_store for consistency with run_store/run_event_store naming.
Add ThreadMetaStore return type annotation to get_thread_store().

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:38:17 +08:00
rayhpeng 03952eca53 refactor(persistence): unify SQLite to single deerflow.db and move checkpointer to runtime
Merge checkpoints.db and app.db into a single deerflow.db file (WAL mode
handles concurrent access safely). Move checkpointer module from
agents/checkpointer to runtime/checkpointer to better reflect its role
as a runtime infrastructure concern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 23:37:25 +08:00
greatmengqi 9197000690 feat(auth): release-validation pass for 2.0-rc — 12 blockers + simplify follow-ups (#2008)
* feat(auth): introduce backend auth module

Port RFC-001 authentication core from PR #1728:
- JWT token handling (create_access_token, decode_token, TokenPayload)
- Password hashing (bcrypt) with verify_password
- SQLite UserRepository with base interface
- Provider Factory pattern (LocalAuthProvider)
- CLI reset_admin tool
- Auth-specific errors (AuthErrorCode, TokenError, AuthErrorResponse)

Deps:
- bcrypt>=4.0.0
- pyjwt>=2.9.0
- email-validator>=2.0.0
- backend/uv.toml pins public PyPI index

Tests: 12 pure unit tests (test_auth_config.py, test_auth_errors.py).

Scope note: authz.py, test_auth.py, and test_auth_type_system.py are
deferred to commit 2 because they depend on middleware and deps wiring
that is not yet in place. Commit 1 stays "pure new files only" as the
spec mandates.

* feat(auth): wire auth end-to-end (middleware + frontend replacement)

Backend:
- Port auth_middleware, csrf_middleware, langgraph_auth, routers/auth
- Port authz decorator (owner_filter_key defaults to 'owner_id')
- Merge app.py: register AuthMiddleware + CSRFMiddleware + CORS, add
  _ensure_admin_user lifespan hook, _migrate_orphaned_threads helper,
  register auth router
- Merge deps.py: add get_local_provider, get_current_user_from_request,
  get_optional_user_from_request; keep get_current_user as thin str|None
  adapter for feedback router
- langgraph.json: add auth path pointing to langgraph_auth.py:auth
- Rename metadata['user_id'] -> metadata['owner_id'] in langgraph_auth
  (both metadata write and LangGraph filter dict) + test fixtures

Frontend:
- Delete better-auth library and api catch-all route
- Remove better-auth npm dependency and env vars (BETTER_AUTH_SECRET,
  BETTER_AUTH_GITHUB_*) from env.js
- Port frontend/src/core/auth/* (AuthProvider, gateway-config,
  proxy-policy, server-side getServerSideUser, types)
- Port frontend/src/core/api/fetcher.ts
- Port (auth)/layout, (auth)/login, (auth)/setup pages
- Rewrite workspace/layout.tsx as server component that calls
  getServerSideUser and wraps in AuthProvider
- Port workspace/workspace-content.tsx for the client-side sidebar logic

Tests:
- Port 5 auth test files (test_auth, test_auth_middleware,
  test_auth_type_system, test_ensure_admin, test_langgraph_auth)
- 176 auth tests PASS

After this commit: login/logout/registration flow works, but persistence
layer does not yet filter by owner_id. Commit 4 closes that gap.

* feat(auth): account settings page + i18n

- Port account-settings-page.tsx (change password, change email, logout)
- Wire into settings-dialog.tsx as new "account" section with UserIcon,
  rendered first in the section list
- Add i18n keys:
  - en-US/zh-CN: settings.sections.account ("Account" / "账号")
  - en-US/zh-CN: button.logout ("Log out" / "退出登录")
  - types.ts: matching type declarations

* feat(auth): enforce owner_id across 2.0-rc persistence layer

Add request-scoped contextvar-based owner filtering to threads_meta,
runs, run_events, and feedback repositories. Router code is unchanged
— isolation is enforced at the storage layer so that any caller that
forgets to pass owner_id still gets filtered results, and new routes
cannot accidentally leak data.

Core infrastructure
-------------------
- deerflow/runtime/user_context.py (new):
  - ContextVar[CurrentUser | None] with default None
  - runtime_checkable CurrentUser Protocol (structural subtype with .id)
  - set/reset/get/require helpers
  - AUTO sentinel + resolve_owner_id(value, method_name) for sentinel
    three-state resolution: AUTO reads contextvar, explicit str
    overrides, explicit None bypasses the filter (for migration/CLI)

Repository changes
------------------
- ThreadMetaRepository: create/get/search/update_*/delete gain
  owner_id=AUTO kwarg; read paths filter by owner, writes stamp it,
  mutations check ownership before applying
- RunRepository: put/get/list_by_thread/delete gain owner_id=AUTO kwarg
- FeedbackRepository: create/get/list_by_run/list_by_thread/delete
  gain owner_id=AUTO kwarg
- DbRunEventStore: list_messages/list_events/list_messages_by_run/
  count_messages/delete_by_thread/delete_by_run gain owner_id=AUTO
  kwarg. Write paths (put/put_batch) read contextvar softly: when a
  request-scoped user is available, owner_id is stamped; background
  worker writes without a user context pass None which is valid
  (orphan row to be bound by migration)

Schema
------
- persistence/models/run_event.py: RunEventRow.owner_id = Mapped[
  str | None] = mapped_column(String(64), nullable=True, index=True)
- No alembic migration needed: 2.0 ships fresh, Base.metadata.create_all
  picks up the new column automatically

Middleware
----------
- auth_middleware.py: after cookie check, call get_optional_user_from_
  request to load the real User, stamp it into request.state.user AND
  the contextvar via set_current_user, reset in a try/finally. Public
  paths and unauthenticated requests continue without contextvar, and
  @require_auth handles the strict 401 path

Test infrastructure
-------------------
- tests/conftest.py: @pytest.fixture(autouse=True) _auto_user_context
  sets a default SimpleNamespace(id="test-user-autouse") on every test
  unless marked @pytest.mark.no_auto_user. Keeps existing 20+
  persistence tests passing without modification
- pyproject.toml [tool.pytest.ini_options]: register no_auto_user
  marker so pytest does not emit warnings for opt-out tests
- tests/test_user_context.py: 6 tests covering three-state semantics,
  Protocol duck typing, and require/optional APIs
- tests/test_thread_meta_repo.py: one test updated to pass owner_id=
  None explicitly where it was previously relying on the old default

Test results
------------
- test_user_context.py: 6 passed
- test_auth*.py + test_langgraph_auth.py + test_ensure_admin.py: 127
- test_run_event_store / test_run_repository / test_thread_meta_repo
  / test_feedback: 92 passed
- Full backend suite: 1905 passed, 2 failed (both @requires_llm flaky
  integration tests unrelated to auth), 1 skipped

* feat(auth): extend orphan migration to 2.0-rc persistence tables

_ensure_admin_user now runs a three-step pipeline on every boot:

  Step 1 (fatal):     admin user exists / is created / password is reset
  Step 2 (non-fatal): LangGraph store orphan threads → admin
  Step 3 (non-fatal): SQL persistence tables → admin
    - threads_meta
    - runs
    - run_events
    - feedback

Each step is idempotent. The fatal/non-fatal split mirrors PR #1728's
original philosophy: admin creation failure blocks startup (the system
is unusable without an admin), whereas migration failures log a warning
and let the service proceed (a partial migration is recoverable; a
missing admin is not).

Key helpers
-----------
- _iter_store_items(store, namespace, *, page_size=500):
  async generator that cursor-paginates across LangGraph store pages.
  Fixes PR #1728's hardcoded limit=1000 bug that would silently lose
  orphans beyond the first page.

- _migrate_orphaned_threads(store, admin_user_id):
  Rewritten to use _iter_store_items. Returns the migrated count so the
  caller can log it; raises only on unhandled exceptions.

- _migrate_orphan_sql_tables(admin_user_id):
  Imports the 4 ORM models lazily, grabs the shared session factory,
  runs one UPDATE per table in a single transaction, commits once.
  No-op when no persistence backend is configured (in-memory dev).

Tests: test_ensure_admin.py (8 passed)

* test(auth): port AUTH test plan docs + lint/format pass

- Port backend/docs/AUTH_TEST_PLAN.md and AUTH_UPGRADE.md from PR #1728
- Rename metadata.user_id → metadata.owner_id in AUTH_TEST_PLAN.md
  (4 occurrences from the original PR doc)
- ruff auto-fix UP037 in sentinel type annotations: drop quotes around
  "str | None | _AutoSentinel" now that from __future__ import
  annotations makes them implicit string forms
- ruff format: 2 files (app/gateway/app.py, runtime/user_context.py)

Note on test coverage additions:
- conftest.py autouse fixture was already added in commit 4 (had to
  be co-located with the repository changes to keep pre-existing
  persistence tests passing)
- cross-user isolation E2E tests (test_owner_isolation.py) deferred
  — enforcement is already proven by the 98-test repository suite
  via the autouse fixture + explicit _AUTO sentinel exercises
- New test cases (TC-API-17..20, TC-ATK-13, TC-MIG-01..07) listed
  in AUTH_TEST_PLAN.md are deferred to a follow-up PR — they are
  manual-QA test cases rather than pytest code, and the spec-level
  coverage is already met by test_user_context.py + the 98-test
  repository suite.

Final test results:
- Auth suite (test_auth*, test_langgraph_auth, test_ensure_admin,
  test_user_context): 186 passed
- Persistence suite (test_run_event_store, test_run_repository,
  test_thread_meta_repo, test_feedback): 98 passed
- Lint: ruff check + ruff format both clean

* test(auth): add cross-user isolation test suite

10 tests exercising the storage-layer owner filter by manually
switching the user_context contextvar between two users. Verifies
the safety invariant:

  After a repository write with owner_id=A, a subsequent read with
  owner_id=B must not return the row, and vice versa.

Covers all 4 tables that own user-scoped data:

TC-API-17  threads_meta  — read, search, update, delete cross-user
TC-API-18  runs          — get, list_by_thread, delete cross-user
TC-API-19  run_events    — list_messages, list_events, count_messages,
                           delete_by_thread (CRITICAL: raw conversation
                           content leak vector)
TC-API-20  feedback      — get, list_by_run, delete cross-user

Plus two meta-tests verifying the sentinel pattern itself:
- AUTO + unset contextvar raises RuntimeError
- explicit owner_id=None bypasses the filter (migration escape hatch)

Architecture note
-----------------
These tests bypass the HTTP layer by design. The full chain
(cookie → middleware → contextvar → repository) is covered piecewise:

- test_auth_middleware.py: middleware sets contextvar from cookies
- test_owner_isolation.py: repositories enforce isolation when
  contextvar is set to different users

Together they prove the end-to-end safety property without the
ceremony of spinning up a full TestClient + in-memory DB for every
router endpoint.

Tests pass: 231 (full auth + persistence + isolation suite)
Lint: clean

* refactor(auth): migrate user repository to SQLAlchemy ORM

Move the users table into the shared persistence engine so auth
matches the pattern of threads_meta, runs, run_events, and feedback —
one engine, one session factory, one schema init codepath.

New files
---------
- persistence/user/__init__.py, persistence/user/model.py: UserRow
  ORM class with partial unique index on (oauth_provider, oauth_id)
- Registered in persistence/models/__init__.py so
  Base.metadata.create_all() picks it up

Modified
--------
- auth/repositories/sqlite.py: rewritten as async SQLAlchemy,
  identical constructor pattern to the other four repositories
  (def __init__(self, session_factory) + self._sf = session_factory)
- auth/config.py: drop users_db_path field — storage is configured
  through config.database like every other table
- deps.py/get_local_provider: construct SQLiteUserRepository with
  the shared session factory, fail fast if engine is not initialised
- tests/test_auth.py: rewrite test_sqlite_round_trip_new_fields to
  use the shared engine (init_engine + close_engine in a tempdir)
- tests/test_auth_type_system.py: add per-test autouse fixture that
  spins up a scratch engine and resets deps._cached_* singletons

* refactor(auth): remove SQL orphan migration (unused in supported scenarios)

The _migrate_orphan_sql_tables helper existed to bind NULL owner_id
rows in threads_meta, runs, run_events, and feedback to the admin on
first boot. But in every supported upgrade path, it's a no-op:

  1. Fresh install: create_all builds fresh tables, no legacy rows
  2. No-auth → with-auth (no existing persistence DB): persistence
     tables are created fresh by create_all, no legacy rows
  3. No-auth → with-auth (has existing persistence DB from #1930):
     NOT a supported upgrade path — "有 DB 到有 DB" schema evolution
     is out of scope; users wipe DB or run manual ALTER

So the SQL orphan migration never has anything to do in the
supported matrix. Delete the function, simplify _ensure_admin_user
from a 3-step pipeline to a 2-step one (admin creation + LangGraph
store orphan migration only).

LangGraph store orphan migration stays: it serves the real
"no-auth → with-auth" upgrade path where a user's existing LangGraph
thread metadata has no owner_id field and needs to be stamped with
the newly-created admin's id.

Tests: 284 passed (auth + persistence + isolation)
Lint: clean

* security(auth): write initial admin password to 0600 file instead of logs

CodeQL py/clear-text-logging-sensitive-data flagged 3 call sites that
logged the auto-generated admin password to stdout via logger.info().
Production log aggregators (ELK/Splunk/etc) would have captured those
cleartext secrets. Replace with a shared helper that writes to
.deer-flow/admin_initial_credentials.txt with mode 0600, and log only
the path.

New file
--------
- app/gateway/auth/credential_file.py: write_initial_credentials()
  helper. Takes email, password, and a "initial"/"reset" label.
  Creates .deer-flow/ if missing, writes a header comment plus the
  email+password, chmods 0o600, returns the absolute Path.

Modified
--------
- app/gateway/app.py: both _ensure_admin_user paths (fresh creation
  + needs_setup password reset) now write to file and log the path
- app/gateway/auth/reset_admin.py: rewritten to use the shared ORM
  repo (SQLiteUserRepository with session_factory) and the
  credential_file helper. The previous implementation was broken
  after the earlier ORM refactor — it still imported _get_users_conn
  and constructed SQLiteUserRepository() without a session factory.

No tests changed — the three password-log sites are all exercised
via existing test_ensure_admin.py which checks that startup
succeeds, not that a specific string appears in logs.

CodeQL alerts 272, 283, 284: all resolved.

* security(auth): strict JWT validation in middleware (fix junk cookie bypass)

AUTH_TEST_PLAN test 7.5.8 expects junk cookies to be rejected with
401. The previous middleware behaviour was "presence-only": check
that some access_token cookie exists, then pass through. In
combination with my Task-12 decision to skip @require_auth
decorators on routes, this created a gap where a request with any
cookie-shaped string (e.g. access_token=not-a-jwt) would bypass
authentication on routes that do not touch the repository
(/api/models, /api/mcp/config, /api/memory, /api/skills, …).

Fix: middleware now calls get_current_user_from_request() strictly
and catches the resulting HTTPException to render a 401 with the
proper fine-grained error code (token_invalid, token_expired,
user_not_found, …). On success it stamps request.state.user and
the contextvar so repository-layer owner filters work downstream.

The 4 old "_with_cookie_passes" tests in test_auth_middleware.py
were written for the presence-only behaviour; they asserted that
a junk cookie would make the handler return 200. They are renamed
to "_with_junk_cookie_rejected" and their assertions flipped to
401. The negative path (no cookie → 401 not_authenticated)
is unchanged.

Verified:
  no cookie       → 401 not_authenticated
  junk cookie     → 401 token_invalid     (the fixed bug)
  expired cookie  → 401 token_expired

Tests: 284 passed (auth + persistence + isolation)
Lint: clean

* security(auth): wire @require_permission(owner_check=True) on isolation routes

Apply the require_permission decorator to all 28 routes that take a
{thread_id} path parameter. Combined with the strict middleware
(previous commit), this gives the double-layer protection that
AUTH_TEST_PLAN test 7.5.9 documents:

  Layer 1 (AuthMiddleware): cookie + JWT validation, rejects junk
                            cookies and stamps request.state.user
  Layer 2 (@require_permission with owner_check=True): per-resource
                            ownership verification via
                            ThreadMetaStore.check_access — returns
                            404 if a different user owns the thread

The decorator's owner_check branch is rewritten to use the SQL
thread_meta_repo (the 2.0-rc persistence layer) instead of the
LangGraph store path that PR #1728 used (_store_get / get_store
in routers/threads.py). The inject_record convenience is dropped
— no caller in 2.0 needs the LangGraph blob, and the SQL repo has
a different shape.

Routes decorated (28 total):
- threads.py: delete, patch, get, get-state, post-state, post-history
- thread_runs.py: post-runs, post-runs-stream, post-runs-wait,
  list_runs, get_run, cancel_run, join_run, stream_existing_run,
  list_thread_messages, list_run_messages, list_run_events,
  thread_token_usage
- feedback.py: create, list, stats, delete
- uploads.py: upload (added Request param), list, delete
- artifacts.py: get_artifact
- suggestions.py: generate (renamed body parameter to avoid
  conflict with FastAPI Request)

Test fixes:
- test_suggestions_router.py: bypass the decorator via __wrapped__
  (the unit tests cover parsing logic, not auth — no point spinning
  up a thread_meta_repo just to test JSON unwrapping)
- test_auth_middleware.py 4 fake-cookie tests: already updated in
  the previous commit (745bf432)

Tests: 293 passed (auth + persistence + isolation + suggestions)
Lint: clean

* security(auth): defense-in-depth fixes from release validation pass

Eight findings caught while running the AUTH_TEST_PLAN end-to-end against
the deployed sg_dev stack. Each is a pre-condition for shipping
release/2.0-rc that the previous PRs missed.

Backend hardening
- routers/auth.py: rate limiter X-Real-IP now requires AUTH_TRUSTED_PROXIES
  whitelist (CIDR/IP allowlist). Without nginx in front, the previous code
  honored arbitrary X-Real-IP, letting an attacker rotate the header to
  fully bypass the per-IP login lockout.
- routers/auth.py: 36-entry common-password blocklist via Pydantic
  field_validator on RegisterRequest + ChangePasswordRequest. The shared
  _validate_strong_password helper keeps the constraint in one place.
- routers/threads.py: ThreadCreateRequest + ThreadPatchRequest strip
  server-reserved metadata keys (owner_id, user_id) via Pydantic
  field_validator so a forged value can never round-trip back to other
  clients reading the same thread. The actual ownership invariant stays
  on the threads_meta row; this closes the metadata-blob echo gap.
- authz.py + thread_meta/sql.py: require_permission gains a require_existing
  flag plumbed through check_access(require_existing=True). Destructive
  routes (DELETE/PATCH/state-update/runs/feedback) now treat a missing
  thread_meta row as 404 instead of "untracked legacy thread, allow",
  closing the cross-user delete-idempotence gap where any user could
  successfully DELETE another user's deleted thread.
- repositories/sqlite.py + base.py: update_user raises UserNotFoundError
  on a vanished row instead of silently returning the input. Concurrent
  delete during password reset can no longer look like a successful update.
- runtime/user_context.py: resolve_owner_id() coerces User.id (UUID) to
  str at the contextvar boundary so SQLAlchemy String(64) columns can
  bind it. The whole 2.0-rc isolation pipeline was previously broken
  end-to-end (POST /api/threads → 500 "type 'UUID' is not supported").
- persistence/engine.py: SQLAlchemy listener enables PRAGMA journal_mode=WAL,
  synchronous=NORMAL, foreign_keys=ON on every new SQLite connection.
  TC-UPG-06 in the test plan expects WAL; previous code shipped with the
  default 'delete' journal.
- auth_middleware.py: stamp request.state.auth = AuthContext(...) so
  @require_permission's short-circuit fires; previously every isolation
  request did a duplicate JWT decode + users SELECT. Also unifies the
  401 payload through AuthErrorResponse(...).model_dump().
- app.py: _ensure_admin_user restructure removes the noqa F821 scoping
  bug where 'password' was referenced outside the branch that defined it.
  New _announce_credentials helper absorbs the duplicate log block in
  the fresh-admin and reset-admin branches.

* fix(frontend+nginx): rollout CSRF on every state-changing client path

The frontend was 100% broken in gateway-pro mode for any user trying to
open a specific chat thread. Three cumulative bugs each silently
masked the next.

LangGraph SDK CSRF gap (api-client.ts)
- The Client constructor took only apiUrl, no defaultHeaders, no fetch
  interceptor. The SDK's internal fetch never sent X-CSRF-Token, so
  every state-changing /api/langgraph-compat/* call (runs/stream,
  threads/search, threads/{tid}/history, ...) hit CSRFMiddleware and
  got 403 before reaching the auth check. UI symptom: empty thread page
  with no error message; the SPA's hooks swallowed the rejection.
- Fix: pass an onRequest hook that injects X-CSRF-Token from the
  csrf_token cookie per request. Reading the cookie per call (not at
  construction time) handles login / logout / password-change cookie
  rotation transparently. The SDK's prepareFetchOptions calls
  onRequest for both regular requests AND streaming/SSE/reconnect, so
  the same hook covers runs.stream and runs.joinStream.

Raw fetch CSRF gap (7 files)
- Audit: 11 frontend fetch sites, only 2 included CSRF (login/setup +
  account-settings change-password). The other 7 routed through raw
  fetch() with no header — suggestions, memory, agents, mcp, skills,
  uploads, and the local thread cleanup hook all 403'd silently.
- Fix: enhance fetcher.ts:fetchWithAuth to auto-inject X-CSRF-Token on
  POST/PUT/DELETE/PATCH from a single shared readCsrfCookie() helper.
  Convert all 7 raw fetch() callers to fetchWithAuth so the contract
  is centrally enforced. api-client.ts and fetcher.ts share
  readCsrfCookie + STATE_CHANGING_METHODS to avoid drift.

nginx routing + buffering (nginx.local.conf)
- The auth feature shipped without updating the nginx config: per-API
  explicit location blocks but no /api/v1/auth/, /api/feedback, /api/runs.
  The frontend's client-side fetches to /api/v1/auth/login/local 404'd
  from the Next.js side because nginx routed /api/* to the frontend.
- Fix: add catch-all `location /api/` that proxies to the gateway.
  nginx longest-prefix matching keeps the explicit blocks (/api/models,
  /api/threads regex, /api/langgraph/, ...) winning for their paths.
- Fix: disable proxy_buffering + proxy_request_buffering for the
  frontend `location /` block. Without it, nginx tries to spool large
  Next.js chunks into /var/lib/nginx/proxy (root-owned) and fails with
  Permission denied → ERR_INCOMPLETE_CHUNKED_ENCODING → ChunkLoadError.

* test(auth): release-validation test infra and new coverage

Test fixtures and unit tests added during the validation pass.

Router test helpers (NEW: tests/_router_auth_helpers.py)
- make_authed_test_app(): builds a FastAPI test app with a stub
  middleware that stamps request.state.user + request.state.auth and a
  permissive thread_meta_repo mock. TestClient-based router tests
  (test_artifacts_router, test_threads_router) use it instead of bare
  FastAPI() so the new @require_permission(owner_check=True) decorators
  short-circuit cleanly.
- call_unwrapped(): walks the __wrapped__ chain to invoke the underlying
  handler without going through the authz wrappers. Direct-call tests
  (test_uploads_router) use it. Typed with ParamSpec so the wrapped
  signature flows through.

Backend test additions
- test_auth.py: 7 tests for the new _get_client_ip trust model (no
  proxy / trusted proxy / untrusted peer / XFF rejection / invalid
  CIDR / no client). 5 tests for the password blocklist (literal,
  case-insensitive, strong password accepted, change-password binding,
  short-password length-check still fires before blocklist).
  test_update_user_raises_when_row_concurrently_deleted: closes a
  shipped-without-coverage gap on the new UserNotFoundError contract.
- test_thread_meta_repo.py: 4 tests for check_access(require_existing=True)
  — strict missing-row denial, strict owner match, strict owner mismatch,
  strict null-owner still allowed (shared rows survive the tightening).
- test_ensure_admin.py: 3 tests for _migrate_orphaned_threads /
  _iter_store_items pagination, covering the TC-UPG-02 upgrade story
  end-to-end via mock store. Closes the gap where the cursor pagination
  was untested even though the previous PR rewrote it.
- test_threads_router.py: 5 tests for _strip_reserved_metadata
  (owner_id removal, user_id removal, safe-keys passthrough, empty
  input, both-stripped).
- test_auth_type_system.py: replace "password123" fixtures with
  Tr0ub4dor3a / AnotherStr0ngPwd! so the new password blocklist
  doesn't reject the test data.

* docs(auth): refresh TC-DOCKER-05 + document Docker validation gap

- AUTH_TEST_PLAN.md TC-DOCKER-05: the previous expectation
  ("admin password visible in docker logs") was stale after the simplify
  pass that moved credentials to a 0600 file. The grep "Password:" check
  would have silently failed and given a false sense of coverage. New
  expectation matches the actual file-based path: 0600 file in
  DEER_FLOW_HOME, log shows the path (not the secret), reverse-grep
  asserts no leaked password in container logs.
- NEW: docs/AUTH_TEST_DOCKER_GAP.md documents the only un-executed
  block in the test plan (TC-DOCKER-01..06). Reason: sg_dev validation
  host has no Docker daemon installed. The doc maps each Docker case
  to an already-validated bare-metal equivalent (TC-1.1, TC-REENT-01,
  TC-API-02 etc.) so the gap is auditable, and includes pre-flight
  reproduction steps for whoever has Docker available.

---------

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
2026-04-11 23:37:23 +08:00
rayhpeng 36fb1c7804 feat(persistence): add unified persistence layer with event store, token tracking, and feedback (#1930)
* feat(persistence): add SQLAlchemy 2.0 async ORM scaffold

Introduce a unified database configuration (DatabaseConfig) that
controls both the LangGraph checkpointer and the DeerFlow application
persistence layer from a single `database:` config section.

New modules:
- deerflow.config.database_config — Pydantic config with memory/sqlite/postgres backends
- deerflow.persistence — async engine lifecycle, DeclarativeBase with to_dict mixin, Alembic skeleton
- deerflow.runtime.runs.store — RunStore ABC + MemoryRunStore implementation

Gateway integration initializes/tears down the persistence engine in
the existing langgraph_runtime() context manager. Legacy checkpointer
config is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add RunEventStore ABC + MemoryRunEventStore

Phase 2-A prerequisite for event storage: adds the unified run event
stream interface (RunEventStore) with an in-memory implementation,
RunEventsConfig, gateway integration, and comprehensive tests (27 cases).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add ORM models, repositories, DB/JSONL event stores, RunJournal, and API endpoints

Phase 2-B: run persistence + event storage + token tracking.

- ORM models: RunRow (with token fields), ThreadMetaRow, RunEventRow
- RunRepository implements RunStore ABC via SQLAlchemy ORM
- ThreadMetaRepository with owner access control
- DbRunEventStore with trace content truncation and cursor pagination
- JsonlRunEventStore with per-run files and seq recovery from disk
- RunJournal (BaseCallbackHandler) captures LLM/tool/lifecycle events,
  accumulates token usage by caller type, buffers and flushes to store
- RunManager now accepts optional RunStore for persistent backing
- Worker creates RunJournal, writes human_message, injects callbacks
- Gateway deps use factory functions (RunRepository when DB available)
- New endpoints: messages, run messages, run events, token-usage
- ThreadCreateRequest gains assistant_id field
- 92 tests pass (33 new), zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add user feedback + follow-up run association

Phase 2-C: feedback and follow-up tracking.

- FeedbackRow ORM model (rating +1/-1, optional message_id, comment)
- FeedbackRepository with CRUD, list_by_run/thread, aggregate stats
- Feedback API endpoints: create, list, stats, delete
- follow_up_to_run_id in RunCreateRequest (explicit or auto-detected
  from latest successful run on the thread)
- Worker writes follow_up_to_run_id into human_message event metadata
- Gateway deps: feedback_repo factory + getter
- 17 new tests (14 FeedbackRepository + 3 follow-up association)
- 109 total tests pass, zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test+config: comprehensive Phase 2 test coverage + deprecate checkpointer config

- config.example.yaml: deprecate standalone checkpointer section, activate
  unified database:sqlite as default (drives both checkpointer + app data)
- New: test_thread_meta_repo.py (14 tests) — full ThreadMetaRepository coverage
  including check_access owner logic, list_by_owner pagination
- Extended test_run_repository.py (+4 tests) — completion preserves fields,
  list ordering desc, limit, owner_none returns all
- Extended test_run_journal.py (+8 tests) — on_chain_error, track_tokens=false,
  middleware no ai_message, unknown caller tokens, convenience fields,
  tool_error, non-summarization custom event
- Extended test_run_event_store.py (+7 tests) — DB batch seq continuity,
  make_run_event_store factory (memory/db/jsonl/fallback/unknown)
- Extended test_phase2b_integration.py (+4 tests) — create_or_reject persists,
  follow-up metadata, summarization in history, full DB-backed lifecycle
- Fixed DB integration test to use proper fake objects (not MagicMock)
  for JSON-serializable metadata
- 157 total Phase 2 tests pass, zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* config: move default sqlite_dir to .deer-flow/data

Keep SQLite databases alongside other DeerFlow-managed data
(threads, memory) under the .deer-flow/ directory instead of a
top-level ./data folder.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(persistence): remove UTFJSON, use engine-level json_serializer + datetime.now()

- Replace custom UTFJSON type with standard sqlalchemy.JSON in all ORM
  models. Add json_serializer=json.dumps(ensure_ascii=False) to all
  create_async_engine calls so non-ASCII text (Chinese etc.) is stored
  as-is in both SQLite and Postgres.
- Change ORM datetime defaults from datetime.now(UTC) to datetime.now(),
  remove UTC imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): simplify deps.py with getter factory + inline repos

- Replace 6 identical getter functions with _require() factory.
- Inline 3 _make_*_repo() factories into langgraph_runtime(), call
  get_session_factory() once instead of 3 times.
- Add thread_meta upsert in start_run (services.py).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(docker): add UV_EXTRAS build arg for optional dependencies

Support installing optional dependency groups (e.g. postgres) at
Docker build time via UV_EXTRAS build arg:
  UV_EXTRAS=postgres docker compose build

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(journal): fix flush, token tracking, and consolidate tests

RunJournal fixes:
- _flush_sync: retain events in buffer when no event loop instead of
  dropping them; worker's finally block flushes via async flush().
- on_llm_end: add tool_calls filter and caller=="lead_agent" guard for
  ai_message events; mark message IDs for dedup with record_llm_usage.
- worker.py: persist completion data (tokens, message count) to RunStore
  in finally block.

Model factory:
- Auto-inject stream_usage=True for BaseChatOpenAI subclasses with
  custom api_base, so usage_metadata is populated in streaming responses.

Test consolidation:
- Delete test_phase2b_integration.py (redundant with existing tests).
- Move DB-backed lifecycle test into test_run_journal.py.
- Add tests for stream_usage injection in test_model_factory.py.
- Clean up executor/task_tool dead journal references.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): widen content type to str|dict in all store backends

Allow event content to be a dict (for structured OpenAI-format messages)
in addition to plain strings. Dict values are JSON-serialized for the DB
backend and deserialized on read; memory and JSONL backends handle dicts
natively. Trace truncation now serializes dicts to JSON before measuring.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(events): use metadata flag instead of heuristic for dict content detection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(converters): add LangChain-to-OpenAI message format converters

Pure functions langchain_to_openai_message, langchain_to_openai_completion,
langchain_messages_to_openai, and _infer_finish_reason for converting
LangChain BaseMessage objects to OpenAI Chat Completions format, used by
RunJournal for event storage. 15 unit tests added.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(converters): handle empty list content as null, clean up test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): human_message content uses OpenAI user message format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): ai_message uses OpenAI format, add ai_tool_call message event

- ai_message content now uses {"role": "assistant", "content": "..."} format
- New ai_tool_call message event emitted when lead_agent LLM responds with tool_calls
- ai_tool_call uses langchain_to_openai_message converter for consistent format
- Both events include finish_reason in metadata ("stop" or "tool_calls")

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): add tool_result message event with OpenAI tool message format

Cache tool_call_id from on_tool_start keyed by run_id as fallback for on_tool_end,
then emit a tool_result message event (role=tool, tool_call_id, content) after each
successful tool completion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): summary content uses OpenAI system message format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): replace llm_start/llm_end with llm_request/llm_response in OpenAI format

Add on_chat_model_start to capture structured prompt messages as llm_request events.
Replace llm_end trace events with llm_response using OpenAI Chat Completions format.
Track llm_call_index to pair request/response events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): add record_middleware method for middleware trace events

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test(events): add full run sequence integration test for OpenAI content format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): align message events with checkpoint format and add middleware tag injection

- Message events (ai_message, ai_tool_call, tool_result, human_message) now use
  BaseMessage.model_dump() format, matching LangGraph checkpoint values.messages
- on_tool_end extracts tool_call_id/name/status from ToolMessage objects
- on_tool_error now emits tool_result message events with error status
- record_middleware uses middleware:{tag} event_type and middleware category
- Summarization custom events use middleware:summarize category
- TitleMiddleware injects middleware:title tag via get_config() inheritance
- SummarizationMiddleware model bound with middleware:summarize tag
- Worker writes human_message using HumanMessage.model_dump()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(threads): switch search endpoint to threads_meta table and sync title

- POST /api/threads/search now queries threads_meta table directly,
  removing the two-phase Store + Checkpointer scan approach
- Add ThreadMetaRepository.search() with metadata/status filters
- Add ThreadMetaRepository.update_display_name() for title sync
- Worker syncs checkpoint title to threads_meta.display_name on run completion
- Map display_name to values.title in search response for API compatibility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(threads): history endpoint reads messages from event store

- POST /api/threads/{thread_id}/history now combines two data sources:
  checkpointer for checkpoint_id, metadata, title, thread_data;
  event store for messages (complete history, not truncated by summarization)
- Strip internal LangGraph metadata keys from response
- Remove full channel_values serialization in favor of selective fields

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: remove duplicate optional-dependencies header in pyproject.toml

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(middleware): pass tagged config to TitleMiddleware ainvoke call

Without the config, the middleware:title tag was not injected,
causing the LLM response to be recorded as a lead_agent ai_message
in run_events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve merge conflict in .env.example

Keep both DATABASE_URL (from persistence-scaffold) and WECOM
credentials (from main) after the merge.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address review feedback on PR #1851

- Fix naive datetime.now() → datetime.now(UTC) in all ORM models
- Fix seq race condition in DbRunEventStore.put() with FOR UPDATE
  and UNIQUE(thread_id, seq) constraint
- Encapsulate _store access in RunManager.update_run_completion()
- Deduplicate _store.put() logic in RunManager via _persist_to_store()
- Add update_run_completion to RunStore ABC + MemoryRunStore
- Wire follow_up_to_run_id through the full create path
- Add error recovery to RunJournal._flush_sync() lost-event scenario
- Add migration note for search_threads breaking change
- Fix test_checkpointer_none_fix mock to set database=None

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: update uv.lock

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address 22 review comments from CodeQL, Copilot, and Code Quality

Bug fixes:
- Sanitize log params to prevent log injection (CodeQL)
- Reset threads_meta.status to idle/error when run completes
- Attach messages only to latest checkpoint in /history response
- Write threads_meta on POST /threads so new threads appear in search

Lint fixes:
- Remove unused imports (journal.py, migrations/env.py, test_converters.py)
- Convert lambda to named function (engine.py, Ruff E731)
- Remove unused logger definitions in repos (Ruff F841)
- Add logging to JSONL decode errors and empty except blocks
- Separate assert side-effects in tests (CodeQL)
- Remove unused local variables in tests (Ruff F841)
- Fix max_trace_content truncation to use byte length, not char length

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: apply ruff format to persistence and runtime files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Potential fix for pull request finding 'Statement has no effect'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* refactor(runtime): introduce RunContext to reduce run_agent parameter bloat

Extract checkpointer, store, event_store, run_events_config, thread_meta_repo,
and follow_up_to_run_id into a frozen RunContext dataclass. Add get_run_context()
in deps.py to build the base context from app.state singletons. start_run() uses
dataclasses.replace() to enrich per-run fields before passing ctx to run_agent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): move sanitize_log_param to app/gateway/utils.py

Extract the log-injection sanitizer from routers/threads.py into a shared
utils module and rename to sanitize_log_param (public API). Eliminates the
reverse service → router import in services.py.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* perf: use SQL aggregation for feedback stats and thread token usage

Replace Python-side counting in FeedbackRepository.aggregate_by_run with
a single SELECT COUNT/SUM query. Add RunStore.aggregate_tokens_by_thread
abstract method with SQL GROUP BY implementation in RunRepository and
Python fallback in MemoryRunStore. Simplify the thread_token_usage
endpoint to delegate to the new method, eliminating the limit=10000
truncation risk.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: annotate DbRunEventStore.put() as low-frequency path

Add docstring clarifying that put() opens a per-call transaction with
FOR UPDATE and should only be used for infrequent writes (currently
just the initial human_message event). High-throughput callers should
use put_batch() instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(threads): fall back to Store search when ThreadMetaRepository is unavailable

When database.backend=memory (default) or no SQL session factory is
configured, search_threads now queries the LangGraph Store instead of
returning 503. Returns empty list if neither Store nor repo is available.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(persistence): introduce ThreadMetaStore ABC for backend-agnostic thread metadata

Add ThreadMetaStore abstract base class with create/get/search/update/delete
interface. ThreadMetaRepository (SQL) now inherits from it. New
MemoryThreadMetaStore wraps LangGraph BaseStore for memory-mode deployments.

deps.py now always provides a non-None thread_meta_repo, eliminating all
`if thread_meta_repo is not None` guards in services.py, worker.py, and
routers/threads.py. search_threads no longer needs a Store fallback branch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(history): read messages from checkpointer instead of RunEventStore

The /history endpoint now reads messages directly from the
checkpointer's channel_values (the authoritative source) instead of
querying RunEventStore.list_messages(). The RunEventStore API is
preserved for other consumers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address new Copilot review comments

- feedback.py: validate thread_id/run_id before deleting feedback
- jsonl.py: add path traversal protection with ID validation
- run_repo.py: parse `before` to datetime for PostgreSQL compat
- thread_meta_repo.py: fix pagination when metadata filter is active
- database_config.py: use resolve_path for sqlite_dir consistency

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Implement skill self-evolution and skill_manage flow (#1874)

* chore: ignore .worktrees directory

* Add skill_manage self-evolution flow

* Fix CI regressions for skill_manage

* Address PR review feedback for skill evolution

* fix(skill-evolution): preserve history on delete

* fix(skill-evolution): tighten scanner fallbacks

* docs: add skill_manage e2e evidence screenshot

* fix(skill-manage): avoid blocking fs ops in session runtime

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>

* fix(config): resolve sqlite_dir relative to CWD, not Paths.base_dir

resolve_path() resolves relative to Paths.base_dir (.deer-flow),
which double-nested the path to .deer-flow/.deer-flow/data/app.db.
Use Path.resolve() (CWD-relative) instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Feature/feishu receive file (#1608)

* feat(feishu): add channel file materialization hook for inbound messages

- Introduce Channel.receive_file(msg, thread_id) as a base method for file materialization; default is no-op.
- Implement FeishuChannel.receive_file to download files/images from Feishu messages, save to sandbox, and inject virtual paths into msg.text.
- Update ChannelManager to call receive_file for any channel if msg.files is present, enabling downstream model access to user-uploaded files.
- No impact on Slack/Telegram or other channels (they inherit the default no-op).

* style(backend): format code with ruff for lint compliance

- Auto-formatted packages/harness/deerflow/agents/factory.py and tests/test_create_deerflow_agent.py using `ruff format`
- Ensured both files conform to project linting standards
- Fixes CI lint check failures caused by code style issues

* fix(feishu): handle file write operation asynchronously to prevent blocking

* fix(feishu): rename GetMessageResourceRequest to _GetMessageResourceRequest and remove redundant code

* test(feishu): add tests for receive_file method and placeholder replacement

* fix(manager): remove unnecessary type casting for channel retrieval

* fix(feishu): update logging messages to reflect resource handling instead of image

* fix(feishu): sanitize filename by replacing invalid characters in file uploads

* fix(feishu): improve filename sanitization and reorder image key handling in message processing

* fix(feishu): add thread lock to prevent filename conflicts during file downloads

* fix(test): correct bad merge in test_feishu_parser.py

* chore: run ruff and apply formatting cleanup
fix(feishu): preserve rich-text attachment order and improve fallback filename handling

* fix(docker): restore gateway env vars and fix langgraph empty arg issue (#1915)

Two production docker-compose.yaml bugs prevent `make up` from working:

1. Gateway missing DEER_FLOW_CONFIG_PATH and DEER_FLOW_EXTENSIONS_CONFIG_PATH
   environment overrides. Added in fb2d99f (#1836) but accidentally reverted
   by ca2fb95 (#1847). Without them, gateway reads host paths from .env via
   env_file, causing FileNotFoundError inside the container.

2. Langgraph command fails when LANGGRAPH_ALLOW_BLOCKING is unset (default).
   Empty $${allow_blocking} inserts a bare space between flags, causing
   ' --no-reload' to be parsed as unexpected extra argument. Fix by building
   args string first and conditionally appending --allow-blocking.

Co-authored-by: cooper <cooperfu@tencent.com>

* fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities (#1904)

* fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities

Fix `<button>` inside `<a>` invalid HTML in artifact components and add
missing `noopener,noreferrer` to `window.open` calls to prevent reverse
tabnabbing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(frontend): address Copilot review on tabnabbing and double-tab-open

Remove redundant parent onClick on web_fetch ChainOfThoughtStep to
prevent opening two tabs on link click, and explicitly null out
window.opener after window.open() for defensive tabnabbing hardening.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(persistence): organize entities into per-entity directories

Restructure the persistence layer from horizontal "models/ + repositories/"
split into vertical entity-aligned directories. Each entity (thread_meta,
run, feedback) now owns its ORM model, abstract interface (where applicable),
and concrete implementations under a single directory with an aggregating
__init__.py for one-line imports.

Layout:
  persistence/thread_meta/{base,model,sql,memory}.py
  persistence/run/{model,sql}.py
  persistence/feedback/{model,sql}.py

models/__init__.py is kept as a facade so Alembic autogenerate continues to
discover all ORM tables via Base.metadata. RunEventRow remains under
models/run_event.py because its storage implementation lives in
runtime/events/store/db.py and has no matching repository directory.

The repositories/ directory is removed entirely. All call sites in
gateway/deps.py and tests are updated to import from the new entity
packages, e.g.:

    from deerflow.persistence.thread_meta import ThreadMetaRepository
    from deerflow.persistence.run import RunRepository
    from deerflow.persistence.feedback import FeedbackRepository

Full test suite passes (1690 passed, 14 skipped).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(gateway): sync thread rename and delete through ThreadMetaStore

The POST /threads/{id}/state endpoint previously synced title changes
only to the LangGraph Store via _store_upsert. In sqlite mode the search
endpoint reads from the ThreadMetaRepository SQL table, so renames never
appeared in /threads/search until the next agent run completed (worker.py
syncs title from checkpoint to thread_meta in its finally block).

Likewise the DELETE /threads/{id} endpoint cleaned up the filesystem,
Store, and checkpointer but left the threads_meta row orphaned in sqlite,
so deleted threads kept appearing in /threads/search.

Fix both endpoints by routing through the ThreadMetaStore abstraction
which already has the correct sqlite/memory implementations wired up by
deps.py. The rename path now calls update_display_name() and the delete
path calls delete() — both work uniformly across backends.

Verified end-to-end with curl in gateway mode against sqlite backend.
Existing test suite (1690 passed) and focused router/repo tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): route all thread metadata access through ThreadMetaStore

Following the rename/delete bug fix in PR1, migrate the remaining direct
LangGraph Store reads/writes in the threads router and services to the
ThreadMetaStore abstraction so that the sqlite and memory backends behave
identically and the legacy dual-write paths can be removed.

Migrated endpoints (threads.py):
- create_thread: idempotency check + write now use thread_meta_repo.get/create
  instead of dual-writing the LangGraph Store and the SQL row.
- get_thread: reads from thread_meta_repo.get; the checkpoint-only fallback
  for legacy threads is preserved.
- patch_thread: replaced _store_get/_store_put with thread_meta_repo.update_metadata.
- delete_thread_data: dropped the legacy store.adelete; thread_meta_repo.delete
  already covers it.

Removed dead code (services.py):
- _upsert_thread_in_store — redundant with the immediately following
  thread_meta_repo.create() call.
- _sync_thread_title_after_run — worker.py's finally block already syncs
  the title via thread_meta_repo.update_display_name() after each run.

Removed dead code (threads.py):
- _store_get / _store_put / _store_upsert helpers (no remaining callers).
- THREADS_NS constant.
- get_store import (router no longer touches the LangGraph Store directly).

New abstract method:
- ThreadMetaStore.update_metadata(thread_id, metadata) merges metadata into
  the thread's metadata field. Implemented in both ThreadMetaRepository (SQL,
  read-modify-write inside one session) and MemoryThreadMetaStore. Three new
  unit tests cover merge / empty / nonexistent behaviour.

Net change: -134 lines. Full test suite: 1693 passed, 14 skipped.
Verified end-to-end with curl in gateway mode against sqlite backend
(create / patch / get / rename / search / delete).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: DanielWalnut <45447813+hetaoBackend@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: JilongSun <965640067@qq.com>
Co-authored-by: jie <49781832+stan-fu@users.noreply.github.com>
Co-authored-by: cooper <cooperfu@tencent.com>
Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com>
2026-04-11 23:36:19 +08:00
125 changed files with 3221 additions and 1295 deletions
+18 -4
View File
@@ -40,16 +40,18 @@ logger = logging.getLogger(__name__)
async def _ensure_admin_user(app: FastAPI) -> None:
"""Startup hook: handle first boot and migrate orphan threads otherwise.
"""Startup hook: generate init token on first boot; migrate orphan threads otherwise.
After admin creation, migrate orphan threads from the LangGraph
store (metadata.user_id unset) to the admin account. This is the
"no-auth → with-auth" upgrade path: users who ran DeerFlow without
authentication have existing LangGraph thread data that needs an
owner assigned.
First boot (no admin exists):
- Does NOT create any user accounts automatically.
- The operator must visit ``/setup`` to create the first admin.
First boot (no admin exists):
- Generates a one-time ``init_token`` stored in ``app.state.init_token``
- Logs the token to stdout so the operator can copy-paste it into the
``/setup`` form to create the first admin account interactively.
- Does NOT create any user accounts automatically.
Subsequent boots (admin already exists):
- Runs the one-time "no-auth → with-auth" orphan thread migration for
@@ -60,6 +62,8 @@ async def _ensure_admin_user(app: FastAPI) -> None:
alongside the auth module via create_all, so freshly created tables
never contain NULL-owner rows.
"""
import secrets
from sqlalchemy import select
from app.gateway.deps import get_local_provider
@@ -70,8 +74,13 @@ async def _ensure_admin_user(app: FastAPI) -> None:
admin_count = await provider.count_admin_users()
if admin_count == 0:
init_token = secrets.token_urlsafe(32)
app.state.init_token = init_token
logger.info("=" * 60)
logger.info(" First boot detected — no admin account exists.")
logger.info(" Use the one-time token below to create the admin account.")
logger.info(" Copy it into the /setup form when prompted.")
logger.info(" INIT TOKEN: %s", init_token)
logger.info(" Visit /setup to complete admin account creation.")
logger.info("=" * 60)
return
@@ -356,6 +365,11 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an
"""
return {"status": "healthy", "service": "deer-flow-gateway"}
# Ensure init_token always exists on app.state (None until lifespan sets it
# if no admin is found). This prevents AttributeError in tests that don't
# run the full lifespan.
app.state.init_token = None
return app
+1
View File
@@ -21,6 +21,7 @@ class AuthErrorCode(StrEnum):
PROVIDER_NOT_FOUND = "provider_not_found"
NOT_AUTHENTICATED = "not_authenticated"
SYSTEM_ALREADY_INITIALIZED = "system_already_initialized"
INVALID_INIT_TOKEN = "invalid_init_token"
class TokenError(StrEnum):
+17 -20
View File
@@ -8,17 +8,13 @@ Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
from __future__ import annotations
from collections.abc import AsyncGenerator, Callable
from collections.abc import AsyncGenerator
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING, TypeVar, cast
from typing import TYPE_CHECKING
from fastapi import FastAPI, HTTPException, Request
from langgraph.types import Checkpointer
from deerflow.persistence.feedback import FeedbackRepository
from deerflow.runtime import RunContext, RunManager, StreamBridge
from deerflow.runtime.events.store.base import RunEventStore
from deerflow.runtime.runs.store.base import RunStore
from deerflow.runtime import RunContext, RunManager
if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
@@ -26,9 +22,6 @@ if TYPE_CHECKING:
from deerflow.persistence.thread_meta.base import ThreadMetaStore
T = TypeVar("T")
@asynccontextmanager
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
"""Bootstrap and tear down all LangGraph runtime singletons.
@@ -91,25 +84,25 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
# ---------------------------------------------------------------------------
def _require(attr: str, label: str) -> Callable[[Request], T]:
def _require(attr: str, label: str):
"""Create a FastAPI dependency that returns ``app.state.<attr>`` or 503."""
def dep(request: Request) -> T:
def dep(request: Request):
val = getattr(request.app.state, attr, None)
if val is None:
raise HTTPException(status_code=503, detail=f"{label} not available")
return cast(T, val)
return val
dep.__name__ = dep.__qualname__ = f"get_{attr}"
return dep
get_stream_bridge: Callable[[Request], StreamBridge] = _require("stream_bridge", "Stream bridge")
get_run_manager: Callable[[Request], RunManager] = _require("run_manager", "Run manager")
get_checkpointer: Callable[[Request], Checkpointer] = _require("checkpointer", "Checkpointer")
get_run_event_store: Callable[[Request], RunEventStore] = _require("run_event_store", "Run event store")
get_feedback_repo: Callable[[Request], FeedbackRepository] = _require("feedback_repo", "Feedback")
get_run_store: Callable[[Request], RunStore] = _require("run_store", "Run store")
get_stream_bridge = _require("stream_bridge", "Stream bridge")
get_run_manager = _require("run_manager", "Run manager")
get_checkpointer = _require("checkpointer", "Checkpointer")
get_run_event_store = _require("run_event_store", "Run event store")
get_feedback_repo = _require("feedback_repo", "Feedback")
get_run_store = _require("run_store", "Run store")
def get_store(request: Request):
@@ -128,7 +121,10 @@ def get_thread_store(request: Request) -> ThreadMetaStore:
def get_run_context(request: Request) -> RunContext:
"""Build a :class:`RunContext` from ``app.state`` singletons.
Returns a *base* context with infrastructure dependencies.
Returns a *base* context with infrastructure dependencies. Callers that
need per-run fields (e.g. ``follow_up_to_run_id``) should use
``dataclasses.replace(ctx, follow_up_to_run_id=...)`` before passing it
to :func:`run_agent`.
"""
from deerflow.config import get_app_config
@@ -141,6 +137,7 @@ def get_run_context(request: Request) -> RunContext:
)
# ---------------------------------------------------------------------------
# Auth helpers (used by authz.py and auth middleware)
# ---------------------------------------------------------------------------
+28 -3
View File
@@ -2,6 +2,7 @@
import logging
import os
import secrets
import time
from ipaddress import ip_address, ip_network
@@ -388,6 +389,7 @@ class InitializeAdminRequest(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
init_token: str | None = Field(default=None, description="One-time initialization token printed to server logs on first boot")
_strong_password = field_validator("password")(classmethod(lambda cls, v: _validate_strong_password(v)))
@@ -397,13 +399,31 @@ async def initialize_admin(request: Request, response: Response, body: Initializ
"""Create the first admin account on initial system setup.
Only callable when no admin exists. Returns 409 Conflict if an admin
already exists.
already exists. Requires the one-time ``init_token`` that is logged to
stdout at startup whenever the system has no admin account.
On success, the admin account is created with ``needs_setup=False`` and
the session cookie is set.
On success the token is consumed (one-time use), the admin account is
created with ``needs_setup=False``, and the session cookie is set.
"""
# Validate the one-time initialization token. The token is generated
# at startup and stored in app.state.init_token; it is consumed here on
# the first successful call so it cannot be replayed.
# Using str | None allows a missing/null token to return 403 (not 422),
# giving a consistent error response regardless of whether the token is
# absent or incorrect.
stored_token: str | None = getattr(request.app.state, "init_token", None)
provided_token: str = body.init_token or ""
if stored_token is None or not secrets.compare_digest(stored_token, provided_token):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=AuthErrorResponse(code=AuthErrorCode.INVALID_INIT_TOKEN, message="Invalid or expired initialization token").model_dump(),
)
admin_count = await get_local_provider().count_admin_users()
if admin_count > 0:
# Do NOT consume the token on this error path — consuming it here
# would allow an attacker to exhaust the token by calling with the
# correct token when admin already exists (denial-of-service).
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=AuthErrorResponse(code=AuthErrorCode.SYSTEM_ALREADY_INITIALIZED, message="System already initialized").model_dump(),
@@ -413,11 +433,16 @@ async def initialize_admin(request: Request, response: Response, body: Initializ
user = await get_local_provider().create_user(email=body.email, password=body.password, system_role="admin", needs_setup=False)
except ValueError:
# DB unique-constraint race: another concurrent request beat us.
# Do NOT consume the token here for the same reason as above.
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=AuthErrorResponse(code=AuthErrorCode.SYSTEM_ALREADY_INITIALIZED, message="System already initialized").model_dump(),
)
# Consume the token only after successful initialization — this is the
# single place where one-time use is enforced.
request.app.state.init_token = None
token = create_access_token(str(user.id), token_version=user.token_version)
_set_session_cookie(response, token, request)
+1 -2
View File
@@ -123,8 +123,7 @@ async def run_messages(
run = await _resolve_run(run_id, request)
event_store = get_run_event_store(request)
rows = await event_store.list_messages_by_run(
run["thread_id"],
run_id,
run["thread_id"], run_id,
limit=limit + 1,
before_seq=before_seq,
after_seq=after_seq,
+7 -11
View File
@@ -54,6 +54,7 @@ class RunCreateRequest(BaseModel):
after_seconds: float | None = Field(default=None, description="Delayed execution")
if_not_exists: Literal["reject", "create"] = Field(default="create", description="Thread creation policy")
feedback_keys: list[str] | None = Field(default=None, description="LangSmith feedback keys")
follow_up_to_run_id: str | None = Field(default=None, description="Run ID this message follows up on. Auto-detected from latest successful run if not provided.")
class RunResponse(BaseModel):
@@ -311,15 +312,11 @@ async def list_thread_messages(
if i in last_ai_indices:
run_id = msg["run_id"]
fb = feedback_map.get(run_id)
msg["feedback"] = (
{
"feedback_id": fb["feedback_id"],
"rating": fb["rating"],
"comment": fb.get("comment"),
}
if fb
else None
)
msg["feedback"] = {
"feedback_id": fb["feedback_id"],
"rating": fb["rating"],
"comment": fb.get("comment"),
} if fb else None
else:
msg["feedback"] = None
@@ -342,8 +339,7 @@ async def list_run_messages(
"""
event_store = get_run_event_store(request)
rows = await event_store.list_messages_by_run(
thread_id,
run_id,
thread_id, run_id,
limit=limit + 1,
before_seq=before_seq,
after_seq=after_seq,
+179 -5
View File
@@ -22,7 +22,7 @@ from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel, Field, field_validator
from app.gateway.authz import require_permission
from app.gateway.deps import get_checkpointer
from app.gateway.deps import get_checkpointer, get_current_user, get_feedback_repo, get_run_event_store
from app.gateway.utils import sanitize_log_param
from deerflow.config.paths import Paths, get_paths
from deerflow.runtime import serialize_channel_values
@@ -405,6 +405,164 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse:
# ---------------------------------------------------------------------------
# Event-store-backed message loader
# ---------------------------------------------------------------------------
_LEGACY_CMD_INNER_CONTENT_RE = re.compile(
r"ToolMessage\(content=(?P<q>['\"])(?P<inner>.*?)(?P=q)",
re.DOTALL,
)
def _sanitize_legacy_command_repr(content_field: Any) -> Any:
"""Recover the inner ToolMessage text from a legacy ``str(Command(...))`` repr.
Runs captured before the ``on_tool_end`` fix in ``journal.py`` stored
``str(Command(update={'messages':[ToolMessage(content='X', ...)]}))`` as the
tool_result content. New runs store ``'X'`` directly. For legacy rows, try
to extract ``'X'`` defensively; return the original string if extraction
fails (still no worse than the checkpoint fallback for summarized threads).
"""
if not isinstance(content_field, str) or not content_field.startswith("Command(update="):
return content_field
match = _LEGACY_CMD_INNER_CONTENT_RE.search(content_field)
return match.group("inner") if match else content_field
async def _get_event_store_messages(request: Request, thread_id: str) -> list[dict] | None:
"""Load the full message stream for ``thread_id`` from the event store.
The event store is append-only and unaffected by summarization — the
checkpoint's ``channel_values["messages"]`` is rewritten in-place when the
SummarizationMiddleware runs, which drops all pre-summarize messages. The
event store retains the full transcript, so callers in Gateway mode should
prefer it for rendering the conversation history.
In addition to the core message content, this helper attaches two extra
fields to every returned dict:
- ``run_id``: the ``run_id`` of the event that produced this message.
Always present.
- ``feedback``: thumbs-up/down data. Present only on the **final
``ai_message`` of each run** (matching the per-run feedback semantics
of ``POST /api/threads/{id}/runs/{run_id}/feedback``). The frontend uses
the presence of this field to decide whether to render the feedback
button, which sidesteps the positional-index mapping bug that an
out-of-band ``/messages`` fetch exhibited.
Behaviour contract:
- **Full pagination.** ``RunEventStore.list_messages`` returns the newest
``limit`` records when no cursor is given, so a fixed limit silently
drops older messages on long threads. We size the read from
``count_messages()`` and then page forward with ``after_seq`` cursors.
- **Copy-on-read.** Each content dict is copied before ``id`` is patched
so the live store object is never mutated; ``MemoryRunEventStore``
returns live references.
- **Stable ids.** Messages with ``id=None`` (human + tool_result) receive
a deterministic ``uuid5(NAMESPACE_URL, f"{thread_id}:{seq}")`` so React
keys are stable across requests without altering stored data. AI messages
retain their LLM-assigned ``lc_run--*`` ids.
- **Legacy Command repr.** Rows captured before the ``journal.py``
``on_tool_end`` fix stored ``str(Command(update={...}))`` as the tool
result content. ``_sanitize_legacy_command_repr`` extracts the inner
ToolMessage text.
- **User context.** ``DbRunEventStore`` is user-scoped by default via
``resolve_user_id(AUTO)`` in ``runtime/user_context.py``. This helper
must run inside a request where ``@require_permission`` has populated
the user contextvar. Both callers below are decorated appropriately.
Do not call this helper from CLI or migration scripts without passing
``user_id=None`` explicitly to the underlying store methods.
Returns ``None`` when the event store is not configured or has no message
events for this thread, so callers fall back to checkpoint messages.
"""
try:
event_store = get_run_event_store(request)
except Exception:
return None
try:
total = await event_store.count_messages(thread_id)
except Exception:
logger.exception("count_messages failed for thread %s", sanitize_log_param(thread_id))
return None
if not total:
return None
# Batch by page_size to keep memory bounded for very long threads.
page_size = 500
collected: list[dict] = []
after_seq: int | None = None
while True:
try:
page = await event_store.list_messages(thread_id, limit=page_size, after_seq=after_seq)
except Exception:
logger.exception("list_messages failed for thread %s", sanitize_log_param(thread_id))
return None
if not page:
break
collected.extend(page)
if len(page) < page_size:
break
next_cursor = page[-1].get("seq")
if next_cursor is None or (after_seq is not None and next_cursor <= after_seq):
break
after_seq = next_cursor
# Build the message list; track the final ``ai_message`` index per run so
# feedback can be attached at the right position (matches thread_runs.py).
messages: list[dict] = []
last_ai_per_run: dict[str, int] = {}
for evt in collected:
raw = evt.get("content")
if not isinstance(raw, dict) or "type" not in raw:
continue
content = dict(raw)
if content.get("id") is None:
content["id"] = str(uuid.uuid5(uuid.NAMESPACE_URL, f"{thread_id}:{evt['seq']}"))
if content.get("type") == "tool":
content["content"] = _sanitize_legacy_command_repr(content.get("content"))
run_id = evt.get("run_id")
if run_id:
content["run_id"] = run_id
if evt.get("event_type") == "ai_message" and run_id:
last_ai_per_run[run_id] = len(messages)
messages.append(content)
if not messages:
return None
# Attach feedback to the final ai_message of each run. If the feedback
# subsystem is unavailable, leave the ``feedback`` field absent entirely
# so the frontend hides the button rather than showing it over a broken
# write path.
feedback_available = False
feedback_map: dict[str, dict] = {}
try:
feedback_repo = get_feedback_repo(request)
user_id = await get_current_user(request)
feedback_map = await feedback_repo.list_by_thread_grouped(thread_id, user_id=user_id)
feedback_available = True
except Exception:
logger.exception("feedback lookup failed for thread %s", sanitize_log_param(thread_id))
if feedback_available:
for run_id, idx in last_ai_per_run.items():
fb = feedback_map.get(run_id)
messages[idx]["feedback"] = (
{
"feedback_id": fb["feedback_id"],
"rating": fb["rating"],
"comment": fb.get("comment"),
}
if fb
else None
)
return messages
@router.get("/{thread_id}/state", response_model=ThreadStateResponse)
@require_permission("threads", "read", owner_check=True)
async def get_thread_state(thread_id: str, request: Request) -> ThreadStateResponse:
@@ -445,6 +603,11 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo
values = serialize_channel_values(channel_values)
# Prefer event-store messages: append-only, immune to summarization.
es_messages = await _get_event_store_messages(request, thread_id)
if es_messages is not None:
values["messages"] = es_messages
return ThreadStateResponse(
values=values,
next=next_tasks,
@@ -564,6 +727,11 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
if body.before:
config["configurable"]["checkpoint_id"] = body.before
# Load the full event-store message stream once; attach to the latest
# checkpoint entry only (matching the prior semantics). The event store
# is append-only and immune to summarization.
es_messages = await _get_event_store_messages(request, thread_id)
entries: list[HistoryEntry] = []
is_latest_checkpoint = True
try:
@@ -587,11 +755,17 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request
if thread_data := channel_values.get("thread_data"):
values["thread_data"] = thread_data
# Attach messages only to the latest checkpoint entry.
# Attach messages only to the latest checkpoint. Prefer the
# event-store stream (complete and unaffected by summarization);
# fall back to checkpoint channel_values when the event store is
# unavailable or empty.
if is_latest_checkpoint:
messages = channel_values.get("messages")
if messages:
values["messages"] = serialize_channel_values({"messages": messages}).get("messages", [])
if es_messages is not None:
values["messages"] = es_messages
else:
messages = channel_values.get("messages")
if messages:
values["messages"] = serialize_channel_values({"messages": messages}).get("messages", [])
is_latest_checkpoint = False
# Derive next tasks
+1 -1
View File
@@ -56,7 +56,7 @@ def _make_file_sandbox_writable(file_path: os.PathLike[str] | str) -> None:
@router.post("", response_model=UploadResponse)
@require_permission("threads", "write", owner_check=True, require_existing=False)
@require_permission("threads", "write", owner_check=True, require_existing=True)
async def upload_files(
thread_id: str,
request: Request,
+16
View File
@@ -195,6 +195,21 @@ async def start_run(
disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_
# Resolve follow_up_to_run_id: explicit from request, or auto-detect from latest successful run
follow_up_to_run_id = getattr(body, "follow_up_to_run_id", None)
if follow_up_to_run_id is None:
run_store = get_run_store(request)
try:
recent_runs = await run_store.list_by_thread(thread_id, limit=1)
if recent_runs and recent_runs[0].get("status") == "success":
follow_up_to_run_id = recent_runs[0]["run_id"]
except Exception:
pass # Don't block run creation
# Enrich base context with per-run field
if follow_up_to_run_id:
run_ctx = dataclasses.replace(run_ctx, follow_up_to_run_id=follow_up_to_run_id)
try:
record = await run_mgr.create_or_reject(
thread_id,
@@ -203,6 +218,7 @@ async def start_run(
metadata=body.metadata or {},
kwargs={"input": body.input, "config": body.config},
multitask_strategy=body.multitask_strategy,
follow_up_to_run_id=follow_up_to_run_id,
)
except ConflictError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
@@ -1,7 +1,7 @@
import logging
from langchain.agents import create_agent
from langchain.agents.middleware import AgentMiddleware
from langchain.agents.middleware import AgentMiddleware, SummarizationMiddleware
from langchain_core.runnables import RunnableConfig
from deerflow.agents.lead_agent.prompt import apply_prompt_template
@@ -9,7 +9,6 @@ from deerflow.agents.middlewares.clarification_middleware import ClarificationMi
from deerflow.agents.middlewares.loop_detection_middleware import LoopDetectionMiddleware
from deerflow.agents.middlewares.memory_middleware import MemoryMiddleware
from deerflow.agents.middlewares.subagent_limit_middleware import SubagentLimitMiddleware
from deerflow.agents.middlewares.summarization_middleware import SummarizationMiddleware
from deerflow.agents.middlewares.title_middleware import TitleMiddleware
from deerflow.agents.middlewares.todo_middleware import TodoMiddleware
from deerflow.agents.middlewares.token_usage_middleware import TokenUsageMiddleware
@@ -283,7 +283,7 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]):
# the conversation; injecting one mid-conversation crashes
# langchain_anthropic's _format_messages(). HumanMessage works
# with all providers. See #1299.
return {"messages": [HumanMessage(content=warning, name="loop_warning")]}
return {"messages": [HumanMessage(content=warning)]}
return None
@@ -1,13 +0,0 @@
from typing import override
from langchain.agents.middleware import SummarizationMiddleware as BaseSummarizationMiddleware
from langchain_core.messages.human import HumanMessage
class SummarizationMiddleware(BaseSummarizationMiddleware):
@override
def _build_new_messages(self, summary: str) -> list[HumanMessage]:
"""Override the base implementation to let the human message with the special name 'summary'.
And this message will be ignored to display in the frontend, but still can be used as context for the model.
"""
return [HumanMessage(content=f"Here is a summary of the conversation to date:\n\n{summary}", name="summary")]
@@ -1,10 +1,8 @@
import logging
from datetime import UTC, datetime
from typing import NotRequired, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.config import get_config
from langgraph.runtime import Runtime
@@ -99,20 +97,8 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
paths = self._create_thread_directories(thread_id, user_id=user_id)
logger.debug("Created thread data directories for thread %s", thread_id)
messages = list(state.get("messages", []))
last_message = messages[-1] if messages else None
if last_message and isinstance(last_message, HumanMessage):
messages[-1] = HumanMessage(
content=last_message.content,
id=last_message.id,
name=last_message.name or "user-input",
additional_kwargs={**last_message.additional_kwargs, "run_id": runtime.context.get("run_id"), "timestamp": datetime.now(UTC).isoformat()},
)
return {
"thread_data": {
**paths,
},
"messages": messages,
}
}
@@ -279,7 +279,6 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
updated_message = HumanMessage(
content=f"{files_message}\n\n{original_content}",
id=last_message.id,
name=last_message.name,
additional_kwargs=last_message.additional_kwargs,
)
@@ -6,10 +6,7 @@ handles token usage accumulation.
Key design decisions:
- on_llm_new_token is NOT implemented -- only complete messages via on_llm_end
- on_chat_model_start captures structured prompts as llm_request (OpenAI format) and
extracts the first human message for run.input, because it is more reliable than
on_chain_start (fires on every node) messages here are fully structured.
- on_chain_start with parent_run_id=None emits a run.start trace marking root invocation.
- on_chat_model_start captures structured prompts as llm_request (OpenAI format)
- on_llm_end emits llm_response in OpenAI Chat Completions format
- Token usage accumulated in memory, written to RunRow on run completion
- Caller identification via tags injection (lead_agent / subagent:{name} / middleware:{name})
@@ -21,12 +18,10 @@ import asyncio
import logging
import time
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any
from uuid import UUID
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.messages import AnyMessage, BaseMessage, HumanMessage, ToolMessage
from langgraph.types import Command
if TYPE_CHECKING:
from deerflow.runtime.events.store.base import RunEventStore
@@ -77,39 +72,34 @@ class RunJournal(BaseCallbackHandler):
# LLM request/response tracking
self._llm_call_index = 0
self._cached_prompts: dict[str, list[dict]] = {} # langchain run_id -> OpenAI messages
self._cached_models: dict[str, str] = {} # langchain run_id -> model name
# Tool call ID cache
self._tool_call_ids: dict[str, str] = {} # langchain run_id -> tool_call_id
# -- Lifecycle callbacks --
def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
caller = self._identify_caller(tags)
if parent_run_id is None:
# Root graph invocation — emit a single trace event for the run start.
chain_name = (serialized or {}).get("name", "unknown")
self._put(
event_type="run.start",
category="trace",
content={"chain": chain_name},
metadata={"caller": caller, **(metadata or {})},
)
def on_chain_start(self, serialized: dict, inputs: Any, *, run_id: UUID, **kwargs: Any) -> None:
if kwargs.get("parent_run_id") is not None:
return
self._put(
event_type="run_start",
category="lifecycle",
metadata={"input_preview": str(inputs)[:500]},
)
def on_chain_end(self, outputs: Any, *, run_id: UUID, **kwargs: Any) -> None:
self._put(event_type="run.end", category="outputs", content=outputs, metadata={"status": "success"})
if kwargs.get("parent_run_id") is not None:
return
self._put(event_type="run_end", category="lifecycle", metadata={"status": "success"})
self._flush_sync()
def on_chain_error(self, error: BaseException, *, run_id: UUID, **kwargs: Any) -> None:
if kwargs.get("parent_run_id") is not None:
return
self._put(
event_type="run.error",
category="error",
event_type="run_error",
category="lifecycle",
content=str(error),
metadata={"error_type": type(error).__name__},
)
@@ -117,132 +107,266 @@ class RunJournal(BaseCallbackHandler):
# -- LLM callbacks --
def on_chat_model_start(
self,
serialized: dict,
messages: list[list[BaseMessage]],
*,
run_id: UUID,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Capture structured prompt messages for llm_request event.
def on_chat_model_start(self, serialized: dict, messages: list[list], *, run_id: UUID, **kwargs: Any) -> None:
"""Capture structured prompt messages for llm_request event."""
from deerflow.runtime.converters import langchain_messages_to_openai
This is also the canonical place to extract the first human message:
messages are fully structured here, it fires only on real LLM calls,
and the content is never compressed by checkpoint trimming.
"""
rid = str(run_id)
self._llm_start_times[rid] = time.monotonic()
self._llm_call_index += 1
# Mark this run_id as seen so on_llm_end knows not to increment again.
self._cached_prompts[rid] = []
logger.info(f"on_chat_model_start {run_id}: tags={tags} serialized={serialized} messages={messages}")
model_name = serialized.get("name", "")
self._cached_models[rid] = model_name
# Capture the first human message sent to any LLM in this run.
if not self._first_human_msg:
for batch in messages.reversed():
for m in batch.reversed():
if isinstance(m, HumanMessage) and m.name != "summary":
caller = self._identify_caller(tags)
self.set_first_human_message(m.text)
self._put(
event_type="llm.human.input",
category="message",
content=m.model_dump(),
metadata={"caller": caller},
)
break
if self._first_human_msg:
break
# Convert the first message list (LangChain passes list-of-lists)
prompt_msgs = messages[0] if messages else []
openai_msgs = langchain_messages_to_openai(prompt_msgs)
self._cached_prompts[rid] = openai_msgs
def on_llm_start(self, serialized: dict, prompts: list[str], *, run_id: UUID, parent_run_id: UUID | None = None, tags: list[str] | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any) -> None:
caller = self._identify_caller(kwargs)
self._put(
event_type="llm_request",
category="trace",
content={"model": model_name, "messages": openai_msgs},
metadata={"caller": caller, "llm_call_index": self._llm_call_index},
)
def on_llm_start(self, serialized: dict, prompts: list[str], *, run_id: UUID, **kwargs: Any) -> None:
# Fallback: on_chat_model_start is preferred. This just tracks latency.
self._llm_start_times[str(run_id)] = time.monotonic()
def on_llm_end(self, response, *, run_id, parent_run_id, tags, **kwargs) -> None:
messages: list[AnyMessage] = []
logger.info(f"on_llm_end {run_id}: response: {tags} {kwargs}")
for generation in response.generations:
for gen in generation:
if hasattr(gen, "message"):
messages.append(gen.message)
else:
logger.warning(f"on_llm_end {run_id}: generation has no message attribute: {gen}")
def on_llm_end(self, response: Any, *, run_id: UUID, **kwargs: Any) -> None:
from deerflow.runtime.converters import langchain_to_openai_completion
for message in messages:
caller = self._identify_caller(tags)
try:
message = response.generations[0][0].message
except (IndexError, AttributeError):
logger.debug("on_llm_end: could not extract message from response")
return
# Latency
rid = str(run_id)
start = self._llm_start_times.pop(rid, None)
latency_ms = int((time.monotonic() - start) * 1000) if start else None
caller = self._identify_caller(kwargs)
# Token usage from message
usage = getattr(message, "usage_metadata", None)
usage_dict = dict(usage) if usage else {}
# Latency
rid = str(run_id)
start = self._llm_start_times.pop(rid, None)
latency_ms = int((time.monotonic() - start) * 1000) if start else None
# Resolve call index
# Token usage from message
usage = getattr(message, "usage_metadata", None)
usage_dict = dict(usage) if usage else {}
# Resolve call index
call_index = self._llm_call_index
if rid not in self._cached_prompts:
# Fallback: on_chat_model_start was not called
self._llm_call_index += 1
call_index = self._llm_call_index
if rid not in self._cached_prompts:
# Fallback: on_chat_model_start was not called
self._llm_call_index += 1
call_index = self._llm_call_index
# Trace event: llm_response (OpenAI completion format)
self._put(
event_type="llm.ai.response",
category="message",
content=message.model_dump(),
metadata={
"caller": caller,
"usage": usage_dict,
"latency_ms": latency_ms,
"llm_call_index": call_index,
},
)
# Clean up caches
self._cached_prompts.pop(rid, None)
self._cached_models.pop(rid, None)
# Token accumulation
if self._track_tokens:
input_tk = usage_dict.get("input_tokens", 0) or 0
output_tk = usage_dict.get("output_tokens", 0) or 0
total_tk = usage_dict.get("total_tokens", 0) or 0
if total_tk == 0:
total_tk = input_tk + output_tk
if total_tk > 0:
self._total_input_tokens += input_tk
self._total_output_tokens += output_tk
self._total_tokens += total_tk
self._llm_call_count += 1
# Trace event: llm_response (OpenAI completion format)
content = getattr(message, "content", "")
self._put(
event_type="llm_response",
category="trace",
content=langchain_to_openai_completion(message),
metadata={
"caller": caller,
"usage": usage_dict,
"latency_ms": latency_ms,
"llm_call_index": call_index,
},
)
# Message events: only lead_agent gets message-category events.
# Content uses message.model_dump() to align with checkpoint format.
tool_calls = getattr(message, "tool_calls", None) or []
if caller == "lead_agent":
resp_meta = getattr(message, "response_metadata", None) or {}
model_name = resp_meta.get("model_name") if isinstance(resp_meta, dict) else None
if tool_calls:
# ai_tool_call: agent decided to use tools
self._put(
event_type="ai_tool_call",
category="message",
content=message.model_dump(),
metadata={"model_name": model_name, "finish_reason": "tool_calls"},
)
elif isinstance(content, str) and content:
# ai_message: final text reply
self._put(
event_type="ai_message",
category="message",
content=message.model_dump(),
metadata={"model_name": model_name, "finish_reason": "stop"},
)
self._last_ai_msg = content
self._msg_count += 1
# Token accumulation
if self._track_tokens:
input_tk = usage_dict.get("input_tokens", 0) or 0
output_tk = usage_dict.get("output_tokens", 0) or 0
total_tk = usage_dict.get("total_tokens", 0) or 0
if total_tk == 0:
total_tk = input_tk + output_tk
if total_tk > 0:
self._total_input_tokens += input_tk
self._total_output_tokens += output_tk
self._total_tokens += total_tk
self._llm_call_count += 1
if caller.startswith("subagent:"):
self._subagent_tokens += total_tk
elif caller.startswith("middleware:"):
self._middleware_tokens += total_tk
else:
self._lead_agent_tokens += total_tk
def on_llm_error(self, error: BaseException, *, run_id: UUID, **kwargs: Any) -> None:
self._llm_start_times.pop(str(run_id), None)
self._put(event_type="llm.error", category="trace", content=str(error))
self._put(event_type="llm_error", category="trace", content=str(error))
def on_tool_start(self, serialized, input_str, *, run_id, parent_run_id=None, tags=None, metadata=None, inputs=None, **kwargs):
"""Handle tool start event, cache tool call ID for later correlation"""
tool_call_id = str(run_id)
logger.info(f"Tool start for node {run_id}, tool_call_id={tool_call_id}, tags={tags}, metadata={metadata}")
# -- Tool callbacks --
def on_tool_end(self, output, *, run_id, parent_run_id=None, **kwargs):
"""Handle tool end event, append message and clear node data"""
try:
if isinstance(output, ToolMessage):
msg = cast(ToolMessage, output)
self._put(event_type="llm.tool.result", category="message", content=msg.model_dump())
elif isinstance(output, Command):
cmd = cast(Command, output)
messages = cmd.update.get("messages", [])
for message in messages:
if isinstance(message, BaseMessage):
self._put(event_type="llm.tool.result", category="message", content=message.model_dump())
else:
logger.warning(f"on_tool_end {run_id}: command update message is not BaseMessage: {type(message)}")
else:
logger.warning(f"on_tool_end {run_id}: output is not ToolMessage: {type(output)}")
finally:
logger.info(f"Tool end for node {run_id}")
def on_tool_start(self, serialized: dict, input_str: str, *, run_id: UUID, **kwargs: Any) -> None:
tool_call_id = kwargs.get("tool_call_id")
if tool_call_id:
self._tool_call_ids[str(run_id)] = tool_call_id
self._put(
event_type="tool_start",
category="trace",
metadata={
"tool_name": serialized.get("name", ""),
"tool_call_id": tool_call_id,
"args": str(input_str)[:2000],
},
)
def on_tool_end(self, output: Any, *, run_id: UUID, **kwargs: Any) -> None:
from langchain_core.messages import ToolMessage
from langgraph.types import Command
# Tools that update graph state return a ``Command`` (e.g.
# ``present_files``). LangGraph later unwraps the inner ToolMessage
# into checkpoint state, so to stay checkpoint-aligned we must
# extract it here rather than storing ``str(Command(...))``.
if isinstance(output, Command):
update = getattr(output, "update", None) or {}
inner_msgs = update.get("messages") if isinstance(update, dict) else None
if isinstance(inner_msgs, list):
inner_tool_msg = next((m for m in inner_msgs if isinstance(m, ToolMessage)), None)
if inner_tool_msg is not None:
output = inner_tool_msg
# Extract fields from ToolMessage object when LangChain provides one.
# LangChain's _format_output wraps tool results into a ToolMessage
# with tool_call_id, name, status, and artifact — more complete than
# what kwargs alone provides.
if isinstance(output, ToolMessage):
tool_call_id = output.tool_call_id or kwargs.get("tool_call_id") or self._tool_call_ids.pop(str(run_id), None)
tool_name = output.name or kwargs.get("name", "")
status = getattr(output, "status", "success") or "success"
content_str = output.content if isinstance(output.content, str) else str(output.content)
# Use model_dump() for checkpoint-aligned message content.
# Override tool_call_id if it was resolved from cache.
msg_content = output.model_dump()
if msg_content.get("tool_call_id") != tool_call_id:
msg_content["tool_call_id"] = tool_call_id
else:
tool_call_id = kwargs.get("tool_call_id") or self._tool_call_ids.pop(str(run_id), None)
tool_name = kwargs.get("name", "")
status = "success"
content_str = str(output)
# Construct checkpoint-aligned dict when output is a plain string.
msg_content = ToolMessage(
content=content_str,
tool_call_id=tool_call_id or "",
name=tool_name,
status=status,
).model_dump()
# Trace event (always)
self._put(
event_type="tool_end",
category="trace",
content=content_str,
metadata={
"tool_name": tool_name,
"tool_call_id": tool_call_id,
"status": status,
},
)
# Message event: tool_result (checkpoint-aligned model_dump format)
self._put(
event_type="tool_result",
category="message",
content=msg_content,
metadata={"tool_name": tool_name, "status": status},
)
def on_tool_error(self, error: BaseException, *, run_id: UUID, **kwargs: Any) -> None:
from langchain_core.messages import ToolMessage
tool_call_id = kwargs.get("tool_call_id") or self._tool_call_ids.pop(str(run_id), None)
tool_name = kwargs.get("name", "")
# Trace event
self._put(
event_type="tool_error",
category="trace",
content=str(error),
metadata={
"tool_name": tool_name,
"tool_call_id": tool_call_id,
},
)
# Message event: tool_result with error status (checkpoint-aligned)
msg_content = ToolMessage(
content=str(error),
tool_call_id=tool_call_id or "",
name=tool_name,
status="error",
).model_dump()
self._put(
event_type="tool_result",
category="message",
content=msg_content,
metadata={"tool_name": tool_name, "status": "error"},
)
# -- Custom event callback --
def on_custom_event(self, name: str, data: Any, *, run_id: UUID, **kwargs: Any) -> None:
from deerflow.runtime.serialization import serialize_lc_object
if name == "summarization":
data_dict = data if isinstance(data, dict) else {}
self._put(
event_type="summarization",
category="trace",
content=data_dict.get("summary", ""),
metadata={
"replaced_message_ids": data_dict.get("replaced_message_ids", []),
"replaced_count": data_dict.get("replaced_count", 0),
},
)
self._put(
event_type="middleware:summarize",
category="middleware",
content={"role": "system", "content": data_dict.get("summary", "")},
metadata={"replaced_count": data_dict.get("replaced_count", 0)},
)
else:
event_data = serialize_lc_object(data) if not isinstance(data, dict) else data
self._put(
event_type=name,
category="trace",
metadata=event_data if isinstance(event_data, dict) else {"data": event_data},
)
# -- Internal methods --
@@ -307,9 +431,8 @@ class RunJournal(BaseCallbackHandler):
if exc:
logger.warning("Journal flush task failed: %s", exc)
def _identify_caller(self, tags: list[str] | None, **kwargs) -> str:
_tags = tags or kwargs.get("tags", [])
for tag in _tags:
def _identify_caller(self, kwargs: dict) -> str:
for tag in kwargs.get("tags") or []:
if isinstance(tag, str) and (tag.startswith("subagent:") or tag.startswith("middleware:") or tag == "lead_agent"):
return tag
# Default to lead_agent: the main agent graph does not inject
@@ -54,7 +54,7 @@ class RunManager:
self._lock = asyncio.Lock()
self._store = store
async def _persist_to_store(self, record: RunRecord) -> None:
async def _persist_to_store(self, record: RunRecord, *, follow_up_to_run_id: str | None = None) -> None:
"""Best-effort persist run record to backing store."""
if self._store is None:
return
@@ -68,6 +68,7 @@ class RunManager:
metadata=record.metadata or {},
kwargs=record.kwargs or {},
created_at=record.created_at,
follow_up_to_run_id=follow_up_to_run_id,
)
except Exception:
logger.warning("Failed to persist run %s to store", record.run_id, exc_info=True)
@@ -89,6 +90,7 @@ class RunManager:
metadata: dict | None = None,
kwargs: dict | None = None,
multitask_strategy: str = "reject",
follow_up_to_run_id: str | None = None,
) -> RunRecord:
"""Create a new pending run and register it."""
run_id = str(uuid.uuid4())
@@ -107,7 +109,7 @@ class RunManager:
)
async with self._lock:
self._runs[run_id] = record
await self._persist_to_store(record)
await self._persist_to_store(record, follow_up_to_run_id=follow_up_to_run_id)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
@@ -120,7 +122,7 @@ class RunManager:
async with self._lock:
# Dict insertion order matches creation order, so reversing it gives
# us deterministic newest-first results even when timestamps tie.
return [r for r in self._runs.values() if r.thread_id == thread_id]
return [r for r in reversed(self._runs.values()) if r.thread_id == thread_id]
async def set_status(self, run_id: str, status: RunStatus, *, error: str | None = None) -> None:
"""Transition a run to a new status."""
@@ -174,6 +176,7 @@ class RunManager:
metadata: dict | None = None,
kwargs: dict | None = None,
multitask_strategy: str = "reject",
follow_up_to_run_id: str | None = None,
) -> RunRecord:
"""Atomically check for inflight runs and create a new one.
@@ -227,7 +230,7 @@ class RunManager:
)
self._runs[run_id] = record
await self._persist_to_store(record)
await self._persist_to_store(record, follow_up_to_run_id=follow_up_to_run_id)
logger.info("Run created: run_id=%s thread_id=%s", run_id, thread_id)
return record
@@ -29,6 +29,7 @@ class RunStore(abc.ABC):
kwargs: dict[str, Any] | None = None,
error: str | None = None,
created_at: str | None = None,
follow_up_to_run_id: str | None = None,
) -> None:
pass
@@ -28,6 +28,7 @@ class MemoryRunStore(RunStore):
kwargs=None,
error=None,
created_at=None,
follow_up_to_run_id=None,
):
now = datetime.now(UTC).isoformat()
self._runs[run_id] = {
@@ -40,6 +41,7 @@ class MemoryRunStore(RunStore):
"metadata": metadata or {},
"kwargs": kwargs or {},
"error": error,
"follow_up_to_run_id": follow_up_to_run_id,
"created_at": created_at or now,
"updated_at": now,
}
@@ -51,6 +51,7 @@ class RunContext:
event_store: Any | None = field(default=None)
run_events_config: Any | None = field(default=None)
thread_store: Any | None = field(default=None)
follow_up_to_run_id: str | None = field(default=None)
async def run_agent(
@@ -75,6 +76,7 @@ async def run_agent(
event_store = ctx.event_store
run_events_config = ctx.run_events_config
thread_store = ctx.thread_store
follow_up_to_run_id = ctx.follow_up_to_run_id
run_id = record.run_id
thread_id = record.thread_id
@@ -85,8 +87,6 @@ async def run_agent(
journal = None
journal = None
# Track whether "events" was requested but skipped
if "events" in requested_modes:
logger.info(
@@ -111,6 +111,22 @@ async def run_agent(
track_token_usage=getattr(run_events_config, "track_token_usage", True),
)
human_msg = _extract_human_message(graph_input)
if human_msg is not None:
msg_metadata = {}
if follow_up_to_run_id:
msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id
await event_store.put(
thread_id=thread_id,
run_id=run_id,
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata=msg_metadata or None,
)
content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content))
# 1. Mark running
await run_manager.set_status(run_id, RunStatus.running)
@@ -148,13 +164,12 @@ async def run_agent(
# Inject runtime context so middlewares can access thread_id
# (langgraph-cli does this automatically; we must do it manually)
runtime = Runtime(context={"thread_id": thread_id, "run_id": run_id}, store=store)
runtime = Runtime(context={"thread_id": thread_id}, store=store)
# If the caller already set a ``context`` key (LangGraph >= 0.6.0
# prefers it over ``configurable`` for thread-level data), make
# sure ``thread_id`` is available there too.
if "context" in config and isinstance(config["context"], dict):
config["context"].setdefault("thread_id", thread_id)
config["context"].setdefault("run_id", run_id)
config.setdefault("configurable", {})["__pregel_runtime"] = runtime
# Inject RunJournal as a LangChain callback handler.
+7 -2
View File
@@ -63,13 +63,14 @@ def _make_session_factory(admin_row=None):
return sf
# ── First boot: no admin → return early ──────────────────────────────────
# ── First boot: no admin → generate init_token, return early ─────────────
def test_first_boot_does_not_create_admin():
"""admin_count==0 → do NOT create admin automatically."""
"""admin_count==0 → generate init_token, do NOT create admin automatically."""
provider = _make_provider(admin_count=0)
app = _make_app_stub()
app.state.init_token = None # lifespan sets this
with patch("app.gateway.deps.get_local_provider", return_value=provider):
from app.gateway.app import _ensure_admin_user
@@ -77,6 +78,9 @@ def test_first_boot_does_not_create_admin():
asyncio.run(_ensure_admin_user(app))
provider.create_user.assert_not_called()
# init_token must have been set on app.state
assert app.state.init_token is not None
assert len(app.state.init_token) > 10
def test_first_boot_skips_migration():
@@ -85,6 +89,7 @@ def test_first_boot_skips_migration():
store = AsyncMock()
store.asearch = AsyncMock(return_value=[])
app = _make_app_stub(store=store)
app.state.init_token = None # lifespan sets this
with patch("app.gateway.deps.get_local_provider", return_value=provider):
from app.gateway.app import _ensure_admin_user
+67 -3
View File
@@ -1,7 +1,7 @@
"""Tests for the POST /api/v1/auth/initialize endpoint.
Covers: first-boot admin creation, rejection when system already
initialized, password strength validation,
initialized, invalid/missing init_token, password strength validation,
and public accessibility (no auth cookie required).
"""
@@ -16,6 +16,7 @@ os.environ.setdefault("AUTH_JWT_SECRET", "test-secret-key-initialize-admin-min-3
from app.gateway.auth.config import AuthConfig, set_auth_config
_TEST_SECRET = "test-secret-key-initialize-admin-min-32"
_INIT_TOKEN = "test-init-token-for-initialization-tests"
@pytest.fixture(autouse=True)
@@ -44,6 +45,9 @@ def client(_setup_auth):
set_auth_config(AuthConfig(jwt_secret=_TEST_SECRET))
app = create_app()
# Pre-set the init token on app.state (normally done by the lifespan on
# first boot; tests don't run the lifespan because it requires config.yaml).
app.state.init_token = _INIT_TOKEN
# Do NOT use TestClient as a context manager — that would trigger the
# full lifespan which requires config.yaml. The auth endpoints work
# without the lifespan (persistence engine is set up by _setup_auth).
@@ -51,10 +55,11 @@ def client(_setup_auth):
def _init_payload(**extra):
"""Build a valid /initialize payload."""
"""Build a valid /initialize payload with the test init_token."""
return {
"email": "admin@example.com",
"password": "Str0ng!Pass99",
"init_token": _INIT_TOKEN,
**extra,
}
@@ -80,12 +85,53 @@ def test_initialize_needs_setup_false(client):
assert me.json()["needs_setup"] is False
# ── Token validation ──────────────────────────────────────────────────────
def test_initialize_rejects_wrong_token(client):
"""Wrong init_token → 403 invalid_init_token."""
resp = client.post(
"/api/v1/auth/initialize",
json={**_init_payload(), "init_token": "wrong-token"},
)
assert resp.status_code == 403
assert resp.json()["detail"]["code"] == "invalid_init_token"
def test_initialize_rejects_empty_token(client):
"""Empty init_token → 403 (fails constant-time comparison against stored token)."""
resp = client.post(
"/api/v1/auth/initialize",
json={**_init_payload(), "init_token": ""},
)
assert resp.status_code == 403
def test_initialize_token_consumed_after_success(client):
"""After a successful /initialize the token is consumed and cannot be reused."""
client.post("/api/v1/auth/initialize", json=_init_payload())
# The token is now None; any subsequent call with the old token must be rejected (403)
resp2 = client.post(
"/api/v1/auth/initialize",
json={**_init_payload(), "email": "other@example.com"},
)
assert resp2.status_code == 403
# ── Rejection when already initialized ───────────────────────────────────
def test_initialize_rejected_when_admin_exists(client):
"""Second call to /initialize after admin exists → 409 system_already_initialized."""
"""Second call to /initialize after admin exists → 409 system_already_initialized.
The first call consumes the token. Re-setting it on app.state simulates
what would happen if the operator somehow restarted or manually refreshed
the token (e.g., in testing).
"""
client.post("/api/v1/auth/initialize", json=_init_payload())
# Re-set the token so the second attempt can pass token validation
# and reach the admin-exists check.
client.app.state.init_token = _INIT_TOKEN
resp2 = client.post(
"/api/v1/auth/initialize",
json={**_init_payload(), "email": "other@example.com"},
@@ -95,6 +141,24 @@ def test_initialize_rejected_when_admin_exists(client):
assert body["detail"]["code"] == "system_already_initialized"
def test_initialize_token_not_consumed_on_admin_exists(client):
"""Token is NOT consumed when the admin-exists guard rejects the request.
This prevents a DoS where an attacker calls with the correct token when
admin already exists and permanently burns the init token.
"""
client.post("/api/v1/auth/initialize", json=_init_payload())
# Token consumed by success above; re-simulate the scenario:
# admin exists, token is still valid (re-set), call should 409 and NOT consume token.
client.app.state.init_token = _INIT_TOKEN
client.post(
"/api/v1/auth/initialize",
json={**_init_payload(), "email": "other@example.com"},
)
# Token must still be set (not consumed) after the 409 rejection.
assert client.app.state.init_token == _INIT_TOKEN
def test_initialize_register_does_not_block_initialization(client):
"""/register creating a user before /initialize doesn't block admin creation."""
# Register a regular user first
File diff suppressed because it is too large Load Diff
+5 -5
View File
@@ -75,27 +75,27 @@ async def test_cancel_not_inflight(manager: RunManager):
@pytest.mark.anyio
async def test_list_by_thread(manager: RunManager):
"""Same thread should return multiple runs."""
"""Same thread should return multiple runs, newest first."""
r1 = await manager.create("thread-1")
r2 = await manager.create("thread-1")
await manager.create("thread-2")
runs = await manager.list_by_thread("thread-1")
assert len(runs) == 2
assert runs[0].run_id == r1.run_id
assert runs[1].run_id == r2.run_id
assert runs[0].run_id == r2.run_id
assert runs[1].run_id == r1.run_id
@pytest.mark.anyio
async def test_list_by_thread_is_stable_when_timestamps_tie(manager: RunManager, monkeypatch: pytest.MonkeyPatch):
"""Ordering should be stable (insertion order) even when timestamps tie."""
"""Newest-first ordering should not depend on timestamp precision."""
monkeypatch.setattr("deerflow.runtime.runs.manager._now_iso", lambda: "2026-01-01T00:00:00+00:00")
r1 = await manager.create("thread-1")
r2 = await manager.create("thread-1")
runs = await manager.list_by_thread("thread-1")
assert [run.run_id for run in runs] == [r1.run_id, r2.run_id]
assert [run.run_id for run in runs] == [r2.run_id, r1.run_id]
@pytest.mark.anyio
@@ -0,0 +1,439 @@
"""Tests for event-store-backed message loading in thread state/history endpoints.
Covers the helper functions added to ``app/gateway/routers/threads.py``:
- ``_sanitize_legacy_command_repr`` extracts inner ToolMessage text from
legacy ``str(Command(...))`` strings captured before the ``journal.py``
fix for state-updating tools like ``present_files``.
- ``_get_event_store_messages`` loads the full message stream with full
pagination, copy-on-read id patching, legacy Command sanitization, and
a clean fallback to ``None`` when the event store is unavailable.
"""
from __future__ import annotations
import uuid
from types import SimpleNamespace
from typing import Any
import pytest
from app.gateway.routers.threads import (
_get_event_store_messages,
_sanitize_legacy_command_repr,
)
from deerflow.runtime.events.store.memory import MemoryRunEventStore
@pytest.fixture()
def event_store() -> MemoryRunEventStore:
return MemoryRunEventStore()
class _FakeFeedbackRepo:
"""Minimal ``FeedbackRepository`` stand-in that returns a configured map."""
def __init__(self, by_run: dict[str, dict] | None = None) -> None:
self._by_run = by_run or {}
async def list_by_thread_grouped(self, thread_id: str, *, user_id: str | None) -> dict[str, dict]:
return dict(self._by_run)
def _make_request(
event_store: MemoryRunEventStore,
feedback_repo: _FakeFeedbackRepo | None = None,
) -> Any:
"""Build a minimal FastAPI-like Request object.
``get_run_event_store(request)`` reads ``request.app.state.run_event_store``.
``get_feedback_repo(request)`` reads ``request.app.state.feedback_repo``.
``get_current_user`` is monkey-patched separately in tests that need it.
"""
state = SimpleNamespace(
run_event_store=event_store,
feedback_repo=feedback_repo or _FakeFeedbackRepo(),
)
app = SimpleNamespace(state=state)
return SimpleNamespace(app=app)
@pytest.fixture(autouse=True)
def _stub_current_user(monkeypatch):
"""Stub out ``get_current_user`` so tests don't need real auth context."""
import app.gateway.routers.threads as threads_mod
async def _fake(_request):
return None
monkeypatch.setattr(threads_mod, "get_current_user", _fake)
async def _seed_simple_run(store: MemoryRunEventStore, thread_id: str, run_id: str) -> None:
"""Seed one run: human + ai_tool_call + tool_result + final ai_message, plus a trace."""
await store.put(
thread_id=thread_id, run_id=run_id,
event_type="human_message", category="message",
content={
"type": "human", "id": None,
"content": [{"type": "text", "text": "hello"}],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
},
)
await store.put(
thread_id=thread_id, run_id=run_id,
event_type="ai_tool_call", category="message",
content={
"type": "ai", "id": "lc_run--tc1",
"content": "",
"tool_calls": [{"name": "search", "args": {"q": "x"}, "id": "call_1", "type": "tool_call"}],
"invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
"usage_metadata": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15},
},
)
await store.put(
thread_id=thread_id, run_id=run_id,
event_type="tool_result", category="message",
content={
"type": "tool", "id": None,
"content": "results",
"tool_call_id": "call_1", "name": "search",
"artifact": None, "status": "success",
"additional_kwargs": {}, "response_metadata": {},
},
)
await store.put(
thread_id=thread_id, run_id=run_id,
event_type="ai_message", category="message",
content={
"type": "ai", "id": "lc_run--final1",
"content": "done",
"tool_calls": [], "invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {"finish_reason": "stop"}, "name": None,
"usage_metadata": {"input_tokens": 20, "output_tokens": 10, "total_tokens": 30},
},
)
# Non-message trace — must be filtered out.
await store.put(
thread_id=thread_id, run_id=run_id,
event_type="llm_request", category="trace",
content={"model": "test"},
)
class TestSanitizeLegacyCommandRepr:
def test_passthrough_non_string(self):
assert _sanitize_legacy_command_repr(None) is None
assert _sanitize_legacy_command_repr(42) == 42
assert _sanitize_legacy_command_repr([{"type": "text", "text": "x"}]) == [{"type": "text", "text": "x"}]
def test_passthrough_plain_string(self):
assert _sanitize_legacy_command_repr("Successfully presented files") == "Successfully presented files"
assert _sanitize_legacy_command_repr("") == ""
def test_extracts_inner_content_single_quotes(self):
legacy = (
"Command(update={'artifacts': ['/mnt/user-data/outputs/report.md'], "
"'messages': [ToolMessage(content='Successfully presented files', "
"tool_call_id='call_abc')]})"
)
assert _sanitize_legacy_command_repr(legacy) == "Successfully presented files"
def test_extracts_inner_content_double_quotes(self):
legacy = 'Command(update={"messages": [ToolMessage(content="ok", tool_call_id="x")]})'
assert _sanitize_legacy_command_repr(legacy) == "ok"
def test_unparseable_command_returns_original(self):
legacy = "Command(update={'something_else': 1})"
assert _sanitize_legacy_command_repr(legacy) == legacy
class TestGetEventStoreMessages:
@pytest.mark.anyio
async def test_returns_none_when_store_empty(self, event_store):
request = _make_request(event_store)
assert await _get_event_store_messages(request, "t_missing") is None
@pytest.mark.anyio
async def test_extracts_all_message_types_in_order(self, event_store):
await _seed_simple_run(event_store, "t1", "r1")
request = _make_request(event_store)
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
types = [m["type"] for m in messages]
assert types == ["human", "ai", "tool", "ai"]
# Trace events must not appear
for m in messages:
assert m.get("type") in {"human", "ai", "tool"}
@pytest.mark.anyio
async def test_null_ids_get_deterministic_uuid5(self, event_store):
await _seed_simple_run(event_store, "t1", "r1")
request = _make_request(event_store)
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
# AI messages keep their LLM ids
assert messages[1]["id"] == "lc_run--tc1"
assert messages[3]["id"] == "lc_run--final1"
# Human (seq=1) + tool (seq=3) get deterministic uuid5
expected_human_id = str(uuid.uuid5(uuid.NAMESPACE_URL, "t1:1"))
expected_tool_id = str(uuid.uuid5(uuid.NAMESPACE_URL, "t1:3"))
assert messages[0]["id"] == expected_human_id
assert messages[2]["id"] == expected_tool_id
# Re-running produces the same ids (stability across requests)
messages2 = await _get_event_store_messages(request, "t1")
assert [m["id"] for m in messages2] == [m["id"] for m in messages]
@pytest.mark.anyio
async def test_helper_does_not_mutate_store(self, event_store):
"""Helper must copy content dicts; the live store must stay unchanged."""
await _seed_simple_run(event_store, "t1", "r1")
request = _make_request(event_store)
_ = await _get_event_store_messages(request, "t1")
# Raw store records still have id=None for human/tool
raw = await event_store.list_messages("t1", limit=500)
human = next(e for e in raw if e["content"]["type"] == "human")
tool = next(e for e in raw if e["content"]["type"] == "tool")
assert human["content"]["id"] is None
assert tool["content"]["id"] is None
@pytest.mark.anyio
async def test_legacy_command_repr_sanitized(self, event_store):
"""A tool_result whose content is a legacy ``str(Command(...))`` is cleaned."""
legacy = (
"Command(update={'artifacts': ['/mnt/user-data/outputs/x.md'], "
"'messages': [ToolMessage(content='Successfully presented files', "
"tool_call_id='call_p')]})"
)
await event_store.put(
thread_id="t2", run_id="r1",
event_type="tool_result", category="message",
content={
"type": "tool", "id": None,
"content": legacy,
"tool_call_id": "call_p", "name": "present_files",
"artifact": None, "status": "success",
"additional_kwargs": {}, "response_metadata": {},
},
)
request = _make_request(event_store)
messages = await _get_event_store_messages(request, "t2")
assert messages is not None and len(messages) == 1
assert messages[0]["content"] == "Successfully presented files"
@pytest.mark.anyio
async def test_pagination_covers_more_than_one_page(self, event_store, monkeypatch):
"""Simulate a long thread that exceeds a single page to exercise the loop."""
thread_id = "t_long"
# Seed 12 human messages
for i in range(12):
await event_store.put(
thread_id=thread_id, run_id="r1",
event_type="human_message", category="message",
content={
"type": "human", "id": None,
"content": [{"type": "text", "text": f"msg {i}"}],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
},
)
# Force small page size to exercise pagination
import app.gateway.routers.threads as threads_mod
original = threads_mod._get_event_store_messages
# Monkeypatch MemoryRunEventStore.list_messages to assert it's called with cursor pagination
calls: list[dict] = []
real_list = event_store.list_messages
async def spy_list_messages(tid, *, limit=50, before_seq=None, after_seq=None):
calls.append({"limit": limit, "after_seq": after_seq})
return await real_list(tid, limit=limit, before_seq=before_seq, after_seq=after_seq)
monkeypatch.setattr(event_store, "list_messages", spy_list_messages)
request = _make_request(event_store)
messages = await original(request, thread_id)
assert messages is not None
assert len(messages) == 12
assert [m["content"][0]["text"] for m in messages] == [f"msg {i}" for i in range(12)]
# At least one call was made with after_seq=None (the initial page)
assert any(c["after_seq"] is None for c in calls)
@pytest.mark.anyio
async def test_summarize_regression_recovers_pre_summarize_messages(self, event_store):
"""The exact bug: checkpoint would have only post-summarize messages;
event store must surface the original pre-summarize human query."""
# Run 1 (pre-summarize)
await event_store.put(
thread_id="t_sum", run_id="r1",
event_type="human_message", category="message",
content={
"type": "human", "id": None,
"content": [{"type": "text", "text": "original question"}],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
},
)
await event_store.put(
thread_id="t_sum", run_id="r1",
event_type="ai_message", category="message",
content={
"type": "ai", "id": "lc_run--r1",
"content": "first answer",
"tool_calls": [], "invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
"usage_metadata": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
},
)
# Run 2 (post-summarize — what the checkpoint still has)
await event_store.put(
thread_id="t_sum", run_id="r2",
event_type="human_message", category="message",
content={
"type": "human", "id": None,
"content": [{"type": "text", "text": "follow up"}],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
},
)
await event_store.put(
thread_id="t_sum", run_id="r2",
event_type="ai_message", category="message",
content={
"type": "ai", "id": "lc_run--r2",
"content": "second answer",
"tool_calls": [], "invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
"usage_metadata": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
},
)
request = _make_request(event_store)
messages = await _get_event_store_messages(request, "t_sum")
assert messages is not None
# 4 messages, not 2 (which is what the summarized checkpoint would yield)
assert len(messages) == 4
assert messages[0]["content"][0]["text"] == "original question"
assert messages[1]["id"] == "lc_run--r1"
assert messages[3]["id"] == "lc_run--r2"
@pytest.mark.anyio
async def test_run_id_attached_to_every_message(self, event_store):
await _seed_simple_run(event_store, "t1", "r1")
request = _make_request(event_store)
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
assert all(m.get("run_id") == "r1" for m in messages)
@pytest.mark.anyio
async def test_feedback_attached_only_to_final_ai_message_per_run(self, event_store):
await _seed_simple_run(event_store, "t1", "r1")
feedback_repo = _FakeFeedbackRepo(
{"r1": {"feedback_id": "fb1", "rating": 1, "comment": "great"}}
)
request = _make_request(event_store, feedback_repo=feedback_repo)
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
# human (0), ai_tool_call (1), tool (2), ai_message (3)
final_ai = messages[3]
assert final_ai["feedback"] == {
"feedback_id": "fb1",
"rating": 1,
"comment": "great",
}
# Non-final messages must NOT have a feedback key at all — the
# frontend keys button visibility off of this.
assert "feedback" not in messages[0]
assert "feedback" not in messages[1]
assert "feedback" not in messages[2]
@pytest.mark.anyio
async def test_feedback_none_when_no_row_for_run(self, event_store):
await _seed_simple_run(event_store, "t1", "r1")
request = _make_request(event_store, feedback_repo=_FakeFeedbackRepo({}))
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
# Final ai_message gets an explicit ``None`` — distinguishes "eligible
# but unrated" from "not eligible" (field absent).
assert messages[3]["feedback"] is None
@pytest.mark.anyio
async def test_feedback_per_run_for_multi_run_thread(self, event_store):
"""A thread with two runs: each final ai_message should get its own feedback."""
# Run 1
await event_store.put(
thread_id="t_multi", run_id="r1",
event_type="human_message", category="message",
content={"type": "human", "id": None, "content": "q1",
"additional_kwargs": {}, "response_metadata": {}, "name": None},
)
await event_store.put(
thread_id="t_multi", run_id="r1",
event_type="ai_message", category="message",
content={"type": "ai", "id": "lc_run--a1", "content": "a1",
"tool_calls": [], "invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
"usage_metadata": None},
)
# Run 2
await event_store.put(
thread_id="t_multi", run_id="r2",
event_type="human_message", category="message",
content={"type": "human", "id": None, "content": "q2",
"additional_kwargs": {}, "response_metadata": {}, "name": None},
)
await event_store.put(
thread_id="t_multi", run_id="r2",
event_type="ai_message", category="message",
content={"type": "ai", "id": "lc_run--a2", "content": "a2",
"tool_calls": [], "invalid_tool_calls": [],
"additional_kwargs": {}, "response_metadata": {}, "name": None,
"usage_metadata": None},
)
feedback_repo = _FakeFeedbackRepo({
"r1": {"feedback_id": "fb_r1", "rating": 1, "comment": None},
"r2": {"feedback_id": "fb_r2", "rating": -1, "comment": "meh"},
})
request = _make_request(event_store, feedback_repo=feedback_repo)
messages = await _get_event_store_messages(request, "t_multi")
assert messages is not None
# human[r1], ai[r1], human[r2], ai[r2]
assert messages[1]["feedback"]["feedback_id"] == "fb_r1"
assert messages[1]["feedback"]["rating"] == 1
assert messages[3]["feedback"]["feedback_id"] == "fb_r2"
assert messages[3]["feedback"]["rating"] == -1
# Humans don't get feedback
assert "feedback" not in messages[0]
assert "feedback" not in messages[2]
@pytest.mark.anyio
async def test_feedback_repo_failure_does_not_break_helper(self, monkeypatch, event_store):
"""If feedback lookup throws, messages still come back without feedback."""
await _seed_simple_run(event_store, "t1", "r1")
class _BoomRepo:
async def list_by_thread_grouped(self, *a, **kw):
raise RuntimeError("db down")
request = _make_request(event_store, feedback_repo=_BoomRepo())
messages = await _get_event_store_messages(request, "t1")
assert messages is not None
assert len(messages) == 4
for m in messages:
assert "feedback" not in m
@pytest.mark.anyio
async def test_returns_none_when_dep_raises(self, monkeypatch, event_store):
"""When ``get_run_event_store`` is not configured, helper returns None."""
import app.gateway.routers.threads as threads_mod
def boom(_request):
raise RuntimeError("no store")
monkeypatch.setattr(threads_mod, "get_run_event_store", boom)
request = _make_request(event_store)
assert await threads_mod._get_event_store_messages(request, "t1") is None
+2 -2
View File
@@ -5,7 +5,7 @@
}
],
"settings": {
"js/ts.tsdk.path": "frontend/node_modules/typescript/lib",
"typescript.tsdk": "frontend/node_modules/typescript/lib",
"python-envs.pythonProjects": [
{
"path": "backend",
@@ -44,4 +44,4 @@
}
]
}
}
}
+21
View File
@@ -26,6 +26,9 @@ export default function SetupPage() {
const [error, setError] = useState("");
const [loading, setLoading] = useState(false);
// --- Init-admin mode only ---
const [initToken, setInitToken] = useState("");
// --- Change-password mode only ---
const [currentPassword, setCurrentPassword] = useState("");
@@ -79,6 +82,7 @@ export default function SetupPage() {
body: JSON.stringify({
email,
password: newPassword,
init_token: initToken,
}),
});
@@ -186,6 +190,23 @@ export default function SetupPage() {
required
/>
</div>
<div className="flex flex-col space-y-1">
<label htmlFor="initToken" className="text-sm font-medium">
Initialization Token
</label>
<Input
id="initToken"
type="text"
placeholder="Copy from server startup logs"
value={initToken}
onChange={(e) => setInitToken(e.target.value)}
required
autoComplete="off"
/>
<p className="text-muted-foreground text-xs">
Find the <code>INIT TOKEN</code> printed in the server startup logs.
</p>
</div>
<div className="flex flex-col space-y-1">
<label htmlFor="password" className="text-sm font-medium">
Password
+2 -2
View File
@@ -34,14 +34,14 @@ export default async function DocLayout({ children, params }) {
<Layout
navbar={
<Header
className="sticky max-w-full px-10"
className="relative max-w-full px-10"
homeURL="/"
locale={locale}
/>
}
pageMap={pageMap}
docsRepositoryBase="https://github.com/bytedance/deerflow/tree/main/frontend/src/content"
footer={<Footer className="mt-0" />}
footer={<Footer />}
i18n={i18n}
// ... Your additional layout options
>
@@ -46,13 +46,7 @@ export default function AgentChatPage() {
const [settings, setSettings] = useThreadSettings(threadId);
const { showNotification } = useNotification();
const {
thread,
sendMessage,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} = useThreadStream({
const [thread, sendMessage] = useThreadStream({
threadId: isNewThread ? undefined : threadId,
context: { ...settings.context, agent_name: agent_name },
onStart: (createdThreadId) => {
@@ -147,9 +141,6 @@ export default function AgentChatPage() {
threadId={threadId}
thread={thread}
paddingBottom={messageListPaddingBottom}
hasMoreHistory={hasMoreHistory}
loadMoreHistory={loadMoreHistory}
isHistoryLoading={isHistoryLoading}
/>
</div>
@@ -1,6 +1,6 @@
"use client";
import { useCallback, useEffect, useRef, useState } from "react";
import { useCallback, useEffect, useState } from "react";
import { type PromptInputMessage } from "@/components/ai-elements/prompt-input";
import { ArtifactTrigger } from "@/components/workspace/artifacts";
@@ -35,30 +35,19 @@ export default function ChatPage() {
const { threadId, setThreadId, isNewThread, setIsNewThread, isMock } =
useThreadChat();
const [settings, setSettings] = useThreadSettings(threadId);
const mountedRef = useRef(false);
const [mounted, setMounted] = useState(false);
useSpecificChatMode();
useEffect(() => {
mountedRef.current = true;
setMounted(true);
}, []);
const { showNotification } = useNotification();
const {
thread,
sendMessage,
isUploading,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} = useThreadStream({
const [thread, sendMessage, isUploading] = useThreadStream({
threadId: isNewThread ? undefined : threadId,
context: settings.context,
isMock,
onSend: (_threadId) => {
setThreadId(_threadId);
setIsNewThread(false);
},
onStart: (createdThreadId) => {
setThreadId(createdThreadId);
setIsNewThread(false);
@@ -126,9 +115,6 @@ export default function ChatPage() {
threadId={threadId}
thread={thread}
paddingBottom={messageListPaddingBottom}
hasMoreHistory={hasMoreHistory}
loadMoreHistory={loadMoreHistory}
isHistoryLoading={isHistoryLoading}
/>
</div>
<div className="absolute right-0 bottom-0 left-0 z-30 flex justify-center px-4">
@@ -152,7 +138,7 @@ export default function ChatPage() {
/>
</div>
</div>
{mountedRef.current ? (
{mounted ? (
<InputBox
className={cn("bg-background/5 w-full -translate-y-4")}
isNewThread={isNewThread}
@@ -184,7 +170,7 @@ export default function ChatPage() {
<div
aria-hidden="true"
className={cn(
"bg-background/5 h-32 w-full -translate-y-4 rounded-2xl",
"bg-background/5 h-32 w-full -translate-y-4 rounded-2xl border",
)}
/>
)}
+2 -13
View File
@@ -1,20 +1,9 @@
import { useMemo } from "react";
import { cn } from "@/lib/utils";
export type FooterProps = {
className?: string;
};
export function Footer({ className }: FooterProps) {
export function Footer() {
const year = useMemo(() => new Date().getFullYear(), []);
return (
<footer
className={cn(
"container-md mx-auto mt-32 flex flex-col items-center justify-center",
className,
)}
>
<footer className="container-md mx-auto mt-32 flex flex-col items-center justify-center">
<hr className="from-border/0 to-border/0 m-0 h-px w-full border-none bg-linear-to-r via-white/20" />
<div className="text-muted-foreground container flex h-20 flex-col items-center justify-center text-sm">
<p className="text-center font-serif text-lg md:text-xl">
+17 -21
View File
@@ -55,7 +55,7 @@ import {
DropdownMenuLabel,
DropdownMenuSeparator,
} from "@/components/ui/dropdown-menu";
import { fetch } from "@/core/api/fetcher";
import { fetchWithAuth } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";
import { useI18n } from "@/core/i18n/hooks";
import { useModels } from "@/core/models/hooks";
@@ -155,7 +155,6 @@ export function InputBox({
const [followupsLoading, setFollowupsLoading] = useState(false);
const lastGeneratedForAiIdRef = useRef<string | null>(null);
const wasStreamingRef = useRef(false);
const messagesRef = useRef(thread.messages);
const [confirmOpen, setConfirmOpen] = useState(false);
const [pendingSuggestion, setPendingSuggestion] = useState<string | null>(
@@ -355,10 +354,6 @@ export function InputBox({
followupsVisibilityChangeRef.current?.(showFollowups);
}, [showFollowups]);
useEffect(() => {
messagesRef.current = thread.messages;
}, [thread.messages]);
useEffect(() => {
return () => followupsVisibilityChangeRef.current?.(false);
}, []);
@@ -375,16 +370,14 @@ export function InputBox({
return;
}
const lastAi = [...messagesRef.current]
.reverse()
.find((m) => m.type === "ai");
const lastAi = [...thread.messages].reverse().find((m) => m.type === "ai");
const lastAiId = lastAi?.id ?? null;
if (!lastAiId || lastAiId === lastGeneratedForAiIdRef.current) {
return;
}
lastGeneratedForAiIdRef.current = lastAiId;
const recent = messagesRef.current
const recent = thread.messages
.filter((m) => m.type === "human" || m.type === "ai")
.map((m) => {
const role = m.type === "human" ? "user" : "assistant";
@@ -403,16 +396,19 @@ export function InputBox({
setFollowupsLoading(true);
setFollowups([]);
fetch(`${getBackendBaseURL()}/api/threads/${threadId}/suggestions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: recent,
n: 3,
model_name: context.model_name ?? undefined,
}),
signal: controller.signal,
})
fetchWithAuth(
`${getBackendBaseURL()}/api/threads/${threadId}/suggestions`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: recent,
n: 3,
model_name: context.model_name ?? undefined,
}),
signal: controller.signal,
},
)
.then(async (res) => {
if (!res.ok) {
return { suggestions: [] as string[] };
@@ -434,7 +430,7 @@ export function InputBox({
});
return () => controller.abort();
}, [context.model_name, disabled, isMock, status, threadId]);
}, [context.model_name, disabled, isMock, status, thread.messages, threadId]);
return (
<div ref={promptRootRef} className="relative flex flex-col gap-4">
@@ -1,12 +1,9 @@
import type { BaseStream } from "@langchain/langgraph-sdk/react";
import { ChevronUpIcon, Loader2Icon } from "lucide-react";
import { useCallback, useEffect, useRef } from "react";
import {
Conversation,
ConversationContent,
} from "@/components/ai-elements/conversation";
import { Button } from "@/components/ui/button";
import { useI18n } from "@/core/i18n/hooks";
import {
extractContentFromMessage,
@@ -21,6 +18,7 @@ import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
import type { Subtask } from "@/core/tasks";
import { useUpdateSubtask } from "@/core/tasks/context";
import type { AgentThreadState } from "@/core/threads";
import { useThreadMessageEnrichment } from "@/core/threads/hooks";
import { cn } from "@/lib/utils";
import { ArtifactFileList } from "../artifacts/artifact-file-list";
@@ -35,134 +33,22 @@ import { SubtaskCard } from "./subtask-card";
export const MESSAGE_LIST_DEFAULT_PADDING_BOTTOM = 160;
export const MESSAGE_LIST_FOLLOWUPS_EXTRA_PADDING_BOTTOM = 80;
const LOAD_MORE_HISTORY_THROTTLE_MS = 1200;
function LoadMoreHistoryIndicator({
isLoading,
hasMore,
loadMore,
}: {
isLoading?: boolean;
hasMore?: boolean;
loadMore?: () => void;
}) {
const { t } = useI18n();
const sentinelRef = useRef<HTMLDivElement | null>(null);
const timeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const lastLoadRef = useRef(0);
const throttledLoadMore = useCallback(() => {
if (!hasMore || isLoading) {
return;
}
const now = Date.now();
const remaining =
LOAD_MORE_HISTORY_THROTTLE_MS - (now - lastLoadRef.current);
if (remaining <= 0) {
lastLoadRef.current = now;
loadMore?.();
return;
}
if (timeoutRef.current) {
return;
}
timeoutRef.current = setTimeout(() => {
timeoutRef.current = null;
if (!hasMore || isLoading) {
return;
}
lastLoadRef.current = Date.now();
loadMore?.();
}, remaining);
}, [hasMore, isLoading, loadMore]);
useEffect(() => {
const element = sentinelRef.current;
if (!element || !hasMore) {
return;
}
const observer = new IntersectionObserver(
([entry]) => {
if (entry?.isIntersecting) {
throttledLoadMore();
}
},
{
rootMargin: "120px 0px 0px 0px",
},
);
observer.observe(element);
return () => {
observer.disconnect();
};
}, [hasMore, throttledLoadMore]);
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, []);
if (!hasMore && !isLoading) {
return null;
}
return (
<div ref={sentinelRef} className="flex w-full justify-center">
<Button
type="button"
variant="ghost"
size="sm"
className="text-muted-foreground hover:text-foreground rounded-full px-3"
disabled={(isLoading ?? false) || !hasMore}
onClick={throttledLoadMore}
>
{isLoading ? (
<>
<Loader2Icon className="mr-2 size-4 animate-spin" />
{t.common.loading}
</>
) : (
<>
<ChevronUpIcon className="mr-2 size-4" />
{t.common.loadMore}
</>
)}
</Button>
</div>
);
}
export function MessageList({
className,
threadId,
thread,
paddingBottom = MESSAGE_LIST_DEFAULT_PADDING_BOTTOM,
hasMoreHistory,
loadMoreHistory,
isHistoryLoading,
}: {
className?: string;
threadId: string;
thread: BaseStream<AgentThreadState>;
paddingBottom?: number;
hasMoreHistory?: boolean;
loadMoreHistory?: () => void;
isHistoryLoading?: boolean;
}) {
const { t } = useI18n();
const rehypePlugins = useRehypeSplitWordsIntoSpans(thread.isLoading);
const updateSubtask = useUpdateSubtask();
const messages = thread.messages;
const { data: enrichment } = useThreadMessageEnrichment(threadId);
if (thread.isThreadLoading && messages.length === 0) {
return <MessageListSkeleton />;
@@ -171,21 +57,19 @@ export function MessageList({
<Conversation
className={cn("flex size-full flex-col justify-center", className)}
>
<ConversationContent className="mx-auto w-full max-w-(--container-width-md) gap-8 pt-8">
<LoadMoreHistoryIndicator
isLoading={isHistoryLoading}
hasMore={hasMoreHistory}
loadMore={loadMoreHistory}
/>
<ConversationContent className="mx-auto w-full max-w-(--container-width-md) gap-8 pt-12">
{groupMessages(messages, (group) => {
if (group.type === "human" || group.type === "assistant") {
return group.messages.map((msg) => {
const entry = msg.id ? enrichment?.get(msg.id) : undefined;
return (
<MessageListItem
key={`${group.id}/${msg.id}`}
threadId={threadId}
message={msg}
isLoading={thread.isLoading}
runId={entry?.run_id}
feedback={entry?.feedback}
/>
);
});
@@ -5,7 +5,7 @@ import { useState } from "react";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { fetch, getCsrfHeaders } from "@/core/api/fetcher";
import { fetchWithAuth, getCsrfHeaders } from "@/core/api/fetcher";
import { useAuth } from "@/core/auth/AuthProvider";
import { parseAuthError } from "@/core/auth/types";
@@ -36,7 +36,7 @@ export function AccountSettingsPage() {
setLoading(true);
try {
const res = await fetch("/api/v1/auth/change-password", {
const res = await fetchWithAuth("/api/v1/auth/change-password", {
method: "POST",
headers: {
"Content-Type": "application/json",
-9
View File
@@ -25,15 +25,6 @@ const meta: MetaRecord = {
blog: {
type: "page",
},
posts: {
type: "page",
},
login: {
type: "page",
},
setup: {
type: "page",
},
};
export default meta;
@@ -1,8 +1,3 @@
---
title: Agents and Threads
description: DeerFlow App supports multiple named agents and maintains conversation state across sessions through threads and checkpointing.
---
import { Callout, Cards, Steps } from "nextra/components";
# Agents and Threads
@@ -1,8 +1,3 @@
---
title: Configuration
description: DeerFlow App is configured through two files and a set of environment variables. This page covers the application-level configuration that most operators need to set up before deploying.
---
import { Callout, Cards, Tabs } from "nextra/components";
# Configuration
@@ -1,8 +1,3 @@
---
title: Deployment Guide
description: "This guide covers all supported deployment methods for DeerFlow App: local development, Docker Compose, and production with Kubernetes-managed sandboxes."
---
import { Callout, Cards, Steps, Tabs } from "nextra/components";
# Deployment Guide
@@ -21,15 +16,14 @@ make dev
Services started:
| Service | Port | Description |
| ----------- | ---- | ------------------------ |
| LangGraph | 2024 | DeerFlow Harness runtime |
| Gateway API | 8001 | FastAPI backend |
| Frontend | 3000 | Next.js UI |
| nginx | 2026 | Unified reverse proxy |
| Service | Port | Description |
|---|---|---|
| LangGraph | 2024 | DeerFlow Harness runtime |
| Gateway API | 8001 | FastAPI backend |
| Frontend | 3000 | Next.js UI |
| nginx | 2026 | Unified reverse proxy |
Access the app at **http://localhost:2026**.
</Tabs.Tab>
<Tabs.Tab>
```bash
@@ -37,7 +31,6 @@ make stop
```
Stops all four services. Safe to run even if a service is not running.
</Tabs.Tab>
<Tabs.Tab>
```
@@ -48,11 +41,9 @@ logs/nginx.log # nginx access/error logs
```
Tail a log in real time:
```bash
tail -f logs/langgraph.log
```
</Tabs.Tab>
</Tabs>
@@ -112,11 +103,11 @@ For production, use a named volume or a Persistent Volume Claim (PVC) instead of
### Sandbox mode selection
| Sandbox | Use case |
| -------------------------------------- | ------------------------------------------ |
| `LocalSandboxProvider` | Single-user, trusted local workflows |
| `AioSandboxProvider` (Docker) | Multi-user, moderate isolation requirement |
| `AioSandboxProvider` + K8s Provisioner | Production, strong isolation, multi-user |
| Sandbox | Use case |
|---|---|
| `LocalSandboxProvider` | Single-user, trusted local workflows |
| `AioSandboxProvider` (Docker) | Multi-user, moderate isolation requirement |
| `AioSandboxProvider` + K8s Provisioner | Production, strong isolation, multi-user |
For any deployment with more than one concurrent user, use a container-based sandbox to prevent users from interfering with each other's execution environments.
@@ -163,10 +154,10 @@ When `USERDATA_PVC_NAME` is set, the provisioner automatically uses subPath (`th
nginx routes all traffic. Key environment variables that control routing:
| Variable | Default | Description |
| -------------------- | ---------------- | --------------------------------------- |
| `LANGGRAPH_UPSTREAM` | `langgraph:2024` | LangGraph service address |
| `LANGGRAPH_REWRITE` | `/` | URL rewrite prefix for LangGraph routes |
| Variable | Default | Description |
|---|---|---|
| `LANGGRAPH_UPSTREAM` | `langgraph:2024` | LangGraph service address |
| `LANGGRAPH_REWRITE` | `/` | URL rewrite prefix for LangGraph routes |
These are set in the Docker Compose environment and processed by `envsubst` at container startup.
@@ -184,12 +175,12 @@ openssl rand -base64 32
### Resource recommendations
| Service | Minimum | Recommended |
| ------------------------------- | ---------------- | ---------------- |
| LangGraph (agent runtime) | 2 vCPU, 4 GB RAM | 4 vCPU, 8 GB RAM |
| Gateway | 0.5 vCPU, 512 MB | 1 vCPU, 1 GB |
| Frontend | 0.5 vCPU, 512 MB | 1 vCPU, 1 GB |
| Sandbox container (per session) | 1 vCPU, 1 GB | 2 vCPU, 2 GB |
| Service | Minimum | Recommended |
|---|---|---|
| LangGraph (agent runtime) | 2 vCPU, 4 GB RAM | 4 vCPU, 8 GB RAM |
| Gateway | 0.5 vCPU, 512 MB | 1 vCPU, 1 GB |
| Frontend | 0.5 vCPU, 512 MB | 1 vCPU, 1 GB |
| Sandbox container (per session) | 1 vCPU, 1 GB | 2 vCPU, 2 GB |
## Deployment verification
@@ -210,8 +201,5 @@ A working deployment returns a `200` response from each endpoint. The `/api/mode
<Cards num={2}>
<Cards.Card title="Configuration" href="/docs/application/configuration" />
<Cards.Card
title="Operations & Troubleshooting"
href="/docs/application/operations-and-troubleshooting"
/>
<Cards.Card title="Operations & Troubleshooting" href="/docs/application/operations-and-troubleshooting" />
</Cards>
@@ -1,8 +1,3 @@
---
title: DeerFlow App
description: DeerFlow App is the reference implementation of what a production DeerFlow experience looks like. It assembles the Harness runtime, a web-based conversation workspace, an API gateway, and a reverse proxy into a single deployable system.
---
import { Callout, Cards } from "nextra/components";
# DeerFlow App
@@ -1,8 +1,3 @@
---
title: Operations and Troubleshooting
description: This page covers day-to-day operational tasks and solutions to common problems when running DeerFlow App.
---
import { Callout, Cards } from "nextra/components";
# Operations and Troubleshooting
@@ -1,8 +1,3 @@
---
title: Quick Start
description: This guide walks you through starting DeerFlow App on your local machine using the `make dev` workflow. All four services (LangGraph, Gateway, Frontend, nginx) start together and are accessible through a single URL.
---
import { Callout, Cards, Steps } from "nextra/components";
# Quick Start
@@ -1,8 +1,3 @@
---
title: Workspace Usage
description: The DeerFlow App workspace is a browser-based interface for having multi-turn conversations with the agent, tracking task progress, viewing artifacts, and managing files.
---
import { Callout, Cards } from "nextra/components";
# Workspace Usage
@@ -1,8 +1,3 @@
---
title: Configuration
description: "DeerFlow's configuration system is designed around one goal: every meaningful behavior should be expressible in a config file, not hardcoded in the application. This makes deployments reproducible, auditable, and easy to customize per environment."
---
import { Callout, Cards } from "nextra/components";
# Configuration
@@ -1,8 +1,3 @@
---
title: Customization
description: DeerFlow's pluggable architecture means most parts of the system can be replaced or extended without forking the core. This page maps the extension points and explains how to use each one.
---
import { Callout, Cards } from "nextra/components";
# Customization
@@ -1,8 +1,3 @@
---
title: Design Principles
description: Understanding the design principles behind DeerFlow Harness helps you use it effectively, extend it confidently, and reason about how your agents will behave in production.
---
import { Callout, Cards } from "nextra/components";
# Design Principles
@@ -1,8 +1,3 @@
---
title: Install DeerFlow Harness
description: The DeerFlow Harness is the Python SDK and runtime foundation for building your own Super Agent systems.
---
import { Callout, Cards } from "nextra/components";
# Install DeerFlow Harness
@@ -1,8 +1,3 @@
---
title: Integration Guide
description: DeerFlow Harness is not only a standalone application. It is a Python library you can import and use inside your own backend, API server, automation system, or multi-agent orchestrator.
---
import { Callout, Cards } from "nextra/components";
# Integration Guide
@@ -1,8 +1,3 @@
---
title: Lead Agent
description: The Lead Agent is the central executor in a DeerFlow thread. Every conversation, task, and workflow flows through it. Understanding how it works helps you configure it effectively and extend it when needed.
---
import { Callout, Cards, Steps } from "nextra/components";
# Lead Agent
-5
View File
@@ -1,8 +1,3 @@
---
title: MCP Integration
description: The **Model Context Protocol (MCP)** is an open standard for connecting language models to external tools and data sources. DeerFlow's MCP integration allows you to extend the agent with any tool server that implements the MCP protocol — without modifying the harness itself.
---
import { Callout, Cards, Steps } from "nextra/components";
# MCP Integration
@@ -1,8 +1,3 @@
---
title: Memory
description: Memory is a runtime feature of the DeerFlow Harness. It is not a simple conversation log — it is a structured store of facts and context summaries that persist across separate sessions and inform the agent's behavior in future conversations.
---
import { Callout, Cards } from "nextra/components";
# Memory
@@ -1,8 +1,3 @@
---
title: Middlewares
description: Every time the Lead Agent calls the LLM, it runs through a **middleware chain** before and after the model call. Middlewares can read and modify the agent's state, inject content into the system prompt, intercept tool calls, and react to model outputs.
---
import { Callout } from "nextra/components";
# Middlewares
+104 -64
View File
@@ -1,18 +1,14 @@
---
title: Quick Start
description: Learn how to create and run a DeerFlow agent with create_deerflow_agent, from model setup to streaming responses.
---
import { Callout, Cards, Steps } from "nextra/components";
# Quick Start
<Callout type="info" emoji="🚀">
This guide shows you how to build and run a DeerFlow agent in Python with
<code>create_deerflow_agent</code>.
This guide shows you how to use the DeerFlow Harness programmatically — not
through the App UI, but by importing and calling the harness directly in
Python.
</Callout>
The fastest way to understand DeerFlow Harness is to create an agent directly in code. This quick start walks through model setup, agent creation, and streaming a response.
The DeerFlow Harness is the Python SDK and runtime foundation. This quick start walks you through the key APIs for running an agent, streaming its output, and working with threads.
## Prerequisites
@@ -25,86 +21,130 @@ cd backend
uv sync
```
You will also need a chat model instance from the LangChain provider package you want to use.
## Configuration
## Create your first agent
All harness behaviors are driven by `config.yaml`. At minimum, you need at least one model configured:
```yaml
# config.yaml
config_version: 6
models:
- name: gpt-4o
use: langchain_openai:ChatOpenAI
model: gpt-4o
api_key: $OPENAI_API_KEY
request_timeout: 600.0
max_retries: 2
sandbox:
use: deerflow.sandbox.local:LocalSandboxProvider
tools:
- use: deerflow.community.ddg_search.tools:web_search_tool
- use: deerflow.community.jina_ai.tools:web_fetch_tool
- use: deerflow.sandbox.tools:ls_tool
- use: deerflow.sandbox.tools:read_file_tool
- use: deerflow.sandbox.tools:write_file_tool
- use: deerflow.sandbox.tools:bash_tool
```
Copy `config.example.yaml` to `config.yaml` and fill in your API key.
## Running the harness
The primary entry point for the DeerFlow Harness is `DeerFlowClient`. It manages thread state, invokes the Lead Agent, and streams the response.
<Steps>
### Import the factory and model
### Import and configure
```python
from deerflow.agents import create_deerflow_agent
from langchain_openai import ChatOpenAI
import asyncio
from deerflow.client import DeerFlowClient
from deerflow.config import load_config
# Load config.yaml from the current directory or DEER_FLOW_CONFIG_PATH
load_config()
client = DeerFlowClient()
```
### Create a model
### Create a thread
```python
model = ChatOpenAI(
model="gpt-4o",
api_key="YOUR_OPENAI_API_KEY",
)
thread_id = "my-thread-001"
```
### Create an agent
Thread IDs are arbitrary strings. Reusing the same ID continues the existing conversation (if a checkpointer is configured).
### Send a message and stream the response
```python
agent = create_deerflow_agent(model)
```
async def run():
async for event in client.astream(
thread_id=thread_id,
message="Research the top 3 open-source LLM frameworks and summarize them.",
config={
"configurable": {
"model_name": "gpt-4o",
"thinking_enabled": False,
"is_plan_mode": True,
"subagent_enabled": True,
}
},
):
print(event)
This returns a compiled LangGraph agent with DeerFlow's default middleware chain.
### Stream a response
```python
for event in agent.stream(
{"messages": [{"role": "user", "content": "Explain what DeerFlow Harness is."}]},
stream_mode=["messages", "values"],
):
print(event)
asyncio.run(run())
```
</Steps>
## Add tools or behavior
## Configurable options
You can customize the agent by passing tools, a system prompt, runtime features, middleware, or a checkpointer.
The `config.configurable` dict controls per-request behavior:
| Key | Type | Default | Description |
|---|---|---|---|
| `model_name` | `str \| None` | first model in config | Model to use for this request |
| `thinking_enabled` | `bool` | `True` | Enable extended thinking mode (if supported) |
| `reasoning_effort` | `str \| None` | `None` | Reasoning effort level (model-specific) |
| `is_plan_mode` | `bool` | `False` | Enable TodoList middleware for task tracking |
| `subagent_enabled` | `bool` | `False` | Allow the agent to delegate subtasks |
| `max_concurrent_subagents` | `int` | `3` | Maximum parallel subagent calls per turn |
| `agent_name` | `str \| None` | `None` | Name of a custom agent to load |
## Streaming event types
`client.astream()` yields events from the LangGraph runtime. The key event types are:
| Event type | Description |
|---|---|
| `messages` | Individual message chunks (text, thinking, tool calls) |
| `thread_state` | Thread state updates (title, artifacts, todo list) |
Message chunks contain the token stream as the agent generates its response.
## Working with a custom agent
If you have defined a custom agent, pass its `name` in the configurable:
```python
from deerflow.agents import RuntimeFeatures, create_deerflow_agent
agent = create_deerflow_agent(
model,
system_prompt="You are a concise research assistant.",
features=RuntimeFeatures(subagent=True, memory=False),
plan_mode=True,
name="research-agent",
)
async for event in client.astream(
thread_id="thread-002",
message="Analyze the attached CSV and generate a summary chart.",
config={
"configurable": {
"agent_name": "data-analyst",
"subagent_enabled": True,
}
},
):
...
```
Common parameters:
| Parameter | Description |
|---|---|
| `tools` | Additional tools available to the agent |
| `system_prompt` | Custom system prompt |
| `features` | Enable or replace built-in runtime features |
| `extra_middleware` | Insert custom middleware into the default chain |
| `plan_mode` | Enable Todo-style task tracking |
| `checkpointer` | Persist agent state across runs |
| `name` | Logical agent name |
## When to use DeerFlowClient instead
`create_deerflow_agent()` is the low-level SDK factory when you want to work directly with the compiled agent graph.
Use `DeerFlowClient` when you want the higher-level embedded app interface, such as:
- thread-oriented chat helpers,
- model / skills / memory management APIs,
- file uploads and artifacts,
- Gateway-like response formats.
The custom agent's configuration (model, skills, tool groups) is loaded automatically from `agents/data-analyst/config.yaml`.
## Next steps
@@ -1,8 +1,3 @@
---
title: Sandbox
description: The sandbox gives the Lead Agent a controlled environment where it can read files, write outputs, run shell commands, and produce artifacts. Without a sandbox, the agent can only generate text. With a sandbox, it can write and execute code, process data files, generate charts, and build deliverables.
---
import { Callout, Cards, Tabs } from "nextra/components";
# Sandbox
@@ -1,8 +1,3 @@
---
title: Skills
description: A skill is more than a prompt. It is a self-contained capability package that can include structured instructions, step-by-step workflows, domain-specific best practices, supporting resources, and tool configurations. Skills are loaded on demand — they inject their content when a task calls for them and stay out of the context otherwise.
---
import { Callout, Cards, FileTree, Steps } from "nextra/components";
# Skills
@@ -1,8 +1,3 @@
---
title: Subagents
description: When a task is too broad for a single reasoning thread, or when parts of it can be done in parallel, the Lead Agent delegates work to **subagents**. A subagent is a self-contained agent invocation that receives a specific task, executes it, and returns the result.
---
import { Callout, Cards } from "nextra/components";
# Subagents
@@ -1,8 +1,3 @@
---
title: Tools
description: "The Lead Agent is a tool-calling agent. Tools are how it interacts with the world: searching the web, reading and writing files, running commands, delegating tasks, and presenting outputs to the user."
---
import { Callout, Cards, Tabs } from "nextra/components";
# Tools
@@ -1,8 +1,3 @@
---
title: Core Concepts
description: Before you go deeper into DeerFlow, it helps to anchor on a few concepts that appear throughout the system. These concepts explain what DeerFlow is optimizing for and why its architecture looks the way it does.
---
import { Callout, Cards } from "nextra/components";
# Core Concepts
@@ -1,8 +1,3 @@
---
title: Harness vs App
description: "DeerFlow has two layers that are closely related but serve different purposes."
---
import { Callout, Cards } from "nextra/components";
# Harness vs App
@@ -1,8 +1,3 @@
---
title: Why DeerFlow
description: DeerFlow exists because modern agent systems need more than a chat loop. A useful agent must plan over long horizons, break work into sub-tasks, use tools, manipulate files, run code safely, and preserve enough context to stay coherent across a complex task. DeerFlow was built to provide that runtime foundation.
---
import { Callout, Cards } from "nextra/components";
# Why DeerFlow
+14 -2
View File
@@ -1,8 +1,20 @@
import type { MetaRecord } from "nextra";
const meta: MetaRecord = {
"model-providers": {
title: "Model providers",
"concepts-glossary": {
title: "Concepts Glossary",
},
"configuration-reference": {
title: "Configuration Reference",
},
"api-gateway-reference": {
title: "API / Gateway Reference",
},
"runtime-flags-and-modes": {
title: "Runtime Flags and Modes",
},
"source-map": {
title: "Source Map",
},
};
@@ -0,0 +1,69 @@
import { Callout } from "nextra/components";
# API / Gateway Reference
<Callout type="info">
DeerFlow Gateway is built on FastAPI and provides interactive API
documentation at <code>http://localhost:8001/docs</code>.
</Callout>
## Base URL
```
http://localhost:8001
```
Via nginx proxy:
```
http://localhost:2026/api
```
## Core endpoints
### System
| Method | Path | Description |
| ------ | ------------- | --------------------------------- |
| `GET` | `/health` | Service health check |
| `GET` | `/api/models` | Get the list of configured models |
### Agent management
| Method | Path | Description |
| -------- | -------------------- | ------------------------------- |
| `GET` | `/api/agents` | List all agents |
| `POST` | `/api/agents` | Create a custom agent |
| `GET` | `/api/agents/{name}` | Get agent configuration |
| `PUT` | `/api/agents/{name}` | Update agent configuration |
| `DELETE` | `/api/agents/{name}` | Delete an agent |
| `POST` | `/api/agents/check` | Check/suggest unique agent slug |
### Threads and memory
| Method | Path | Description |
| -------- | -------------------------- | ------------------------- |
| `GET` | `/api/threads` | List threads |
| `DELETE` | `/api/threads/{thread_id}` | Delete a thread |
| `GET` | `/api/memory` | Get global memory |
| `GET` | `/api/memory/{agent_name}` | Get agent-specific memory |
| `DELETE` | `/api/memory` | Clear global memory |
### Extensions
| Method | Path | Description |
| ------ | --------------------------------------- | -------------------------------------------- |
| `GET` | `/api/extensions` | List all extensions (MCP servers and skills) |
| `POST` | `/api/extensions/mcp/{name}/enable` | Enable an MCP server |
| `POST` | `/api/extensions/mcp/{name}/disable` | Disable an MCP server |
| `POST` | `/api/extensions/skills/{name}/enable` | Enable a skill |
| `POST` | `/api/extensions/skills/{name}/disable` | Disable a skill |
### File uploads
| Method | Path | Description |
| ------ | ------------------------------------- | ------------------------------------- |
| `POST` | `/api/uploads/{thread_id}` | Upload a file to the thread workspace |
| `GET` | `/api/uploads/{thread_id}/{filename}` | Retrieve an uploaded file |
For the full interactive API documentation visit `http://localhost:8001/docs` (Swagger UI).
@@ -0,0 +1,67 @@
import { Callout } from "nextra/components";
# Concepts Glossary
This glossary defines the core terms used throughout the DeerFlow documentation.
---
## Agent
In DeerFlow, an agent is the primary processing unit that receives user messages, decides what actions to take (tool calls or direct responses), and generates output. DeerFlow uses a two-tier architecture with a **Lead Agent** and **Subagents**.
## Artifact
A file produced by the agent — a report, chart, code file, or other deliverable. Artifacts are exposed via the `present_files` tool and persisted in the thread's user-data directory.
## Checkpoint
A persisted snapshot of thread state, saved after each agent turn. Checkpoints allow conversations to resume after server restarts and support state management for long-horizon tasks.
## Context Engineering
The practice of controlling what the agent sees, remembers, and ignores at each step — through summarization, scoped subagent contexts, and external file memory — to keep the agent effective over long-horizon tasks.
## Harness
An opinionated agent runtime that packages tool access, skill loading, sandbox execution, memory, subagent coordination, and context management — rather than just exposing abstractions.
## Lead Agent
The primary executor in each DeerFlow thread, responsible for planning, tool calls, and response generation. Built on LangGraph + LangChain Agent, augmented by the middleware chain.
## Long-horizon Agent
An agent that remains useful across a chain of actions — making plans, calling tools many times, managing intermediate files, and producing a final artifact — rather than producing only a single answer.
## Memory
Structured facts and user context that persists across independent conversation sessions, injected into the agent's system prompt in subsequent sessions.
## Middleware
A plugin that wraps every LLM call, able to read and modify agent state before and after the model invocation. DeerFlow uses middleware for memory, summarization, title generation, and other cross-cutting behaviors.
## MCP (Model Context Protocol)
An open standard for connecting language models to external tools and data sources. DeerFlow's MCP integration allows connection to any compatible tool server.
## Sandbox
The isolated execution environment where the agent performs file and command-based work. DeerFlow supports local (`LocalSandboxProvider`) and container-based (`AioSandboxProvider`) sandbox modes.
## Skill
A task-oriented capability pack containing structured instructions, workflows, and best practices, loaded into the agent's context on demand. Skills provide specialization without polluting the general agent context.
## Subagent
A focused worker that handles a delegated subtask, running with an isolated context that contains only the information needed to complete its assigned work.
## Thread
The complete encapsulation of a conversation and all its associated state — message history, artifacts, todo list, and checkpoint data.
## ThreadState
The LangGraph-managed state object in DeerFlow, containing `messages`, `artifacts`, `todo_list`, and runtime metadata.
@@ -0,0 +1,123 @@
import { Callout } from "nextra/components";
# Configuration Reference
This page is the complete reference for all top-level fields in `config.yaml`.
<Callout type="info">
See <code>config.example.yaml</code> in the repository root for a fully
commented example with all available options.
</Callout>
## Top-level fields
| Field | Type | Description |
| ---------------------- | ------------- | ----------------------------------------------- |
| `config_version` | `int` | Config schema version (current: 6) |
| `log_level` | `str` | Log verbosity: `debug`/`info`/`warning`/`error` |
| `models` | `list` | Available LLM model configurations |
| `image_generate_model` | `str \| list` | Model name to use for image generation |
| `token_usage` | `object` | Token usage tracking config |
| `tools` | `list` | Available tool configurations |
| `tool_groups` | `list` | Named groupings of tools |
| `tool_search` | `object` | Deferred tool loading config |
| `sandbox` | `object` | Sandbox provider and options |
| `skills` | `object` | Skills directory and container path |
| `skill_evolution` | `object` | Agent-managed skill creation |
| `subagents` | `object` | Subagent timeouts and max turns |
| `acp_agents` | `dict` | External ACP agent configurations |
| `memory` | `object` | Cross-session memory storage |
| `summarization` | `object` | Conversation summarization |
| `title` | `object` | Automatic thread title generation |
| `checkpointer` | `object` | Thread state persistence |
| `guardrails` | `object` | Tool call authorization |
| `uploads` | `object` | File upload settings |
| `channels` | `list` | IM channel integrations |
## models
```yaml
models:
- name: gpt-4o # Model identifier (referenced in requests)
use: langchain_openai:ChatOpenAI # Python class path
model: gpt-4o # Model name passed to the LLM
api_key: $OPENAI_API_KEY # API key (env var interpolation supported)
base_url: null # Optional: custom endpoint URL
request_timeout: 600.0 # Request timeout in seconds
max_retries: 2 # Number of retries on failure
supports_vision: true # Whether to enable vision capabilities
thinking_enabled: false # Whether to enable extended thinking
# Any additional fields are passed through to the model constructor
```
## sandbox
```yaml
sandbox:
# Local (default, no container isolation)
use: deerflow.sandbox.local:LocalSandboxProvider
allow_host_bash: false
bash_output_max_chars: 20000
read_file_output_max_chars: 50000
ls_output_max_chars: 20000
# Container-based
# use: deerflow.community.aio_sandbox:AioSandboxProvider
# image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest
# replicas: 3
# idle_timeout: 600
```
## memory
```yaml
memory:
enabled: true
storage_path: memory.json
debounce_seconds: 30
max_facts: 100
fact_confidence_threshold: 0.7
injection_enabled: true
max_injection_tokens: 2000
model_name: null
```
## summarization
```yaml
summarization:
enabled: true
model_name: null
trigger:
- type: tokens
value: 15564
keep:
type: messages
value: 10
trim_tokens_to_summarize: 15564
summary_prompt: null
```
## checkpointer
```yaml
checkpointer:
type: sqlite # sqlite | redis | postgres
connection_string: .deer-flow/checkpoints.db
```
## subagents
```yaml
subagents:
timeout_seconds: 900
agents:
general-purpose:
timeout_seconds: 1800
max_turns: 160
bash:
timeout_seconds: 300
max_turns: 80
```
See the dedicated documentation page for each feature for full field descriptions.
@@ -1,9 +0,0 @@
import type { MetaRecord } from "nextra";
const meta: MetaRecord = {
ark: {
title: "火山方舟",
},
};
export default meta;
@@ -1,8 +0,0 @@
---
title: Volcano Ark
description: Integration guide for the Volcano Ark model provider.
---
# Volcano Ark
## Coding Plan
@@ -1,7 +0,0 @@
---
title: Model providers
description: Integration references for supported model provider services.
asIndexPage: true
---
# Model providers
@@ -0,0 +1,36 @@
# Runtime Flags and Modes
This page documents the runtime flags and modes that affect DeerFlow Harness and agent runtime behavior.
## Per-request configurable options
These options are passed via `config.configurable` (for programmatic use) or selected in the web UI (for application use):
| Flag | Type | Default | Description |
| -------------------------- | ------------- | ---------------------- | ------------------------------------------------ |
| `model_name` | `str \| None` | First configured model | Model to use for the request |
| `agent_name` | `str \| None` | `None` | Load a custom agent configuration |
| `thinking_enabled` | `bool` | `True` | Enable extended thinking (model must support it) |
| `reasoning_effort` | `str \| None` | `None` | Reasoning effort level (model-specific) |
| `is_plan_mode` | `bool` | `False` | Enable TodoList middleware |
| `subagent_enabled` | `bool` | `False` | Allow subagent delegation |
| `max_concurrent_subagents` | `int` | `3` | Maximum parallel subagent calls per turn |
## Environment variables
| Variable | Default | Description |
| ----------------------- | --------------- | ------------------------------------------------ |
| `DEER_FLOW_CONFIG_PATH` | Auto-discovered | Absolute path to `config.yaml` |
| `LOG_LEVEL` | `info` | Log level override |
| `DEER_FLOW_ROOT` | Repo root | Base path for Docker bind mounts |
| `BETTER_AUTH_SECRET` | — | Frontend session secret (required in production) |
| `BETTER_AUTH_URL` | — | Public URL (for callbacks and CORS) |
## Model capability flags
Set in the model configuration in `config.yaml`:
| Flag | Type | Description |
| ------------------ | ------ | ------------------------------------- |
| `supports_vision` | `bool` | Model accepts image inputs |
| `thinking_enabled` | `bool` | Model supports extended thinking mode |
@@ -0,0 +1,88 @@
# Source Map
This page maps DeerFlow's core concepts to where they are implemented in the codebase, helping you quickly locate specific features.
## Backend core paths
```
backend/
├── app/
│ └── gateway/ # FastAPI Gateway API
│ ├── routers/
│ │ ├── agents.py # Custom agent CRUD
│ │ ├── extensions.py # MCP/skill enable/disable
│ │ ├── memory.py # Memory read/clear
│ │ ├── threads.py # Thread management
│ │ └── uploads.py # File uploads
│ └── main.py # FastAPI app entry point
└── packages/harness/deerflow/
├── agents/
│ ├── lead_agent/
│ │ ├── agent.py # make_lead_agent() factory
│ │ └── prompt.py # System prompt templates
│ ├── middlewares/ # All middleware implementations
│ ├── memory/
│ │ ├── middleware.py # MemoryMiddleware
│ │ └── storage.py # Memory file storage
│ └── thread_state.py # ThreadState dataclass
├── config/
│ ├── app_config.py # AppConfig (top-level config)
│ ├── model_config.py # ModelConfig
│ ├── paths.py # Path resolution utilities
│ └── *.py # Per-module config classes
├── mcp/
│ ├── cache.py # mtime-based tool cache
│ └── oauth.py # MCP OAuth support
├── models/
│ └── factory.py # create_chat_model() LLM factory
├── sandbox/
│ ├── local/ # LocalSandboxProvider
│ └── sandbox.py # Sandbox base class
├── skills/
│ ├── loader.py # load_skills() (hot reload)
│ ├── parser.py # SKILL.md parsing
│ ├── installer.py # Dependency installation
│ └── manager.py # Skill lifecycle management
├── subagents/
│ └── registry.py # Subagent lookup and config override
└── tools/
└── builtins/ # Built-in tool implementations
```
## Frontend core paths
```
frontend/src/
├── app/ # Next.js routes and pages
├── components/
│ └── workspace/ # Workspace UI components
├── core/
│ ├── agents/ # Agent types and API client
│ ├── messages/ # Message types and tool call handling
│ └── threads/ # Thread state management
└── content/ # Documentation content (MDX)
├── en/ # English documentation
└── zh/ # Chinese documentation
```
## Quick index
| Goal | File |
| -------------------------- | ----------------------------- |
| Lead agent creation | `agents/lead_agent/agent.py` |
| System prompt template | `agents/lead_agent/prompt.py` |
| All middleware | `agents/middlewares/` |
| Config loading | `config/app_config.py` |
| Model factory | `models/factory.py` |
| Skill loading (hot reload) | `skills/loader.py` |
| MCP tool cache | `mcp/cache.py` |
| File upload handling | `uploads/manager.py` |
| Gateway main router | `app/gateway/main.py` |
@@ -1,8 +1,3 @@
---
title: Create Your First Harness
description: This tutorial shows you how to use the DeerFlow Harness programmatically — importing and using DeerFlow directly in your Python code rather than through the web interface.
---
import { Callout, Steps } from "nextra/components";
# Create Your First Harness
@@ -1,8 +1,3 @@
---
title: Deploy Your Own DeerFlow
description: This tutorial guides you through deploying DeerFlow to a production environment using Docker Compose for multi-user access.
---
import { Callout, Steps } from "nextra/components";
# Deploy Your Own DeerFlow
@@ -1,8 +1,3 @@
---
title: First Conversation
description: This tutorial walks you through your first complete agent conversation in DeerFlow — from launching the app to getting meaningful work done with the agent.
---
import { Callout, Steps } from "nextra/components";
# First Conversation
@@ -1,8 +1,3 @@
---
title: Use Tools and Skills
description: This tutorial shows you how to configure and use tools and skills in DeerFlow to give the agent access to web search, file operations, and domain-specific capabilities.
---
import { Callout } from "nextra/components";
# Use Tools and Skills
@@ -1,8 +1,3 @@
---
title: Work with Memory
description: This tutorial shows you how to enable and use DeerFlow's memory system so the agent remembers important information about you across multiple sessions.
---
import { Callout } from "nextra/components";
# Work with Memory
-9
View File
@@ -25,15 +25,6 @@ const meta: MetaRecord = {
blog: {
type: "page",
},
posts: {
type: "page",
},
login: {
type: "page",
},
setup: {
type: "page",
},
};
export default meta;
@@ -1,8 +1,3 @@
---
title: Agent 与线程
description: 了解 DeerFlow 中 Agent 与线程的关系,以及如何管理自定义 Agent 和对话线程。
---
import { Callout, Cards, Steps } from "nextra/components";
# Agent 与线程
@@ -1,8 +1,3 @@
---
title: 配置
description: 本页面涵盖 DeerFlow 应用的所有配置层——`config.yaml`、前端环境变量、`extensions_config.json` 和运行时环境变量。
---
import { Callout, Cards, Tabs } from "nextra/components";
# 配置
@@ -1,8 +1,3 @@
---
title: 部署指南
description: 本指南涵盖 DeerFlow 应用所有支持的部署方式:本地开发、Docker Compose 以及使用 Kubernetes 管理沙箱的生产环境。
---
import { Callout, Cards, Steps, Tabs } from "nextra/components";
# 部署指南
@@ -1,8 +1,3 @@
---
title: DeerFlow 应用
description: DeerFlow 应用是 DeerFlow 生产体验的参考实现。它将 Harness 运行时、基于 Web 的对话工作区、API Gateway 和反向代理组合成一个可部署的完整系统。
---
import { Callout, Cards } from "nextra/components";
# DeerFlow 应用
@@ -1,8 +1,3 @@
---
title: 运维与排障
description: 本页面涵盖运行 DeerFlow 应用的操作信息:日志记录、常见问题和维护任务。
---
import { Callout, Cards } from "nextra/components";
# 运维与排障
@@ -1,8 +1,3 @@
---
title: 快速上手
description: 本指南引导你使用 `make dev` 工作流在本地机器上启动 DeerFlow 应用。所有四个服务(LangGraph、Gateway、前端、nginx)一起启动,通过单个 URL 访问。
---
import { Callout, Cards, Steps } from "nextra/components";
# 快速上手
@@ -1,8 +1,3 @@
---
title: 工作区使用
description: DeerFlow 工作区是一个基于浏览器的对话界面,你可以在其中向 Agent 发送消息、上传文件、查看中间步骤,以及下载生成的产出物。
---
import { Callout, Cards } from "nextra/components";
# 工作区使用
@@ -1,8 +1,3 @@
---
title: 配置
description: DeerFlow 的配置系统围绕一个目标设计:每一个有意义的行为都应该可以在配置文件中表达,而不是硬编码在应用程序中。这使部署可重现、可审计,并且易于按环境定制。
---
import { Callout, Cards } from "nextra/components";
# 配置
@@ -1,8 +1,3 @@
---
title: 自定义与扩展
description: DeerFlow 的可插拔架构意味着系统的大多数部分都可以在不 fork 核心的情况下被替换或扩展。本页面列举了扩展点,并解释如何使用每一个。
---
import { Callout, Cards } from "nextra/components";
# 自定义与扩展
@@ -1,8 +1,3 @@
---
title: 设计理念
description: 了解 DeerFlow Harness 背后的设计理念,有助于你有效地使用它、自信地扩展它,并推断 Agent 在生产环境中的行为方式。
---
import { Callout, Cards } from "nextra/components";
# 设计理念
@@ -1,8 +1,3 @@
---
title: 安装 DeerFlow Harness
description: DeerFlow Harness 是构建自己 Super Agent 系统的 Python SDK 和运行时基础。
---
import { Callout, Cards } from "nextra/components";
# 安装 DeerFlow Harness
@@ -1,8 +1,3 @@
---
title: 集成指南
description: DeerFlow Harness 不仅仅是一个独立应用程序——它是一个可以导入并在你自己的后端、API 服务器、自动化系统或多 Agent 协调器中使用的 Python 库。
---
import { Callout, Cards } from "nextra/components";
# 集成指南
@@ -1,8 +1,3 @@
---
title: Lead Agent
description: Lead Agent 是 DeerFlow 线程中的核心执行者。每个对话、任务和工作流都通过它进行。理解它的工作方式有助于你有效地配置它,并在需要时扩展它。
---
import { Callout, Cards, Steps } from "nextra/components";
# Lead Agent
+1 -9
View File
@@ -1,15 +1,9 @@
---
title: MCP 集成
description: Model Context ProtocolMCP) 是连接语言模型与外部工具和数据源的开放标准。DeerFlow 的 MCP 集成允许你用任何实现了 MCP 协议的工具服务器扩展 Agent——无需修改 Harness 本身。
---
import { Callout, Cards, Steps } from "nextra/components";
# MCP 集成
<Callout type="info" emoji="🔌">
Model Context ProtocolMCP)让 DeerFlow
能够连接任何外部工具服务器。连接后,MCP 工具与内置工具一样对 Lead Agent 可用。
Model Context ProtocolMCP)让 DeerFlow 能够连接任何外部工具服务器。连接后,MCP 工具与内置工具一样对 Lead Agent 可用。
</Callout>
**Model Context ProtocolMCP** 是连接语言模型与外部工具和数据源的开放标准。DeerFlow 的 MCP 集成允许你用任何实现了 MCP 协议的工具服务器扩展 Agent——无需修改 Harness 本身。
@@ -43,7 +37,6 @@ MCP 服务器在 `extensions_config.json` 中配置,这个文件独立于 `con
```
每个服务器条目支持:
- `command`:要运行的可执行文件(如 `npx`、`uvx`、`python`
- `args`:命令参数数组
- `enabled`:服务器是否激活(可切换而无需删除条目)
@@ -88,7 +81,6 @@ tool_search:
某些 MCP 服务器需要 OAuth 认证。DeerFlow 的 `mcp/oauth.py` 处理声明了 OAuth 需求的服务器的 OAuth 流程。
当连接到受 OAuth 保护的 MCP 服务器时,DeerFlow 会:
1. 从服务器能力头中检测 OAuth 需求
2. 使用 `get_initial_oauth_headers()` 构建适当的授权头
3. 通过 `build_oauth_tool_interceptor()` 用 OAuth 拦截器包装工具调用
@@ -1,8 +1,3 @@
---
title: 记忆系统
description: 记忆是 DeerFlow Harness 的一个运行时功能。它不是简单的对话日志,而是跨多个独立会话持久化、在未来对话中影响 Agent 行为的结构化事实和上下文摘要存储。
---
import { Callout, Cards } from "nextra/components";
# 记忆系统
@@ -1,8 +1,3 @@
---
title: 中间件
description: 每次 Lead Agent 调用 LLM 时,都会先后执行一条**中间件链**。中间件可以读取和修改 Agent 的状态、向系统提示注入内容、拦截工具调用,并对模型输出做出反应。
---
import { Callout } from "nextra/components";
# 中间件
+102 -64
View File
@@ -1,18 +1,12 @@
---
title: 快速上手
description: 学习如何使用 create_deerflow_agent 创建并运行 DeerFlow Agent,从模型初始化到流式响应。
---
import { Callout, Cards, Steps } from "nextra/components";
# 快速上手
<Callout type="info" emoji="🚀">
本指南介绍如何在 Python 中通过 <code>create_deerflow_agent</code>
创建并运行一个 DeerFlow Agent。
本指南介绍如何以编程方式使用 DeerFlow Harness——不是通过应用界面,而是直接在 Python 中导入和调用 Harness。
</Callout>
理解 DeerFlow Harness 的最快方式,是直接在代码里创建一个 Agent。本快速上手指南将带你完成模型初始化、Agent 创建,以及响应流式输出
DeerFlow Harness 是 Python SDK 和运行时基础。本快速上手指南将带你了解运行 Agent、流式传输输出和使用线程的核心 API
## 前置条件
@@ -25,86 +19,130 @@ cd backend
uv sync
```
你还需要准备一个来自对应 LangChain Provider 包的聊天模型实例。
## 配置
## 创建第一个 Agent
所有 Harness 行为由 `config.yaml` 驱动。至少需要配置一个模型:
```yaml
# config.yaml
config_version: 6
models:
- name: gpt-4o
use: langchain_openai:ChatOpenAI
model: gpt-4o
api_key: $OPENAI_API_KEY
request_timeout: 600.0
max_retries: 2
sandbox:
use: deerflow.sandbox.local:LocalSandboxProvider
tools:
- use: deerflow.community.ddg_search.tools:web_search_tool
- use: deerflow.community.jina_ai.tools:web_fetch_tool
- use: deerflow.sandbox.tools:ls_tool
- use: deerflow.sandbox.tools:read_file_tool
- use: deerflow.sandbox.tools:write_file_tool
- use: deerflow.sandbox.tools:bash_tool
```
将 `config.example.yaml` 复制到 `config.yaml` 并填写你的 API Key。
## 运行 Harness
DeerFlow Harness 的主要入口是 `DeerFlowClient`。它管理线程状态、调用 Lead Agent,并流式传输响应。
<Steps>
### 导入工厂函数与模型类
### 导入并配置
```python
from deerflow.agents import create_deerflow_agent
from langchain_openai import ChatOpenAI
import asyncio
from deerflow.client import DeerFlowClient
from deerflow.config import load_config
# 从当前目录或 DEER_FLOW_CONFIG_PATH 加载 config.yaml
load_config()
client = DeerFlowClient()
```
### 创建模型
### 创建线程
```python
model = ChatOpenAI(
model="gpt-4o",
api_key="YOUR_OPENAI_API_KEY",
)
thread_id = "my-thread-001"
```
### 创建 Agent
线程 ID 是任意字符串。使用相同 ID 可以继续已有对话(需要配置检查点)。
### 发送消息并流式传输响应
```python
agent = create_deerflow_agent(model)
```
async def run():
async for event in client.astream(
thread_id=thread_id,
message="研究前三大开源 LLM 框架并进行总结。",
config={
"configurable": {
"model_name": "gpt-4o",
"thinking_enabled": False,
"is_plan_mode": True,
"subagent_enabled": True,
}
},
):
print(event)
这会返回一个已经编译好的 LangGraph Agent,并带有 DeerFlow 默认的中间件链。
### 流式获取响应
```python
for event in agent.stream(
{"messages": [{"role": "user", "content": "解释一下 DeerFlow Harness 是什么。"}]},
stream_mode=["messages", "values"],
):
print(event)
asyncio.run(run())
```
</Steps>
## 添加工具或行为
## 可配置选项
你可以通过传入工具、系统提示词、运行时特性、中间件或 checkpointer 来自定义 Agent。
`config.configurable` 字典控制每次请求的行为:
| 键 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| `model_name` | `str \| None` | 配置中第一个模型 | 本次请求使用的模型 |
| `thinking_enabled` | `bool` | `True` | 启用扩展思考模式(如果支持) |
| `reasoning_effort` | `str \| None` | `None` | 推理努力程度(特定模型参数) |
| `is_plan_mode` | `bool` | `False` | 启用 TodoList 中间件进行任务跟踪 |
| `subagent_enabled` | `bool` | `False` | 允许 Agent 委派子任务 |
| `max_concurrent_subagents` | `int` | `3` | 每轮最大并行子 Agent 调用数 |
| `agent_name` | `str \| None` | `None` | 要加载的自定义 Agent 名称 |
## 流式事件类型
`client.astream()` 从 LangGraph 运行时产生事件,主要事件类型如下:
| 事件类型 | 说明 |
|---|---|
| `messages` | 消息块(文本、思考过程、工具调用) |
| `thread_state` | 线程状态更新(标题、产出物、待办列表) |
消息块包含 Agent 生成响应时的 token 流。
## 使用自定义 Agent
如果已定义自定义 Agent,在 configurable 中传入其 `name`
```python
from deerflow.agents import RuntimeFeatures, create_deerflow_agent
agent = create_deerflow_agent(
model,
system_prompt="你是一个简洁的研究助手。",
features=RuntimeFeatures(subagent=True, memory=False),
plan_mode=True,
name="research-agent",
)
async for event in client.astream(
thread_id="thread-002",
message="分析上传的 CSV 并生成摘要图表。",
config={
"configurable": {
"agent_name": "data-analyst",
"subagent_enabled": True,
}
},
):
...
```
常用参数:
| 参数 | 说明 |
|---|---|
| `tools` | 提供给 Agent 的额外工具 |
| `system_prompt` | 自定义系统提示词 |
| `features` | 启用或替换内置运行时能力 |
| `extra_middleware` | 将自定义中间件插入默认链路 |
| `plan_mode` | 启用 Todo 风格的任务跟踪 |
| `checkpointer` | 为多轮运行持久化状态 |
| `name` | Agent 的逻辑名称 |
## 什么时候使用 DeerFlowClient
如果你想直接操作底层的编译后 Agent 图,使用 `create_deerflow_agent()`。
如果你想使用更高层的嵌入式应用接口,则应使用 `DeerFlowClient`,例如:
- 面向线程的对话封装,
- 模型 / 技能 / 记忆管理 API,
- 文件上传与 artifacts
- 与 Gateway 一致的返回格式。
自定义 Agent 的配置(模型、技能、工具组)将从 `agents/data-analyst/config.yaml` 自动加载。
<Cards num={3}>
<Cards.Card title="设计理念" href="/docs/harness/design-principles" />
@@ -1,8 +1,3 @@
---
title: 沙箱
description: 沙箱为 Lead Agent 提供一个受控环境,在其中可以读取文件、写入输出、运行 Shell 命令并生成产出物。没有沙箱,Agent 只能生成文本;有了沙箱,它可以编写和执行代码、处理数据文件、生成图表并构建交付物。
---
import { Callout, Cards, Tabs } from "nextra/components";
# 沙箱
@@ -1,8 +1,3 @@
---
title: 技能
description: 技能不仅仅是提示词。它是一个自包含的能力包,可以包含结构化指令、分步工作流、领域最佳实践、支撑资源和工具配置。技能按需加载——在任务需要时注入内容,否则不影响上下文。
---
import { Callout, Cards, FileTree, Steps } from "nextra/components";
# 技能
@@ -1,8 +1,3 @@
---
title: 子 Agent
description: 当一个任务对单个推理线程来说太宽泛,或者部分任务可以并行完成时,Lead Agent 将工作委派给**子 Agent**。子 Agent 是一个独立的 Agent 调用,接收特定任务、执行并返回结果。
---
import { Callout, Cards } from "nextra/components";
# 子 Agent
@@ -1,8 +1,3 @@
---
title: 工具
description: Lead Agent 是一个工具调用 Agent。工具是它与世界交互的方式:搜索网络、读写文件、运行命令、委派任务以及向用户呈现输出。
---
import { Callout, Cards, Tabs } from "nextra/components";
# 工具
@@ -1,8 +1,3 @@
---
title: 核心概念
description: 在深入了解 DeerFlow 之前,先建立一些贯穿整个系统的核心概念。这些概念解释了 DeerFlow 的优化目标以及其架构设计的原因。
---
import { Callout, Cards } from "nextra/components";
# 核心概念
@@ -1,8 +1,3 @@
---
title: Harness 与应用
description: DeerFlow 有两个紧密相关但服务于不同目的的层次:.
---
import { Callout, Cards } from "nextra/components";
# Harness 与应用
@@ -1,8 +1,3 @@
---
title: 为什么选择 DeerFlow
description: DeerFlow 的诞生是因为现代 Agent 系统需要的不仅仅是一个聊天循环。一个真正有用的 Agent 必须能够进行长时序规划、将任务拆解为子任务、使用工具、操作文件、安全地运行代码,并在复杂任务中保持足够的上下文连贯性。DeerFlow 正是为提供这样的运行时基础而构建的。
---
import { Callout, Cards } from "nextra/components";
# 为什么选择 DeerFlow
+14 -2
View File
@@ -1,8 +1,20 @@
import type { MetaRecord } from "nextra";
const meta: MetaRecord = {
"model-providers": {
title: "模型接入",
"concepts-glossary": {
title: "概念词汇表",
},
"configuration-reference": {
title: "配置参考",
},
"api-gateway-reference": {
title: "API / Gateway 参考",
},
"runtime-flags-and-modes": {
title: "运行时标志与模式",
},
"source-map": {
title: "代码映射",
},
};
@@ -0,0 +1,68 @@
import { Callout } from "nextra/components";
# API / Gateway 参考
<Callout type="info">
DeerFlow Gateway 是基于 FastAPI 构建的,提供交互式 API 文档,可通过 <code>http://localhost:8001/docs</code> 访问。
</Callout>
## 基础 URL
```
http://localhost:8001
```
通过 nginx 代理:
```
http://localhost:2026/api
```
## 核心端点
### 系统
| 方法 | 路径 | 描述 |
|---|---|---|
| `GET` | `/health` | 服务健康检查 |
| `GET` | `/api/models` | 获取已配置的模型列表 |
### Agent 管理
| 方法 | 路径 | 描述 |
|---|---|---|
| `GET` | `/api/agents` | 列出所有 Agent |
| `POST` | `/api/agents` | 创建自定义 Agent |
| `GET` | `/api/agents/{name}` | 获取 Agent 配置 |
| `PUT` | `/api/agents/{name}` | 更新 Agent 配置 |
| `DELETE` | `/api/agents/{name}` | 删除 Agent |
| `POST` | `/api/agents/check` | 检查/建议 Agent slug 唯一性 |
### 线程和记忆
| 方法 | 路径 | 描述 |
|---|---|---|
| `GET` | `/api/threads` | 列出线程 |
| `DELETE` | `/api/threads/{thread_id}` | 删除线程 |
| `GET` | `/api/memory` | 获取全局记忆 |
| `GET` | `/api/memory/{agent_name}` | 获取 Agent 特定记忆 |
| `DELETE` | `/api/memory` | 清除全局记忆 |
### 扩展
| 方法 | 路径 | 描述 |
|---|---|---|
| `GET` | `/api/extensions` | 列出所有扩展(MCP 服务器和技能) |
| `POST` | `/api/extensions/mcp/{name}/enable` | 启用 MCP 服务器 |
| `POST` | `/api/extensions/mcp/{name}/disable` | 禁用 MCP 服务器 |
| `POST` | `/api/extensions/skills/{name}/enable` | 启用技能 |
| `POST` | `/api/extensions/skills/{name}/disable` | 禁用技能 |
### 文件上传
| 方法 | 路径 | 描述 |
|---|---|---|
| `POST` | `/api/uploads/{thread_id}` | 上传文件到线程工作区 |
| `GET` | `/api/uploads/{thread_id}/{filename}` | 获取上传的文件 |
完整的交互式 API 文档请访问 `http://localhost:8001/docs`Swagger UI)。
@@ -0,0 +1,67 @@
import { Callout } from "nextra/components";
# 概念词汇表
本词汇表定义了 DeerFlow 文档中使用的核心术语。
---
## Agent
在 DeerFlow 中,Agent 是接收用户消息、决定采取什么行动(工具调用或直接响应),并生成输出的主要处理单元。DeerFlow 使用 **Lead Agent** 和**子 Agent** 两级架构。
## Artifact(产出物)
Agent 生成的文件——报告、图表、代码或其他交付物。产出物通过 `present_files` 工具暴露,并持久化存储在线程的用户数据目录中。
## Checkpoint(检查点)
线程状态的持久化快照,在每次 Agent 轮次后保存。检查点允许服务器重启后恢复对话,并支持长时序任务的状态管理。
## Context Engineering(上下文工程)
通过控制 Agent 在每个步骤可见的内容(通过摘要压缩、子 Agent 上下文隔离和外部文件记忆)来保持 Agent 在长时序任务中有效的实践。
## Harness
带有主张的 Agent 运行时,打包了工具访问、技能加载、沙箱执行、记忆、子 Agent 协调和上下文管理——而不仅仅是暴露抽象接口。
## Lead Agent
每个 DeerFlow 线程中的主要执行者,负责规划、工具调用和响应生成。基于 LangGraph + LangChain Agent 构建,由中间件链增强。
## Long-horizon Agent(长时序 Agent
在一系列动作中保持有效的 Agent——进行规划、多次调用工具、管理中间文件,并生成最终产出物——而不是只产生单一答案。
## Memory(记忆)
跨多个独立对话会话持久化的结构化事实和用户上下文存储,在后续会话中注入到 Agent 的系统提示中。
## Middleware(中间件)
包裹每次 LLM 调用的插件,可以在模型调用前后读取和修改 Agent 状态。DeerFlow 使用中间件实现记忆、摘要压缩、标题生成等跨领域行为。
## MCPModel Context Protocol
连接语言模型与外部工具和数据源的开放标准。DeerFlow 的 MCP 集成允许连接任何兼容的工具服务器。
## Sandbox(沙箱)
Agent 进行文件和命令操作的隔离执行环境。DeerFlow 支持本地(`LocalSandboxProvider`)和基于容器(`AioSandboxProvider`)两种沙箱模式。
## Skill(技能)
面向任务的能力包,包含结构化指令、工作流程和最佳实践,按需加载到 Agent 上下文中。技能提供专业化而不污染通用 Agent 上下文。
## Subagent(子 Agent
接受委派子任务的专注执行者,以隔离上下文运行,仅接收完成其分配任务所需的信息。
## Thread(线程)
对话及其所有相关状态——消息历史、产出物、待办列表和检查点数据——的完整封装。
## ThreadState
DeerFlow 中 LangGraph 管理的状态对象,包含 `messages`、`artifacts`、`todo_list` 和运行时元数据。

Some files were not shown because too many files have changed in this diff Show More