mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-12 02:15:58 +00:00
Address channel connection review comments
This commit is contained in:
+1
-1
@@ -407,7 +407,7 @@ Bridges external messaging platforms (Feishu, Slack, Telegram, Discord, DingTalk
|
||||
- Telegram uses a deep-link `/start <code>` flow over the existing long-polling worker. Slack, Discord, Feishu/Lark, DingTalk, WeChat, and WeCom use `/connect <code>` over their existing outbound channel workers.
|
||||
- Frontend APIs: `GET /api/channels/providers`, `GET /api/channels/connections`, `POST /api/channels/{provider}/connect`, and `DELETE /api/channels/connections/{connection_id}`.
|
||||
- Browser APIs remain protected by normal Gateway auth/CSRF. Provider messages arrive through the already-configured channel workers.
|
||||
- Slack replies use the configured operator bot token from `channels.slack` unless a future provider-token flow stores per-connection credentials.
|
||||
- Slack replies use the configured operator bot token from `channels.slack` unless per-connection credentials are present; unreadable or corrupt stored credentials are treated as unavailable.
|
||||
- Telegram, Slack, Discord, Feishu/Lark, DingTalk, WeChat, and WeCom workers resolve incoming platform identities to connection records before reaching `ChannelManager`.
|
||||
- See `backend/docs/IM_CHANNEL_CONNECTIONS.md` for provider setup and operational notes.
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@ Feishu/Lark, DingTalk, WeChat, and WeCom:
|
||||
- The UI shows `Send /connect <code> to the DeerFlow <Provider> bot.`
|
||||
- The already-running long-connection or polling worker receives the message and binds the platform user/workspace identity to the current DeerFlow user.
|
||||
|
||||
Codes expire after 10 minutes and are single-use.
|
||||
Codes use 128 bits of randomness, expire after 10 minutes, and are single-use.
|
||||
|
||||
## Runtime Model
|
||||
|
||||
@@ -116,6 +116,7 @@ Incoming messages that resolve to a connection carry `connection_id`, `owner_use
|
||||
## Security Notes
|
||||
|
||||
- Browser APIs remain authenticated and CSRF-protected.
|
||||
- Connect codes are random, short-lived, and single-use.
|
||||
- Connect codes are 128-bit random, short-lived, and single-use.
|
||||
- Provider bot tokens remain in `channels.*` and are never returned to the browser.
|
||||
- Stored per-connection credentials are encrypted. If stored credential material cannot be decrypted, DeerFlow treats it as unavailable instead of using corrupt secrets.
|
||||
- This implementation does not add public provider callback or webhook routes.
|
||||
|
||||
@@ -5,11 +5,12 @@ from __future__ import annotations
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
from sqlalchemy import delete, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
||||
|
||||
@@ -21,6 +22,8 @@ from deerflow.persistence.channel_connections.model import (
|
||||
)
|
||||
from deerflow.utils.time import coerce_iso
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChannelCredentialCipher:
|
||||
"""Encrypts provider credentials before they are persisted."""
|
||||
@@ -195,6 +198,7 @@ class ChannelConnectionRepository:
|
||||
row = await session.get(ChannelCredentialRow, connection_id)
|
||||
if row is None:
|
||||
return None
|
||||
try:
|
||||
extra_raw = self._cipher.decrypt_text(row.encrypted_extra_json)
|
||||
return {
|
||||
"connection_id": row.connection_id,
|
||||
@@ -205,6 +209,12 @@ class ChannelConnectionRepository:
|
||||
"refresh_expires_at": self._coerce_datetime(row.refresh_expires_at),
|
||||
"extra": json.loads(extra_raw) if extra_raw else {},
|
||||
}
|
||||
except (InvalidToken, UnicodeError, json.JSONDecodeError):
|
||||
logger.warning(
|
||||
"Unable to decrypt channel connection credentials; treating credentials as unavailable",
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def hash_state(state: str) -> str:
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from datetime import UTC, datetime, timedelta
|
||||
|
||||
import pytest
|
||||
@@ -122,6 +123,26 @@ class TestChannelConnectionRepository:
|
||||
assert credentials["expires_at"] == expires_at
|
||||
assert credentials["extra"] == {"bot_user_id": "B123"}
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_get_credentials_returns_none_when_decryption_fails(self, repo, caplog):
|
||||
connection = await repo.upsert_connection(
|
||||
owner_user_id="alice",
|
||||
provider="slack",
|
||||
external_account_id="U-alice",
|
||||
workspace_id="T1",
|
||||
)
|
||||
await repo.store_credentials(connection["id"], access_token="xoxb-secret-access-token")
|
||||
wrong_key_repo = ChannelConnectionRepository(
|
||||
repo.session_factory,
|
||||
cipher=ChannelCredentialCipher.from_key("wrong-encryption-key"),
|
||||
)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="deerflow.persistence.channel_connections.sql"):
|
||||
credentials = await wrong_key_repo.get_credentials(connection["id"])
|
||||
|
||||
assert credentials is None
|
||||
assert any("Unable to decrypt channel connection credentials" in record.message for record in caplog.records)
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_conversations_are_scoped_by_connection(self, repo):
|
||||
alice = await repo.upsert_connection(
|
||||
|
||||
@@ -188,6 +188,20 @@ def test_get_providers_uses_existing_channels_config(tmp_path):
|
||||
anyio.run(repo.close)
|
||||
|
||||
|
||||
def test_get_providers_degrades_when_persistence_is_unavailable(monkeypatch):
|
||||
monkeypatch.setattr(channel_connections, "get_session_factory", lambda: None)
|
||||
app = _make_app(_enabled_connections_config(), None, _channels_config())
|
||||
|
||||
with TestClient(app) as client:
|
||||
response = client.get("/api/channels/providers")
|
||||
|
||||
assert response.status_code == 200
|
||||
by_provider = {item["provider"]: item for item in response.json()["providers"]}
|
||||
assert by_provider["slack"]["configured"] is True
|
||||
assert by_provider["slack"]["connectable"] is True
|
||||
assert by_provider["slack"]["connection_status"] == "connected"
|
||||
|
||||
|
||||
def test_get_providers_reports_unconfigured_when_runtime_channel_is_missing(tmp_path):
|
||||
import anyio
|
||||
|
||||
|
||||
@@ -48,6 +48,12 @@ def test_format_sse_no_event_id():
|
||||
assert "id:" not in frame
|
||||
|
||||
|
||||
def test_sanitize_log_param_strips_control_characters():
|
||||
from app.gateway.utils import sanitize_log_param
|
||||
|
||||
assert sanitize_log_param("thread\nid\rwith\x00controls") == "threadidwithcontrols"
|
||||
|
||||
|
||||
def test_normalize_stream_modes_none():
|
||||
from app.gateway.services import normalize_stream_modes
|
||||
|
||||
|
||||
Reference in New Issue
Block a user