Compare commits

...

1 Commits

Author SHA1 Message Date
Willem Jiang e5c7328cf5 fix(chat): refresh run list after each turn so scroll-to-load history works
After context compression in an active conversation, scrolling to the top
  would not load earlier messages because useThreadRuns was never invalidated
  once new runs completed. The stale runs list left hasMore permanently false.

  - Invalidate the ["thread", threadId] query in onFinish so the runs list
    stays current as new runs are created
  - Split the useThreadHistory effect into separate reset (threadId change)
    and incremental-update (runs.data change) paths so indexRef is adjusted
    without resetting already-loaded history
  - Add message deduplication when prepending loaded run messages to guard
    against overlap with the live stream
  - Extract deduplicateHistoryMessages and adjustHistoryIndex into a pure
    utility module with 10 unit tests

  Closes #2965
2026-05-15 10:11:56 +08:00
3 changed files with 226 additions and 6 deletions
@@ -0,0 +1,49 @@
import type { Message } from "@langchain/langgraph-sdk";
/**
* Deduplicate incoming messages against an existing history.
* A message is considered a duplicate if its `id` or `tool_call_id`
* (for tool messages) already appears in the existing list.
*/
export function deduplicateHistoryMessages(
existing: Message[],
incoming: Message[],
): Message[] {
const existingIds = new Set(
existing
.map((m) => ("tool_call_id" in m ? m.tool_call_id : m.id))
.filter(Boolean),
);
return incoming.filter((m) => {
if (m.id && existingIds.has(m.id)) return false;
if (
"tool_call_id" in m &&
m.tool_call_id &&
existingIds.has(m.tool_call_id)
) {
return false;
}
return true;
});
}
/**
* Compute the new history-loading index when the runs list grows.
*
* - `currentIndex < 0` means all previously-known runs have been loaded;
* reset to the last run so the user can scroll up to load new runs.
* - `currentIndex >= 0` means some runs haven't been loaded yet;
* shift the index by the number of newly-added runs.
* - If no new runs were added, return `currentIndex` unchanged.
*/
export function adjustHistoryIndex(
currentIndex: number,
prevRunsLength: number,
newRunsLength: number,
): number {
const added = newRunsLength - prevRunsLength;
if (added <= 0) return currentIndex;
if (currentIndex < 0) return newRunsLength - 1;
return currentIndex + added;
}
+41 -6
View File
@@ -18,6 +18,10 @@ import type { UploadedFileInfo } from "../uploads";
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
import { fetchThreadTokenUsage } from "./api";
import {
adjustHistoryIndex,
deduplicateHistoryMessages,
} from "./history-utils";
import { threadTokenUsageQueryKey } from "./token-usage";
import type {
AgentThread,
@@ -316,6 +320,9 @@ export function useThreadStream({
);
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
if (threadIdRef.current && !isMock) {
void queryClient.invalidateQueries({
queryKey: ["thread", threadIdRef.current],
});
void queryClient.invalidateQueries({
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
});
@@ -629,6 +636,7 @@ export function useThreadHistory(threadId: string) {
const loadingRef = useRef(false);
const [loading, setLoading] = useState(false);
const [messages, setMessages] = useState<Message[]>([]);
const initialLoadDoneRef = useRef(false);
loadingRef.current = loading;
const loadMessages = useCallback(async () => {
@@ -656,7 +664,10 @@ export function useThreadHistory(threadId: string) {
const _messages = result.data
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
.map((m) => m.content);
setMessages((prev) => [..._messages, ...prev]);
setMessages((prev) => {
const deduped = deduplicateHistoryMessages(prev, _messages);
return [...deduped, ...prev];
});
indexRef.current -= 1;
} catch (err) {
console.error(err);
@@ -664,15 +675,39 @@ export function useThreadHistory(threadId: string) {
setLoading(false);
}
}, []);
// Reset state when threadId changes
useEffect(() => {
threadIdRef.current = threadId;
runsRef.current = [];
indexRef.current = -1;
initialLoadDoneRef.current = false;
setMessages([]);
}, [threadId]);
// Load/update history when runs data changes
useEffect(() => {
if (runs.data && runs.data.length > 0) {
runsRef.current = runs.data ?? [];
indexRef.current = runs.data.length - 1;
const prevLength = runsRef.current.length;
runsRef.current = runs.data;
if (!initialLoadDoneRef.current) {
// Initial load: start from the most recent run
initialLoadDoneRef.current = true;
indexRef.current = runs.data.length - 1;
loadMessages().catch(() => {
toast.error("Failed to load thread history.");
});
} else if (runs.data.length > prevLength) {
// New runs added (e.g., after query invalidation): adjust indexRef
// so the user can load older history by scrolling up
indexRef.current = adjustHistoryIndex(
indexRef.current,
prevLength,
runs.data.length,
);
}
}
loadMessages().catch(() => {
toast.error("Failed to load thread history.");
});
}, [threadId, runs.data, loadMessages]);
const appendMessages = useCallback((_messages: Message[]) => {
@@ -0,0 +1,136 @@
import type { Message } from "@langchain/langgraph-sdk";
import { expect, test } from "vitest";
import {
adjustHistoryIndex,
deduplicateHistoryMessages,
} from "@/core/threads/history-utils";
// ---------------------------------------------------------------------------
// deduplicateHistoryMessages
// ---------------------------------------------------------------------------
test("returns all incoming messages when existing history is empty", () => {
const existing: Message[] = [];
const incoming: Message[] = [
{ type: "human", id: "m1", content: "hello" },
{ type: "ai", id: "m2", content: "hi" },
];
const result = deduplicateHistoryMessages(existing, incoming);
expect(result).toHaveLength(2);
expect(result.map((m) => m.id)).toEqual(["m1", "m2"]);
});
test("filters out messages whose id already exists in history", () => {
const existing: Message[] = [
{ type: "human", id: "m1", content: "hello" },
{ type: "ai", id: "m2", content: "hi" },
];
const incoming: Message[] = [
{ type: "human", id: "m1", content: "hello" }, // duplicate
{ type: "ai", id: "m3", content: "new" },
];
const result = deduplicateHistoryMessages(existing, incoming);
expect(result).toHaveLength(1);
expect(result[0]!.id).toBe("m3");
});
test("filters out tool messages by tool_call_id", () => {
const existing: Message[] = [
{
type: "tool",
id: "t1",
tool_call_id: "tc-1",
content: "tool result",
name: "search",
} as unknown as Message,
];
const incoming: Message[] = [
{
type: "tool",
id: "t1-dup",
tool_call_id: "tc-1",
content: "tool result",
name: "search",
} as unknown as Message,
{
type: "tool",
id: "t2",
tool_call_id: "tc-2",
content: "other result",
name: "search",
} as unknown as Message,
];
const result = deduplicateHistoryMessages(existing, incoming);
expect(result).toHaveLength(1);
expect(result[0]!.id).toBe("t2");
});
test("keeps messages with no id or tool_call_id", () => {
const existing: Message[] = [
{ type: "human", id: "m1", content: "existing" },
];
const incoming: Message[] = [
// Message without id — should be kept (not considered a duplicate)
{ type: "ai", content: "no id" } as Message,
];
const result = deduplicateHistoryMessages(existing, incoming);
expect(result).toHaveLength(1);
});
test("deduplicates against tool_call_id from existing messages", () => {
// Existing message has tool_call_id stored in the id set
const existing: Message[] = [
{
type: "tool",
id: "t0",
tool_call_id: "tc-x",
content: "result",
name: "tool",
} as unknown as Message,
];
// Incoming AI message references the same id — should be filtered
const incoming: Message[] = [{ type: "ai", id: "tc-x", content: "response" }];
const result = deduplicateHistoryMessages(existing, incoming);
expect(result).toHaveLength(0);
});
// ---------------------------------------------------------------------------
// adjustHistoryIndex
// ---------------------------------------------------------------------------
test("returns unchanged index when no new runs were added", () => {
expect(adjustHistoryIndex(2, 5, 5)).toBe(2);
expect(adjustHistoryIndex(-1, 3, 3)).toBe(-1);
expect(adjustHistoryIndex(0, 1, 0)).toBe(0); // shouldn't happen, but safe
});
test("resets to last run when all previous runs were loaded", () => {
// 3 runs existed, all loaded (index = -1), now 5 runs
const result = adjustHistoryIndex(-1, 3, 5);
expect(result).toBe(4); // last index of new runs list
});
test("shifts index by number of added runs when some are unloaded", () => {
// 3 runs, currently at index 1 (run at index 2 loaded), now 6 runs
const result = adjustHistoryIndex(1, 3, 6);
// 3 new runs added, shift: 1 + (6 - 3) = 4
expect(result).toBe(4);
});
test("handles single new run when all previous were loaded", () => {
// 4 runs, all loaded (index = -1), now 5 runs
const result = adjustHistoryIndex(-1, 4, 5);
expect(result).toBe(4);
});
test("handles transition from empty runs to populated", () => {
// 0 runs → 3 runs, all loaded (index = -1)
const result = adjustHistoryIndex(-1, 0, 3);
expect(result).toBe(2);
});