mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-18 21:55:59 +00:00
fix(frontend): isolate new chat thread messages (#3508)
* fix(frontend): isolate new chat thread messages * fix(frontend): keep live messages visible in new chat * fix(frontend): reset thread-local message refs
This commit is contained in:
@@ -9,7 +9,7 @@ import {
|
||||
useQuery,
|
||||
useQueryClient,
|
||||
} from "@tanstack/react-query";
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||
import { toast } from "sonner";
|
||||
|
||||
import type { PromptInputMessage } from "@/components/ai-elements/prompt-input";
|
||||
@@ -41,6 +41,7 @@ export type ToolEndEvent = {
|
||||
|
||||
export type ThreadStreamOptions = {
|
||||
threadId?: string | null | undefined;
|
||||
displayThreadId?: string | null | undefined;
|
||||
context: LocalSettings["context"];
|
||||
isMock?: boolean;
|
||||
onSend?: (threadId: string) => void;
|
||||
@@ -53,6 +54,13 @@ type SendMessageOptions = {
|
||||
additionalKwargs?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
const EMPTY_THREAD_VALUES: AgentThreadState = {
|
||||
title: "",
|
||||
messages: [],
|
||||
artifacts: [],
|
||||
todos: [],
|
||||
};
|
||||
|
||||
function isNonEmptyString(value: string | undefined): value is string {
|
||||
return typeof value === "string" && value.length > 0;
|
||||
}
|
||||
@@ -388,6 +396,7 @@ function getStreamErrorMessage(error: unknown): string {
|
||||
|
||||
export function useThreadStream({
|
||||
threadId,
|
||||
displayThreadId,
|
||||
context,
|
||||
isMock,
|
||||
onSend,
|
||||
@@ -396,6 +405,18 @@ export function useThreadStream({
|
||||
onToolEnd,
|
||||
}: ThreadStreamOptions) {
|
||||
const { t } = useI18n();
|
||||
const currentViewThreadId = displayThreadId ?? threadId ?? null;
|
||||
const currentViewThreadIdRef = useRef(currentViewThreadId);
|
||||
currentViewThreadIdRef.current = currentViewThreadId;
|
||||
// Optimistic messages shown before the server stream responds.
|
||||
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
|
||||
const [optimisticThreadId, setOptimisticThreadId] = useState<string | null>(
|
||||
null,
|
||||
);
|
||||
const [liveMessagesThreadId, setLiveMessagesThreadId] = useState<
|
||||
string | null
|
||||
>(null);
|
||||
const [isUploading, setIsUploading] = useState(false);
|
||||
// Track the thread ID that is currently streaming to handle thread changes during streaming
|
||||
const [onStreamThreadId, setOnStreamThreadId] = useState(() => threadId);
|
||||
// Ref to track current thread ID across async callbacks without causing re-renders,
|
||||
@@ -437,6 +458,28 @@ export function useThreadStream({
|
||||
|
||||
const handleStreamStart = useCallback((_threadId: string, _runId: string) => {
|
||||
threadIdRef.current = _threadId;
|
||||
setOptimisticThreadId((currentOptimisticThreadId) => {
|
||||
const currentView = currentViewThreadIdRef.current;
|
||||
if (
|
||||
currentOptimisticThreadId &&
|
||||
(currentOptimisticThreadId === currentView ||
|
||||
currentOptimisticThreadId === _threadId)
|
||||
) {
|
||||
return _threadId;
|
||||
}
|
||||
return currentOptimisticThreadId;
|
||||
});
|
||||
setLiveMessagesThreadId((currentLiveMessagesThreadId) => {
|
||||
const currentView = currentViewThreadIdRef.current;
|
||||
if (
|
||||
currentLiveMessagesThreadId &&
|
||||
(currentLiveMessagesThreadId === currentView ||
|
||||
currentLiveMessagesThreadId === _threadId)
|
||||
) {
|
||||
return _threadId;
|
||||
}
|
||||
return currentLiveMessagesThreadId;
|
||||
});
|
||||
if (!startedRef.current) {
|
||||
listeners.current.onStart?.(_threadId, _runId);
|
||||
startedRef.current = true;
|
||||
@@ -608,6 +651,8 @@ export function useThreadStream({
|
||||
},
|
||||
onError(error) {
|
||||
setOptimisticMessages([]);
|
||||
setOptimisticThreadId(null);
|
||||
setLiveMessagesThreadId(null);
|
||||
toast.error(getStreamErrorMessage(error));
|
||||
pendingUsageBaselineMessageIdsRef.current = new Set(
|
||||
messagesRef.current
|
||||
@@ -639,10 +684,17 @@ export function useThreadStream({
|
||||
},
|
||||
});
|
||||
|
||||
// Optimistic messages shown before the server stream responds
|
||||
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
|
||||
const [isUploading, setIsUploading] = useState(false);
|
||||
const humanMessageCount = thread.messages.filter(
|
||||
const hasVisibleStreamState =
|
||||
Boolean(threadId) || liveMessagesThreadId === currentViewThreadId;
|
||||
const persistedMessages = useMemo(
|
||||
() => (hasVisibleStreamState ? thread.messages : []),
|
||||
[hasVisibleStreamState, thread.messages],
|
||||
);
|
||||
const visibleHistory = useMemo(
|
||||
() => (threadId ? history : []),
|
||||
[history, threadId],
|
||||
);
|
||||
const humanMessageCount = persistedMessages.filter(
|
||||
(m) => m.type === "human",
|
||||
).length;
|
||||
const latestMessageCountsRef = useRef({ humanMessageCount });
|
||||
@@ -663,15 +715,23 @@ export function useThreadStream({
|
||||
useEffect(() => {
|
||||
startedRef.current = false;
|
||||
sendInFlightRef.current = false;
|
||||
pendingUsageBaselineMessageIdsRef.current = new Set(
|
||||
messagesRef.current
|
||||
.map(messageIdentity)
|
||||
.filter((id): id is string => Boolean(id)),
|
||||
);
|
||||
messagesRef.current = [];
|
||||
summarizedRef.current = new Set<string>();
|
||||
pendingUsageBaselineMessageIdsRef.current = new Set();
|
||||
prevHumanMsgCountRef.current =
|
||||
latestMessageCountsRef.current.humanMessageCount;
|
||||
}, [threadId]);
|
||||
|
||||
useEffect(() => {
|
||||
if (optimisticThreadId && optimisticThreadId !== currentViewThreadId) {
|
||||
setOptimisticMessages([]);
|
||||
setOptimisticThreadId(null);
|
||||
}
|
||||
if (liveMessagesThreadId && liveMessagesThreadId !== currentViewThreadId) {
|
||||
setLiveMessagesThreadId(null);
|
||||
}
|
||||
}, [currentViewThreadId, liveMessagesThreadId, optimisticThreadId]);
|
||||
|
||||
// When streaming starts without a baseline (e.g. reconnection, run started
|
||||
// from another client, or page reload mid-stream), snapshot the current
|
||||
// messages so only *new* messages are treated as "pending" for token usage.
|
||||
@@ -681,12 +741,12 @@ export function useThreadStream({
|
||||
pendingUsageBaselineMessageIdsRef.current.size === 0
|
||||
) {
|
||||
pendingUsageBaselineMessageIdsRef.current = new Set(
|
||||
thread.messages
|
||||
persistedMessages
|
||||
.map(messageIdentity)
|
||||
.filter((id): id is string => Boolean(id)),
|
||||
);
|
||||
}
|
||||
}, [thread.isLoading, thread.messages]);
|
||||
}, [persistedMessages, thread.isLoading]);
|
||||
|
||||
// Clear optimistic when server messages arrive.
|
||||
// For messages with a human optimistic message, wait until the server's
|
||||
@@ -702,6 +762,7 @@ export function useThreadStream({
|
||||
|
||||
if (!hasHumanOptimistic || newHumanMsgArrived) {
|
||||
setOptimisticMessages([]);
|
||||
setOptimisticThreadId(null);
|
||||
}
|
||||
}, [hasHumanOptimistic, humanMessageCount, optimisticMessageCount]);
|
||||
|
||||
@@ -723,7 +784,7 @@ export function useThreadStream({
|
||||
// messages so we can wait for the server's copy of the user input.
|
||||
prevHumanMsgCountRef.current = humanMessageCount;
|
||||
pendingUsageBaselineMessageIdsRef.current = new Set(
|
||||
thread.messages
|
||||
persistedMessages
|
||||
.map(messageIdentity)
|
||||
.filter((id): id is string => Boolean(id)),
|
||||
);
|
||||
@@ -762,6 +823,8 @@ export function useThreadStream({
|
||||
additional_kwargs: { element: "task" },
|
||||
});
|
||||
}
|
||||
setOptimisticThreadId(threadId);
|
||||
setLiveMessagesThreadId(threadId);
|
||||
setOptimisticMessages(newOptimistic);
|
||||
|
||||
listeners.current.onSend?.(threadId);
|
||||
@@ -827,6 +890,8 @@ export function useThreadStream({
|
||||
: "Failed to upload files.";
|
||||
toast.error(errorMessage);
|
||||
setOptimisticMessages([]);
|
||||
setOptimisticThreadId(null);
|
||||
setLiveMessagesThreadId(null);
|
||||
throw error;
|
||||
} finally {
|
||||
setIsUploading(false);
|
||||
@@ -895,35 +960,44 @@ export function useThreadStream({
|
||||
});
|
||||
} catch (error) {
|
||||
setOptimisticMessages([]);
|
||||
setOptimisticThreadId(null);
|
||||
setLiveMessagesThreadId(null);
|
||||
setIsUploading(false);
|
||||
throw error;
|
||||
} finally {
|
||||
sendInFlightRef.current = false;
|
||||
}
|
||||
},
|
||||
[thread, t.uploads.uploadingFiles, context, queryClient, humanMessageCount],
|
||||
[
|
||||
thread,
|
||||
t.uploads.uploadingFiles,
|
||||
context,
|
||||
queryClient,
|
||||
humanMessageCount,
|
||||
persistedMessages,
|
||||
],
|
||||
);
|
||||
|
||||
// Cache the latest thread messages in a ref to compare against incoming history messages for deduplication,
|
||||
// and to allow access to the full message list in onUpdateEvent without causing re-renders.
|
||||
if (thread.messages.length >= messagesRef.current.length) {
|
||||
messagesRef.current = thread.messages;
|
||||
if (persistedMessages.length >= messagesRef.current.length) {
|
||||
messagesRef.current = persistedMessages;
|
||||
}
|
||||
|
||||
const visibleOptimisticMessages = getVisibleOptimisticMessages(
|
||||
optimisticMessages,
|
||||
optimisticThreadId === currentViewThreadId ? optimisticMessages : [],
|
||||
prevHumanMsgCountRef.current,
|
||||
humanMessageCount,
|
||||
);
|
||||
|
||||
const mergedMessages = mergeMessages(
|
||||
history,
|
||||
thread.messages,
|
||||
visibleHistory,
|
||||
persistedMessages,
|
||||
visibleOptimisticMessages,
|
||||
);
|
||||
const pendingUsageMessages = thread.isLoading
|
||||
? getMessagesAfterBaseline(
|
||||
thread.messages,
|
||||
persistedMessages,
|
||||
pendingUsageBaselineMessageIdsRef.current,
|
||||
)
|
||||
: [];
|
||||
@@ -932,6 +1006,7 @@ export function useThreadStream({
|
||||
// History messages may overlap with thread.messages; thread.messages take precedence
|
||||
const mergedThread = {
|
||||
...thread,
|
||||
values: hasVisibleStreamState ? thread.values : EMPTY_THREAD_VALUES,
|
||||
messages: mergedMessages,
|
||||
} as typeof thread;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user