Align IM connections with local channels

This commit is contained in:
taohe
2026-06-10 22:16:47 +08:00
parent 92c185b90d
commit d06643d8a2
33 changed files with 588 additions and 1536 deletions
@@ -1,22 +1,16 @@
"""Browser-facing APIs for user-owned IM channel connections."""
"""Browser-facing APIs for user-owned IM channel bindings."""
from __future__ import annotations
import hashlib
import json
import secrets
from datetime import UTC, datetime, timedelta
from typing import Any
from urllib.parse import urlencode
from fastapi import APIRouter, HTTPException, Request, Response
from pydantic import BaseModel, Field
from starlette.responses import PlainTextResponse, RedirectResponse
from app.channels.message_bus import InboundMessage, InboundMessageType
from app.channels.providers import discord_connect, slack_connect
from deerflow.config.channel_connections_config import ChannelConnectionsConfig
from deerflow.persistence.channel_connections import ChannelConnectionRepository, ChannelCredentialCipher
from deerflow.persistence.channel_connections import ChannelConnectionRepository
from deerflow.persistence.engine import get_session_factory
router = APIRouter(prefix="/api/channels", tags=["channel-connections"])
@@ -59,14 +53,22 @@ class ChannelConnectionsResponse(BaseModel):
class ChannelConnectResponse(BaseModel):
provider: str
mode: str
url: str
url: str | None = None
code: str
instruction: str
expires_in: int
_PROVIDER_META: dict[str, dict[str, str]] = {
"telegram": {"display_name": "Telegram", "auth_mode": "deep_link"},
"slack": {"display_name": "Slack", "auth_mode": "oauth"},
"discord": {"display_name": "Discord", "auth_mode": "oauth_and_bot_install"},
"slack": {"display_name": "Slack", "auth_mode": "binding_code"},
"discord": {"display_name": "Discord", "auth_mode": "binding_code"},
}
_RUNTIME_REQUIREMENTS: dict[str, tuple[str, ...]] = {
"telegram": ("bot_token",),
"slack": ("bot_token", "app_token"),
"discord": ("bot_token",),
}
@@ -77,14 +79,28 @@ def _get_user_id(request: Request) -> str:
return str(user.id)
def _get_app_config():
from deerflow.config.app_config import get_app_config
return get_app_config()
def _get_channel_connections_config(request: Request) -> ChannelConnectionsConfig:
config = getattr(request.app.state, "channel_connections_config", None)
if isinstance(config, ChannelConnectionsConfig):
return config
return _get_app_config().channel_connections
from deerflow.config.app_config import get_app_config
return get_app_config().channel_connections
def _get_channels_config(request: Request) -> dict[str, Any]:
state_config = getattr(request.app.state, "channels_config", None)
if isinstance(state_config, dict):
return state_config
app_config = _get_app_config()
extra = app_config.model_extra or {}
channels_config = extra.get("channels")
return dict(channels_config) if isinstance(channels_config, dict) else {}
def _get_repository(request: Request, config: ChannelConnectionsConfig) -> ChannelConnectionRepository:
@@ -96,8 +112,7 @@ def _get_repository(request: Request, config: ChannelConnectionsConfig) -> Chann
if sf is None:
raise HTTPException(status_code=503, detail="Channel connection persistence is not available")
cipher = ChannelCredentialCipher.from_key(config.encryption_key) if config.encryption_key else None
repo = ChannelConnectionRepository(sf, cipher=cipher)
repo = ChannelConnectionRepository(sf)
request.app.state.channel_connection_repo = repo
return repo
@@ -109,41 +124,48 @@ def _provider_config(config: ChannelConnectionsConfig, provider: str):
return provider_config
def _provider_unavailable_reason(config: ChannelConnectionsConfig, provider: str) -> str | None:
def _runtime_channel_configured(provider: str, channels_config: dict[str, Any]) -> bool:
runtime_config = channels_config.get(provider)
if not isinstance(runtime_config, dict) or not runtime_config.get("enabled", False):
return False
return all(str(runtime_config.get(key) or "").strip() for key in _RUNTIME_REQUIREMENTS[provider])
def _runtime_unavailable_reason(provider: str) -> str:
keys = " and ".join(f"channels.{provider}.{key}" for key in _RUNTIME_REQUIREMENTS[provider])
return f"Enable and configure channels.{provider} with {keys}."
def _provider_unavailable_reason(
config: ChannelConnectionsConfig,
channels_config: dict[str, Any],
provider: str,
) -> str | None:
provider_config = _provider_config(config, provider)
if not provider_config.enabled or not provider_config.configured:
if not provider_config.enabled:
return None
if provider == "telegram" and getattr(provider_config, "delivery", "polling") == "webhook":
if not provider_config.webhook_secret:
return "Telegram webhook delivery requires channel_connections.telegram.webhook_secret"
if not config.public_base_url:
return "Telegram webhook delivery requires channel_connections.public_base_url; use polling for local/private deployments"
if provider == "slack" and getattr(provider_config, "event_delivery", "http") == "http" and not config.public_base_url:
return "Slack HTTP Events require channel_connections.public_base_url; use a public URL/tunnel or Slack Socket Mode for private deployments"
if provider in {"slack", "discord"} and not config.encryption_key:
display_name = _PROVIDER_META[provider]["display_name"]
return f"{display_name} connections require channel_connections.encryption_key to store OAuth credentials"
if not provider_config.configured:
if provider == "telegram":
return "Configure channel_connections.telegram.bot_username for Telegram deep links."
return f"Configure channel_connections.{provider}."
if not _runtime_channel_configured(provider, channels_config):
return _runtime_unavailable_reason(provider)
return None
def _require_provider_connectable(config: ChannelConnectionsConfig, provider: str) -> None:
reason = _provider_unavailable_reason(config, provider)
if reason:
raise HTTPException(status_code=400, detail=reason)
def _provider_status(
config: ChannelConnectionsConfig,
channels_config: dict[str, Any],
provider: str,
) -> tuple[dict[str, bool], str | None]:
declared = config.provider_status(provider)
unavailable_reason = _provider_unavailable_reason(config, channels_config, provider)
configured = declared["configured"] and _runtime_channel_configured(provider, channels_config)
return {"enabled": declared["enabled"], "configured": configured}, unavailable_reason
def _callback_base_url(config: ChannelConnectionsConfig, request: Request) -> str:
if config.public_base_url:
return config.public_base_url.rstrip("/")
return str(request.base_url).rstrip("/")
def _callback_redirect_uri(config: ChannelConnectionsConfig, request: Request, provider: str) -> str:
return f"{_callback_base_url(config, request)}/api/channels/{provider}/callback"
def _new_binding_code() -> str:
return secrets.token_hex(4)
async def _create_state(
@@ -151,135 +173,40 @@ async def _create_state(
*,
owner_user_id: str,
provider: str,
requested_scopes: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> str:
state = secrets.token_urlsafe(32)
state = _new_binding_code()
await repo.create_oauth_state(
owner_user_id=owner_user_id,
provider=provider,
state=state,
requested_scopes=requested_scopes,
metadata=metadata,
expires_at=datetime.now(UTC) + timedelta(seconds=_STATE_TTL_SECONDS),
)
return state
def _build_connect_url(config: ChannelConnectionsConfig, request: Request, provider: str, state: str) -> str:
provider_config = _provider_config(config, provider)
def _connect_instruction(provider: str, code: str) -> str:
if provider == "telegram":
return f"https://t.me/{provider_config.bot_username}?start={state}"
redirect_uri = _callback_redirect_uri(config, request, provider)
return f"Send /start {code} to the DeerFlow Telegram bot."
if provider == "slack":
query = urlencode(
{
"client_id": provider_config.client_id,
"scope": ",".join(provider_config.scopes),
"redirect_uri": redirect_uri,
"state": state,
}
)
return f"https://slack.com/oauth/v2/authorize?{query}"
return f"Send /connect {code} to the DeerFlow Slack bot."
if provider == "discord":
scopes = "identify guilds bot applications.commands"
query = urlencode(
{
"client_id": provider_config.client_id,
"response_type": "code",
"redirect_uri": redirect_uri,
"scope": scopes,
"state": state,
"permissions": provider_config.permissions,
}
)
return f"https://discord.com/oauth2/authorize?{query}"
return f"Send /connect {code} to the DeerFlow Discord bot."
raise HTTPException(status_code=404, detail="Unknown channel provider")
def _callback_redirect(provider: str, state_data: dict[str, Any]) -> RedirectResponse:
redirect_after = state_data.get("redirect_after")
if isinstance(redirect_after, str) and redirect_after:
return RedirectResponse(redirect_after)
return RedirectResponse(f"/workspace?channel_connected={provider}")
def _get_message_bus(request: Request):
bus = getattr(request.app.state, "channel_message_bus", None)
if bus is not None:
return bus
try:
from app.channels.service import get_channel_service
except Exception:
def _connect_url(config: ChannelConnectionsConfig, provider: str, code: str) -> str | None:
if provider == "telegram":
provider_config = _provider_config(config, provider)
return f"https://t.me/{provider_config.bot_username}?start={code}"
if provider in {"slack", "discord"}:
return None
service = get_channel_service()
return service.bus if service is not None else None
def _get_channel_instance(request: Request, name: str):
channel_instances = getattr(request.app.state, "channel_instances", None)
if isinstance(channel_instances, dict) and name in channel_instances:
return channel_instances[name]
try:
from app.channels.service import get_channel_service
except Exception:
return None
service = get_channel_service()
return service.get_channel(name) if service is not None else None
async def _publish_slack_event(
*,
repo: ChannelConnectionRepository,
bus: Any,
payload: dict[str, Any],
) -> bool:
event = payload.get("event") or {}
event_type = event.get("type")
if event_type not in {"message", "app_mention"}:
return False
if event.get("bot_id") or event.get("subtype"):
return False
text = str(event.get("text") or "").strip()
user_id = str(event.get("user") or "")
channel_id = str(event.get("channel") or "")
team_id = str(payload.get("team_id") or event.get("team") or event.get("team_id") or "")
if not text or not user_id or not channel_id or not team_id:
return False
connection = await repo.find_connection_by_external_identity(
provider="slack",
external_account_id=user_id,
workspace_id=team_id,
)
if connection is None:
return False
thread_ts = str(event.get("thread_ts") or event.get("ts") or "")
inbound = InboundMessage(
channel_name="slack",
chat_id=channel_id,
user_id=user_id,
text=text,
msg_type=InboundMessageType.COMMAND if text.startswith("/") else InboundMessageType.CHAT,
thread_ts=thread_ts,
metadata={"team_id": team_id, "event_id": payload.get("event_id")},
connection_id=connection["id"],
owner_user_id=connection["owner_user_id"],
workspace_id=team_id,
)
inbound.topic_id = thread_ts or None
await bus.publish_inbound(inbound)
return True
raise HTTPException(status_code=404, detail="Unknown channel provider")
@router.get("/providers", response_model=ChannelProvidersResponse)
async def get_channel_providers(request: Request) -> ChannelProvidersResponse:
config = _get_channel_connections_config(request)
channels_config = _get_channels_config(request)
repo = None
if config.enabled:
try:
@@ -293,9 +220,8 @@ async def get_channel_providers(request: Request) -> ChannelProvidersResponse:
providers: list[ChannelProviderResponse] = []
for provider, meta in _PROVIDER_META.items():
status = config.provider_status(provider)
status, unavailable_reason = _provider_status(config, channels_config, provider)
connection = by_provider.get(provider)
unavailable_reason = _provider_unavailable_reason(config, provider)
providers.append(
ChannelProviderResponse(
provider=provider,
@@ -337,199 +263,32 @@ async def disconnect_channel_connection(connection_id: str, request: Request) ->
return Response(status_code=204)
@router.get("/slack/callback")
async def slack_oauth_callback(request: Request, code: str | None = None, state: str | None = None, error: str | None = None):
if error:
raise HTTPException(status_code=400, detail=f"Slack OAuth failed: {error}")
if not code or not state:
raise HTTPException(status_code=400, detail="Slack OAuth callback is missing code or state")
config = _get_channel_connections_config(request)
provider_config = _provider_config(config, "slack")
if not config.enabled or not provider_config.enabled or not provider_config.configured:
raise HTTPException(status_code=400, detail="Channel provider is not configured")
repo = _get_repository(request, config)
state_data = await repo.consume_oauth_state(provider="slack", state=state)
if state_data is None:
raise HTTPException(status_code=400, detail="Invalid or expired OAuth state")
redirect_uri = _callback_redirect_uri(config, request, "slack")
install = await slack_connect.exchange_slack_oauth_code(
client_id=provider_config.client_id,
client_secret=provider_config.client_secret,
code=code,
redirect_uri=redirect_uri,
)
connection = await repo.upsert_connection(
owner_user_id=state_data["owner_user_id"],
provider="slack",
external_account_id=install.authed_user_id,
workspace_id=install.team_id,
workspace_name=install.team_name,
bot_user_id=install.bot_user_id,
scopes=install.scopes or state_data.get("requested_scopes", []),
metadata={"team_id": install.team_id, "team_name": install.team_name},
status="connected",
)
await repo.store_credentials(
connection["id"],
access_token=install.bot_access_token,
token_type="Bearer",
extra={"bot_user_id": install.bot_user_id, "team_id": install.team_id},
)
return _callback_redirect("slack", state_data)
@router.get("/discord/callback")
async def discord_oauth_callback(request: Request, code: str | None = None, state: str | None = None, error: str | None = None):
if error:
raise HTTPException(status_code=400, detail=f"Discord OAuth failed: {error}")
if not code or not state:
raise HTTPException(status_code=400, detail="Discord OAuth callback is missing code or state")
config = _get_channel_connections_config(request)
provider_config = _provider_config(config, "discord")
if not config.enabled or not provider_config.enabled or not provider_config.configured:
raise HTTPException(status_code=400, detail="Channel provider is not configured")
repo = _get_repository(request, config)
state_data = await repo.consume_oauth_state(provider="discord", state=state)
if state_data is None:
raise HTTPException(status_code=400, detail="Invalid or expired OAuth state")
redirect_uri = _callback_redirect_uri(config, request, "discord")
identity = await discord_connect.complete_discord_oauth(
client_id=provider_config.client_id,
client_secret=provider_config.client_secret,
code=code,
redirect_uri=redirect_uri,
)
connection = await repo.upsert_connection(
owner_user_id=state_data["owner_user_id"],
provider="discord",
external_account_id=identity.user_id,
external_account_name=identity.display_name or identity.username,
scopes=identity.scopes or state_data.get("requested_scopes", []),
capabilities={"message_content_intent_required": provider_config.require_message_content_intent},
metadata={"username": identity.username, "guilds": identity.guilds},
status="connected",
)
await repo.store_credentials(
connection["id"],
access_token=identity.access_token,
refresh_token=identity.refresh_token,
token_type=identity.token_type,
expires_at=identity.expires_at,
extra={"guilds": identity.guilds},
)
return _callback_redirect("discord", state_data)
@router.post("/webhooks/slack/events")
async def slack_events_webhook(request: Request):
config = _get_channel_connections_config(request)
provider_config = _provider_config(config, "slack")
if not config.enabled or not provider_config.enabled or not provider_config.configured:
raise HTTPException(status_code=400, detail="Channel provider is not configured")
body = await request.body()
if not slack_connect.verify_slack_signature(
signing_secret=provider_config.signing_secret,
timestamp=request.headers.get("X-Slack-Request-Timestamp"),
body=body,
signature=request.headers.get("X-Slack-Signature"),
):
raise HTTPException(status_code=401, detail="Invalid Slack signature")
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
raise HTTPException(status_code=400, detail="Invalid Slack payload") from exc
if payload.get("type") == "url_verification":
challenge = payload.get("challenge")
if not isinstance(challenge, str):
raise HTTPException(status_code=400, detail="Slack challenge is missing")
return PlainTextResponse(challenge)
repo = _get_repository(request, config)
delivery_id = str(payload.get("event_id") or hashlib.sha256(body).hexdigest())
payload_hash = hashlib.sha256(body).hexdigest()
event = payload.get("event") or {}
is_new = await repo.record_webhook_delivery(
provider="slack",
delivery_id=delivery_id,
payload_sha256=payload_hash,
event_type=event.get("type"),
)
if not is_new:
return {"ok": True, "duplicate": True, "processed": False}
bus = _get_message_bus(request)
processed = False
if bus is not None:
processed = await _publish_slack_event(repo=repo, bus=bus, payload=payload)
return {"ok": True, "processed": processed}
@router.post("/webhooks/telegram")
async def telegram_webhook(request: Request):
config = _get_channel_connections_config(request)
provider_config = _provider_config(config, "telegram")
if not config.enabled or not provider_config.enabled or not provider_config.configured:
raise HTTPException(status_code=400, detail="Channel provider is not configured")
secret_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token")
if not secret_header or not secrets.compare_digest(secret_header, provider_config.webhook_secret):
raise HTTPException(status_code=401, detail="Invalid Telegram webhook secret")
body = await request.body()
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
raise HTTPException(status_code=400, detail="Invalid Telegram payload") from exc
repo = _get_repository(request, config)
delivery_id = str(payload.get("update_id") or hashlib.sha256(body).hexdigest())
is_new = await repo.record_webhook_delivery(
provider="telegram",
delivery_id=delivery_id,
payload_sha256=hashlib.sha256(body).hexdigest(),
event_type="update",
)
if not is_new:
return {"ok": True, "duplicate": True, "processed": False}
processed = False
channel = _get_channel_instance(request, "telegram")
process_update = getattr(channel, "process_webhook_update", None)
if process_update is not None:
processed = bool(await process_update(payload))
return {"ok": True, "processed": processed}
@router.post("/{provider}/connect", response_model=ChannelConnectResponse)
async def connect_channel_provider(provider: str, request: Request) -> ChannelConnectResponse:
config = _get_channel_connections_config(request)
channels_config = _get_channels_config(request)
if not config.enabled:
raise HTTPException(status_code=400, detail="Channel connections are disabled")
provider_config = _provider_config(config, provider)
if not provider_config.enabled or not provider_config.configured:
status, unavailable_reason = _provider_status(config, channels_config, provider)
if not status["enabled"]:
raise HTTPException(status_code=400, detail="Channel provider is not enabled")
if unavailable_reason:
raise HTTPException(status_code=400, detail=unavailable_reason)
if not status["configured"]:
raise HTTPException(status_code=400, detail="Channel provider is not configured")
_require_provider_connectable(config, provider)
repo = _get_repository(request, config)
state = await _create_state(
code = await _create_state(
repo,
owner_user_id=_get_user_id(request),
provider=provider,
requested_scopes=getattr(provider_config, "scopes", []),
)
return ChannelConnectResponse(
provider=provider,
mode=_PROVIDER_META[provider]["auth_mode"],
url=_build_connect_url(config, request, provider, state),
url=_connect_url(config, provider, code),
code=code,
instruction=_connect_instruction(provider, code),
expires_in=_STATE_TTL_SECONDS,
)