mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-06-11 18:05:58 +00:00
Address channel connection review comments
This commit is contained in:
@@ -136,27 +136,25 @@ class ChannelService:
|
|||||||
has_creds = any(not isinstance(channel_config.get(k), bool) and channel_config.get(k) is not None and str(channel_config[k]).strip() for k in cred_keys)
|
has_creds = any(not isinstance(channel_config.get(k), bool) and channel_config.get(k) is not None and str(channel_config[k]).strip() for k in cred_keys)
|
||||||
if has_creds:
|
if has_creds:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Channel '%s' has credentials configured but is disabled. Set enabled: true under channels.%s in config.yaml to activate it.",
|
"A configured channel has credentials configured but is disabled. Set enabled: true under its channels entry in config.yaml to activate it.",
|
||||||
name,
|
|
||||||
name,
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.info("Channel %s is disabled, skipping", name)
|
logger.info("A configured channel is disabled, skipping")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
await self._start_channel(name, channel_config)
|
await self._start_channel(name, channel_config)
|
||||||
|
|
||||||
self._running = True
|
self._running = True
|
||||||
logger.info("ChannelService started with channels: %s", list(self._channels.keys()))
|
logger.info("ChannelService started with %d channels", len(self._channels))
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop all channels and the manager."""
|
"""Stop all channels and the manager."""
|
||||||
for name, channel in list(self._channels.items()):
|
for name, channel in list(self._channels.items()):
|
||||||
try:
|
try:
|
||||||
await channel.stop()
|
await channel.stop()
|
||||||
logger.info("Channel %s stopped", name)
|
logger.info("Channel stopped")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error stopping channel %s", name)
|
logger.exception("Error stopping channel")
|
||||||
self._channels.clear()
|
self._channels.clear()
|
||||||
|
|
||||||
await self.manager.stop()
|
await self.manager.stop()
|
||||||
@@ -169,12 +167,12 @@ class ChannelService:
|
|||||||
try:
|
try:
|
||||||
await self._channels[name].stop()
|
await self._channels[name].stop()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error stopping channel %s for restart", name)
|
logger.exception("Error stopping channel for restart")
|
||||||
del self._channels[name]
|
del self._channels[name]
|
||||||
|
|
||||||
config = self._config.get(name)
|
config = self._config.get(name)
|
||||||
if not config or not isinstance(config, dict):
|
if not config or not isinstance(config, dict):
|
||||||
logger.warning("No config for channel %s", name)
|
logger.warning("No config for requested channel")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return await self._start_channel(name, config)
|
return await self._start_channel(name, config)
|
||||||
@@ -194,17 +192,17 @@ class ChannelService:
|
|||||||
return True
|
return True
|
||||||
try:
|
try:
|
||||||
await channel.stop()
|
await channel.stop()
|
||||||
logger.info("Channel %s stopped and removed", name)
|
logger.info("Channel stopped and removed")
|
||||||
return True
|
return True
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error stopping channel %s for removal", name)
|
logger.exception("Error stopping channel for removal")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
async def _start_channel(self, name: str, config: dict[str, Any]) -> bool:
|
||||||
"""Instantiate and start a single channel."""
|
"""Instantiate and start a single channel."""
|
||||||
import_path = _CHANNEL_REGISTRY.get(name)
|
import_path = _CHANNEL_REGISTRY.get(name)
|
||||||
if not import_path:
|
if not import_path:
|
||||||
logger.warning("Unknown channel type: %s", name)
|
logger.warning("Unknown channel type")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -212,7 +210,7 @@ class ChannelService:
|
|||||||
|
|
||||||
channel_cls = resolve_class(import_path, base_class=None)
|
channel_cls = resolve_class(import_path, base_class=None)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to import channel class for %s", name)
|
logger.exception("Failed to import channel class")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -225,13 +223,13 @@ class ChannelService:
|
|||||||
await channel.start()
|
await channel.start()
|
||||||
if not channel.is_running:
|
if not channel.is_running:
|
||||||
self._channels.pop(name, None)
|
self._channels.pop(name, None)
|
||||||
logger.error("Channel %s did not enter a running state after start()", name)
|
logger.error("Channel did not enter a running state after start()")
|
||||||
return False
|
return False
|
||||||
logger.info("Channel %s started", name)
|
logger.info("Channel started")
|
||||||
return True
|
return True
|
||||||
except Exception:
|
except Exception:
|
||||||
self._channels.pop(name, None)
|
self._channels.pop(name, None)
|
||||||
logger.exception("Failed to start channel %s", name)
|
logger.exception("Failed to start channel")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_status(self) -> dict[str, Any]:
|
def get_status(self) -> dict[str, Any]:
|
||||||
|
|||||||
@@ -414,7 +414,7 @@ async def _restart_runtime_channel_if_available(provider: str, runtime_config: d
|
|||||||
try:
|
try:
|
||||||
from app.channels.service import get_channel_service
|
from app.channels.service import get_channel_service
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to import channel service while configuring %s", provider)
|
logger.exception("Failed to import channel service while configuring a runtime channel")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
service = get_channel_service()
|
service = get_channel_service()
|
||||||
@@ -427,7 +427,7 @@ async def _sync_runtime_channel_after_removal(provider: str, channels_config: di
|
|||||||
try:
|
try:
|
||||||
from app.channels.service import get_channel_service
|
from app.channels.service import get_channel_service
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Failed to import channel service while disconnecting %s", provider)
|
logger.exception("Failed to import channel service while disconnecting a runtime channel")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
service = get_channel_service()
|
service = get_channel_service()
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ def make_safe_user_id(raw: str) -> str:
|
|||||||
sanitized = _UNSAFE_USER_ID_CHAR_RE.sub("-", raw)
|
sanitized = _UNSAFE_USER_ID_CHAR_RE.sub("-", raw)
|
||||||
if sanitized == raw:
|
if sanitized == raw:
|
||||||
return raw
|
return raw
|
||||||
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:_SAFE_USER_ID_DIGEST_HEX_LEN]
|
digest = hashlib.sha1(raw.encode("utf-8")).hexdigest()[:_SAFE_USER_ID_DIGEST_HEX_LEN]
|
||||||
return f"{sanitized}-{digest}"
|
return f"{sanitized}-{digest}"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3532,7 +3532,8 @@ class TestChannelService:
|
|||||||
await service.stop()
|
await service.stop()
|
||||||
|
|
||||||
_run(go())
|
_run(go())
|
||||||
assert any("wecom" in r.message and r.levelno == logging.WARNING for r in caplog.records)
|
assert any("credentials configured but is disabled" in r.message and r.levelno == logging.WARNING for r in caplog.records)
|
||||||
|
assert all("wecom" not in r.message for r in caplog.records)
|
||||||
|
|
||||||
def test_disabled_channel_with_int_creds_emits_warning(self, caplog):
|
def test_disabled_channel_with_int_creds_emits_warning(self, caplog):
|
||||||
"""Warning is emitted even when YAML-parsed integer credentials are present."""
|
"""Warning is emitted even when YAML-parsed integer credentials are present."""
|
||||||
@@ -3552,7 +3553,8 @@ class TestChannelService:
|
|||||||
await service.stop()
|
await service.stop()
|
||||||
|
|
||||||
_run(go())
|
_run(go())
|
||||||
assert any("telegram" in r.message and r.levelno == logging.WARNING for r in caplog.records)
|
assert any("credentials configured but is disabled" in r.message and r.levelno == logging.WARNING for r in caplog.records)
|
||||||
|
assert all("telegram" not in r.message for r in caplog.records)
|
||||||
|
|
||||||
def test_disabled_channel_without_creds_emits_info(self, caplog):
|
def test_disabled_channel_without_creds_emits_info(self, caplog):
|
||||||
"""Only an info log (no warning) is emitted when a channel is disabled with no credentials."""
|
"""Only an info log (no warning) is emitted when a channel is disabled with no credentials."""
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ class TestMakeSafeUserId:
|
|||||||
# Sanitized prefix plus a stable digest of the original.
|
# Sanitized prefix plus a stable digest of the original.
|
||||||
assert result.startswith("user-example-com-")
|
assert result.startswith("user-example-com-")
|
||||||
assert len(result.rsplit("-", 1)[1]) == 16
|
assert len(result.rsplit("-", 1)[1]) == 16
|
||||||
|
assert result == "user-example-com-63a710569261a24b"
|
||||||
assert make_safe_user_id("user@example.com") == result
|
assert make_safe_user_id("user@example.com") == result
|
||||||
|
|
||||||
def test_sanitized_id_passes_validation(self, paths: Paths):
|
def test_sanitized_id_passes_validation(self, paths: Paths):
|
||||||
|
|||||||
+13
-95
@@ -29,6 +29,14 @@ set -e
|
|||||||
REPO_ROOT="$(builtin cd "$(dirname "${BASH_SOURCE[0]}")/.." >/dev/null 2>&1 && pwd -P)"
|
REPO_ROOT="$(builtin cd "$(dirname "${BASH_SOURCE[0]}")/.." >/dev/null 2>&1 && pwd -P)"
|
||||||
cd "$REPO_ROOT"
|
cd "$REPO_ROOT"
|
||||||
|
|
||||||
|
# ── Load .env ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
if [ -f "$REPO_ROOT/.env" ]; then
|
||||||
|
set -a
|
||||||
|
source "$REPO_ROOT/.env"
|
||||||
|
set +a
|
||||||
|
fi
|
||||||
|
|
||||||
_pick_python() {
|
_pick_python() {
|
||||||
local candidate
|
local candidate
|
||||||
for candidate in python3 python py; do
|
for candidate in python3 python py; do
|
||||||
@@ -40,61 +48,6 @@ _pick_python() {
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
_load_dotenv_file() {
|
|
||||||
local env_file=$1
|
|
||||||
local python_bin
|
|
||||||
|
|
||||||
[ -f "$env_file" ] || return 0
|
|
||||||
|
|
||||||
if ! python_bin="$(_pick_python)"; then
|
|
||||||
echo "Python is required to load $env_file safely."
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
eval "$("$python_bin" - "$env_file" <<'PY'
|
|
||||||
import re
|
|
||||||
import shlex
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
env_path = Path(sys.argv[1])
|
|
||||||
assign_re = re.compile(r"^(?:export\s+)?([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.*)$")
|
|
||||||
|
|
||||||
|
|
||||||
def strip_unquoted_comment(value: str) -> str:
|
|
||||||
for index, char in enumerate(value):
|
|
||||||
if char == "#" and (index == 0 or value[index - 1].isspace()):
|
|
||||||
return value[:index].rstrip()
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
for raw_line in env_path.read_text(encoding="utf-8").splitlines():
|
|
||||||
line = raw_line.strip()
|
|
||||||
if not line or line.startswith("#"):
|
|
||||||
continue
|
|
||||||
|
|
||||||
match = assign_re.match(line)
|
|
||||||
if not match:
|
|
||||||
continue
|
|
||||||
|
|
||||||
key, value = match.groups()
|
|
||||||
value = value.strip()
|
|
||||||
try:
|
|
||||||
parsed = shlex.split(value, comments=True, posix=True)
|
|
||||||
except ValueError:
|
|
||||||
value = strip_unquoted_comment(value)
|
|
||||||
else:
|
|
||||||
value = parsed[0] if parsed else ""
|
|
||||||
|
|
||||||
print(f"export {key}={shlex.quote(value)}")
|
|
||||||
PY
|
|
||||||
)"
|
|
||||||
}
|
|
||||||
|
|
||||||
# ── Load .env ────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
_load_dotenv_file "$REPO_ROOT/.env"
|
|
||||||
|
|
||||||
# ── Argument parsing ─────────────────────────────────────────────────────────
|
# ── Argument parsing ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
DEV_MODE=true
|
DEV_MODE=true
|
||||||
@@ -226,10 +179,7 @@ _is_port_listening() {
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
if command -v netstat >/dev/null 2>&1; then
|
if command -v netstat >/dev/null 2>&1; then
|
||||||
if netstat -ltn 2>/dev/null | awk -v port="$port" '
|
if netstat -ltn 2>/dev/null | awk '{print $4}' | grep -Eq "(^|[.:])${port}$"; then
|
||||||
toupper($NF) == "LISTEN" && $4 ~ "(^|[.:])" port "$" { found = 1 }
|
|
||||||
END { exit found ? 0 : 1 }
|
|
||||||
'; then
|
|
||||||
return 0
|
return 0
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
@@ -237,21 +187,6 @@ _is_port_listening() {
|
|||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
_wait_for_port_free() {
|
|
||||||
local port=$1
|
|
||||||
local timeout=${2:-10}
|
|
||||||
local elapsed=0
|
|
||||||
|
|
||||||
while _is_port_listening "$port"; do
|
|
||||||
if [ "$elapsed" -ge "$timeout" ]; then
|
|
||||||
echo " ⚠ Port $port is still in use after ${timeout}s"
|
|
||||||
return 1
|
|
||||||
fi
|
|
||||||
sleep 1
|
|
||||||
elapsed=$((elapsed + 1))
|
|
||||||
done
|
|
||||||
}
|
|
||||||
|
|
||||||
_is_repo_nginx_pid() {
|
_is_repo_nginx_pid() {
|
||||||
local pid=$1
|
local pid=$1
|
||||||
local command
|
local command
|
||||||
@@ -304,12 +239,9 @@ stop_all() {
|
|||||||
echo "Stopping all services..."
|
echo "Stopping all services..."
|
||||||
_report_reclaimed_ports
|
_report_reclaimed_ports
|
||||||
_kill_repo_processes "uvicorn app.gateway.app:app"
|
_kill_repo_processes "uvicorn app.gateway.app:app"
|
||||||
_kill_repo_processes "pnpm .*run dev"
|
|
||||||
_kill_repo_processes "next dev"
|
_kill_repo_processes "next dev"
|
||||||
_kill_repo_processes "next start"
|
_kill_repo_processes "next start"
|
||||||
_kill_repo_processes "next-server"
|
_kill_repo_processes "next-server"
|
||||||
_kill_repo_processes "next/dist"
|
|
||||||
_kill_repo_processes "turbopack"
|
|
||||||
nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true
|
nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true
|
||||||
sleep 1
|
sleep 1
|
||||||
_kill_repo_nginx
|
_kill_repo_nginx
|
||||||
@@ -320,9 +252,6 @@ stop_all() {
|
|||||||
_kill_repo_port 8001
|
_kill_repo_port 8001
|
||||||
_kill_repo_port 3000
|
_kill_repo_port 3000
|
||||||
_kill_repo_port 2026
|
_kill_repo_port 2026
|
||||||
_wait_for_port_free 8001 30 || true
|
|
||||||
_wait_for_port_free 3000 30 || true
|
|
||||||
_wait_for_port_free 2026 30 || true
|
|
||||||
./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true
|
./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true
|
||||||
echo "✓ All services stopped"
|
echo "✓ All services stopped"
|
||||||
}
|
}
|
||||||
@@ -485,7 +414,6 @@ trap 'cleanup 143' TERM
|
|||||||
# In daemon mode, wraps with nohup. Waits for port to be ready.
|
# In daemon mode, wraps with nohup. Waits for port to be ready.
|
||||||
run_service() {
|
run_service() {
|
||||||
local name="$1" cmd="$2" port="$3" timeout="$4"
|
local name="$1" cmd="$2" port="$3" timeout="$4"
|
||||||
local service_pid
|
|
||||||
|
|
||||||
if _is_port_listening "$port"; then
|
if _is_port_listening "$port"; then
|
||||||
echo "✗ $name cannot start because port $port is already in use."
|
echo "✗ $name cannot start because port $port is already in use."
|
||||||
@@ -495,14 +423,10 @@ run_service() {
|
|||||||
|
|
||||||
echo "Starting $name..."
|
echo "Starting $name..."
|
||||||
if $DAEMON_MODE; then
|
if $DAEMON_MODE; then
|
||||||
nohup sh -c "$cmd" < /dev/null > /dev/null 2>&1 &
|
nohup sh -c "$cmd" > /dev/null 2>&1 &
|
||||||
else
|
else
|
||||||
sh -c "$cmd" &
|
sh -c "$cmd" &
|
||||||
fi
|
fi
|
||||||
service_pid=$!
|
|
||||||
if $DAEMON_MODE; then
|
|
||||||
disown "$service_pid" 2>/dev/null || true
|
|
||||||
fi
|
|
||||||
|
|
||||||
./scripts/wait-for-port.sh "$port" "$timeout" "$name" || {
|
./scripts/wait-for-port.sh "$port" "$timeout" "$name" || {
|
||||||
local logfile="logs/$(echo "$name" | tr '[:upper:]' '[:lower:]' | tr ' ' '-').log"
|
local logfile="logs/$(echo "$name" | tr '[:upper:]' '[:lower:]' | tr ' ' '-').log"
|
||||||
@@ -510,12 +434,6 @@ run_service() {
|
|||||||
[ -f "$logfile" ] && tail -20 "$logfile"
|
[ -f "$logfile" ] && tail -20 "$logfile"
|
||||||
cleanup 1
|
cleanup 1
|
||||||
}
|
}
|
||||||
if ! kill -0 "$service_pid" 2>/dev/null; then
|
|
||||||
local logfile="logs/$(echo "$name" | tr '[:upper:]' '[:lower:]' | tr ' ' '-').log"
|
|
||||||
echo "✗ $name process exited after port $port became available."
|
|
||||||
[ -f "$logfile" ] && tail -20 "$logfile"
|
|
||||||
cleanup 1
|
|
||||||
fi
|
|
||||||
echo "✓ $name started on localhost:$port"
|
echo "✓ $name started on localhost:$port"
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -526,17 +444,17 @@ mkdir -p temp/client_body_temp temp/proxy_temp temp/fastcgi_temp temp/uwsgi_temp
|
|||||||
|
|
||||||
# 1. Gateway API
|
# 1. Gateway API
|
||||||
run_service "Gateway" \
|
run_service "Gateway" \
|
||||||
"cd backend && exec env PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 $GATEWAY_EXTRA_FLAGS > ../logs/gateway.log 2>&1" \
|
"cd backend && PYTHONPATH=. uv run uvicorn app.gateway.app:app --host 0.0.0.0 --port 8001 $GATEWAY_EXTRA_FLAGS > ../logs/gateway.log 2>&1" \
|
||||||
8001 30
|
8001 30
|
||||||
|
|
||||||
# 2. Frontend
|
# 2. Frontend
|
||||||
run_service "Frontend" \
|
run_service "Frontend" \
|
||||||
"cd frontend && exec $FRONTEND_CMD > ../logs/frontend.log 2>&1" \
|
"cd frontend && $FRONTEND_CMD > ../logs/frontend.log 2>&1" \
|
||||||
3000 120
|
3000 120
|
||||||
|
|
||||||
# 3. Nginx
|
# 3. Nginx
|
||||||
run_service "Nginx" \
|
run_service "Nginx" \
|
||||||
"exec nginx -g 'daemon off;' -c '$REPO_ROOT/docker/nginx/nginx.local.conf' -p '$REPO_ROOT' > logs/nginx.log 2>&1" \
|
"nginx -g 'daemon off;' -c '$REPO_ROOT/docker/nginx/nginx.local.conf' -p '$REPO_ROOT' > logs/nginx.log 2>&1" \
|
||||||
2026 10
|
2026 10
|
||||||
|
|
||||||
# ── Ready ────────────────────────────────────────────────────────────────────
|
# ── Ready ────────────────────────────────────────────────────────────────────
|
||||||
|
|||||||
Reference in New Issue
Block a user