feat: enhance chat history loading with new hooks and UI components (#2338)

* Refactor API fetch calls to use a unified fetch function; enhance chat history loading with new hooks and UI components

- Replaced `fetchWithAuth` with a generic `fetch` function across various API modules for consistency.
- Updated `useThreadStream` and `useThreadHistory` hooks to manage chat history loading, including loading states and pagination.
- Introduced `LoadMoreHistoryIndicator` component for better user experience when loading more chat history.
- Enhanced message handling in `MessageList` to accommodate new loading states and history management.
- Added support for run messages in the thread context, improving the overall message handling logic.
- Updated translations for loading indicators in English and Chinese.

* Fix test assertions for run ordering in RunManager tests

- Updated assertions in `test_list_by_thread` to reflect correct ordering of runs.
- Modified `test_list_by_thread_is_stable_when_timestamps_tie` to ensure stable ordering when timestamps are tied.
This commit is contained in:
JeffJiang
2026-04-19 10:23:09 +08:00
committed by GitHub
parent 7b9d224b3a
commit 3a99c4e81c
35 changed files with 755 additions and 1440 deletions
+20 -17
View File
@@ -8,13 +8,17 @@ Initialization is handled directly in ``app.py`` via :class:`AsyncExitStack`.
from __future__ import annotations
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Callable
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar, cast
from fastapi import FastAPI, HTTPException, Request
from langgraph.types import Checkpointer
from deerflow.runtime import RunContext, RunManager
from deerflow.persistence.feedback import FeedbackRepository
from deerflow.runtime import RunContext, RunManager, StreamBridge
from deerflow.runtime.events.store.base import RunEventStore
from deerflow.runtime.runs.store.base import RunStore
if TYPE_CHECKING:
from app.gateway.auth.local_provider import LocalAuthProvider
@@ -22,6 +26,9 @@ if TYPE_CHECKING:
from deerflow.persistence.thread_meta.base import ThreadMetaStore
T = TypeVar("T")
@asynccontextmanager
async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
"""Bootstrap and tear down all LangGraph runtime singletons.
@@ -84,25 +91,25 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]:
# ---------------------------------------------------------------------------
def _require(attr: str, label: str):
def _require(attr: str, label: str) -> Callable[[Request], T]:
"""Create a FastAPI dependency that returns ``app.state.<attr>`` or 503."""
def dep(request: Request):
def dep(request: Request) -> T:
val = getattr(request.app.state, attr, None)
if val is None:
raise HTTPException(status_code=503, detail=f"{label} not available")
return val
return cast(T, val)
dep.__name__ = dep.__qualname__ = f"get_{attr}"
return dep
get_stream_bridge = _require("stream_bridge", "Stream bridge")
get_run_manager = _require("run_manager", "Run manager")
get_checkpointer = _require("checkpointer", "Checkpointer")
get_run_event_store = _require("run_event_store", "Run event store")
get_feedback_repo = _require("feedback_repo", "Feedback")
get_run_store = _require("run_store", "Run store")
get_stream_bridge: Callable[[Request], StreamBridge] = _require("stream_bridge", "Stream bridge")
get_run_manager: Callable[[Request], RunManager] = _require("run_manager", "Run manager")
get_checkpointer: Callable[[Request], Checkpointer] = _require("checkpointer", "Checkpointer")
get_run_event_store: Callable[[Request], RunEventStore] = _require("run_event_store", "Run event store")
get_feedback_repo: Callable[[Request], FeedbackRepository] = _require("feedback_repo", "Feedback")
get_run_store: Callable[[Request], RunStore] = _require("run_store", "Run store")
def get_store(request: Request):
@@ -121,10 +128,7 @@ def get_thread_store(request: Request) -> ThreadMetaStore:
def get_run_context(request: Request) -> RunContext:
"""Build a :class:`RunContext` from ``app.state`` singletons.
Returns a *base* context with infrastructure dependencies. Callers that
need per-run fields (e.g. ``follow_up_to_run_id``) should use
``dataclasses.replace(ctx, follow_up_to_run_id=...)`` before passing it
to :func:`run_agent`.
Returns a *base* context with infrastructure dependencies.
"""
from deerflow.config import get_app_config
@@ -137,7 +141,6 @@ def get_run_context(request: Request) -> RunContext:
)
# ---------------------------------------------------------------------------
# Auth helpers (used by authz.py and auth middleware)
# ---------------------------------------------------------------------------
+2 -1
View File
@@ -123,7 +123,8 @@ async def run_messages(
run = await _resolve_run(run_id, request)
event_store = get_run_event_store(request)
rows = await event_store.list_messages_by_run(
run["thread_id"], run_id,
run["thread_id"],
run_id,
limit=limit + 1,
before_seq=before_seq,
after_seq=after_seq,
+11 -7
View File
@@ -54,7 +54,6 @@ class RunCreateRequest(BaseModel):
after_seconds: float | None = Field(default=None, description="Delayed execution")
if_not_exists: Literal["reject", "create"] = Field(default="create", description="Thread creation policy")
feedback_keys: list[str] | None = Field(default=None, description="LangSmith feedback keys")
follow_up_to_run_id: str | None = Field(default=None, description="Run ID this message follows up on. Auto-detected from latest successful run if not provided.")
class RunResponse(BaseModel):
@@ -312,11 +311,15 @@ async def list_thread_messages(
if i in last_ai_indices:
run_id = msg["run_id"]
fb = feedback_map.get(run_id)
msg["feedback"] = {
"feedback_id": fb["feedback_id"],
"rating": fb["rating"],
"comment": fb.get("comment"),
} if fb else None
msg["feedback"] = (
{
"feedback_id": fb["feedback_id"],
"rating": fb["rating"],
"comment": fb.get("comment"),
}
if fb
else None
)
else:
msg["feedback"] = None
@@ -339,7 +342,8 @@ async def list_run_messages(
"""
event_store = get_run_event_store(request)
rows = await event_store.list_messages_by_run(
thread_id, run_id,
thread_id,
run_id,
limit=limit + 1,
before_seq=before_seq,
after_seq=after_seq,
+1 -1
View File
@@ -56,7 +56,7 @@ def _make_file_sandbox_writable(file_path: os.PathLike[str] | str) -> None:
@router.post("", response_model=UploadResponse)
@require_permission("threads", "write", owner_check=True, require_existing=True)
@require_permission("threads", "write", owner_check=True, require_existing=False)
async def upload_files(
thread_id: str,
request: Request,
-16
View File
@@ -195,21 +195,6 @@ async def start_run(
disconnect = DisconnectMode.cancel if body.on_disconnect == "cancel" else DisconnectMode.continue_
# Resolve follow_up_to_run_id: explicit from request, or auto-detect from latest successful run
follow_up_to_run_id = getattr(body, "follow_up_to_run_id", None)
if follow_up_to_run_id is None:
run_store = get_run_store(request)
try:
recent_runs = await run_store.list_by_thread(thread_id, limit=1)
if recent_runs and recent_runs[0].get("status") == "success":
follow_up_to_run_id = recent_runs[0]["run_id"]
except Exception:
pass # Don't block run creation
# Enrich base context with per-run field
if follow_up_to_run_id:
run_ctx = dataclasses.replace(run_ctx, follow_up_to_run_id=follow_up_to_run_id)
try:
record = await run_mgr.create_or_reject(
thread_id,
@@ -218,7 +203,6 @@ async def start_run(
metadata=body.metadata or {},
kwargs={"input": body.input, "config": body.config},
multitask_strategy=body.multitask_strategy,
follow_up_to_run_id=follow_up_to_run_id,
)
except ConflictError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc