diff --git a/frontend/src/core/threads/history-utils.ts b/frontend/src/core/threads/history-utils.ts new file mode 100644 index 000000000..d562f98ff --- /dev/null +++ b/frontend/src/core/threads/history-utils.ts @@ -0,0 +1,49 @@ +import type { Message } from "@langchain/langgraph-sdk"; + +/** + * Deduplicate incoming messages against an existing history. + * A message is considered a duplicate if its `id` or `tool_call_id` + * (for tool messages) already appears in the existing list. + */ +export function deduplicateHistoryMessages( + existing: Message[], + incoming: Message[], +): Message[] { + const existingIds = new Set( + existing + .map((m) => ("tool_call_id" in m ? m.tool_call_id : m.id)) + .filter(Boolean), + ); + + return incoming.filter((m) => { + if (m.id && existingIds.has(m.id)) return false; + if ( + "tool_call_id" in m && + m.tool_call_id && + existingIds.has(m.tool_call_id) + ) { + return false; + } + return true; + }); +} + +/** + * Compute the new history-loading index when the runs list grows. + * + * - `currentIndex < 0` means all previously-known runs have been loaded; + * reset to the last run so the user can scroll up to load new runs. + * - `currentIndex >= 0` means some runs haven't been loaded yet; + * shift the index by the number of newly-added runs. + * - If no new runs were added, return `currentIndex` unchanged. + */ +export function adjustHistoryIndex( + currentIndex: number, + prevRunsLength: number, + newRunsLength: number, +): number { + const added = newRunsLength - prevRunsLength; + if (added <= 0) return currentIndex; + if (currentIndex < 0) return newRunsLength - 1; + return currentIndex + added; +} diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index adf9dbbb6..aa16e51a0 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -18,6 +18,10 @@ import type { UploadedFileInfo } from "../uploads"; import { promptInputFilePartToFile, uploadFiles } from "../uploads"; import { fetchThreadTokenUsage } from "./api"; +import { + adjustHistoryIndex, + deduplicateHistoryMessages, +} from "./history-utils"; import { threadTokenUsageQueryKey } from "./token-usage"; import type { AgentThread, @@ -316,6 +320,9 @@ export function useThreadStream({ ); void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); if (threadIdRef.current && !isMock) { + void queryClient.invalidateQueries({ + queryKey: ["thread", threadIdRef.current], + }); void queryClient.invalidateQueries({ queryKey: threadTokenUsageQueryKey(threadIdRef.current), }); @@ -629,6 +636,7 @@ export function useThreadHistory(threadId: string) { const loadingRef = useRef(false); const [loading, setLoading] = useState(false); const [messages, setMessages] = useState([]); + const initialLoadDoneRef = useRef(false); loadingRef.current = loading; const loadMessages = useCallback(async () => { @@ -656,7 +664,10 @@ export function useThreadHistory(threadId: string) { const _messages = result.data .filter((m) => !m.metadata.caller?.startsWith("middleware:")) .map((m) => m.content); - setMessages((prev) => [..._messages, ...prev]); + setMessages((prev) => { + const deduped = deduplicateHistoryMessages(prev, _messages); + return [...deduped, ...prev]; + }); indexRef.current -= 1; } catch (err) { console.error(err); @@ -664,15 +675,39 @@ export function useThreadHistory(threadId: string) { setLoading(false); } }, []); + + // Reset state when threadId changes useEffect(() => { threadIdRef.current = threadId; + runsRef.current = []; + indexRef.current = -1; + initialLoadDoneRef.current = false; + setMessages([]); + }, [threadId]); + + // Load/update history when runs data changes + useEffect(() => { if (runs.data && runs.data.length > 0) { - runsRef.current = runs.data ?? []; - indexRef.current = runs.data.length - 1; + const prevLength = runsRef.current.length; + runsRef.current = runs.data; + + if (!initialLoadDoneRef.current) { + // Initial load: start from the most recent run + initialLoadDoneRef.current = true; + indexRef.current = runs.data.length - 1; + loadMessages().catch(() => { + toast.error("Failed to load thread history."); + }); + } else if (runs.data.length > prevLength) { + // New runs added (e.g., after query invalidation): adjust indexRef + // so the user can load older history by scrolling up + indexRef.current = adjustHistoryIndex( + indexRef.current, + prevLength, + runs.data.length, + ); + } } - loadMessages().catch(() => { - toast.error("Failed to load thread history."); - }); }, [threadId, runs.data, loadMessages]); const appendMessages = useCallback((_messages: Message[]) => { diff --git a/frontend/tests/unit/core/threads/history-utils.test.ts b/frontend/tests/unit/core/threads/history-utils.test.ts new file mode 100644 index 000000000..3cf5fa1d3 --- /dev/null +++ b/frontend/tests/unit/core/threads/history-utils.test.ts @@ -0,0 +1,136 @@ +import type { Message } from "@langchain/langgraph-sdk"; +import { expect, test } from "vitest"; + +import { + adjustHistoryIndex, + deduplicateHistoryMessages, +} from "@/core/threads/history-utils"; + +// --------------------------------------------------------------------------- +// deduplicateHistoryMessages +// --------------------------------------------------------------------------- + +test("returns all incoming messages when existing history is empty", () => { + const existing: Message[] = []; + const incoming: Message[] = [ + { type: "human", id: "m1", content: "hello" }, + { type: "ai", id: "m2", content: "hi" }, + ]; + + const result = deduplicateHistoryMessages(existing, incoming); + expect(result).toHaveLength(2); + expect(result.map((m) => m.id)).toEqual(["m1", "m2"]); +}); + +test("filters out messages whose id already exists in history", () => { + const existing: Message[] = [ + { type: "human", id: "m1", content: "hello" }, + { type: "ai", id: "m2", content: "hi" }, + ]; + const incoming: Message[] = [ + { type: "human", id: "m1", content: "hello" }, // duplicate + { type: "ai", id: "m3", content: "new" }, + ]; + + const result = deduplicateHistoryMessages(existing, incoming); + expect(result).toHaveLength(1); + expect(result[0]!.id).toBe("m3"); +}); + +test("filters out tool messages by tool_call_id", () => { + const existing: Message[] = [ + { + type: "tool", + id: "t1", + tool_call_id: "tc-1", + content: "tool result", + name: "search", + } as unknown as Message, + ]; + const incoming: Message[] = [ + { + type: "tool", + id: "t1-dup", + tool_call_id: "tc-1", + content: "tool result", + name: "search", + } as unknown as Message, + { + type: "tool", + id: "t2", + tool_call_id: "tc-2", + content: "other result", + name: "search", + } as unknown as Message, + ]; + + const result = deduplicateHistoryMessages(existing, incoming); + expect(result).toHaveLength(1); + expect(result[0]!.id).toBe("t2"); +}); + +test("keeps messages with no id or tool_call_id", () => { + const existing: Message[] = [ + { type: "human", id: "m1", content: "existing" }, + ]; + const incoming: Message[] = [ + // Message without id — should be kept (not considered a duplicate) + { type: "ai", content: "no id" } as Message, + ]; + + const result = deduplicateHistoryMessages(existing, incoming); + expect(result).toHaveLength(1); +}); + +test("deduplicates against tool_call_id from existing messages", () => { + // Existing message has tool_call_id stored in the id set + const existing: Message[] = [ + { + type: "tool", + id: "t0", + tool_call_id: "tc-x", + content: "result", + name: "tool", + } as unknown as Message, + ]; + // Incoming AI message references the same id — should be filtered + const incoming: Message[] = [{ type: "ai", id: "tc-x", content: "response" }]; + + const result = deduplicateHistoryMessages(existing, incoming); + expect(result).toHaveLength(0); +}); + +// --------------------------------------------------------------------------- +// adjustHistoryIndex +// --------------------------------------------------------------------------- + +test("returns unchanged index when no new runs were added", () => { + expect(adjustHistoryIndex(2, 5, 5)).toBe(2); + expect(adjustHistoryIndex(-1, 3, 3)).toBe(-1); + expect(adjustHistoryIndex(0, 1, 0)).toBe(0); // shouldn't happen, but safe +}); + +test("resets to last run when all previous runs were loaded", () => { + // 3 runs existed, all loaded (index = -1), now 5 runs + const result = adjustHistoryIndex(-1, 3, 5); + expect(result).toBe(4); // last index of new runs list +}); + +test("shifts index by number of added runs when some are unloaded", () => { + // 3 runs, currently at index 1 (run at index 2 loaded), now 6 runs + const result = adjustHistoryIndex(1, 3, 6); + // 3 new runs added, shift: 1 + (6 - 3) = 4 + expect(result).toBe(4); +}); + +test("handles single new run when all previous were loaded", () => { + // 4 runs, all loaded (index = -1), now 5 runs + const result = adjustHistoryIndex(-1, 4, 5); + expect(result).toBe(4); +}); + +test("handles transition from empty runs to populated", () => { + // 0 runs → 3 runs, all loaded (index = -1) + const result = adjustHistoryIndex(-1, 0, 3); + expect(result).toBe(2); +});