Merge branch 'main' into codex/im-channel-connections

This commit is contained in:
DanielWalnut
2026-06-11 22:32:36 +08:00
committed by GitHub
17 changed files with 816 additions and 77 deletions
+6 -7
View File
@@ -1,7 +1,7 @@
import Link from "next/link";
import { redirect } from "next/navigation";
import { type ReactNode } from "react";
import { GatewayOfflineFallback } from "@/components/workspace/gateway-offline-fallback";
import { AuthProvider } from "@/core/auth/AuthProvider";
import { getServerSideUser } from "@/core/auth/server";
import { assertNever } from "@/core/auth/types";
@@ -25,18 +25,17 @@ export default async function AuthLayout({
case "unauthenticated":
return <AuthProvider initialUser={null}>{children}</AuthProvider>;
case "gateway_unavailable":
// Auth pages have no banner of their own, so render one here. The
// fallback's AuthProvider replaces the bare-HTML branch that
// previously locked users out without any logout/retry capability.
return (
<GatewayOfflineFallback renderBanner>
<div className="flex h-screen flex-col items-center justify-center gap-4">
<p className="text-muted-foreground">
Service temporarily unavailable.
</p>
<Link
href="/login"
className="bg-primary text-primary-foreground hover:bg-primary/90 rounded-md px-4 py-2 text-sm"
>
Retry
</Link>
</div>
</GatewayOfflineFallback>
);
case "config_error":
throw new Error(result.message);
@@ -72,20 +72,21 @@ export default function AgentChatPage() {
loadMoreHistory,
} = useThreadStream({
threadId: isNewThread ? undefined : threadId,
displayThreadId: threadId,
context: { ...settings.context, agent_name: agent_name },
isMock,
onSend: () => {
setIsWelcomeMode(false);
},
onStart: (createdThreadId) => {
setThreadId(createdThreadId);
setIsNewThread(false);
// ! Important: Never use next.js router for navigation in this case, otherwise it will cause the thread to re-mount and lose all states. Use native history API instead.
history.replaceState(
null,
"",
`/workspace/agents/${agent_name}/chats/${createdThreadId}`,
);
setThreadId(createdThreadId);
setIsNewThread(false);
},
onFinish: (state) => {
if (document.hidden || !document.hasFocus()) {
@@ -75,6 +75,7 @@ export default function ChatPage() {
loadMoreHistory,
} = useThreadStream({
threadId: isNewThread ? undefined : threadId,
displayThreadId: threadId,
context: settings.context,
isMock,
// onSend only animates the UI; do NOT flip `isNewThread` here — the
@@ -84,10 +85,10 @@ export default function ChatPage() {
setIsWelcomeMode(false);
},
onStart: (createdThreadId) => {
setThreadId(createdThreadId);
setIsNewThread(false);
// ! Important: Never use next.js router for navigation in this case, otherwise it will cause the thread to re-mount and lose all states. Use native history API instead.
history.replaceState(null, "", `/workspace/chats/${createdThreadId}`);
setThreadId(createdThreadId);
setIsNewThread(false);
},
onFinish: (state) => {
if (document.hidden || !document.hasFocus()) {
+7 -25
View File
@@ -1,6 +1,6 @@
import Link from "next/link";
import { redirect } from "next/navigation";
import { GatewayOfflineFallback } from "@/components/workspace/gateway-offline-fallback";
import { AuthProvider } from "@/core/auth/AuthProvider";
import { getServerSideUser } from "@/core/auth/server";
import { assertNever } from "@/core/auth/types";
@@ -28,31 +28,13 @@ export default async function WorkspaceLayout({
case "unauthenticated":
redirect("/login");
case "gateway_unavailable":
// GatewayOfflineFallback supplies the AuthProvider; WorkspaceContent
// already mounts the banner inside its sidebar layout, so renderBanner
// stays false here to avoid double-mounting.
return (
<div className="flex h-screen flex-col items-center justify-center gap-4">
<p className="text-muted-foreground">
Service temporarily unavailable.
</p>
<p className="text-muted-foreground text-xs">
The backend may be restarting. Please wait a moment and try again.
</p>
<div className="flex gap-3">
<Link
href="/workspace"
className="bg-primary text-primary-foreground hover:bg-primary/90 rounded-md px-4 py-2 text-sm"
>
Retry
</Link>
<form action="/api/v1/auth/logout" method="post">
<button
type="submit"
className="text-muted-foreground hover:bg-muted rounded-md border px-4 py-2 text-sm"
>
Logout &amp; Reset
</button>
</form>
</div>
</div>
<GatewayOfflineFallback>
<WorkspaceContent gatewayUnavailable>{children}</WorkspaceContent>
</GatewayOfflineFallback>
);
case "config_error":
throw new Error(result.message);
@@ -4,6 +4,7 @@ import { Toaster } from "sonner";
import { QueryClientProvider } from "@/components/query-client-provider";
import { SidebarInset, SidebarProvider } from "@/components/ui/sidebar";
import { CommandPalette } from "@/components/workspace/command-palette";
import { GatewayOfflineBanner } from "@/components/workspace/gateway-offline-banner";
import { WorkspaceSidebar } from "@/components/workspace/workspace-sidebar";
function parseSidebarOpenCookie(
@@ -16,7 +17,11 @@ function parseSidebarOpenCookie(
export async function WorkspaceContent({
children,
}: Readonly<{ children: React.ReactNode }>) {
gatewayUnavailable = false,
}: Readonly<{
children: React.ReactNode;
gatewayUnavailable?: boolean;
}>) {
const cookieStore = await cookies();
const initialSidebarOpen = parseSidebarOpenCookie(
cookieStore.get("sidebar_state")?.value,
@@ -26,7 +31,10 @@ export async function WorkspaceContent({
<QueryClientProvider>
<SidebarProvider className="h-screen" defaultOpen={initialSidebarOpen}>
<WorkspaceSidebar />
<SidebarInset className="min-w-0">{children}</SidebarInset>
<SidebarInset className="min-w-0">
<GatewayOfflineBanner gatewayUnavailable={gatewayUnavailable} />
{children}
</SidebarInset>
</SidebarProvider>
<CommandPalette />
<Toaster position="top-center" />
@@ -1,29 +1,44 @@
"use client";
import { useParams, usePathname, useSearchParams } from "next/navigation";
import { useEffect, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { uuid } from "@/core/utils/uuid";
export function useThreadChat() {
const { thread_id: threadIdFromPath } = useParams<{ thread_id: string }>();
const pathname = usePathname();
const actualPathname =
typeof window === "undefined" ? pathname : window.location.pathname;
const isNewPath = actualPathname.endsWith("/new");
const newThreadIdRef = useRef<string | null>(
threadIdFromPath === "new" ? uuid() : null,
);
if (isNewPath && !newThreadIdRef.current) {
newThreadIdRef.current = uuid();
}
const searchParams = useSearchParams();
const [threadId, setThreadId] = useState(() => {
return threadIdFromPath === "new" ? uuid() : threadIdFromPath;
const [threadId, setThreadIdState] = useState(() => {
return threadIdFromPath === "new"
? (newThreadIdRef.current ?? uuid())
: threadIdFromPath;
});
const [isNewThread, setIsNewThread] = useState(
const [isNewThreadState, setIsNewThreadState] = useState(
() => threadIdFromPath === "new",
);
useEffect(() => {
if (pathname.endsWith("/new")) {
setIsNewThread(true);
setThreadId(uuid());
if (isNewPath) {
const nextThreadId = newThreadIdRef.current ?? uuid();
newThreadIdRef.current = nextThreadId;
setIsNewThreadState(true);
setThreadIdState(nextThreadId);
return;
}
newThreadIdRef.current = null;
// Guard: after history.replaceState updates the URL from /chats/new to
// /chats/{UUID}, Next.js useParams may still return the stale "new" value
// because replaceState does not trigger router updates. Avoid propagating
@@ -32,9 +47,28 @@ export function useThreadChat() {
if (threadIdFromPath === "new") {
return;
}
setIsNewThread(false);
setThreadId(threadIdFromPath);
}, [pathname, threadIdFromPath]);
const isMock = searchParams.get("mock") === "true";
return { threadId, setThreadId, isNewThread, setIsNewThread, isMock };
setIsNewThreadState(false);
setThreadIdState(threadIdFromPath);
}, [isNewPath, threadIdFromPath]);
const setThreadId = useCallback((nextThreadId: string) => {
newThreadIdRef.current = null;
setThreadIdState(nextThreadId);
}, []);
const setIsNewThread = useCallback((nextIsNewThread: boolean) => {
if (!nextIsNewThread) {
newThreadIdRef.current = null;
}
setIsNewThreadState(nextIsNewThread);
}, []);
const isMock = searchParams.get("mock") === "true";
return {
threadId: isNewPath ? (newThreadIdRef.current ?? threadId) : threadId,
setThreadId,
isNewThread: isNewPath ? true : isNewThreadState,
setIsNewThread,
isMock,
};
}
@@ -0,0 +1,87 @@
export const OFFLINE_BANNER_RETRY_INTERVAL_MS = 10_000;
/**
* Number of consecutive 401 responses before treating the session as
* expired and delegating to AuthProvider.refreshUser() for /login redirect.
*
* Threshold > 1 absorbs transient 401s that may occur in the first few
* milliseconds after a gateway becomes ready again, without indefinitely
* masking a genuinely expired cookie.
*/
export const OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD = 3;
import type { User } from "@/core/auth/types";
export function shouldShowOfflineBanner(
user: User | null,
gatewayUnavailable: boolean,
): boolean {
return gatewayUnavailable && user === null;
}
/** Categorised outcome of a single /auth/me probe. */
export type ProbeOutcome =
| { kind: "ok"; user: User } // 2xx with parsed body
| { kind: "unauthorized" } // 401
| { kind: "transient" }; // 5xx, network, abort, malformed body, etc.
/** Next action the banner effect should take after a probe. */
export type ProbeAction =
| { type: "apply-user"; user: User }
| { type: "delegate-refresh"; reason: "session-expired" }
| { type: "noop"; nextFailureCount: number };
/**
* Pure: classify an HTTP probe outcome into ProbeOutcome.
*
* Extracted from the banner effect so it can be unit-tested independently.
* `parsedUser` is the JSON body of a 2xx response (or null if absent/malformed);
* surfacing it through ProbeOutcome lets the caller apply it directly instead
* of paying for a second /auth/me round-trip via refreshUser().
*/
export function classifyProbe(
res: Response | null,
errored: boolean,
parsedUser: User | null = null,
): ProbeOutcome {
if (errored || res === null) return { kind: "transient" };
if (res.ok && parsedUser !== null) return { kind: "ok", user: parsedUser };
if (res.ok) return { kind: "transient" }; // 2xx but body unusable
if (res.status === 401) return { kind: "unauthorized" };
return { kind: "transient" };
}
/**
* Pure state machine for what to do after a probe lands.
*
* Inputs: how many consecutive 401s we've seen so far + the new outcome.
* Outputs: either "apply the user body we just fetched", "delegate to
* refreshUser() for /login redirect", or "do nothing, update counter".
*
* Transient outcomes (5xx / network / abort) decrement the auth-failure
* streak by 1 (floored at 0) rather than resetting it. This prevents a
* flapping gateway that alternates 401 ↔ 5xx from indefinitely masking a
* genuinely expired session: the streak still converges on the threshold.
*/
export function decideProbeAction(
consecutiveAuthFailures: number,
outcome: ProbeOutcome,
threshold: number = OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD,
): ProbeAction {
if (outcome.kind === "ok") {
return { type: "apply-user", user: outcome.user };
}
if (outcome.kind === "unauthorized") {
const next = consecutiveAuthFailures + 1;
if (next >= threshold) {
return { type: "delegate-refresh", reason: "session-expired" };
}
return { type: "noop", nextFailureCount: next };
}
// transient: decrement rather than reset so a flapping gateway
// (alternating 401 ↔ 5xx) still converges on session-expired.
return {
type: "noop",
nextFailureCount: Math.max(0, consecutiveAuthFailures - 1),
};
}
@@ -0,0 +1,130 @@
"use client";
import { useEffect, useRef } from "react";
import { useAuth } from "@/core/auth/AuthProvider";
import { userSchema, type User } from "@/core/auth/types";
import { useI18n } from "@/core/i18n/hooks";
import {
OFFLINE_BANNER_RETRY_INTERVAL_MS,
classifyProbe,
decideProbeAction,
shouldShowOfflineBanner,
} from "./gateway-offline-banner-helpers";
interface GatewayOfflineBannerProps {
/**
* True when the server-side auth probe at `/api/v1/auth/me` could not
* reach the gateway. The banner stays mounted until a client-side probe
* confirms the gateway is healthy and `user` becomes populated.
*/
gatewayUnavailable: boolean;
}
export function GatewayOfflineBanner({
gatewayUnavailable,
}: GatewayOfflineBannerProps) {
const { t } = useI18n();
const { user, applyUser, refreshUser, logout } = useAuth();
// Guard against piling up probe calls while the gateway is still slow.
const inFlightRef = useRef(false);
// Count consecutive 401s so we can distinguish "transient warm-up 401"
// from "session actually expired" and avoid lying with the banner.
const authFailuresRef = useRef(0);
useEffect(() => {
if (!gatewayUnavailable) return;
// Once AuthProvider has a user again the banner has served its
// purpose; tear down the polling so we don't keep probing every 10s
// for the entire lifetime of the page (gatewayUnavailable is a
// server-rendered prop and stays true until a full reload).
if (user !== null) return;
const probe = async () => {
if (inFlightRef.current) return;
inFlightRef.current = true;
let res: Response | null = null;
let errored = false;
let parsedUser: User | null = null;
try {
res = await fetch("/api/v1/auth/me", {
credentials: "include",
cache: "no-store",
});
// Reuse the probe's own response body instead of triggering a
// second /auth/me request via refreshUser() — halves the recovery
// burst against an already-struggling gateway.
if (res.ok) {
try {
const data = await res.json();
const parsed = userSchema.safeParse(data);
if (parsed.success) parsedUser = parsed.data;
} catch (err) {
console.warn(
"[gateway-offline-banner] probe body parse failed:",
err,
);
}
}
} catch (err) {
console.warn("[gateway-offline-banner] probe failed:", err);
errored = true;
} finally {
inFlightRef.current = false;
}
const action = decideProbeAction(
authFailuresRef.current,
classifyProbe(res, errored, parsedUser),
);
if (action.type === "apply-user") {
authFailuresRef.current = 0;
applyUser(action.user);
return;
}
if (action.type === "delegate-refresh") {
// Hand off to AuthProvider, which on 401 will /login-redirect.
authFailuresRef.current = 0;
await refreshUser();
return;
}
authFailuresRef.current = action.nextFailureCount;
};
void probe();
const handle = window.setInterval(() => {
void probe();
}, OFFLINE_BANNER_RETRY_INTERVAL_MS);
return () => {
window.clearInterval(handle);
};
}, [gatewayUnavailable, user, applyUser, refreshUser]);
if (!shouldShowOfflineBanner(user, gatewayUnavailable)) {
return null;
}
return (
<div
role="status"
aria-live="polite"
className="bg-muted text-muted-foreground flex items-center justify-between gap-3 border-b px-4 py-2 text-sm"
>
<span>
{t.workspace.gatewayUnavailable}{" "}
{t.workspace.gatewayUnavailableRetrying}
</span>
<button
type="button"
onClick={() => {
void logout();
}}
className="hover:bg-background rounded-md border px-3 py-1 text-xs"
>
{t.workspace.logout}
</button>
</div>
);
}
@@ -0,0 +1,36 @@
"use client";
import { AuthProvider } from "@/core/auth/AuthProvider";
import { GatewayOfflineBanner } from "./gateway-offline-banner";
interface GatewayOfflineFallbackProps {
/**
* When true, this component renders its own banner. The workspace layout
* sets this to false because WorkspaceContent already mounts the banner
* inside its sidebar layout. The (auth) layout sets it to true because
* its plain children have no banner of their own.
*/
renderBanner?: boolean;
children?: React.ReactNode;
}
/**
* Shared fallback shown by both the workspace and (auth) layouts when the
* server-side auth probe could not reach the gateway. Wraps the children
* with an AuthProvider so the banner's probe / logout / refresh hooks work
* — fixing the `(auth)/layout.tsx` lockup where the bare static HTML had
* no AuthProvider / QueryClientProvider and the user could not recover
* without a manual reload.
*/
export function GatewayOfflineFallback({
renderBanner = false,
children,
}: GatewayOfflineFallbackProps) {
return (
<AuthProvider initialUser={null}>
{renderBanner && <GatewayOfflineBanner gatewayUnavailable />}
{children}
</AuthProvider>
);
}
+29 -2
View File
@@ -26,6 +26,7 @@ interface AuthContextType {
isLoading: boolean;
logout: () => Promise<void>;
refreshUser: () => Promise<void>;
applyUser: (user: User | null) => void;
}
const AuthContext = createContext<AuthContextType | undefined>(undefined);
@@ -52,6 +53,15 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
const isAuthenticated = user !== null;
/**
* Apply a user value supplied by a caller (e.g. banner probe) that has
* already fetched it. Equivalent to setUser, exposed with a stable name
* so consumers don't reach into React internals.
*/
const applyUser = useCallback((next: User | null) => {
setUser(next);
}, []);
/**
* Fetch current user from FastAPI
* Used when initialUser might be stale (e.g., after tab was inactive)
@@ -87,6 +97,13 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
/**
* Logout - call FastAPI logout endpoint and clear local state
* Per RFC-001: Immediately clear local state, don't wait for server confirmation
*
* When the gateway is unreachable the fetch silently fails — the SPA
* router.push("/") would leave the user on "/" still holding stale
* React state and any in-flight SSE / fetch / query subscriptions.
* We therefore fall back to a hard navigation (window.location.href),
* which discards all client state the same way the legacy form-POST
* logout used to.
*/
const logout = useCallback(async () => {
// Immediately clear local state to prevent UI flicker
@@ -97,14 +114,23 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
return;
}
let logoutFailed = false;
try {
await fetch("/api/v1/auth/logout", {
const res = await fetch("/api/v1/auth/logout", {
method: "POST",
credentials: "include",
});
if (!res.ok) logoutFailed = true;
} catch (err) {
console.error("Logout request failed:", err);
// Still redirect even if logout request fails
logoutFailed = true;
}
if (logoutFailed && typeof window !== "undefined") {
// Hard navigation ensures every in-flight subscription is torn down,
// matching the legacy form-POST logout behaviour during a gateway outage.
window.location.href = "/";
return;
}
// Redirect to home page
@@ -140,6 +166,7 @@ export function AuthProvider({ children, initialUser }: AuthProviderProps) {
isLoading,
logout,
refreshUser,
applyUser,
};
return <AuthContext.Provider value={value}>{children}</AuthContext.Provider>;
+2
View File
@@ -242,6 +242,8 @@ export const enUS: Translations = {
contactUs: "Contact us",
about: "About DeerFlow",
logout: "Log out",
gatewayUnavailable: "Gateway is temporarily unavailable.",
gatewayUnavailableRetrying: "Retrying in the background…",
},
// Conversation
+2
View File
@@ -173,6 +173,8 @@ export interface Translations {
contactUs: string;
about: string;
logout: string;
gatewayUnavailable: string;
gatewayUnavailableRetrying: string;
};
// Conversation
+2
View File
@@ -230,6 +230,8 @@ export const zhCN: Translations = {
contactUs: "联系我们",
about: "关于 DeerFlow",
logout: "退出登录",
gatewayUnavailable: "网关暂时不可用。",
gatewayUnavailableRetrying: "正在后台重试…",
},
// Conversation
+95 -20
View File
@@ -9,7 +9,7 @@ import {
useQuery,
useQueryClient,
} from "@tanstack/react-query";
import { useCallback, useEffect, useRef, useState } from "react";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { toast } from "sonner";
import type { PromptInputMessage } from "@/components/ai-elements/prompt-input";
@@ -46,6 +46,7 @@ export type ToolEndEvent = {
export type ThreadStreamOptions = {
threadId?: string | null | undefined;
displayThreadId?: string | null | undefined;
context: LocalSettings["context"];
isMock?: boolean;
onSend?: (threadId: string) => void;
@@ -58,6 +59,13 @@ type SendMessageOptions = {
additionalKwargs?: Record<string, unknown>;
};
const EMPTY_THREAD_VALUES: AgentThreadState = {
title: "",
messages: [],
artifacts: [],
todos: [],
};
function isNonEmptyString(value: string | undefined): value is string {
return typeof value === "string" && value.length > 0;
}
@@ -393,6 +401,7 @@ function getStreamErrorMessage(error: unknown): string {
export function useThreadStream({
threadId,
displayThreadId,
context,
isMock,
onSend,
@@ -401,6 +410,18 @@ export function useThreadStream({
onToolEnd,
}: ThreadStreamOptions) {
const { t } = useI18n();
const currentViewThreadId = displayThreadId ?? threadId ?? null;
const currentViewThreadIdRef = useRef(currentViewThreadId);
currentViewThreadIdRef.current = currentViewThreadId;
// Optimistic messages shown before the server stream responds.
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
const [optimisticThreadId, setOptimisticThreadId] = useState<string | null>(
null,
);
const [liveMessagesThreadId, setLiveMessagesThreadId] = useState<
string | null
>(null);
const [isUploading, setIsUploading] = useState(false);
// Track the thread ID that is currently streaming to handle thread changes during streaming
const [onStreamThreadId, setOnStreamThreadId] = useState(() => threadId);
// Ref to track current thread ID across async callbacks without causing re-renders,
@@ -442,6 +463,28 @@ export function useThreadStream({
const handleStreamStart = useCallback((_threadId: string, _runId: string) => {
threadIdRef.current = _threadId;
setOptimisticThreadId((currentOptimisticThreadId) => {
const currentView = currentViewThreadIdRef.current;
if (
currentOptimisticThreadId &&
(currentOptimisticThreadId === currentView ||
currentOptimisticThreadId === _threadId)
) {
return _threadId;
}
return currentOptimisticThreadId;
});
setLiveMessagesThreadId((currentLiveMessagesThreadId) => {
const currentView = currentViewThreadIdRef.current;
if (
currentLiveMessagesThreadId &&
(currentLiveMessagesThreadId === currentView ||
currentLiveMessagesThreadId === _threadId)
) {
return _threadId;
}
return currentLiveMessagesThreadId;
});
if (!startedRef.current) {
listeners.current.onStart?.(_threadId, _runId);
startedRef.current = true;
@@ -613,6 +656,8 @@ export function useThreadStream({
},
onError(error) {
setOptimisticMessages([]);
setOptimisticThreadId(null);
setLiveMessagesThreadId(null);
toast.error(getStreamErrorMessage(error));
pendingUsageBaselineMessageIdsRef.current = new Set(
messagesRef.current
@@ -644,10 +689,17 @@ export function useThreadStream({
},
});
// Optimistic messages shown before the server stream responds
const [optimisticMessages, setOptimisticMessages] = useState<Message[]>([]);
const [isUploading, setIsUploading] = useState(false);
const humanMessageCount = thread.messages.filter(
const hasVisibleStreamState =
Boolean(threadId) || liveMessagesThreadId === currentViewThreadId;
const persistedMessages = useMemo(
() => (hasVisibleStreamState ? thread.messages : []),
[hasVisibleStreamState, thread.messages],
);
const visibleHistory = useMemo(
() => (threadId ? history : []),
[history, threadId],
);
const humanMessageCount = persistedMessages.filter(
(m) => m.type === "human",
).length;
const latestMessageCountsRef = useRef({ humanMessageCount });
@@ -668,15 +720,23 @@ export function useThreadStream({
useEffect(() => {
startedRef.current = false;
sendInFlightRef.current = false;
pendingUsageBaselineMessageIdsRef.current = new Set(
messagesRef.current
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
messagesRef.current = [];
summarizedRef.current = new Set<string>();
pendingUsageBaselineMessageIdsRef.current = new Set();
prevHumanMsgCountRef.current =
latestMessageCountsRef.current.humanMessageCount;
}, [threadId]);
useEffect(() => {
if (optimisticThreadId && optimisticThreadId !== currentViewThreadId) {
setOptimisticMessages([]);
setOptimisticThreadId(null);
}
if (liveMessagesThreadId && liveMessagesThreadId !== currentViewThreadId) {
setLiveMessagesThreadId(null);
}
}, [currentViewThreadId, liveMessagesThreadId, optimisticThreadId]);
// When streaming starts without a baseline (e.g. reconnection, run started
// from another client, or page reload mid-stream), snapshot the current
// messages so only *new* messages are treated as "pending" for token usage.
@@ -686,12 +746,12 @@ export function useThreadStream({
pendingUsageBaselineMessageIdsRef.current.size === 0
) {
pendingUsageBaselineMessageIdsRef.current = new Set(
thread.messages
persistedMessages
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
}
}, [thread.isLoading, thread.messages]);
}, [persistedMessages, thread.isLoading]);
// Clear optimistic when server messages arrive.
// For messages with a human optimistic message, wait until the server's
@@ -707,6 +767,7 @@ export function useThreadStream({
if (!hasHumanOptimistic || newHumanMsgArrived) {
setOptimisticMessages([]);
setOptimisticThreadId(null);
}
}, [hasHumanOptimistic, humanMessageCount, optimisticMessageCount]);
@@ -728,7 +789,7 @@ export function useThreadStream({
// messages so we can wait for the server's copy of the user input.
prevHumanMsgCountRef.current = humanMessageCount;
pendingUsageBaselineMessageIdsRef.current = new Set(
thread.messages
persistedMessages
.map(messageIdentity)
.filter((id): id is string => Boolean(id)),
);
@@ -767,6 +828,8 @@ export function useThreadStream({
additional_kwargs: { element: "task" },
});
}
setOptimisticThreadId(threadId);
setLiveMessagesThreadId(threadId);
setOptimisticMessages(newOptimistic);
listeners.current.onSend?.(threadId);
@@ -832,6 +895,8 @@ export function useThreadStream({
: "Failed to upload files.";
toast.error(errorMessage);
setOptimisticMessages([]);
setOptimisticThreadId(null);
setLiveMessagesThreadId(null);
throw error;
} finally {
setIsUploading(false);
@@ -900,35 +965,44 @@ export function useThreadStream({
});
} catch (error) {
setOptimisticMessages([]);
setOptimisticThreadId(null);
setLiveMessagesThreadId(null);
setIsUploading(false);
throw error;
} finally {
sendInFlightRef.current = false;
}
},
[thread, t.uploads.uploadingFiles, context, queryClient, humanMessageCount],
[
thread,
t.uploads.uploadingFiles,
context,
queryClient,
humanMessageCount,
persistedMessages,
],
);
// Cache the latest thread messages in a ref to compare against incoming history messages for deduplication,
// and to allow access to the full message list in onUpdateEvent without causing re-renders.
if (thread.messages.length >= messagesRef.current.length) {
messagesRef.current = thread.messages;
if (persistedMessages.length >= messagesRef.current.length) {
messagesRef.current = persistedMessages;
}
const visibleOptimisticMessages = getVisibleOptimisticMessages(
optimisticMessages,
optimisticThreadId === currentViewThreadId ? optimisticMessages : [],
prevHumanMsgCountRef.current,
humanMessageCount,
);
const mergedMessages = mergeMessages(
history,
thread.messages,
visibleHistory,
persistedMessages,
visibleOptimisticMessages,
);
const pendingUsageMessages = thread.isLoading
? getMessagesAfterBaseline(
thread.messages,
persistedMessages,
pendingUsageBaselineMessageIdsRef.current,
)
: [];
@@ -937,6 +1011,7 @@ export function useThreadStream({
// History messages may overlap with thread.messages; thread.messages take precedence
const mergedThread = {
...thread,
values: hasVisibleStreamState ? thread.values : EMPTY_THREAD_VALUES,
messages: mergedMessages,
} as typeof thread;
+107 -1
View File
@@ -1,4 +1,4 @@
import { expect, test } from "@playwright/test";
import { expect, test, type Route } from "@playwright/test";
import {
mockLangGraphAPI,
@@ -19,6 +19,9 @@ const THREADS = [
},
];
const DEMO_THREAD_ID = "7cfa5f8f-a2f8-47ad-acbd-da7137baf990";
const SVG_PROMPT_THREAD_ID = "00000000-0000-0000-0000-000000000777";
const SVG_PROMPT_MARKER = "LEAK-STRICT-SVG-PROMPT-SHOULD-DISAPPEAR";
const OPTIMISTIC_PROMPT_MARKER = "LEAK-OPTIMISTIC-SVG-PROMPT-SHOULD-DISAPPEAR";
test.describe("Thread history", () => {
test("sidebar shows existing threads", async ({ page }) => {
@@ -62,6 +65,109 @@ test.describe("Thread history", () => {
).toBeVisible({ timeout: 15_000 });
});
test("new chat does not show previous thread messages after client-side navigation", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: SVG_PROMPT_THREAD_ID,
title: "SVG artifact prompt",
updated_at: "2025-06-03T12:00:00Z",
messages: [
{
type: "human",
id: "msg-human-svg-prompt",
content: [
{
type: "text",
text: `请严格执行:\n1. 使用 write_file 创建 /mnt/user-data/outputs/shared.svg,内容包含 ${SVG_PROMPT_MARKER}\n2. 最终回复只输出 Markdown 图片。`,
},
],
},
{
type: "ai",
id: "msg-ai-svg-prompt",
content: "![shared artifact](/mnt/user-data/outputs/shared.svg)",
},
],
},
],
});
await page.goto(`/workspace/chats/${SVG_PROMPT_THREAD_ID}`);
await expect(page.getByText(SVG_PROMPT_MARKER)).toBeVisible({
timeout: 15_000,
});
await page.getByRole("link", { name: /new chat/i }).click();
await page.waitForURL("**/workspace/chats/new");
await expect(page.getByText(SVG_PROMPT_MARKER)).toBeHidden();
await expect(page.getByPlaceholder(/how can i assist you/i)).toBeVisible();
});
test("new chat does not show previous optimistic user message after client-side navigation", async ({
page,
}) => {
mockLangGraphAPI(page, {
threads: [
{
thread_id: MOCK_THREAD_ID_2,
title: "Destination conversation",
updated_at: "2025-06-04T12:00:00Z",
},
],
});
const metadataOnlyStream = async (route: Route) => {
const body = [
{
event: "metadata",
data: {
run_id: "00000000-0000-0000-0000-000000000778",
thread_id: MOCK_THREAD_ID,
},
},
{ event: "end", data: {} },
]
.map((e) => `event: ${e.event}\ndata: ${JSON.stringify(e.data)}\n\n`)
.join("");
await route.fulfill({
status: 200,
contentType: "text/event-stream",
body,
});
};
await page.route("**/api/langgraph/runs/stream", metadataOnlyStream);
await page.route(
"**/api/langgraph/threads/*/runs/stream",
metadataOnlyStream,
);
await page.goto("/workspace/chats/new");
const textarea = page.getByPlaceholder(/how can i assist you/i);
await expect(textarea).toBeVisible({ timeout: 15_000 });
await textarea.fill(
`请严格执行:使用 write_file 创建 shared.svg,内容包含 ${OPTIMISTIC_PROMPT_MARKER}`,
);
await textarea.press("Enter");
await expect(page.getByText(OPTIMISTIC_PROMPT_MARKER)).toBeVisible();
await page.getByText("Destination conversation").click();
await page.waitForURL(`**/workspace/chats/${MOCK_THREAD_ID_2}`);
await expect(page.getByText(OPTIMISTIC_PROMPT_MARKER)).toHaveCount(0);
await page.getByRole("link", { name: /new chat/i }).click();
await page.waitForURL("**/workspace/chats/new");
await expect(page.getByText(OPTIMISTIC_PROMPT_MARKER)).toHaveCount(0);
await expect(page.getByPlaceholder(/how can i assist you/i)).toBeVisible();
});
test("mock thread does not load real backend run history", async ({
page,
}) => {
@@ -0,0 +1,185 @@
import { describe, expect, it } from "vitest";
import {
OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD,
OFFLINE_BANNER_RETRY_INTERVAL_MS,
classifyProbe,
decideProbeAction,
shouldShowOfflineBanner,
} from "@/components/workspace/gateway-offline-banner-helpers";
import type { User } from "@/core/auth/types";
const fakeUser: User = {
id: "u1",
email: "user@example.com",
system_role: "user",
needs_setup: false,
};
function makeResponse(status: number, ok = status >= 200 && status < 300) {
return { status, ok } as Response;
}
describe("shouldShowOfflineBanner", () => {
it("hides when the gateway is reachable", () => {
expect(shouldShowOfflineBanner(null, false)).toBe(false);
expect(shouldShowOfflineBanner(fakeUser, false)).toBe(false);
});
it("shows when the gateway is unavailable and the client has no user yet", () => {
expect(shouldShowOfflineBanner(null, true)).toBe(true);
});
it("hides as soon as the client recovers an authenticated user", () => {
expect(shouldShowOfflineBanner(fakeUser, true)).toBe(false);
});
});
describe("OFFLINE_BANNER_RETRY_INTERVAL_MS", () => {
it("is a positive finite number", () => {
expect(OFFLINE_BANNER_RETRY_INTERVAL_MS).toBeGreaterThan(0);
expect(Number.isFinite(OFFLINE_BANNER_RETRY_INTERVAL_MS)).toBe(true);
});
});
describe("OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD", () => {
it("is an integer greater than 1 so a single transient 401 cannot expire the session", () => {
expect(Number.isInteger(OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD)).toBe(true);
expect(OFFLINE_BANNER_AUTH_FAILURE_THRESHOLD).toBeGreaterThan(1);
});
});
describe("classifyProbe", () => {
it("returns transient when fetch errored", () => {
expect(classifyProbe(null, true)).toEqual({ kind: "transient" });
});
it("returns transient when response is null with no error flag", () => {
expect(classifyProbe(null, false)).toEqual({ kind: "transient" });
});
it("returns ok with parsed user for a 2xx response with body", () => {
expect(classifyProbe(makeResponse(200), false, fakeUser)).toEqual({
kind: "ok",
user: fakeUser,
});
});
it("returns transient for a 2xx response whose body failed to parse", () => {
// Defensive: a 200 with malformed JSON / schema mismatch should not be
// treated as 'ok' because the caller has no user to apply.
expect(classifyProbe(makeResponse(200), false, null)).toEqual({
kind: "transient",
});
});
it("returns unauthorized for a 401 response", () => {
expect(classifyProbe(makeResponse(401), false)).toEqual({
kind: "unauthorized",
});
});
it("returns transient for 5xx responses", () => {
expect(classifyProbe(makeResponse(503), false)).toEqual({
kind: "transient",
});
expect(classifyProbe(makeResponse(500), false)).toEqual({
kind: "transient",
});
});
it("returns transient for unexpected non-401 4xx responses", () => {
expect(classifyProbe(makeResponse(429), false)).toEqual({
kind: "transient",
});
});
});
describe("decideProbeAction", () => {
it("returns apply-user with the body on a 2xx response", () => {
expect(decideProbeAction(0, { kind: "ok", user: fakeUser })).toEqual({
type: "apply-user",
user: fakeUser,
});
// Even if we'd accumulated some 401s, a 200 wins immediately.
expect(decideProbeAction(2, { kind: "ok", user: fakeUser })).toEqual({
type: "apply-user",
user: fakeUser,
});
});
it("treats a single 401 as transient noise and only bumps the counter", () => {
expect(decideProbeAction(0, { kind: "unauthorized" })).toEqual({
type: "noop",
nextFailureCount: 1,
});
});
it("treats consecutive 401s below the threshold as still transient", () => {
expect(decideProbeAction(1, { kind: "unauthorized" })).toEqual({
type: "noop",
nextFailureCount: 2,
});
});
it("delegates to refreshUser as 'session-expired' once 401s reach the threshold", () => {
expect(decideProbeAction(2, { kind: "unauthorized" })).toEqual({
type: "delegate-refresh",
reason: "session-expired",
});
});
it("honours a custom threshold (parameterised for safer tests)", () => {
expect(decideProbeAction(0, { kind: "unauthorized" }, 2)).toEqual({
type: "noop",
nextFailureCount: 1,
});
expect(decideProbeAction(1, { kind: "unauthorized" }, 2)).toEqual({
type: "delegate-refresh",
reason: "session-expired",
});
});
it("decrements (not resets) the auth-failure streak on a transient outcome", () => {
// Was 2 → 1, so a flapping gateway (401↔5xx) still converges on the
// threshold instead of indefinitely masking session expiry.
expect(decideProbeAction(2, { kind: "transient" })).toEqual({
type: "noop",
nextFailureCount: 1,
});
// Floored at 0; never goes negative.
expect(decideProbeAction(0, { kind: "transient" })).toEqual({
type: "noop",
nextFailureCount: 0,
});
expect(decideProbeAction(1, { kind: "transient" })).toEqual({
type: "noop",
nextFailureCount: 0,
});
});
it("convergence: alternating 401/transient still triggers session-expired", () => {
// Simulate the exact scenario from #3493 CR: flapping gateway alternates
// 401 (session gone) and 503 (overloaded). With decrement-by-1, the
// counter still nets +1 per 401/transient pair and reaches threshold.
let count = 0;
const seq: Array<"unauthorized" | "transient"> = [
"unauthorized", // count -> 1
"transient", // count -> 0
"unauthorized", // count -> 1
"unauthorized", // count -> 2
"transient", // count -> 1
"unauthorized", // count -> 2
];
for (const kind of seq) {
const action = decideProbeAction(count, { kind });
expect(action.type).toBe("noop");
if (action.type === "noop") count = action.nextFailureCount;
}
// Next 401 should trip the wire (2 -> 3 == threshold).
expect(decideProbeAction(count, { kind: "unauthorized" })).toEqual({
type: "delegate-refresh",
reason: "session-expired",
});
});
});
@@ -106,3 +106,65 @@ describe("getServerSideUser", () => {
expect(isAuthDisabledMode()).toBe(false);
});
});
describe("getServerSideUser — gateway_unavailable contract (issue #3493)", () => {
let saved: EnvSnapshot;
beforeEach(() => {
saved = snapshotEnv();
setEnv("DEER_FLOW_AUTH_DISABLED", undefined);
setEnv("NEXT_PUBLIC_STATIC_WEBSITE_ONLY", undefined);
});
afterEach(() => {
restoreEnv(saved);
vi.unstubAllGlobals();
vi.doUnmock("next/headers");
});
test("returns gateway_unavailable when /auth/me fetch rejects (e.g. AbortError)", async () => {
vi.doMock("next/headers", () => ({
cookies: vi.fn(async () => ({
get: (name: string) =>
name === "access_token" ? { value: "stub-token" } : undefined,
})),
}));
const abortErr = new DOMException("Aborted", "AbortError");
vi.stubGlobal(
"fetch",
vi.fn(() => Promise.reject(abortErr)),
);
const { getServerSideUser } = await loadFreshServerAuth();
await expect(getServerSideUser()).resolves.toEqual({
tag: "gateway_unavailable",
});
});
test("returns gateway_unavailable when /auth/me responds with a 5xx", async () => {
vi.doMock("next/headers", () => ({
cookies: vi.fn(async () => ({
get: (name: string) =>
name === "access_token" ? { value: "stub-token" } : undefined,
})),
}));
vi.stubGlobal(
"fetch",
vi.fn(() =>
Promise.resolve(
new Response("upstream error", {
status: 503,
statusText: "Service Unavailable",
}),
),
),
);
const { getServerSideUser } = await loadFreshServerAuth();
await expect(getServerSideUser()).resolves.toEqual({
tag: "gateway_unavailable",
});
});
});