feat: enhance chat history loading with new hooks and UI components (#2338)

* Refactor API fetch calls to use a unified fetch function; enhance chat history loading with new hooks and UI components

- Replaced `fetchWithAuth` with a generic `fetch` function across various API modules for consistency.
- Updated `useThreadStream` and `useThreadHistory` hooks to manage chat history loading, including loading states and pagination.
- Introduced `LoadMoreHistoryIndicator` component for better user experience when loading more chat history.
- Enhanced message handling in `MessageList` to accommodate new loading states and history management.
- Added support for run messages in the thread context, improving the overall message handling logic.
- Updated translations for loading indicators in English and Chinese.

* Fix test assertions for run ordering in RunManager tests

- Updated assertions in `test_list_by_thread` to reflect correct ordering of runs.
- Modified `test_list_by_thread_is_stable_when_timestamps_tie` to ensure stable ordering when timestamps are tied.
This commit is contained in:
JeffJiang
2026-04-19 10:23:09 +08:00
parent 2e05f380c4
commit db5ad86381
34 changed files with 749 additions and 1441 deletions
@@ -48,7 +48,13 @@ export default function AgentChatPage() {
const { tokenUsageEnabled } = useModels();
const { showNotification } = useNotification();
const [thread, sendMessage] = useThreadStream({
const {
thread,
sendMessage,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} = useThreadStream({
threadId: isNewThread ? undefined : threadId,
context: { ...settings.context, agent_name: agent_name },
onStart: (createdThreadId) => {
@@ -147,6 +153,9 @@ export default function AgentChatPage() {
thread={thread}
paddingBottom={messageListPaddingBottom}
tokenUsageEnabled={tokenUsageEnabled}
hasMoreHistory={hasMoreHistory}
loadMoreHistory={loadMoreHistory}
isHistoryLoading={isHistoryLoading}
/>
</div>
@@ -1,6 +1,6 @@
"use client";
import { useCallback, useEffect, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { type PromptInputMessage } from "@/components/ai-elements/prompt-input";
import { ArtifactTrigger } from "@/components/workspace/artifacts";
@@ -36,20 +36,31 @@ export default function ChatPage() {
const { threadId, setThreadId, isNewThread, setIsNewThread, isMock } =
useThreadChat();
const [settings, setSettings] = useThreadSettings(threadId);
const [mounted, setMounted] = useState(false);
const { tokenUsageEnabled } = useModels();
const mountedRef = useRef(false);
useSpecificChatMode();
useEffect(() => {
setMounted(true);
mountedRef.current = true;
}, []);
const { showNotification } = useNotification();
const [thread, sendMessage, isUploading] = useThreadStream({
const {
thread,
sendMessage,
isUploading,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} = useThreadStream({
threadId: isNewThread ? undefined : threadId,
context: settings.context,
isMock,
onSend: (_threadId) => {
setThreadId(_threadId);
setIsNewThread(false);
},
onStart: (createdThreadId) => {
setThreadId(createdThreadId);
setIsNewThread(false);
@@ -121,6 +132,9 @@ export default function ChatPage() {
thread={thread}
paddingBottom={messageListPaddingBottom}
tokenUsageEnabled={tokenUsageEnabled}
hasMoreHistory={hasMoreHistory}
loadMoreHistory={loadMoreHistory}
isHistoryLoading={isHistoryLoading}
/>
</div>
<div className="absolute right-0 bottom-0 left-0 z-30 flex justify-center px-4">
@@ -144,7 +158,7 @@ export default function ChatPage() {
/>
</div>
</div>
{mounted ? (
{mountedRef.current ? (
<InputBox
className={cn("bg-background/5 w-full -translate-y-4")}
isNewThread={isNewThread}
@@ -176,7 +190,7 @@ export default function ChatPage() {
<div
aria-hidden="true"
className={cn(
"bg-background/5 h-32 w-full -translate-y-4 rounded-2xl border",
"bg-background/5 h-32 w-full -translate-y-4 rounded-2xl",
)}
/>
)}
+21 -17
View File
@@ -55,7 +55,7 @@ import {
DropdownMenuLabel,
DropdownMenuSeparator,
} from "@/components/ui/dropdown-menu";
import { fetchWithAuth } from "@/core/api/fetcher";
import { fetch } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";
import { useI18n } from "@/core/i18n/hooks";
import { useModels } from "@/core/models/hooks";
@@ -155,6 +155,7 @@ export function InputBox({
const [followupsLoading, setFollowupsLoading] = useState(false);
const lastGeneratedForAiIdRef = useRef<string | null>(null);
const wasStreamingRef = useRef(false);
const messagesRef = useRef(thread.messages);
const [confirmOpen, setConfirmOpen] = useState(false);
const [pendingSuggestion, setPendingSuggestion] = useState<string | null>(
@@ -354,6 +355,10 @@ export function InputBox({
followupsVisibilityChangeRef.current?.(showFollowups);
}, [showFollowups]);
useEffect(() => {
messagesRef.current = thread.messages;
}, [thread.messages]);
useEffect(() => {
return () => followupsVisibilityChangeRef.current?.(false);
}, []);
@@ -370,14 +375,16 @@ export function InputBox({
return;
}
const lastAi = [...thread.messages].reverse().find((m) => m.type === "ai");
const lastAi = [...messagesRef.current]
.reverse()
.find((m) => m.type === "ai");
const lastAiId = lastAi?.id ?? null;
if (!lastAiId || lastAiId === lastGeneratedForAiIdRef.current) {
return;
}
lastGeneratedForAiIdRef.current = lastAiId;
const recent = thread.messages
const recent = messagesRef.current
.filter((m) => m.type === "human" || m.type === "ai")
.map((m) => {
const role = m.type === "human" ? "user" : "assistant";
@@ -396,19 +403,16 @@ export function InputBox({
setFollowupsLoading(true);
setFollowups([]);
fetchWithAuth(
`${getBackendBaseURL()}/api/threads/${threadId}/suggestions`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: recent,
n: 3,
model_name: context.model_name ?? undefined,
}),
signal: controller.signal,
},
)
fetch(`${getBackendBaseURL()}/api/threads/${threadId}/suggestions`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages: recent,
n: 3,
model_name: context.model_name ?? undefined,
}),
signal: controller.signal,
})
.then(async (res) => {
if (!res.ok) {
return { suggestions: [] as string[] };
@@ -430,7 +434,7 @@ export function InputBox({
});
return () => controller.abort();
}, [context.model_name, disabled, isMock, status, thread.messages, threadId]);
}, [context.model_name, disabled, isMock, status, threadId]);
return (
<div ref={promptRootRef} className="relative flex flex-col gap-4">
@@ -1,9 +1,12 @@
import type { BaseStream } from "@langchain/langgraph-sdk/react";
import { ChevronUpIcon, Loader2Icon } from "lucide-react";
import { useCallback, useEffect, useRef } from "react";
import {
Conversation,
ConversationContent,
} from "@/components/ai-elements/conversation";
import { Button } from "@/components/ui/button";
import { useI18n } from "@/core/i18n/hooks";
import {
extractContentFromMessage,
@@ -19,7 +22,6 @@ import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
import type { Subtask } from "@/core/tasks";
import { useUpdateSubtask } from "@/core/tasks/context";
import type { AgentThreadState } from "@/core/threads";
import { useThreadMessageEnrichment } from "@/core/threads/hooks";
import { cn } from "@/lib/utils";
import { ArtifactFileList } from "../artifacts/artifact-file-list";
@@ -35,24 +37,136 @@ import { SubtaskCard } from "./subtask-card";
export const MESSAGE_LIST_DEFAULT_PADDING_BOTTOM = 160;
export const MESSAGE_LIST_FOLLOWUPS_EXTRA_PADDING_BOTTOM = 80;
const LOAD_MORE_HISTORY_THROTTLE_MS = 1200;
function LoadMoreHistoryIndicator({
isLoading,
hasMore,
loadMore,
}: {
isLoading?: boolean;
hasMore?: boolean;
loadMore?: () => void;
}) {
const { t } = useI18n();
const sentinelRef = useRef<HTMLDivElement | null>(null);
const timeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const lastLoadRef = useRef(0);
const throttledLoadMore = useCallback(() => {
if (!hasMore || isLoading) {
return;
}
const now = Date.now();
const remaining =
LOAD_MORE_HISTORY_THROTTLE_MS - (now - lastLoadRef.current);
if (remaining <= 0) {
lastLoadRef.current = now;
loadMore?.();
return;
}
if (timeoutRef.current) {
return;
}
timeoutRef.current = setTimeout(() => {
timeoutRef.current = null;
if (!hasMore || isLoading) {
return;
}
lastLoadRef.current = Date.now();
loadMore?.();
}, remaining);
}, [hasMore, isLoading, loadMore]);
useEffect(() => {
const element = sentinelRef.current;
if (!element || !hasMore) {
return;
}
const observer = new IntersectionObserver(
([entry]) => {
if (entry?.isIntersecting) {
throttledLoadMore();
}
},
{
rootMargin: "120px 0px 0px 0px",
},
);
observer.observe(element);
return () => {
observer.disconnect();
};
}, [hasMore, throttledLoadMore]);
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, []);
if (!hasMore && !isLoading) {
return null;
}
return (
<div ref={sentinelRef} className="flex w-full justify-center">
<Button
type="button"
variant="ghost"
size="sm"
className="text-muted-foreground hover:text-foreground rounded-full px-3"
disabled={(isLoading ?? false) || !hasMore}
onClick={throttledLoadMore}
>
{isLoading ? (
<>
<Loader2Icon className="mr-2 size-4 animate-spin" />
{t.common.loading}
</>
) : (
<>
<ChevronUpIcon className="mr-2 size-4" />
{t.common.loadMore}
</>
)}
</Button>
</div>
);
}
export function MessageList({
className,
threadId,
thread,
paddingBottom = MESSAGE_LIST_DEFAULT_PADDING_BOTTOM,
tokenUsageEnabled = false,
hasMoreHistory,
loadMoreHistory,
isHistoryLoading,
}: {
className?: string;
threadId: string;
thread: BaseStream<AgentThreadState>;
paddingBottom?: number;
tokenUsageEnabled?: boolean;
hasMoreHistory?: boolean;
loadMoreHistory?: () => void;
isHistoryLoading?: boolean;
}) {
const { t } = useI18n();
const rehypePlugins = useRehypeSplitWordsIntoSpans(thread.isLoading);
const updateSubtask = useUpdateSubtask();
const messages = thread.messages;
const { data: enrichment } = useThreadMessageEnrichment(threadId);
if (thread.isThreadLoading && messages.length === 0) {
return <MessageListSkeleton />;
@@ -61,11 +175,15 @@ export function MessageList({
<Conversation
className={cn("flex size-full flex-col justify-center", className)}
>
<ConversationContent className="mx-auto w-full max-w-(--container-width-md) gap-8 pt-12">
<ConversationContent className="mx-auto w-full max-w-(--container-width-md) gap-8 pt-8">
<LoadMoreHistoryIndicator
isLoading={isHistoryLoading}
hasMore={hasMoreHistory}
loadMore={loadMoreHistory}
/>
{groupMessages(messages, (group) => {
if (group.type === "human" || group.type === "assistant") {
return group.messages.map((msg) => {
const entry = msg.id ? enrichment?.get(msg.id) : undefined;
return (
<MessageListItem
key={`${group.id}/${msg.id}`}
@@ -73,8 +191,6 @@ export function MessageList({
message={msg}
isLoading={thread.isLoading}
tokenUsageEnabled={tokenUsageEnabled}
runId={entry?.run_id}
feedback={entry?.feedback}
/>
);
});
@@ -5,7 +5,7 @@ import { useState } from "react";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { fetchWithAuth, getCsrfHeaders } from "@/core/api/fetcher";
import { fetch, getCsrfHeaders } from "@/core/api/fetcher";
import { useAuth } from "@/core/auth/AuthProvider";
import { parseAuthError } from "@/core/auth/types";
@@ -36,7 +36,7 @@ export function AccountSettingsPage() {
setLoading(true);
try {
const res = await fetchWithAuth("/api/v1/auth/change-password", {
const res = await fetch("/api/v1/auth/change-password", {
method: "POST",
headers: {
"Content-Type": "application/json",
+4 -4
View File
@@ -1,4 +1,4 @@
import { fetchWithAuth } from "@/core/api/fetcher";
import { fetch } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";
import type { Agent, CreateAgentRequest, UpdateAgentRequest } from "./types";
@@ -29,7 +29,7 @@ export async function getAgent(name: string): Promise<Agent> {
}
export async function createAgent(request: CreateAgentRequest): Promise<Agent> {
const res = await fetchWithAuth(`${getBackendBaseURL()}/api/agents`, {
const res = await fetch(`${getBackendBaseURL()}/api/agents`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(request),
@@ -45,7 +45,7 @@ export async function updateAgent(
name: string,
request: UpdateAgentRequest,
): Promise<Agent> {
const res = await fetchWithAuth(`${getBackendBaseURL()}/api/agents/${name}`, {
const res = await fetch(`${getBackendBaseURL()}/api/agents/${name}`, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(request),
@@ -58,7 +58,7 @@ export async function updateAgent(
}
export async function deleteAgent(name: string): Promise<void> {
const res = await fetchWithAuth(`${getBackendBaseURL()}/api/agents/${name}`, {
const res = await fetch(`${getBackendBaseURL()}/api/agents/${name}`, {
method: "DELETE",
});
if (!res.ok) throw new Error(`Failed to delete agent: ${res.statusText}`);
+3 -3
View File
@@ -1,6 +1,6 @@
import { getBackendBaseURL } from "../config";
import { fetchWithAuth } from "./fetcher";
import { fetch } from "./fetcher";
export interface FeedbackData {
feedback_id: string;
@@ -14,7 +14,7 @@ export async function upsertFeedback(
rating: number,
comment?: string,
): Promise<FeedbackData> {
const res = await fetchWithAuth(
const res = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/feedback`,
{
method: "PUT",
@@ -32,7 +32,7 @@ export async function deleteFeedback(
threadId: string,
runId: string,
): Promise<void> {
const res = await fetchWithAuth(
const res = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/feedback`,
{ method: "DELETE" },
);
+2 -2
View File
@@ -53,7 +53,7 @@ export function readCsrfCookie(): string | null {
* preserved; the helper only ADDS the CSRF header when it isn't already
* present, so explicit overrides win.
*/
export async function fetchWithAuth(
export async function fetch(
input: RequestInfo | string,
init?: RequestInit,
): Promise<Response> {
@@ -74,7 +74,7 @@ export async function fetchWithAuth(
}
}
const res = await fetch(url, {
const res = await globalThis.fetch(url, {
...init,
headers,
credentials: "include",
+1
View File
@@ -29,6 +29,7 @@ export const enUS: Translations = {
close: "Close",
more: "More",
search: "Search",
loadMore: "Load more",
download: "Download",
thinking: "Thinking",
artifacts: "Artifacts",
+1
View File
@@ -18,6 +18,7 @@ export interface Translations {
close: string;
more: string;
search: string;
loadMore: string;
download: string;
thinking: string;
artifacts: string;
+1
View File
@@ -29,6 +29,7 @@ export const zhCN: Translations = {
close: "关闭",
more: "更多",
search: "搜索",
loadMore: "加载更多",
download: "下载",
thinking: "思考",
artifacts: "文件",
+7 -10
View File
@@ -1,4 +1,4 @@
import { fetchWithAuth } from "@/core/api/fetcher";
import { fetch } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";
import type { MCPConfig } from "./types";
@@ -9,15 +9,12 @@ export async function loadMCPConfig() {
}
export async function updateMCPConfig(config: MCPConfig) {
const response = await fetchWithAuth(
`${getBackendBaseURL()}/api/mcp/config`,
{
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(config),
const response = await fetch(`${getBackendBaseURL()}/api/mcp/config`, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
);
body: JSON.stringify(config),
});
return response.json();
}
+16 -22
View File
@@ -1,4 +1,4 @@
import { fetchWithAuth } from "../api/fetcher";
import { fetch } from "../api/fetcher";
import { getBackendBaseURL } from "../config";
import type {
@@ -86,14 +86,14 @@ export async function loadMemory(): Promise<UserMemory> {
}
export async function clearMemory(): Promise<UserMemory> {
const response = await fetchWithAuth(`${getBackendBaseURL()}/api/memory`, {
const response = await fetch(`${getBackendBaseURL()}/api/memory`, {
method: "DELETE",
});
return readMemoryResponse(response, "Failed to clear memory");
}
export async function deleteMemoryFact(factId: string): Promise<UserMemory> {
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/memory/facts/${encodeURIComponent(factId)}`,
{
method: "DELETE",
@@ -108,32 +108,26 @@ export async function exportMemory(): Promise<UserMemory> {
}
export async function importMemory(memory: UserMemory): Promise<UserMemory> {
const response = await fetchWithAuth(
`${getBackendBaseURL()}/api/memory/import`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(memory),
const response = await fetch(`${getBackendBaseURL()}/api/memory/import`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
);
body: JSON.stringify(memory),
});
return readMemoryResponse(response, "Failed to import memory");
}
export async function createMemoryFact(
input: MemoryFactInput,
): Promise<UserMemory> {
const response = await fetchWithAuth(
`${getBackendBaseURL()}/api/memory/facts`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(input),
const response = await fetch(`${getBackendBaseURL()}/api/memory/facts`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
);
body: JSON.stringify(input),
});
return readMemoryResponse(response, "Failed to create memory fact");
}
@@ -141,7 +135,7 @@ export async function updateMemoryFact(
factId: string,
input: MemoryFactPatchInput,
): Promise<UserMemory> {
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/memory/facts/${encodeURIComponent(factId)}`,
{
method: "PATCH",
+5 -1
View File
@@ -328,7 +328,11 @@ export function findToolCallResult(toolCallId: string, messages: Message[]) {
}
export function isHiddenFromUIMessage(message: Message) {
return message.additional_kwargs?.hide_from_ui === true;
return (
message.additional_kwargs?.hide_from_ui === true ||
message.name === "summary" ||
message.name === "loop_warning"
);
}
/**
+8 -11
View File
@@ -1,4 +1,4 @@
import { fetchWithAuth } from "@/core/api/fetcher";
import { fetch } from "@/core/api/fetcher";
import { getBackendBaseURL } from "@/core/config";
import type { Skill } from "./type";
@@ -10,7 +10,7 @@ export async function loadSkills() {
}
export async function enableSkill(skillName: string, enabled: boolean) {
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/skills/${skillName}`,
{
method: "PUT",
@@ -39,16 +39,13 @@ export interface InstallSkillResponse {
export async function installSkill(
request: InstallSkillRequest,
): Promise<InstallSkillResponse> {
const response = await fetchWithAuth(
`${getBackendBaseURL()}/api/skills/install`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(request),
const response = await fetch(`${getBackendBaseURL()}/api/skills/install`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
);
body: JSON.stringify(request),
});
if (!response.ok) {
// Handle HTTP error responses (4xx, 5xx)
+214 -189
View File
@@ -1,15 +1,14 @@
import type { AIMessage, Message } from "@langchain/langgraph-sdk";
import type { AIMessage, Message, Run } from "@langchain/langgraph-sdk";
import type { ThreadsClient } from "@langchain/langgraph-sdk/client";
import { useStream } from "@langchain/langgraph-sdk/react";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import { useCallback, useEffect, useRef, useState } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import type { PromptInputMessage } from "@/components/ai-elements/prompt-input";
import { getAPIClient } from "../api";
import type { FeedbackData } from "../api/feedback";
import { fetchWithAuth } from "../api/fetcher";
import { fetch } from "../api/fetcher";
import { getBackendBaseURL } from "../config";
import { useI18n } from "../i18n/hooks";
import type { FileInMessage } from "../messages/utils";
@@ -18,7 +17,7 @@ import { useUpdateSubtask } from "../tasks/context";
import type { UploadedFileInfo } from "../uploads";
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
import type { AgentThread, AgentThreadState } from "./types";
import type { AgentThread, AgentThreadState, RunMessage } from "./types";
export type ToolEndEvent = {
name: string;
@@ -29,7 +28,8 @@ export type ThreadStreamOptions = {
threadId?: string | null | undefined;
context: LocalSettings["context"];
isMock?: boolean;
onStart?: (threadId: string) => void;
onSend?: (threadId: string) => void;
onStart?: (threadId: string, runId: string) => void;
onFinish?: (state: AgentThreadState) => void;
onToolEnd?: (event: ToolEndEvent) => void;
};
@@ -38,79 +38,41 @@ type SendMessageOptions = {
additionalKwargs?: Record<string, unknown>;
};
function normalizeStoredRunId(runId: string | null): string | null {
if (!runId) {
return null;
}
function mergeMessages(
historyMessages: Message[],
threadMessages: Message[],
optimisticMessages: Message[],
): Message[] {
const threadMessageIds = new Set(
threadMessages
.map((m) => ("tool_call_id" in m ? m.tool_call_id : m.id))
.filter(Boolean),
);
const trimmed = runId.trim();
if (!trimmed) {
return null;
}
const queryIndex = trimmed.indexOf("?");
if (queryIndex >= 0) {
const params = new URLSearchParams(trimmed.slice(queryIndex + 1));
const queryRunId = params.get("run_id")?.trim();
if (queryRunId) {
return queryRunId;
// The overlap is a contiguous suffix of historyMessages (newest history == oldest thread).
// Scan from the end: shrink cutoff while messages are already in thread, stop as soon as
// we hit one that isn't — everything before that point is non-overlapping.
let cutoff = historyMessages.length;
for (let i = historyMessages.length - 1; i >= 0; i--) {
const msg = historyMessages[i];
if (!msg) {
continue;
}
if (
(msg?.id && threadMessageIds.has(msg.id)) ||
("tool_call_id" in msg && threadMessageIds.has(msg.tool_call_id))
) {
cutoff = i;
} else {
break;
}
}
const pathWithoutQueryOrHash = trimmed.split(/[?#]/, 1)[0]?.trim() ?? "";
if (!pathWithoutQueryOrHash) {
return null;
}
const runsMarker = "/runs/";
const runsIndex = pathWithoutQueryOrHash.lastIndexOf(runsMarker);
if (runsIndex >= 0) {
const runIdAfterMarker = pathWithoutQueryOrHash
.slice(runsIndex + runsMarker.length)
.split("/", 1)[0]
?.trim();
if (runIdAfterMarker) {
return runIdAfterMarker;
}
return null;
}
const segments = pathWithoutQueryOrHash
.split("/")
.map((segment) => segment.trim())
.filter(Boolean);
return segments.at(-1) ?? null;
}
function getRunMetadataStorage(): {
getItem(key: `lg:stream:${string}`): string | null;
setItem(key: `lg:stream:${string}`, value: string): void;
removeItem(key: `lg:stream:${string}`): void;
} {
return {
getItem(key) {
const normalized = normalizeStoredRunId(
window.sessionStorage.getItem(key),
);
if (normalized) {
window.sessionStorage.setItem(key, normalized);
return normalized;
}
window.sessionStorage.removeItem(key);
return null;
},
setItem(key, value) {
const normalized = normalizeStoredRunId(value);
if (normalized) {
window.sessionStorage.setItem(key, normalized);
return;
}
window.sessionStorage.removeItem(key);
},
removeItem(key) {
window.sessionStorage.removeItem(key);
},
};
return [
...historyMessages.slice(0, cutoff),
...threadMessages,
...optimisticMessages,
];
}
function getStreamErrorMessage(error: unknown): string {
@@ -140,6 +102,7 @@ export function useThreadStream({
threadId,
context,
isMock,
onSend,
onStart,
onFinish,
onToolEnd,
@@ -151,17 +114,25 @@ export function useThreadStream({
// and to allow access to the current thread id in onUpdateEvent
const threadIdRef = useRef<string | null>(threadId ?? null);
const startedRef = useRef(false);
const listeners = useRef({
onSend,
onStart,
onFinish,
onToolEnd,
});
const {
messages: history,
hasMore: hasMoreHistory,
loadMore: loadMoreHistory,
loading: isHistoryLoading,
appendMessages,
} = useThreadHistory(onStreamThreadId ?? "");
// Keep listeners ref updated with latest callbacks
useEffect(() => {
listeners.current = { onStart, onFinish, onToolEnd };
}, [onStart, onFinish, onToolEnd]);
listeners.current = { onSend, onStart, onFinish, onToolEnd };
}, [onSend, onStart, onFinish, onToolEnd]);
useEffect(() => {
const normalizedThreadId = threadId ?? null;
@@ -175,45 +146,26 @@ export function useThreadStream({
threadIdRef.current = normalizedThreadId;
}, [threadId]);
const _handleOnStart = useCallback((id: string) => {
const handleStreamStart = useCallback((_threadId: string, _runId: string) => {
threadIdRef.current = _threadId;
if (!startedRef.current) {
listeners.current.onStart?.(id);
listeners.current.onStart?.(_threadId, _runId);
startedRef.current = true;
}
setOnStreamThreadId(_threadId);
}, []);
const handleStreamStart = useCallback(
(_threadId: string) => {
threadIdRef.current = _threadId;
_handleOnStart(_threadId);
},
[_handleOnStart],
);
const queryClient = useQueryClient();
const updateSubtask = useUpdateSubtask();
const runMetadataStorageRef = useRef<
ReturnType<typeof getRunMetadataStorage> | undefined
>(undefined);
if (
typeof window !== "undefined" &&
runMetadataStorageRef.current === undefined
) {
runMetadataStorageRef.current = getRunMetadataStorage();
}
const thread = useStream<AgentThreadState>({
client: getAPIClient(isMock),
assistantId: "lead_agent",
threadId: onStreamThreadId,
reconnectOnMount: runMetadataStorageRef.current
? () => runMetadataStorageRef.current!
: false,
reconnectOnMount: true,
fetchStateHistory: { limit: 1 },
onCreated(meta) {
handleStreamStart(meta.thread_id);
setOnStreamThreadId(meta.thread_id);
handleStreamStart(meta.thread_id, meta.run_id);
if (context.agent_name && !isMock) {
void getAPIClient()
.threads.update(meta.thread_id, {
@@ -231,6 +183,34 @@ export function useThreadStream({
}
},
onUpdateEvent(data) {
if (data["SummarizationMiddleware.before_model"]) {
const _messages = [
...(data["SummarizationMiddleware.before_model"].messages ?? []),
];
if (_messages.length < 2) {
return;
}
for (const m of _messages) {
if (m.name === "summary" && m.type === "human") {
summarizedRef.current?.add(m.id ?? "");
}
}
const _lastKeepMessage = _messages[2];
const _currentMessages = [...messagesRef.current];
const _movedMessages: Message[] = [];
for (const m of _currentMessages) {
if (m.id !== undefined && m.id === _lastKeepMessage?.id) {
break;
}
if (!summarizedRef.current?.has(m.id ?? "")) {
_movedMessages.push(m);
}
}
appendMessages(_movedMessages);
messagesRef.current = [];
}
const updates: Array<Partial<AgentThreadState> | null> = Object.values(
data || {},
);
@@ -295,9 +275,6 @@ export function useThreadStream({
onFinish(state) {
listeners.current.onFinish?.(state.values);
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
void queryClient.invalidateQueries({
queryKey: ["thread-message-enrichment"],
});
},
});
@@ -305,24 +282,25 @@ export function useThreadStream({
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
const [isUploading, setIsUploading] = useState(false);
const sendInFlightRef = useRef(false);
const messagesRef = useRef<Message[]>([]);
const summarizedRef = useRef<Set<string>>(null);
// Track message count before sending so we know when server has responded
const prevMsgCountRef = useRef(thread.messages.length);
summarizedRef.current ??= new Set<string>();
// Reset thread-local pending UI state when switching between threads so
// optimistic messages and in-flight guards do not leak across chat views.
useEffect(() => {
startedRef.current = false;
sendInFlightRef.current = false;
prevMsgCountRef.current = 0;
setOptimisticMessages([]);
setIsUploading(false);
}, [threadId]);
// Clear optimistic when server messages arrive (count increases)
useEffect(() => {
if (
optimisticMessages.length > 0 &&
thread.messages.length > prevMsgCountRef.current + 1
thread.messages.length > prevMsgCountRef.current
) {
setOptimisticMessages([]);
}
@@ -381,12 +359,7 @@ export function useThreadStream({
}
setOptimisticMessages(newOptimistic);
// Only fire onStart immediately for an existing persisted thread.
// Brand-new chats should wait for onCreated(meta.thread_id) so URL sync
// uses the real server-generated thread id.
if (threadIdRef.current) {
_handleOnStart(threadId);
}
listeners.current.onSend?.(threadId);
let uploadedFileInfo: UploadedFileInfo[] = [];
@@ -520,19 +493,106 @@ export function useThreadStream({
sendInFlightRef.current = false;
}
},
[thread, _handleOnStart, t.uploads.uploadingFiles, context, queryClient],
[thread, t.uploads.uploadingFiles, context, queryClient],
);
// Merge thread with optimistic messages for display
const mergedThread =
optimisticMessages.length > 0
? ({
...thread,
messages: [...thread.messages, ...optimisticMessages],
} as typeof thread)
: thread;
// Cache the latest thread messages in a ref to compare against incoming history messages for deduplication,
// and to allow access to the full message list in onUpdateEvent without causing re-renders.
if (thread.messages.length >= messagesRef.current.length) {
messagesRef.current = thread.messages;
}
return [mergedThread, sendMessage, isUploading] as const;
const mergedMessages = mergeMessages(
history,
thread.messages,
optimisticMessages,
);
// Merge history, live stream, and optimistic messages for display
// History messages may overlap with thread.messages; thread.messages take precedence
const mergedThread = {
...thread,
messages: mergedMessages,
} as typeof thread;
return {
thread: mergedThread,
sendMessage,
isUploading,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} as const;
}
export function useThreadHistory(threadId: string) {
const runs = useThreadRuns(threadId);
const threadIdRef = useRef(threadId);
const runsRef = useRef(runs.data ?? []);
const indexRef = useRef(-1);
const loadingRef = useRef(false);
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<Message[]>([]);
loadingRef.current = loading;
const loadMessages = useCallback(async () => {
if (runsRef.current.length === 0) {
return;
}
const run = runsRef.current[indexRef.current];
if (!run || loadingRef.current) {
return;
}
try {
setLoading(true);
const result: { data: RunMessage[]; hasMore: boolean } = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadIdRef.current)}/runs/${encodeURIComponent(run.run_id)}/messages`,
{
method: "GET",
headers: {
"Content-Type": "application/json",
},
credentials: "include",
},
).then((res) => {
return res.json();
});
const _messages = result.data
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
.map((m) => m.content);
setMessages((prev) => [..._messages, ...prev]);
indexRef.current -= 1;
} catch (err) {
console.error(err);
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
threadIdRef.current = threadId;
if (runs.data && runs.data.length > 0) {
runsRef.current = runs.data ?? [];
indexRef.current = runs.data.length - 1;
}
loadMessages().catch(() => {
toast.error("Failed to load thread history.");
});
}, [threadId, runs.data, loadMessages]);
const appendMessages = useCallback((_messages: Message[]) => {
setMessages((prev) => {
return [...prev, ..._messages];
});
}, []);
const hasMore = indexRef.current >= 0 || !runs.data;
return {
runs: runs.data,
messages,
loading,
appendMessages,
hasMore,
loadMore: loadMessages,
};
}
export function useThreads(
@@ -602,6 +662,33 @@ export function useThreads(
});
}
export function useThreadRuns(threadId?: string) {
const apiClient = getAPIClient();
return useQuery<Run[]>({
queryKey: ["thread", threadId],
queryFn: async () => {
if (!threadId) {
return [];
}
const response = await apiClient.runs.list(threadId);
return response;
},
refetchOnWindowFocus: false,
});
}
export function useRunDetail(threadId: string, runId: string) {
const apiClient = getAPIClient();
return useQuery<Run>({
queryKey: ["thread", threadId, "run", runId],
queryFn: async () => {
const response = await apiClient.runs.get(threadId, runId);
return response;
},
refetchOnWindowFocus: false,
});
}
export function useDeleteThread() {
const queryClient = useQueryClient();
const apiClient = getAPIClient();
@@ -609,7 +696,7 @@ export function useDeleteThread() {
mutationFn: async ({ threadId }: { threadId: string }) => {
await apiClient.threads.delete(threadId);
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}`,
{
method: "DELETE",
@@ -682,65 +769,3 @@ export function useRenameThread() {
},
});
}
/** Per-message enrichment data attached by the backend ``/history`` helper. */
export interface MessageEnrichment {
run_id: string;
/** ``undefined`` = not feedback-eligible; ``null`` = eligible but unrated. */
feedback?: FeedbackData | null;
}
/**
* Fetch ``/history`` once and index feedback + run_id by message id.
*
* Replaces the old ``useThreadFeedback`` hook which keyed by AI-message
* ordinal position — an inherently fragile mapping that broke whenever
* ``ai_tool_call`` messages were interleaved with ``ai_message`` messages.
* Keying by ``message.id`` is stable regardless of run count, tool-call
* chains, or summarization.
*
* The ``/history`` response is refreshed on every stream completion via
* ``invalidateQueries(["thread-message-enrichment"])`` in ``onFinish``.
*/
export function useThreadMessageEnrichment(
threadId: string | null | undefined,
) {
return useQuery({
queryKey: ["thread-message-enrichment", threadId],
queryFn: async (): Promise<Map<string, MessageEnrichment>> => {
const empty = new Map<string, MessageEnrichment>();
if (!threadId) return empty;
const res = await fetchWithAuth(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}/history`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ limit: 1 }),
},
);
if (!res.ok) return empty;
const entries = (await res.json()) as Array<{
values?: {
messages?: Array<{
id?: string;
run_id?: string;
feedback?: FeedbackData | null;
}>;
};
}>;
const messages = entries[0]?.values?.messages ?? [];
const map = new Map<string, MessageEnrichment>();
for (const m of messages) {
if (!m.id || !m.run_id) continue;
const entry: MessageEnrichment = { run_id: m.run_id };
// Preserve presence: "feedback" key absent → ineligible; present with
// null → eligible but unrated; present with object → rated.
if ("feedback" in m) entry.feedback = m.feedback;
map.set(m.id, entry);
}
return map;
},
enabled: !!threadId,
staleTime: 30_000,
});
}
+9
View File
@@ -22,3 +22,12 @@ export interface AgentThreadContext extends Record<string, unknown> {
export interface AgentThread extends Thread<AgentThreadState> {
context?: AgentThreadContext;
}
export interface RunMessage {
run_id: string;
content: Message;
metadata: {
caller: string;
};
created_at: string;
}
+3 -3
View File
@@ -2,7 +2,7 @@
* API functions for file uploads
*/
import { fetchWithAuth } from "../api/fetcher";
import { fetch } from "../api/fetcher";
import { getBackendBaseURL } from "../config";
export interface UploadedFileInfo {
@@ -51,7 +51,7 @@ export async function uploadFiles(
formData.append("files", file);
});
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/threads/${threadId}/uploads`,
{
method: "POST",
@@ -92,7 +92,7 @@ export async function deleteUploadedFile(
threadId: string,
filename: string,
): Promise<{ success: boolean; message: string }> {
const response = await fetchWithAuth(
const response = await fetch(
`${getBackendBaseURL()}/api/threads/${threadId}/uploads/${filename}`,
{
method: "DELETE",