Files
deer-flow/frontend/src/core/threads/hooks.ts
T
2026-06-11 15:58:40 +08:00

1178 lines
35 KiB
TypeScript

import type { AIMessage, Message, Run } from "@langchain/langgraph-sdk";
import { useStream } from "@langchain/langgraph-sdk/react";
import {
type QueryClient,
useMutation,
useQuery,
useQueryClient,
} from "@tanstack/react-query";
import { useCallback, useEffect, useRef, useState } from "react";
import { toast } from "sonner";
import type { PromptInputMessage } from "@/components/ai-elements/prompt-input";
import { getAPIClient } from "../api";
import { fetch } from "../api/fetcher";
import { getBackendBaseURL } from "../config";
import { useI18n } from "../i18n/hooks";
import { isHiddenFromUIMessage } from "../messages/utils";
import type { FileInMessage } from "../messages/utils";
import type { LocalSettings } from "../settings";
import { useUpdateSubtask } from "../tasks/context";
import type { UploadedFileInfo } from "../uploads";
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
import { fetchThreadTokenUsage } from "./api";
import {
buildThreadsSearchQueryOptions,
DEFAULT_THREAD_SEARCH_PARAMS,
type ThreadSearchParams,
} from "./thread-search-query";
import { threadTokenUsageQueryKey } from "./token-usage";
import type {
AgentThread,
AgentThreadState,
RunMessage,
ThreadTokenUsageResponse,
} from "./types";
export type ToolEndEvent = {
name: string;
data: unknown;
};
export type ThreadStreamOptions = {
threadId?: string | null | undefined;
context: LocalSettings["context"];
isMock?: boolean;
onSend?: (threadId: string) => void;
onStart?: (threadId: string, runId: string) => void;
onFinish?: (state: AgentThreadState) => void;
onToolEnd?: (event: ToolEndEvent) => void;
};
type SendMessageOptions = {
additionalKwargs?: Record<string, unknown>;
};
function isNonEmptyString(value: string | undefined): value is string {
return typeof value === "string" && value.length > 0;
}
const SUMMARIZATION_MIDDLEWARE_UPDATE_KEYS = new Set([
"SummarizationMiddleware.before_model",
"DeerFlowSummarizationMiddleware.before_model",
]);
function messageIdentity(message: Message): string | undefined {
if (
"tool_call_id" in message &&
typeof message.tool_call_id === "string" &&
message.tool_call_id.length > 0
) {
return `tool:${message.tool_call_id}`;
}
if (typeof message.id === "string" && message.id.length > 0) {
return `message:${message.id}`;
}
return undefined;
}
function dedupeMessagesByIdentity(messages: Message[]): Message[] {
const lastIndexByIdentity = new Map<string, number>();
const lastVisibleIndexByIdentity = new Map<string, number>();
// This is a UI-display dedupe rule, not a general LangChain message-stream
// contract. Hidden messages that share an identity with a visible message are
// treated as control messages for this merged view; hidden messages carrying
// independent tracing/task semantics should use a distinct id or a custom
// stream/state channel instead of relying on message dedupe preservation.
messages.forEach((message, index) => {
const identity = messageIdentity(message);
if (identity) {
lastIndexByIdentity.set(identity, index);
if (!isHiddenFromUIMessage(message)) {
lastVisibleIndexByIdentity.set(identity, index);
}
}
});
return messages.filter((message, index) => {
const identity = messageIdentity(message);
if (!identity) {
return true;
}
const visibleIndex = lastVisibleIndexByIdentity.get(identity);
if (visibleIndex !== undefined) {
return visibleIndex === index;
}
return lastIndexByIdentity.get(identity) === index;
});
}
export function findLatestUnloadedRunIndex(
runs: Run[],
loadedRunIds: ReadonlySet<string>,
): number {
for (let i = 0; i < runs.length; i++) {
const run = runs[i];
if (run && !loadedRunIds.has(run.run_id)) {
return i;
}
}
return -1;
}
export const MAX_CONSECUTIVE_EMPTY_RUN_LOADS = 5;
export function shouldAutoContinueOnEmptyRun(
fetchedMessageCount: number,
consecutiveEmptyLoads: number,
maxConsecutiveEmptyLoads: number = MAX_CONSECUTIVE_EMPTY_RUN_LOADS,
): boolean {
return (
fetchedMessageCount === 0 &&
consecutiveEmptyLoads < maxConsecutiveEmptyLoads
);
}
type RunMessagesPageResponse = {
data: RunMessage[];
has_more?: boolean;
hasMore?: boolean;
};
export function runMessagesPageHasMore(result: RunMessagesPageResponse) {
return result.has_more ?? result.hasMore ?? false;
}
export function getOldestRunMessageSeq(messages: RunMessage[]) {
let oldestSeq: number | null = null;
for (const message of messages) {
if (typeof message.seq !== "number") {
continue;
}
oldestSeq =
oldestSeq === null ? message.seq : Math.min(oldestSeq, message.seq);
}
return oldestSeq;
}
export function getNextRunMessagesBeforeSeq(
result: RunMessagesPageResponse,
): number | null | undefined {
if (!runMessagesPageHasMore(result)) {
return null;
}
return getOldestRunMessageSeq(result.data) ?? undefined;
}
export function buildRunMessagesUrl(
baseUrl: string,
threadId: string,
runId: string,
beforeSeq?: number,
) {
const normalizedBaseUrl = baseUrl.replace(/\/$/, "");
const path = `/api/threads/${encodeURIComponent(threadId)}/runs/${encodeURIComponent(runId)}/messages`;
const url = new URL(
`${normalizedBaseUrl}${path}`,
typeof window !== "undefined" ? window.location.origin : "http://localhost",
);
if (beforeSeq !== undefined) {
url.searchParams.set("before_seq", String(beforeSeq));
}
return normalizedBaseUrl ? url.toString() : `${url.pathname}${url.search}`;
}
export function mergeMessages(
historyMessages: Message[],
threadMessages: Message[],
optimisticMessages: Message[],
): Message[] {
// Only visible live messages should trim overlapping history. Hidden messages
// are UI control messages in this path, not observability records; any hidden
// message that must survive as task/tracing data should use custom events or a
// separate state channel instead of participating in this overlap heuristic.
const threadMessageIds = new Set(
threadMessages
.filter((message) => !isHiddenFromUIMessage(message))
.map(messageIdentity)
.filter(isNonEmptyString),
);
// The overlap is a contiguous suffix of historyMessages (newest history == oldest thread).
// Scan from the end: shrink cutoff while messages are already in thread, stop as soon as
// we hit one that isn't — everything before that point is non-overlapping.
let cutoff = historyMessages.length;
for (let i = historyMessages.length - 1; i >= 0; i--) {
const msg = historyMessages[i];
if (!msg) {
continue;
}
const identity = messageIdentity(msg);
if (identity && threadMessageIds.has(identity)) {
cutoff = i;
} else {
break;
}
}
return dedupeMessagesByIdentity([
...historyMessages.slice(0, cutoff),
...threadMessages,
...optimisticMessages,
]);
}
function getMessagesAfterBaseline(
messages: Message[],
baselineMessageIds: ReadonlySet<string>,
): Message[] {
return messages.filter((message) => {
const id = messageIdentity(message);
return !id || !baselineMessageIds.has(id);
});
}
export function getVisibleOptimisticMessages(
optimisticMessages: Message[],
previousHumanMessageCount: number,
currentHumanMessageCount: number,
): Message[] {
if (
optimisticMessages.some((message) => message.type === "human") &&
currentHumanMessageCount > previousHumanMessageCount
) {
return [];
}
return optimisticMessages;
}
export function getSummarizationMiddlewareMessages(
data: unknown,
): Message[] | undefined {
if (typeof data !== "object" || data === null) {
return undefined;
}
for (const [key, update] of Object.entries(data)) {
if (!SUMMARIZATION_MIDDLEWARE_UPDATE_KEYS.has(key)) {
continue;
}
if (typeof update !== "object" || update === null) {
continue;
}
const messages = Reflect.get(update, "messages");
if (Array.isArray(messages)) {
return [...messages] as Message[];
}
}
return undefined;
}
export function upsertThreadInSearchCache(
queryClient: QueryClient,
thread: AgentThread,
) {
queryClient.setQueriesData(
{
queryKey: ["threads", "search"],
exact: false,
},
(oldData: Array<AgentThread> | undefined) => {
if (!oldData) {
return [thread];
}
const existingIndex = oldData.findIndex(
(t) => t.thread_id === thread.thread_id,
);
if (existingIndex === -1) {
return [thread, ...oldData];
}
return oldData.map((t, index) => {
if (index !== existingIndex) {
return t;
}
return {
...thread,
...t,
metadata: {
...(thread.metadata ?? {}),
...(t.metadata ?? {}),
},
values: {
...thread.values,
...t.values,
},
};
});
},
);
}
function getStreamErrorMessage(error: unknown): string {
if (typeof error === "string" && error.trim()) {
return error;
}
if (error instanceof Error && error.message.trim()) {
return error.message;
}
if (typeof error === "object" && error !== null) {
const message = Reflect.get(error, "message");
if (typeof message === "string" && message.trim()) {
return message;
}
const nestedError = Reflect.get(error, "error");
if (nestedError instanceof Error && nestedError.message.trim()) {
return nestedError.message;
}
if (typeof nestedError === "string" && nestedError.trim()) {
return nestedError;
}
}
return "Request failed.";
}
export function useThreadStream({
threadId,
context,
isMock,
onSend,
onStart,
onFinish,
onToolEnd,
}: ThreadStreamOptions) {
const { t } = useI18n();
// Track the thread ID that is currently streaming to handle thread changes during streaming
const [onStreamThreadId, setOnStreamThreadId] = useState(() => threadId);
// Ref to track current thread ID across async callbacks without causing re-renders,
// and to allow access to the current thread id in onUpdateEvent
const threadIdRef = useRef<string | null>(threadId ?? null);
const startedRef = useRef(false);
const pendingUsageBaselineMessageIdsRef = useRef<Set<string>>(new Set());
const listeners = useRef({
onSend,
onStart,
onFinish,
onToolEnd,
});
const {
messages: history,
hasMore: hasMoreHistory,
loadMore: loadMoreHistory,
loading: isHistoryLoading,
appendMessages,
} = useThreadHistory(onStreamThreadId ?? "", { enabled: !isMock });
// Keep listeners ref updated with latest callbacks
useEffect(() => {
listeners.current = { onSend, onStart, onFinish, onToolEnd };
}, [onSend, onStart, onFinish, onToolEnd]);
useEffect(() => {
const normalizedThreadId = threadId ?? null;
if (!normalizedThreadId) {
// Reset when the UI moves back to a brand new unsaved thread.
startedRef.current = false;
setOnStreamThreadId(normalizedThreadId);
} else {
setOnStreamThreadId(normalizedThreadId);
}
threadIdRef.current = normalizedThreadId;
}, [threadId]);
const handleStreamStart = useCallback((_threadId: string, _runId: string) => {
threadIdRef.current = _threadId;
if (!startedRef.current) {
listeners.current.onStart?.(_threadId, _runId);
startedRef.current = true;
}
setOnStreamThreadId(_threadId);
}, []);
const queryClient = useQueryClient();
const updateSubtask = useUpdateSubtask();
const thread = useStream<AgentThreadState>({
client: getAPIClient(isMock),
assistantId: "lead_agent",
threadId: onStreamThreadId,
reconnectOnMount: true,
fetchStateHistory: { limit: 1 },
onCreated(meta) {
handleStreamStart(meta.thread_id, meta.run_id);
const now = new Date().toISOString();
upsertThreadInSearchCache(queryClient, {
thread_id: meta.thread_id,
created_at: now,
updated_at: now,
metadata: context.agent_name ? { agent_name: context.agent_name } : {},
status: "busy",
values: {
title: t.pages.newChat,
messages: [],
artifacts: [],
},
interrupts: {},
});
if (context.agent_name && !isMock) {
void getAPIClient()
.threads.update(meta.thread_id, {
metadata: { agent_name: context.agent_name },
})
.catch(() => ({}));
}
},
onLangChainEvent(event) {
if (event.event === "on_tool_end") {
listeners.current.onToolEnd?.({
name: event.name,
data: event.data,
});
}
},
onUpdateEvent(data) {
const _messages = getSummarizationMiddlewareMessages(data);
if (_messages && _messages.length >= 2) {
for (const m of _messages) {
if (m.name === "summary" && m.type === "human") {
summarizedRef.current?.add(m.id ?? "");
}
}
const firstRetainedVisibleIdentity = _messages
.filter((message) => message.type !== "remove")
.filter((message) => !isHiddenFromUIMessage(message))
.map(messageIdentity)
.find(isNonEmptyString);
const _currentMessages = [...messagesRef.current];
const _movedMessages: Message[] = [];
for (const m of _currentMessages) {
if (
firstRetainedVisibleIdentity &&
messageIdentity(m) === firstRetainedVisibleIdentity
) {
break;
}
if (!summarizedRef.current?.has(m.id ?? "")) {
_movedMessages.push(m);
}
}
appendMessages(_movedMessages);
messagesRef.current = [];
}
const updates: Array<Partial<AgentThreadState> | null> = Object.values(
data || {},
);
for (const update of updates) {
if (update && "title" in update && update.title) {
void queryClient.setQueriesData(
{
queryKey: ["threads", "search"],
exact: false,
},
(oldData: Array<AgentThread> | undefined) => {
return oldData?.map((t) => {
if (t.thread_id === threadIdRef.current) {
return {
...t,
values: {
...t.values,
title: update.title,
},
};
}
return t;
});
},
);
}
}
},
onCustomEvent(event: unknown) {
if (
typeof event === "object" &&
event !== null &&
"type" in event &&
event.type === "task_running"
) {
const e = event as {
type: "task_running";
task_id: string;
message: AIMessage;
};
updateSubtask({ id: e.task_id, latestMessage: e.message });
return;
}
if (
typeof event === "object" &&
event !== null &&
"type" in event &&
event.type === "llm_retry" &&
"message" in event &&
typeof event.message === "string" &&
event.message.trim()
) {
const e = event as { type: "llm_retry"; message: string };
toast(e.message);
}
},
onError(error) {
setOptimisticMessages([]);
toast.error(getStreamErrorMessage(error));
pendingUsageBaselineMessageIdsRef.current = new Set(
messagesRef.current
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
if (threadIdRef.current && !isMock) {
void queryClient.invalidateQueries({
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
});
}
},
onFinish(state) {
listeners.current.onFinish?.(state.values);
pendingUsageBaselineMessageIdsRef.current = new Set(
messagesRef.current
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
if (threadIdRef.current && !isMock) {
void queryClient.invalidateQueries({
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
});
}
},
});
// Optimistic messages shown before the server stream responds
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
const [isUploading, setIsUploading] = useState(false);
const humanMessageCount = thread.messages.filter(
(m) => m.type === "human",
).length;
const latestMessageCountsRef = useRef({ humanMessageCount });
const sendInFlightRef = useRef(false);
const messagesRef = useRef<Message[]>([]);
const summarizedRef = useRef<Set<string>>(null);
// Track human message count before sending to prevent clearing optimistic
// messages before the server's human message arrives (e.g. when AI messages
// from "messages-tuple" events arrive before the input human message from
// "values" events).
const prevHumanMsgCountRef = useRef(humanMessageCount);
latestMessageCountsRef.current = { humanMessageCount };
summarizedRef.current ??= new Set<string>();
// Reset thread-local pending UI state when switching between threads so
// optimistic messages and in-flight guards do not leak across chat views.
useEffect(() => {
startedRef.current = false;
sendInFlightRef.current = false;
pendingUsageBaselineMessageIdsRef.current = new Set(
messagesRef.current
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
prevHumanMsgCountRef.current =
latestMessageCountsRef.current.humanMessageCount;
}, [threadId]);
// When streaming starts without a baseline (e.g. reconnection, run started
// from another client, or page reload mid-stream), snapshot the current
// messages so only *new* messages are treated as "pending" for token usage.
useEffect(() => {
if (
thread.isLoading &&
pendingUsageBaselineMessageIdsRef.current.size === 0
) {
pendingUsageBaselineMessageIdsRef.current = new Set(
thread.messages
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
}
}, [thread.isLoading, thread.messages]);
// Clear optimistic when server messages arrive.
// For messages with a human optimistic message, wait until the server's
// human message has arrived to avoid clearing before the input message
// appears in the stream (the input message may arrive via "values" events
// after individual "messages-tuple" events for AI messages).
const optimisticMessageCount = optimisticMessages.length;
const hasHumanOptimistic = optimisticMessages.some((m) => m.type === "human");
useEffect(() => {
if (optimisticMessageCount === 0) return;
const newHumanMsgArrived = humanMessageCount > prevHumanMsgCountRef.current;
if (!hasHumanOptimistic || newHumanMsgArrived) {
setOptimisticMessages([]);
}
}, [hasHumanOptimistic, humanMessageCount, optimisticMessageCount]);
const sendMessage = useCallback(
async (
threadId: string,
message: PromptInputMessage,
extraContext?: Record<string, unknown>,
options?: SendMessageOptions,
) => {
if (sendInFlightRef.current) {
return;
}
sendInFlightRef.current = true;
const text = message.text.trim();
// Capture the current human message count before showing optimistic
// messages so we can wait for the server's copy of the user input.
prevHumanMsgCountRef.current = humanMessageCount;
pendingUsageBaselineMessageIdsRef.current = new Set(
thread.messages
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
// Build optimistic files list with uploading status
const optimisticFiles: FileInMessage[] = (message.files ?? []).map(
(f) => ({
filename: f.filename ?? "",
size: 0,
status: "uploading" as const,
}),
);
const hideFromUI = options?.additionalKwargs?.hide_from_ui === true;
const optimisticAdditionalKwargs = {
...options?.additionalKwargs,
...(optimisticFiles.length > 0 ? { files: optimisticFiles } : {}),
};
const newOptimistic: Message[] = [];
if (!hideFromUI) {
newOptimistic.push({
type: "human",
id: `opt-human-${Date.now()}`,
content: text ? [{ type: "text", text }] : "",
additional_kwargs: optimisticAdditionalKwargs,
});
}
if (optimisticFiles.length > 0 && !hideFromUI) {
// Mock AI message while files are being uploaded
newOptimistic.push({
type: "ai",
id: `opt-ai-${Date.now()}`,
content: t.uploads.uploadingFiles,
additional_kwargs: { element: "task" },
});
}
setOptimisticMessages(newOptimistic);
listeners.current.onSend?.(threadId);
let uploadedFileInfo: UploadedFileInfo[] = [];
try {
// Upload files first if any
if (message.files && message.files.length > 0) {
setIsUploading(true);
try {
const filePromises = message.files.map((fileUIPart) =>
promptInputFilePartToFile(fileUIPart),
);
const conversionResults = await Promise.all(filePromises);
const files = conversionResults.filter(
(file): file is File => file !== null,
);
const failedConversions = conversionResults.length - files.length;
if (failedConversions > 0) {
throw new Error(
`Failed to prepare ${failedConversions} attachment(s) for upload. Please retry.`,
);
}
if (!threadId) {
throw new Error("Thread is not ready for file upload.");
}
if (files.length > 0) {
const uploadResponse = await uploadFiles(threadId, files);
uploadedFileInfo = uploadResponse.files;
// Update optimistic human message with uploaded status + paths
const uploadedFiles: FileInMessage[] = uploadedFileInfo.map(
(info) => ({
filename: info.filename,
size: info.size,
path: info.virtual_path,
status: "uploaded" as const,
}),
);
setOptimisticMessages((messages) => {
if (messages.length > 1 && messages[0]) {
const humanMessage: Message = messages[0];
return [
{
...humanMessage,
additional_kwargs: { files: uploadedFiles },
},
...messages.slice(1),
];
}
return messages;
});
}
} catch (error) {
const errorMessage =
error instanceof Error
? error.message
: "Failed to upload files.";
toast.error(errorMessage);
setOptimisticMessages([]);
throw error;
} finally {
setIsUploading(false);
}
}
// Build files metadata for submission (included in additional_kwargs)
const filesForSubmit: FileInMessage[] = uploadedFileInfo.map(
(info) => ({
filename: info.filename,
size: info.size,
path: info.virtual_path,
status: "uploaded" as const,
}),
);
await thread.submit(
{
messages: [
{
type: "human",
content: [
{
type: "text",
text,
},
],
additional_kwargs: {
...options?.additionalKwargs,
...(filesForSubmit.length > 0
? { files: filesForSubmit }
: {}),
},
},
],
},
{
threadId: threadId,
streamSubgraphs: true,
streamResumable: true,
config: {
recursion_limit: 1000,
},
context: {
...extraContext,
...context,
thinking_enabled: context.mode !== "flash",
is_plan_mode: context.mode === "pro" || context.mode === "ultra",
subagent_enabled: context.mode === "ultra",
reasoning_effort:
context.reasoning_effort ??
(context.mode === "ultra"
? "high"
: context.mode === "pro"
? "medium"
: context.mode === "thinking"
? "low"
: undefined),
thread_id: threadId,
},
},
);
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
} catch (error) {
setOptimisticMessages([]);
setIsUploading(false);
throw error;
} finally {
sendInFlightRef.current = false;
}
},
[thread, t.uploads.uploadingFiles, context, queryClient, humanMessageCount],
);
// Cache the latest thread messages in a ref to compare against incoming history messages for deduplication,
// and to allow access to the full message list in onUpdateEvent without causing re-renders.
if (thread.messages.length >= messagesRef.current.length) {
messagesRef.current = thread.messages;
}
const visibleOptimisticMessages = getVisibleOptimisticMessages(
optimisticMessages,
prevHumanMsgCountRef.current,
humanMessageCount,
);
const mergedMessages = mergeMessages(
history,
thread.messages,
visibleOptimisticMessages,
);
const pendingUsageMessages = thread.isLoading
? getMessagesAfterBaseline(
thread.messages,
pendingUsageBaselineMessageIdsRef.current,
)
: [];
// Merge history, live stream, and optimistic messages for display
// History messages may overlap with thread.messages; thread.messages take precedence
const mergedThread = {
...thread,
messages: mergedMessages,
} as typeof thread;
return {
thread: mergedThread,
pendingUsageMessages,
sendMessage,
isUploading,
isHistoryLoading,
hasMoreHistory,
loadMoreHistory,
} as const;
}
type ThreadHistoryOptions = {
enabled?: boolean;
};
export function useThreadHistory(
threadId: string,
{ enabled = true }: ThreadHistoryOptions = {},
) {
const runs = useThreadRuns(threadId, { enabled });
const threadIdRef = useRef(threadId);
const runsRef = useRef(runs.data ?? []);
const indexRef = useRef(-1);
const loadingRef = useRef(false);
const pendingLoadRef = useRef(false);
const loadingRunIdRef = useRef<string | null>(null);
const loadedRunIdsRef = useRef<Set<string>>(new Set());
const runBeforeSeqRef = useRef<Map<string, number>>(new Map());
const loadGenerationRef = useRef(0);
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<Message[]>([]);
const loadMessages = useCallback(async () => {
if (!enabled) {
return;
}
const loadGeneration = loadGenerationRef.current;
if (loadingRef.current) {
const pendingRunIndex = findLatestUnloadedRunIndex(
runsRef.current,
loadedRunIdsRef.current,
);
const pendingRun = runsRef.current[pendingRunIndex];
if (pendingRun && pendingRun.run_id !== loadingRunIdRef.current) {
pendingLoadRef.current = true;
}
return;
}
if (runsRef.current.length === 0) {
return;
}
loadingRef.current = true;
setLoading(true);
try {
let consecutiveEmptyLoads = 0;
do {
pendingLoadRef.current = false;
const nextRunIndex = findLatestUnloadedRunIndex(
runsRef.current,
loadedRunIdsRef.current,
);
indexRef.current = nextRunIndex;
const run = runsRef.current[nextRunIndex];
if (!run) {
indexRef.current = -1;
return;
}
const requestThreadId = threadIdRef.current;
loadingRunIdRef.current = run.run_id;
const beforeSeq = runBeforeSeqRef.current.get(run.run_id);
const url = buildRunMessagesUrl(
getBackendBaseURL(),
requestThreadId,
run.run_id,
beforeSeq,
);
const result: RunMessagesPageResponse = await fetch(url, {
method: "GET",
headers: {
"Content-Type": "application/json",
},
credentials: "include",
}).then((res) => {
return res.json();
});
if (
loadGenerationRef.current !== loadGeneration ||
threadIdRef.current !== requestThreadId
) {
return;
}
const _messages = result.data
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
.map((m) => m.content);
setMessages((prev) =>
dedupeMessagesByIdentity([..._messages, ...prev]),
);
const nextBeforeSeq = getNextRunMessagesBeforeSeq(result);
if (typeof nextBeforeSeq === "number") {
runBeforeSeqRef.current.set(run.run_id, nextBeforeSeq);
pendingLoadRef.current = true;
} else if (nextBeforeSeq === undefined) {
console.warn(
`Run ${run.run_id} returned has_more without message seq values; leaving it pending for retry.`,
);
} else {
runBeforeSeqRef.current.delete(run.run_id);
loadedRunIdsRef.current.add(run.run_id);
if (
shouldAutoContinueOnEmptyRun(
_messages.length,
consecutiveEmptyLoads,
)
) {
consecutiveEmptyLoads += 1;
pendingLoadRef.current = true;
} else {
consecutiveEmptyLoads = 0;
}
}
indexRef.current = findLatestUnloadedRunIndex(
runsRef.current,
loadedRunIdsRef.current,
);
} while (pendingLoadRef.current);
} catch (err) {
console.error(err);
} finally {
if (loadGenerationRef.current === loadGeneration) {
loadingRef.current = false;
loadingRunIdRef.current = null;
setLoading(false);
}
}
}, [enabled]);
useEffect(() => {
const threadChanged = threadIdRef.current !== threadId;
threadIdRef.current = threadId;
if (!enabled || threadChanged) {
loadGenerationRef.current += 1;
runsRef.current = [];
indexRef.current = -1;
pendingLoadRef.current = false;
loadingRunIdRef.current = null;
loadedRunIdsRef.current = new Set();
runBeforeSeqRef.current = new Map();
loadingRef.current = false;
setLoading(false);
setMessages([]);
}
if (!enabled) {
return;
}
if (runs.data && runs.data.length > 0) {
runsRef.current = runs.data ?? [];
indexRef.current = findLatestUnloadedRunIndex(
runs.data,
loadedRunIdsRef.current,
);
}
loadMessages().catch(() => {
toast.error("Failed to load thread history.");
});
}, [enabled, threadId, runs.data, loadMessages]);
const appendMessages = useCallback((_messages: Message[]) => {
setMessages((prev) => {
return dedupeMessagesByIdentity([...prev, ..._messages]);
});
}, []);
const hasMore =
enabled && Boolean(threadId) && (indexRef.current >= 0 || !runs.data);
return {
runs: runs.data,
messages,
loading,
appendMessages,
hasMore,
loadMore: loadMessages,
};
}
export function useThreads(
params: ThreadSearchParams = DEFAULT_THREAD_SEARCH_PARAMS,
) {
const apiClient = getAPIClient();
return useQuery<AgentThread[]>({
...buildThreadsSearchQueryOptions(apiClient, params),
});
}
export function useThreadRuns(
threadId?: string,
{ enabled = true }: { enabled?: boolean } = {},
) {
const apiClient = getAPIClient();
return useQuery<Run[]>({
queryKey: ["thread", threadId],
queryFn: async () => {
if (!threadId) {
return [];
}
const response = await apiClient.runs.list(threadId);
return response;
},
enabled: enabled && Boolean(threadId),
refetchOnWindowFocus: false,
});
}
export function useThreadTokenUsage(
threadId?: string | null,
{ enabled = true }: { enabled?: boolean } = {},
) {
return useQuery<ThreadTokenUsageResponse | null>({
queryKey: threadTokenUsageQueryKey(threadId),
queryFn: async () => {
if (!threadId) {
return null;
}
return fetchThreadTokenUsage(threadId);
},
enabled: enabled && Boolean(threadId),
retry: false,
refetchOnWindowFocus: false,
});
}
export function useRunDetail(threadId: string, runId: string) {
const apiClient = getAPIClient();
return useQuery<Run>({
queryKey: ["thread", threadId, "run", runId],
queryFn: async () => {
const response = await apiClient.runs.get(threadId, runId);
return response;
},
refetchOnWindowFocus: false,
});
}
export function useDeleteThread() {
const queryClient = useQueryClient();
const apiClient = getAPIClient();
return useMutation({
mutationFn: async ({ threadId }: { threadId: string }) => {
await apiClient.threads.delete(threadId);
const response = await fetch(
`${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}`,
{
method: "DELETE",
},
);
if (!response.ok) {
const error = await response
.json()
.catch(() => ({ detail: "Failed to delete local thread data." }));
throw new Error(error.detail ?? "Failed to delete local thread data.");
}
},
onSuccess(_, { threadId }) {
queryClient.setQueriesData(
{
queryKey: ["threads", "search"],
exact: false,
},
(oldData: Array<AgentThread> | undefined) => {
if (oldData == null) {
return oldData;
}
return oldData.filter((t) => t.thread_id !== threadId);
},
);
},
onSettled() {
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
},
});
}
export function useRenameThread() {
const queryClient = useQueryClient();
const apiClient = getAPIClient();
return useMutation({
mutationFn: async ({
threadId,
title,
}: {
threadId: string;
title: string;
}) => {
await apiClient.threads.updateState(threadId, {
values: { title },
});
},
onSuccess(_, { threadId, title }) {
queryClient.setQueriesData(
{
queryKey: ["threads", "search"],
exact: false,
},
(oldData: Array<AgentThread>) => {
return oldData.map((t) => {
if (t.thread_id === threadId) {
return {
...t,
values: {
...t.values,
title,
},
};
}
return t;
});
},
);
},
});
}