Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e5c7328cf5 |
@@ -0,0 +1,49 @@
|
||||
import type { Message } from "@langchain/langgraph-sdk";
|
||||
|
||||
/**
|
||||
* Deduplicate incoming messages against an existing history.
|
||||
* A message is considered a duplicate if its `id` or `tool_call_id`
|
||||
* (for tool messages) already appears in the existing list.
|
||||
*/
|
||||
export function deduplicateHistoryMessages(
|
||||
existing: Message[],
|
||||
incoming: Message[],
|
||||
): Message[] {
|
||||
const existingIds = new Set(
|
||||
existing
|
||||
.map((m) => ("tool_call_id" in m ? m.tool_call_id : m.id))
|
||||
.filter(Boolean),
|
||||
);
|
||||
|
||||
return incoming.filter((m) => {
|
||||
if (m.id && existingIds.has(m.id)) return false;
|
||||
if (
|
||||
"tool_call_id" in m &&
|
||||
m.tool_call_id &&
|
||||
existingIds.has(m.tool_call_id)
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the new history-loading index when the runs list grows.
|
||||
*
|
||||
* - `currentIndex < 0` means all previously-known runs have been loaded;
|
||||
* reset to the last run so the user can scroll up to load new runs.
|
||||
* - `currentIndex >= 0` means some runs haven't been loaded yet;
|
||||
* shift the index by the number of newly-added runs.
|
||||
* - If no new runs were added, return `currentIndex` unchanged.
|
||||
*/
|
||||
export function adjustHistoryIndex(
|
||||
currentIndex: number,
|
||||
prevRunsLength: number,
|
||||
newRunsLength: number,
|
||||
): number {
|
||||
const added = newRunsLength - prevRunsLength;
|
||||
if (added <= 0) return currentIndex;
|
||||
if (currentIndex < 0) return newRunsLength - 1;
|
||||
return currentIndex + added;
|
||||
}
|
||||
@@ -18,6 +18,10 @@ import type { UploadedFileInfo } from "../uploads";
|
||||
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
|
||||
|
||||
import { fetchThreadTokenUsage } from "./api";
|
||||
import {
|
||||
adjustHistoryIndex,
|
||||
deduplicateHistoryMessages,
|
||||
} from "./history-utils";
|
||||
import { threadTokenUsageQueryKey } from "./token-usage";
|
||||
import type {
|
||||
AgentThread,
|
||||
@@ -316,6 +320,9 @@ export function useThreadStream({
|
||||
);
|
||||
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
|
||||
if (threadIdRef.current && !isMock) {
|
||||
void queryClient.invalidateQueries({
|
||||
queryKey: ["thread", threadIdRef.current],
|
||||
});
|
||||
void queryClient.invalidateQueries({
|
||||
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
|
||||
});
|
||||
@@ -629,6 +636,7 @@ export function useThreadHistory(threadId: string) {
|
||||
const loadingRef = useRef(false);
|
||||
const [loading, setLoading] = useState(false);
|
||||
const [messages, setMessages] = useState<Message[]>([]);
|
||||
const initialLoadDoneRef = useRef(false);
|
||||
|
||||
loadingRef.current = loading;
|
||||
const loadMessages = useCallback(async () => {
|
||||
@@ -656,7 +664,10 @@ export function useThreadHistory(threadId: string) {
|
||||
const _messages = result.data
|
||||
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
|
||||
.map((m) => m.content);
|
||||
setMessages((prev) => [..._messages, ...prev]);
|
||||
setMessages((prev) => {
|
||||
const deduped = deduplicateHistoryMessages(prev, _messages);
|
||||
return [...deduped, ...prev];
|
||||
});
|
||||
indexRef.current -= 1;
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
@@ -664,15 +675,39 @@ export function useThreadHistory(threadId: string) {
|
||||
setLoading(false);
|
||||
}
|
||||
}, []);
|
||||
|
||||
// Reset state when threadId changes
|
||||
useEffect(() => {
|
||||
threadIdRef.current = threadId;
|
||||
runsRef.current = [];
|
||||
indexRef.current = -1;
|
||||
initialLoadDoneRef.current = false;
|
||||
setMessages([]);
|
||||
}, [threadId]);
|
||||
|
||||
// Load/update history when runs data changes
|
||||
useEffect(() => {
|
||||
if (runs.data && runs.data.length > 0) {
|
||||
runsRef.current = runs.data ?? [];
|
||||
const prevLength = runsRef.current.length;
|
||||
runsRef.current = runs.data;
|
||||
|
||||
if (!initialLoadDoneRef.current) {
|
||||
// Initial load: start from the most recent run
|
||||
initialLoadDoneRef.current = true;
|
||||
indexRef.current = runs.data.length - 1;
|
||||
}
|
||||
loadMessages().catch(() => {
|
||||
toast.error("Failed to load thread history.");
|
||||
});
|
||||
} else if (runs.data.length > prevLength) {
|
||||
// New runs added (e.g., after query invalidation): adjust indexRef
|
||||
// so the user can load older history by scrolling up
|
||||
indexRef.current = adjustHistoryIndex(
|
||||
indexRef.current,
|
||||
prevLength,
|
||||
runs.data.length,
|
||||
);
|
||||
}
|
||||
}
|
||||
}, [threadId, runs.data, loadMessages]);
|
||||
|
||||
const appendMessages = useCallback((_messages: Message[]) => {
|
||||
|
||||
@@ -0,0 +1,136 @@
|
||||
import type { Message } from "@langchain/langgraph-sdk";
|
||||
import { expect, test } from "vitest";
|
||||
|
||||
import {
|
||||
adjustHistoryIndex,
|
||||
deduplicateHistoryMessages,
|
||||
} from "@/core/threads/history-utils";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// deduplicateHistoryMessages
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
test("returns all incoming messages when existing history is empty", () => {
|
||||
const existing: Message[] = [];
|
||||
const incoming: Message[] = [
|
||||
{ type: "human", id: "m1", content: "hello" },
|
||||
{ type: "ai", id: "m2", content: "hi" },
|
||||
];
|
||||
|
||||
const result = deduplicateHistoryMessages(existing, incoming);
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result.map((m) => m.id)).toEqual(["m1", "m2"]);
|
||||
});
|
||||
|
||||
test("filters out messages whose id already exists in history", () => {
|
||||
const existing: Message[] = [
|
||||
{ type: "human", id: "m1", content: "hello" },
|
||||
{ type: "ai", id: "m2", content: "hi" },
|
||||
];
|
||||
const incoming: Message[] = [
|
||||
{ type: "human", id: "m1", content: "hello" }, // duplicate
|
||||
{ type: "ai", id: "m3", content: "new" },
|
||||
];
|
||||
|
||||
const result = deduplicateHistoryMessages(existing, incoming);
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]!.id).toBe("m3");
|
||||
});
|
||||
|
||||
test("filters out tool messages by tool_call_id", () => {
|
||||
const existing: Message[] = [
|
||||
{
|
||||
type: "tool",
|
||||
id: "t1",
|
||||
tool_call_id: "tc-1",
|
||||
content: "tool result",
|
||||
name: "search",
|
||||
} as unknown as Message,
|
||||
];
|
||||
const incoming: Message[] = [
|
||||
{
|
||||
type: "tool",
|
||||
id: "t1-dup",
|
||||
tool_call_id: "tc-1",
|
||||
content: "tool result",
|
||||
name: "search",
|
||||
} as unknown as Message,
|
||||
{
|
||||
type: "tool",
|
||||
id: "t2",
|
||||
tool_call_id: "tc-2",
|
||||
content: "other result",
|
||||
name: "search",
|
||||
} as unknown as Message,
|
||||
];
|
||||
|
||||
const result = deduplicateHistoryMessages(existing, incoming);
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]!.id).toBe("t2");
|
||||
});
|
||||
|
||||
test("keeps messages with no id or tool_call_id", () => {
|
||||
const existing: Message[] = [
|
||||
{ type: "human", id: "m1", content: "existing" },
|
||||
];
|
||||
const incoming: Message[] = [
|
||||
// Message without id — should be kept (not considered a duplicate)
|
||||
{ type: "ai", content: "no id" } as Message,
|
||||
];
|
||||
|
||||
const result = deduplicateHistoryMessages(existing, incoming);
|
||||
expect(result).toHaveLength(1);
|
||||
});
|
||||
|
||||
test("deduplicates against tool_call_id from existing messages", () => {
|
||||
// Existing message has tool_call_id stored in the id set
|
||||
const existing: Message[] = [
|
||||
{
|
||||
type: "tool",
|
||||
id: "t0",
|
||||
tool_call_id: "tc-x",
|
||||
content: "result",
|
||||
name: "tool",
|
||||
} as unknown as Message,
|
||||
];
|
||||
// Incoming AI message references the same id — should be filtered
|
||||
const incoming: Message[] = [{ type: "ai", id: "tc-x", content: "response" }];
|
||||
|
||||
const result = deduplicateHistoryMessages(existing, incoming);
|
||||
expect(result).toHaveLength(0);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// adjustHistoryIndex
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
test("returns unchanged index when no new runs were added", () => {
|
||||
expect(adjustHistoryIndex(2, 5, 5)).toBe(2);
|
||||
expect(adjustHistoryIndex(-1, 3, 3)).toBe(-1);
|
||||
expect(adjustHistoryIndex(0, 1, 0)).toBe(0); // shouldn't happen, but safe
|
||||
});
|
||||
|
||||
test("resets to last run when all previous runs were loaded", () => {
|
||||
// 3 runs existed, all loaded (index = -1), now 5 runs
|
||||
const result = adjustHistoryIndex(-1, 3, 5);
|
||||
expect(result).toBe(4); // last index of new runs list
|
||||
});
|
||||
|
||||
test("shifts index by number of added runs when some are unloaded", () => {
|
||||
// 3 runs, currently at index 1 (run at index 2 loaded), now 6 runs
|
||||
const result = adjustHistoryIndex(1, 3, 6);
|
||||
// 3 new runs added, shift: 1 + (6 - 3) = 4
|
||||
expect(result).toBe(4);
|
||||
});
|
||||
|
||||
test("handles single new run when all previous were loaded", () => {
|
||||
// 4 runs, all loaded (index = -1), now 5 runs
|
||||
const result = adjustHistoryIndex(-1, 4, 5);
|
||||
expect(result).toBe(4);
|
||||
});
|
||||
|
||||
test("handles transition from empty runs to populated", () => {
|
||||
// 0 runs → 3 runs, all loaded (index = -1)
|
||||
const result = adjustHistoryIndex(-1, 0, 3);
|
||||
expect(result).toBe(2);
|
||||
});
|
||||
Reference in New Issue
Block a user