From 4f564370308d49fe7e73a0ab03bd73d8ccf14ca7 Mon Sep 17 00:00:00 2001 From: taohe Date: Thu, 11 Jun 2026 16:51:04 +0800 Subject: [PATCH] Show IM channel source on threads --- backend/app/channels/manager.py | 36 ++++++++++-- backend/tests/test_channels.py | 34 ++++++++++- frontend/src/app/workspace/chats/page.tsx | 46 ++++++++++----- .../components/workspace/recent-chat-list.tsx | 26 ++++++++- .../workspace/thread-channel-source.tsx | 56 +++++++++++++++++++ frontend/src/core/threads/utils.ts | 45 +++++++++++++++ frontend/tests/e2e/thread-history.spec.ts | 41 ++++++++++++++ frontend/tests/e2e/utils/mock-api.ts | 6 +- .../tests/unit/core/threads/utils.test.ts | 39 ++++++++++++- 9 files changed, 303 insertions(+), 26 deletions(-) create mode 100644 frontend/src/components/workspace/thread-channel-source.tsx diff --git a/backend/app/channels/manager.py b/backend/app/channels/manager.py index be8a145bc..1b2a6bbc8 100644 --- a/backend/app/channels/manager.py +++ b/backend/app/channels/manager.py @@ -274,6 +274,22 @@ def _response_metadata(base_metadata: dict[str, Any], *, pending_clarification: return metadata +def _thread_channel_metadata(msg: InboundMessage) -> dict[str, Any]: + channel_source: dict[str, Any] = { + "type": "im_channel", + "provider": msg.channel_name, + "chat_id": msg.chat_id, + } + if msg.topic_id: + channel_source["topic_id"] = msg.topic_id + if msg.thread_ts: + channel_source["thread_ts"] = msg.thread_ts + if msg.connection_id: + channel_source["connection_id"] = msg.connection_id + + return {"channel_source": channel_source} + + def _extract_text_content(content: Any) -> str: """Extract text from a streaming payload content field.""" if isinstance(content, str): @@ -943,16 +959,27 @@ class ChannelManager: async def _create_thread(self, client, msg: InboundMessage) -> str: """Create a new thread through Gateway and store the mapping.""" + metadata = _thread_channel_metadata(msg) owner_headers = _owner_headers(msg) if owner_headers: - thread = await client.threads.create(headers=owner_headers) + thread = await client.threads.create(metadata=metadata, headers=owner_headers) else: - thread = await client.threads.create() + thread = await client.threads.create(metadata=metadata) thread_id = thread["thread_id"] await self._store_thread_id(msg, thread_id) logger.info("[Manager] new thread created through Gateway: thread_id=%s for chat_id=%s topic_id=%s", thread_id, msg.chat_id, msg.topic_id) return thread_id + async def _update_thread_channel_metadata(self, client, msg: InboundMessage, thread_id: str) -> None: + """Best-effort source metadata backfill for existing IM-created threads.""" + update_kwargs: dict[str, Any] = {"metadata": _thread_channel_metadata(msg)} + if owner_headers := _owner_headers(msg): + update_kwargs["headers"] = owner_headers + try: + await client.threads.update(thread_id, **update_kwargs) + except Exception: + logger.debug("[Manager] failed to update channel metadata for thread_id=%s", thread_id, exc_info=True) + async def _handle_chat(self, msg: InboundMessage, extra_context: dict[str, Any] | None = None) -> None: client = self._get_client() @@ -962,6 +989,7 @@ class ChannelManager: thread_id = await self._lookup_thread_id(msg) if thread_id: logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id) + await self._update_thread_channel_metadata(client, msg, thread_id) # No existing thread found — create a new one if thread_id is None: @@ -1202,9 +1230,7 @@ class ChannelManager: if reply is None and command == "new": # Create a new thread through Gateway client = self._get_client() - thread = await client.threads.create() - new_thread_id = thread["thread_id"] - await self._store_thread_id(msg, new_thread_id) + await self._create_thread(client, msg) reply = "New conversation started." elif reply is None and command == "status": thread_id = await self._lookup_thread_id(msg) diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index b3616fb01..c72c46fad 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -487,6 +487,7 @@ def _make_mock_langgraph_client(thread_id="test-thread-123", run_result=None): # threads.create() returns a Thread-like dict mock_client.threads.create = AsyncMock(return_value={"thread_id": thread_id}) + mock_client.threads.update = AsyncMock(return_value={"thread_id": thread_id}) # threads.get() returns thread info (succeeds by default) mock_client.threads.get = AsyncMock(return_value={"thread_id": thread_id}) @@ -667,16 +668,34 @@ class TestChannelManager: await manager.start() - inbound = InboundMessage(channel_name="test", chat_id="chat1", user_id="user1", text="hi") + inbound = InboundMessage( + channel_name="test", + chat_id="chat1", + user_id="user1", + text="hi", + topic_id="topic1", + thread_ts="msg1", + connection_id="conn1", + ) await bus.publish_inbound(inbound) await _wait_for(lambda: len(outbound_received) >= 1) await manager.stop() # Thread should be created through Gateway mock_client.threads.create.assert_called_once() + assert mock_client.threads.create.call_args.kwargs["metadata"] == { + "channel_source": { + "type": "im_channel", + "provider": "test", + "chat_id": "chat1", + "topic_id": "topic1", + "thread_ts": "msg1", + "connection_id": "conn1", + } + } # Thread ID should be stored - thread_id = store.get_thread_id("test", "chat1") + thread_id = store.get_thread_id("test", "chat1", topic_id="topic1") assert thread_id == "test-thread-123" # runs.wait should be called with the thread_id @@ -2003,6 +2022,17 @@ class TestChannelManager: # threads.create should be called only ONCE (second message reuses the thread) mock_client.threads.create.assert_called_once() + mock_client.threads.update.assert_called_once_with( + "topic-thread-1", + metadata={ + "channel_source": { + "type": "im_channel", + "provider": "test", + "chat_id": "chat1", + "topic_id": "topic-root-123", + } + }, + ) # Both runs.wait calls should use the same thread_id assert mock_client.runs.wait.call_count == 2 diff --git a/frontend/src/app/workspace/chats/page.tsx b/frontend/src/app/workspace/chats/page.tsx index 43d661225..a3cea55ed 100644 --- a/frontend/src/app/workspace/chats/page.tsx +++ b/frontend/src/app/workspace/chats/page.tsx @@ -5,6 +5,10 @@ import { useEffect, useMemo, useState } from "react"; import { Input } from "@/components/ui/input"; import { ScrollArea } from "@/components/ui/scroll-area"; +import { + ThreadChannelBadge, + ThreadChannelIcon, +} from "@/components/workspace/thread-channel-source"; import { WorkspaceBody, WorkspaceContainer, @@ -12,7 +16,11 @@ import { } from "@/components/workspace/workspace-container"; import { useI18n } from "@/core/i18n/hooks"; import { useThreads } from "@/core/threads/hooks"; -import { pathOfThread, titleOfThread } from "@/core/threads/utils"; +import { + channelSourceOfThread, + pathOfThread, + titleOfThread, +} from "@/core/threads/utils"; import { formatTimeAgo } from "@/core/utils/datetime"; export default function ChatsPage() { @@ -47,20 +55,30 @@ export default function ChatsPage() {
- {filteredThreads?.map((thread) => ( - -
-
-
{titleOfThread(thread)}
-
- {thread.updated_at && ( -
- {formatTimeAgo(thread.updated_at)} + {filteredThreads?.map((thread) => { + const channelSource = channelSourceOfThread(thread); + return ( + +
+
+ +
+ {titleOfThread(thread)} +
+
- )} -
- - ))} + {thread.updated_at && ( +
+ {formatTimeAgo(thread.updated_at)} +
+ )} +
+ + ); + })}
diff --git a/frontend/src/components/workspace/recent-chat-list.tsx b/frontend/src/components/workspace/recent-chat-list.tsx index f110506e4..42b374dd0 100644 --- a/frontend/src/components/workspace/recent-chat-list.tsx +++ b/frontend/src/components/workspace/recent-chat-list.tsx @@ -55,10 +55,16 @@ import { useThreads, } from "@/core/threads/hooks"; import type { AgentThread, AgentThreadState } from "@/core/threads/types"; -import { pathOfThread, titleOfThread } from "@/core/threads/utils"; +import { + channelSourceOfThread, + pathOfThread, + titleOfThread, +} from "@/core/threads/utils"; import { env } from "@/env"; import { isIMEComposing } from "@/lib/ime"; +import { ThreadChannelIcon } from "./thread-channel-source"; + export function RecentChatList() { const { t } = useI18n(); const router = useRouter(); @@ -182,6 +188,7 @@ export function RecentChatList() {
{threads.map((thread) => { const isActive = pathOfThread(thread) === pathname; + const channelSource = channelSourceOfThread(thread); return (
- {titleOfThread(thread)} + + + {titleOfThread(thread)} + + {channelSource && ( + + + {channelSource.label} + + + )} {env.NEXT_PUBLIC_STATIC_WEBSITE_ONLY !== "true" && ( diff --git a/frontend/src/components/workspace/thread-channel-source.tsx b/frontend/src/components/workspace/thread-channel-source.tsx new file mode 100644 index 000000000..f6b44c8bf --- /dev/null +++ b/frontend/src/components/workspace/thread-channel-source.tsx @@ -0,0 +1,56 @@ +"use client"; + +import { ChannelProviderIcon } from "@/components/workspace/channels/channel-provider-icon"; +import type { ChannelThreadSource } from "@/core/threads/utils"; +import { cn } from "@/lib/utils"; + +type ThreadChannelIconProps = { + source: ChannelThreadSource | null; + className?: string; +}; + +export function ThreadChannelIcon({ + source, + className, +}: ThreadChannelIconProps) { + if (!source) { + return null; + } + + return ( + + + + ); +} + +type ThreadChannelBadgeProps = { + source: ChannelThreadSource | null; + className?: string; +}; + +export function ThreadChannelBadge({ + source, + className, +}: ThreadChannelBadgeProps) { + if (!source) { + return null; + } + + return ( + + + {source.label} + + ); +} diff --git a/frontend/src/core/threads/utils.ts b/frontend/src/core/threads/utils.ts index 901f1d07f..7a5b048c0 100644 --- a/frontend/src/core/threads/utils.ts +++ b/frontend/src/core/threads/utils.ts @@ -2,6 +2,12 @@ import type { Message } from "@langchain/langgraph-sdk"; import type { AgentThread, AgentThreadContext } from "./types"; +export type ChannelThreadSource = { + type: "im_channel"; + provider: string; + label: string; +}; + type ThreadRouteTarget = | string | { @@ -49,3 +55,42 @@ export function textOfMessage(message: Message) { export function titleOfThread(thread: AgentThread) { return thread.values?.title ?? "Untitled"; } + +const CHANNEL_PROVIDER_LABELS: Record = { + dingtalk: "DingTalk", + discord: "Discord", + feishu: "Feishu", + slack: "Slack", + telegram: "Telegram", + wechat: "WeChat", + wecom: "WeCom", +}; + +function labelOfChannelProvider(provider: string) { + return CHANNEL_PROVIDER_LABELS[provider] ?? provider; +} + +export function channelSourceOfThread( + thread: Pick, +): ChannelThreadSource | null { + const source = thread.metadata?.channel_source; + if (!source || typeof source !== "object" || Array.isArray(source)) { + return null; + } + + if (Reflect.get(source, "type") !== "im_channel") { + return null; + } + + const provider = Reflect.get(source, "provider"); + if (typeof provider !== "string" || provider.trim().length === 0) { + return null; + } + + const normalizedProvider = provider.trim().toLowerCase(); + return { + type: "im_channel", + provider: normalizedProvider, + label: labelOfChannelProvider(normalizedProvider), + }; +} diff --git a/frontend/tests/e2e/thread-history.spec.ts b/frontend/tests/e2e/thread-history.spec.ts index 9476ca4ab..e56aa52c2 100644 --- a/frontend/tests/e2e/thread-history.spec.ts +++ b/frontend/tests/e2e/thread-history.spec.ts @@ -152,4 +152,45 @@ test.describe("Thread history", () => { }); await expect(main.getByText("Second conversation")).toBeVisible(); }); + + test("IM channel threads show their source in thread lists", async ({ + page, + }) => { + mockLangGraphAPI(page, { + threads: [ + { + thread_id: MOCK_THREAD_ID, + title: "Feishu conversation", + updated_at: "2025-06-03T12:00:00Z", + metadata: { + channel_source: { + type: "im_channel", + provider: "feishu", + chat_id: "oc_mock", + }, + }, + }, + ], + }); + + await page.goto("/workspace/chats/new"); + + const sidebarThread = page.locator( + `a[href='/workspace/chats/${MOCK_THREAD_ID}']`, + ); + await expect(sidebarThread).toBeVisible({ timeout: 15_000 }); + await expect( + sidebarThread.getByLabel("Feishu channel"), + ).toBeVisible(); + + await page.goto("/workspace/chats"); + + const mainThread = page.locator("main").locator( + `a[href='/workspace/chats/${MOCK_THREAD_ID}']`, + ); + await expect(mainThread.getByText("Feishu conversation")).toBeVisible({ + timeout: 15_000, + }); + await expect(mainThread.getByText("Feishu", { exact: true })).toBeVisible(); + }); }); diff --git a/frontend/tests/e2e/utils/mock-api.ts b/frontend/tests/e2e/utils/mock-api.ts index 888b066b0..1fbe3f348 100644 --- a/frontend/tests/e2e/utils/mock-api.ts +++ b/frontend/tests/e2e/utils/mock-api.ts @@ -25,6 +25,7 @@ export type MockThread = { title?: string; updated_at?: string; agent_name?: string; + metadata?: Record; messages?: unknown[]; artifacts?: string[]; }; @@ -90,7 +91,10 @@ export function mockLangGraphAPI(page: Page, options?: MockAPIOptions) { thread_id: t.thread_id, created_at: "2025-01-01T00:00:00Z", updated_at: t.updated_at ?? "2025-01-01T00:00:00Z", - metadata: t.agent_name ? { agent_name: t.agent_name } : {}, + metadata: { + ...(t.metadata ?? {}), + ...(t.agent_name ? { agent_name: t.agent_name } : {}), + }, status: "idle", values: { title: t.title ?? "Untitled" }, })); diff --git a/frontend/tests/unit/core/threads/utils.test.ts b/frontend/tests/unit/core/threads/utils.test.ts index c138f3d96..bc600fa3b 100644 --- a/frontend/tests/unit/core/threads/utils.test.ts +++ b/frontend/tests/unit/core/threads/utils.test.ts @@ -1,6 +1,6 @@ import { expect, test } from "vitest"; -import { pathOfThread } from "@/core/threads/utils"; +import { channelSourceOfThread, pathOfThread } from "@/core/threads/utils"; test("uses standard chat route when thread has no agent context", () => { expect(pathOfThread("thread-123")).toBe("/workspace/chats/thread-123"); @@ -44,3 +44,40 @@ test("prefers context.agent_name over metadata.agent_name", () => { }), ).toBe("/workspace/agents/from-context/chats/thread-789"); }); + +test("reads IM channel source metadata", () => { + expect( + channelSourceOfThread({ + metadata: { + channel_source: { + type: "im_channel", + provider: "feishu", + chat_id: "oc_123", + }, + }, + }), + ).toEqual({ + type: "im_channel", + provider: "feishu", + label: "Feishu", + }); +}); + +test("ignores threads without valid IM channel source metadata", () => { + expect(channelSourceOfThread({ metadata: {} })).toBeNull(); + expect( + channelSourceOfThread({ + metadata: { channel_source: { provider: "" } }, + }), + ).toBeNull(); + expect( + channelSourceOfThread({ + metadata: { + channel_source: { + type: "other", + provider: "feishu", + }, + }, + }), + ).toBeNull(); +});