Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e5c7328cf5 |
@@ -0,0 +1,49 @@
|
|||||||
|
import type { Message } from "@langchain/langgraph-sdk";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deduplicate incoming messages against an existing history.
|
||||||
|
* A message is considered a duplicate if its `id` or `tool_call_id`
|
||||||
|
* (for tool messages) already appears in the existing list.
|
||||||
|
*/
|
||||||
|
export function deduplicateHistoryMessages(
|
||||||
|
existing: Message[],
|
||||||
|
incoming: Message[],
|
||||||
|
): Message[] {
|
||||||
|
const existingIds = new Set(
|
||||||
|
existing
|
||||||
|
.map((m) => ("tool_call_id" in m ? m.tool_call_id : m.id))
|
||||||
|
.filter(Boolean),
|
||||||
|
);
|
||||||
|
|
||||||
|
return incoming.filter((m) => {
|
||||||
|
if (m.id && existingIds.has(m.id)) return false;
|
||||||
|
if (
|
||||||
|
"tool_call_id" in m &&
|
||||||
|
m.tool_call_id &&
|
||||||
|
existingIds.has(m.tool_call_id)
|
||||||
|
) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute the new history-loading index when the runs list grows.
|
||||||
|
*
|
||||||
|
* - `currentIndex < 0` means all previously-known runs have been loaded;
|
||||||
|
* reset to the last run so the user can scroll up to load new runs.
|
||||||
|
* - `currentIndex >= 0` means some runs haven't been loaded yet;
|
||||||
|
* shift the index by the number of newly-added runs.
|
||||||
|
* - If no new runs were added, return `currentIndex` unchanged.
|
||||||
|
*/
|
||||||
|
export function adjustHistoryIndex(
|
||||||
|
currentIndex: number,
|
||||||
|
prevRunsLength: number,
|
||||||
|
newRunsLength: number,
|
||||||
|
): number {
|
||||||
|
const added = newRunsLength - prevRunsLength;
|
||||||
|
if (added <= 0) return currentIndex;
|
||||||
|
if (currentIndex < 0) return newRunsLength - 1;
|
||||||
|
return currentIndex + added;
|
||||||
|
}
|
||||||
@@ -18,6 +18,10 @@ import type { UploadedFileInfo } from "../uploads";
|
|||||||
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
|
import { promptInputFilePartToFile, uploadFiles } from "../uploads";
|
||||||
|
|
||||||
import { fetchThreadTokenUsage } from "./api";
|
import { fetchThreadTokenUsage } from "./api";
|
||||||
|
import {
|
||||||
|
adjustHistoryIndex,
|
||||||
|
deduplicateHistoryMessages,
|
||||||
|
} from "./history-utils";
|
||||||
import { threadTokenUsageQueryKey } from "./token-usage";
|
import { threadTokenUsageQueryKey } from "./token-usage";
|
||||||
import type {
|
import type {
|
||||||
AgentThread,
|
AgentThread,
|
||||||
@@ -316,6 +320,9 @@ export function useThreadStream({
|
|||||||
);
|
);
|
||||||
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
|
void queryClient.invalidateQueries({ queryKey: ["threads", "search"] });
|
||||||
if (threadIdRef.current && !isMock) {
|
if (threadIdRef.current && !isMock) {
|
||||||
|
void queryClient.invalidateQueries({
|
||||||
|
queryKey: ["thread", threadIdRef.current],
|
||||||
|
});
|
||||||
void queryClient.invalidateQueries({
|
void queryClient.invalidateQueries({
|
||||||
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
|
queryKey: threadTokenUsageQueryKey(threadIdRef.current),
|
||||||
});
|
});
|
||||||
@@ -629,6 +636,7 @@ export function useThreadHistory(threadId: string) {
|
|||||||
const loadingRef = useRef(false);
|
const loadingRef = useRef(false);
|
||||||
const [loading, setLoading] = useState(false);
|
const [loading, setLoading] = useState(false);
|
||||||
const [messages, setMessages] = useState<Message[]>([]);
|
const [messages, setMessages] = useState<Message[]>([]);
|
||||||
|
const initialLoadDoneRef = useRef(false);
|
||||||
|
|
||||||
loadingRef.current = loading;
|
loadingRef.current = loading;
|
||||||
const loadMessages = useCallback(async () => {
|
const loadMessages = useCallback(async () => {
|
||||||
@@ -656,7 +664,10 @@ export function useThreadHistory(threadId: string) {
|
|||||||
const _messages = result.data
|
const _messages = result.data
|
||||||
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
|
.filter((m) => !m.metadata.caller?.startsWith("middleware:"))
|
||||||
.map((m) => m.content);
|
.map((m) => m.content);
|
||||||
setMessages((prev) => [..._messages, ...prev]);
|
setMessages((prev) => {
|
||||||
|
const deduped = deduplicateHistoryMessages(prev, _messages);
|
||||||
|
return [...deduped, ...prev];
|
||||||
|
});
|
||||||
indexRef.current -= 1;
|
indexRef.current -= 1;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(err);
|
console.error(err);
|
||||||
@@ -664,15 +675,39 @@ export function useThreadHistory(threadId: string) {
|
|||||||
setLoading(false);
|
setLoading(false);
|
||||||
}
|
}
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
|
// Reset state when threadId changes
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
threadIdRef.current = threadId;
|
threadIdRef.current = threadId;
|
||||||
|
runsRef.current = [];
|
||||||
|
indexRef.current = -1;
|
||||||
|
initialLoadDoneRef.current = false;
|
||||||
|
setMessages([]);
|
||||||
|
}, [threadId]);
|
||||||
|
|
||||||
|
// Load/update history when runs data changes
|
||||||
|
useEffect(() => {
|
||||||
if (runs.data && runs.data.length > 0) {
|
if (runs.data && runs.data.length > 0) {
|
||||||
runsRef.current = runs.data ?? [];
|
const prevLength = runsRef.current.length;
|
||||||
indexRef.current = runs.data.length - 1;
|
runsRef.current = runs.data;
|
||||||
|
|
||||||
|
if (!initialLoadDoneRef.current) {
|
||||||
|
// Initial load: start from the most recent run
|
||||||
|
initialLoadDoneRef.current = true;
|
||||||
|
indexRef.current = runs.data.length - 1;
|
||||||
|
loadMessages().catch(() => {
|
||||||
|
toast.error("Failed to load thread history.");
|
||||||
|
});
|
||||||
|
} else if (runs.data.length > prevLength) {
|
||||||
|
// New runs added (e.g., after query invalidation): adjust indexRef
|
||||||
|
// so the user can load older history by scrolling up
|
||||||
|
indexRef.current = adjustHistoryIndex(
|
||||||
|
indexRef.current,
|
||||||
|
prevLength,
|
||||||
|
runs.data.length,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
loadMessages().catch(() => {
|
|
||||||
toast.error("Failed to load thread history.");
|
|
||||||
});
|
|
||||||
}, [threadId, runs.data, loadMessages]);
|
}, [threadId, runs.data, loadMessages]);
|
||||||
|
|
||||||
const appendMessages = useCallback((_messages: Message[]) => {
|
const appendMessages = useCallback((_messages: Message[]) => {
|
||||||
|
|||||||
@@ -0,0 +1,136 @@
|
|||||||
|
import type { Message } from "@langchain/langgraph-sdk";
|
||||||
|
import { expect, test } from "vitest";
|
||||||
|
|
||||||
|
import {
|
||||||
|
adjustHistoryIndex,
|
||||||
|
deduplicateHistoryMessages,
|
||||||
|
} from "@/core/threads/history-utils";
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// deduplicateHistoryMessages
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
test("returns all incoming messages when existing history is empty", () => {
|
||||||
|
const existing: Message[] = [];
|
||||||
|
const incoming: Message[] = [
|
||||||
|
{ type: "human", id: "m1", content: "hello" },
|
||||||
|
{ type: "ai", id: "m2", content: "hi" },
|
||||||
|
];
|
||||||
|
|
||||||
|
const result = deduplicateHistoryMessages(existing, incoming);
|
||||||
|
expect(result).toHaveLength(2);
|
||||||
|
expect(result.map((m) => m.id)).toEqual(["m1", "m2"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("filters out messages whose id already exists in history", () => {
|
||||||
|
const existing: Message[] = [
|
||||||
|
{ type: "human", id: "m1", content: "hello" },
|
||||||
|
{ type: "ai", id: "m2", content: "hi" },
|
||||||
|
];
|
||||||
|
const incoming: Message[] = [
|
||||||
|
{ type: "human", id: "m1", content: "hello" }, // duplicate
|
||||||
|
{ type: "ai", id: "m3", content: "new" },
|
||||||
|
];
|
||||||
|
|
||||||
|
const result = deduplicateHistoryMessages(existing, incoming);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.id).toBe("m3");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("filters out tool messages by tool_call_id", () => {
|
||||||
|
const existing: Message[] = [
|
||||||
|
{
|
||||||
|
type: "tool",
|
||||||
|
id: "t1",
|
||||||
|
tool_call_id: "tc-1",
|
||||||
|
content: "tool result",
|
||||||
|
name: "search",
|
||||||
|
} as unknown as Message,
|
||||||
|
];
|
||||||
|
const incoming: Message[] = [
|
||||||
|
{
|
||||||
|
type: "tool",
|
||||||
|
id: "t1-dup",
|
||||||
|
tool_call_id: "tc-1",
|
||||||
|
content: "tool result",
|
||||||
|
name: "search",
|
||||||
|
} as unknown as Message,
|
||||||
|
{
|
||||||
|
type: "tool",
|
||||||
|
id: "t2",
|
||||||
|
tool_call_id: "tc-2",
|
||||||
|
content: "other result",
|
||||||
|
name: "search",
|
||||||
|
} as unknown as Message,
|
||||||
|
];
|
||||||
|
|
||||||
|
const result = deduplicateHistoryMessages(existing, incoming);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
expect(result[0]!.id).toBe("t2");
|
||||||
|
});
|
||||||
|
|
||||||
|
test("keeps messages with no id or tool_call_id", () => {
|
||||||
|
const existing: Message[] = [
|
||||||
|
{ type: "human", id: "m1", content: "existing" },
|
||||||
|
];
|
||||||
|
const incoming: Message[] = [
|
||||||
|
// Message without id — should be kept (not considered a duplicate)
|
||||||
|
{ type: "ai", content: "no id" } as Message,
|
||||||
|
];
|
||||||
|
|
||||||
|
const result = deduplicateHistoryMessages(existing, incoming);
|
||||||
|
expect(result).toHaveLength(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("deduplicates against tool_call_id from existing messages", () => {
|
||||||
|
// Existing message has tool_call_id stored in the id set
|
||||||
|
const existing: Message[] = [
|
||||||
|
{
|
||||||
|
type: "tool",
|
||||||
|
id: "t0",
|
||||||
|
tool_call_id: "tc-x",
|
||||||
|
content: "result",
|
||||||
|
name: "tool",
|
||||||
|
} as unknown as Message,
|
||||||
|
];
|
||||||
|
// Incoming AI message references the same id — should be filtered
|
||||||
|
const incoming: Message[] = [{ type: "ai", id: "tc-x", content: "response" }];
|
||||||
|
|
||||||
|
const result = deduplicateHistoryMessages(existing, incoming);
|
||||||
|
expect(result).toHaveLength(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// adjustHistoryIndex
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
test("returns unchanged index when no new runs were added", () => {
|
||||||
|
expect(adjustHistoryIndex(2, 5, 5)).toBe(2);
|
||||||
|
expect(adjustHistoryIndex(-1, 3, 3)).toBe(-1);
|
||||||
|
expect(adjustHistoryIndex(0, 1, 0)).toBe(0); // shouldn't happen, but safe
|
||||||
|
});
|
||||||
|
|
||||||
|
test("resets to last run when all previous runs were loaded", () => {
|
||||||
|
// 3 runs existed, all loaded (index = -1), now 5 runs
|
||||||
|
const result = adjustHistoryIndex(-1, 3, 5);
|
||||||
|
expect(result).toBe(4); // last index of new runs list
|
||||||
|
});
|
||||||
|
|
||||||
|
test("shifts index by number of added runs when some are unloaded", () => {
|
||||||
|
// 3 runs, currently at index 1 (run at index 2 loaded), now 6 runs
|
||||||
|
const result = adjustHistoryIndex(1, 3, 6);
|
||||||
|
// 3 new runs added, shift: 1 + (6 - 3) = 4
|
||||||
|
expect(result).toBe(4);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("handles single new run when all previous were loaded", () => {
|
||||||
|
// 4 runs, all loaded (index = -1), now 5 runs
|
||||||
|
const result = adjustHistoryIndex(-1, 4, 5);
|
||||||
|
expect(result).toBe(4);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("handles transition from empty runs to populated", () => {
|
||||||
|
// 0 runs → 3 runs, all loaded (index = -1)
|
||||||
|
const result = adjustHistoryIndex(-1, 0, 3);
|
||||||
|
expect(result).toBe(2);
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user