Merge remote-tracking branch 'origin/main' into codex/im-channel-connections

# Conflicts:
#	backend/app/channels/discord.py
#	backend/app/channels/manager.py
#	backend/app/channels/slack.py
#	backend/app/channels/telegram.py
This commit is contained in:
taohe
2026-06-10 21:13:02 +08:00
85 changed files with 5575 additions and 253 deletions
+127 -13
View File
@@ -8,6 +8,7 @@ import mimetypes
import re
import time
from collections.abc import Awaitable, Callable, Mapping
from dataclasses import dataclass
from pathlib import Path
from typing import Any
@@ -26,8 +27,13 @@ from app.channels.message_bus import (
from app.channels.store import ChannelStore
from app.gateway.csrf_middleware import CSRF_COOKIE_NAME, CSRF_HEADER_NAME, generate_csrf_token
from app.gateway.internal_auth import create_internal_auth_headers
from deerflow.config.agents_config import load_agent_config
from deerflow.config.paths import make_safe_user_id
from deerflow.runtime.user_context import get_effective_user_id
from deerflow.skills.slash import parse_slash_skill_reference
from deerflow.skills.storage import get_or_new_skill_storage
from deerflow.skills.storage.skill_storage import SkillStorage
from deerflow.utils.messages import ORIGINAL_USER_CONTENT_KEY
logger = logging.getLogger(__name__)
@@ -124,6 +130,16 @@ class InvalidChannelSessionConfigError(ValueError):
"""Raised when IM channel session overrides contain invalid agent config."""
class SlashSkillCommandResolutionError(RuntimeError):
"""Raised when IM slash-skill command resolution cannot complete safely."""
@dataclass(frozen=True, slots=True)
class _SlashSkillCommandResolution:
route_to_chat: bool = False
failure_message: str | None = None
def _is_thread_busy_error(exc: BaseException | None) -> bool:
if exc is None:
return False
@@ -410,6 +426,46 @@ def _format_artifact_text(artifacts: list[str]) -> str:
_OUTPUTS_VIRTUAL_PREFIX = "/mnt/user-data/outputs/"
def _unknown_command_reply(command: str | None = None) -> str:
available = " | ".join(sorted(KNOWN_CHANNEL_COMMANDS))
if command:
return f"Unknown command: /{command}. Available commands: {available}"
return f"Unknown command. Available commands: {available}"
def _human_input_message(content: str, *, original_content: str | None = None) -> dict[str, Any]:
message: dict[str, Any] = {"role": "human", "content": content}
if original_content is not None and original_content != content:
message["additional_kwargs"] = {ORIGINAL_USER_CONTENT_KEY: original_content}
return message
def _resolve_slash_skill_command(
text: str,
available_skills: set[str] | None = None,
storage: SkillStorage | Callable[[], SkillStorage] | None = None,
) -> _SlashSkillCommandResolution | None:
reference = parse_slash_skill_reference(text)
if reference is None:
return None
try:
resolved_storage = storage() if callable(storage) else storage or get_or_new_skill_storage()
skills = resolved_storage.load_skills(enabled_only=False)
skill = next((candidate for candidate in skills if candidate.name == reference.name), None)
if skill is None:
return None
if not skill.enabled:
return _SlashSkillCommandResolution(failure_message=f"Skill `/{reference.name}` is installed but disabled. Enable it before using slash activation.")
if available_skills is not None and reference.name not in available_skills:
return _SlashSkillCommandResolution(failure_message=f"Skill `/{reference.name}` is not available for this agent.")
return _SlashSkillCommandResolution(route_to_chat=True)
except Exception as exc:
logger.exception("[Manager] failed to resolve slash skill command")
raise SlashSkillCommandResolutionError("Failed to resolve slash skill command. Please check the skill configuration.") from exc
def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]:
"""Resolve virtual artifact paths to host filesystem paths with metadata.
@@ -626,6 +682,7 @@ class ChannelManager:
self._channel_sessions = dict(channel_sessions or {})
self._connection_repo = connection_repo
self._client = None # lazy init — langgraph_sdk async client
self._skill_storage: SkillStorage | None = None
self._csrf_token = generate_csrf_token()
self._semaphore: asyncio.Semaphore | None = None
self._running = False
@@ -702,6 +759,21 @@ class ChannelManager:
return assistant_id, run_config, run_context
def _resolve_available_skill_names(self, msg: InboundMessage) -> set[str] | None:
thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) or ""
_, _, run_context = self._resolve_run_params(msg, thread_id)
if run_context.get("is_bootstrap"):
return {"bootstrap"}
agent_name = run_context.get("agent_name")
if not isinstance(agent_name, str) or not agent_name.strip():
return None
agent_config = load_agent_config(_normalize_custom_agent_name(agent_name))
if agent_config and agent_config.skills is not None:
return set(agent_config.skills)
return None
# -- LangGraph SDK client (lazy) ----------------------------------------
def _get_client(self):
@@ -719,6 +791,11 @@ class ChannelManager:
)
return self._client
def _get_skill_storage(self) -> SkillStorage:
if self._skill_storage is None:
self._skill_storage = get_or_new_skill_storage()
return self._skill_storage
# -- lifecycle ---------------------------------------------------------
async def start(self) -> None:
@@ -788,6 +865,14 @@ class ChannelManager:
exc,
)
await self._send_error(msg, str(exc))
except SlashSkillCommandResolutionError as exc:
logger.warning(
"Slash skill command resolution failed for %s (chat=%s): %s",
msg.channel_name,
msg.chat_id,
exc,
)
await self._send_error(msg, str(exc))
except Exception:
logger.exception(
"Error handling message from %s (chat=%s)",
@@ -865,9 +950,11 @@ class ChannelManager:
if extra_context:
run_context.update(extra_context)
original_text = msg.text
uploaded = await _ingest_inbound_files(thread_id, msg)
if uploaded:
msg.text = f"{_format_uploaded_files_block(uploaded)}\n\n{msg.text}".strip()
human_message = _human_input_message(msg.text, original_content=original_text)
if self._channel_supports_streaming(msg.channel_name):
await self._handle_streaming_chat(
@@ -877,6 +964,7 @@ class ChannelManager:
assistant_id,
run_config,
run_context,
human_message,
)
return
@@ -885,7 +973,7 @@ class ChannelManager:
result = await client.runs.wait(
thread_id,
assistant_id,
input={"messages": [{"role": "human", "content": msg.text}]},
input={"messages": [human_message]},
config=run_config,
context=run_context,
multitask_strategy="reject",
@@ -940,6 +1028,7 @@ class ChannelManager:
assistant_id: str,
run_config: dict[str, Any],
run_context: dict[str, Any],
human_message: dict[str, Any],
) -> None:
logger.info("[Manager] invoking runs.stream(thread_id=%s, text=%r)", thread_id, msg.text[:100])
@@ -955,7 +1044,7 @@ class ChannelManager:
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"messages": [{"role": "human", "content": msg.text}]},
input={"messages": [human_message]},
config=run_config,
context=run_context,
stream_mode=["messages-tuple", "values"],
@@ -1046,11 +1135,20 @@ class ChannelManager:
# -- command handling --------------------------------------------------
async def _handle_command(self, msg: InboundMessage) -> None:
text = msg.text.strip()
raw_text = msg.text
text = raw_text.strip()
parts = text.split(maxsplit=1)
command = parts[0].lower().lstrip("/")
reply: str | None = None
if not parts:
command = None
reply = _unknown_command_reply()
else:
command = parts[0].lower().removeprefix("/")
if command == "bootstrap":
if reply is None and not raw_text.startswith("/"):
reply = _unknown_command_reply(command)
if reply is None and command == "bootstrap":
from dataclasses import replace as _dc_replace
chat_text = parts[1] if len(parts) > 1 else "Initialize workspace"
@@ -1058,21 +1156,21 @@ class ChannelManager:
await self._handle_chat(chat_msg, extra_context={"is_bootstrap": True})
return
if command == "new":
if reply is None and command == "new":
# Create a new thread through Gateway
client = self._get_client()
thread = await client.threads.create()
new_thread_id = thread["thread_id"]
await self._store_thread_id(msg, new_thread_id)
reply = "New conversation started."
elif command == "status":
elif reply is None and command == "status":
thread_id = await self._lookup_thread_id(msg)
reply = f"Active thread: {thread_id}" if thread_id else "No active conversation."
elif command == "models":
elif reply is None and command == "models":
reply = await self._fetch_gateway("/api/models", "models")
elif command == "memory":
elif reply is None and command == "memory":
reply = await self._fetch_gateway("/api/memory", "memory")
elif command == "help":
elif reply is None and command == "help":
reply = (
"Available commands:\n"
"/bootstrap — Start a bootstrap session (enables agent setup)\n"
@@ -1080,11 +1178,27 @@ class ChannelManager:
"/status — Show current thread info\n"
"/models — List available models\n"
"/memory — Show memory status\n"
"/<skill-name> <task> — Activate an enabled skill for one turn\n"
"/help — Show this help"
)
else:
available = " | ".join(sorted(KNOWN_CHANNEL_COMMANDS))
reply = f"Unknown command: /{command}. Available commands: {available}"
elif reply is None:
slash_resolution = await asyncio.to_thread(
lambda: _resolve_slash_skill_command(
raw_text,
self._resolve_available_skill_names(msg),
self._get_skill_storage,
)
)
if slash_resolution and slash_resolution.failure_message:
reply = slash_resolution.failure_message
elif slash_resolution and slash_resolution.route_to_chat:
from dataclasses import replace as _dc_replace
chat_msg = _dc_replace(msg, msg_type=InboundMessageType.CHAT)
await self._handle_chat(chat_msg)
return
else:
reply = _unknown_command_reply(command)
outbound = OutboundMessage(
channel_name=msg.channel_name,