fix(channels): make runtime provider state authoritative (#3580)

* fix(channels): make runtime provider state authoritative

* make format

* fix(channels): close runtime provider config races and status gaps

Address review findings on the runtime-provider-state change:

- configure/disconnect now re-read the live app.state.channels_config
  after the worker await and mutate only the affected provider key in
  place, so a concurrent mutation for a different provider is no longer
  clobbered by a stale pre-await snapshot.
- disconnect revokes DB connection rows before committing the store and
  cache, so a repo failure cannot leave the store/cache "disconnected"
  while the DB keeps "connected" rows a later re-configure would
  silently reactivate.
- _provider_response preserves non-connected statuses (e.g. revoked)
  when the provider is unavailable, only masking a stale "connected"
  row as not_connected.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Nan Gao
2026-06-17 01:45:46 +02:00
committed by GitHub
parent 43dba448ad
commit 926406e0d6
5 changed files with 289 additions and 27 deletions
@@ -412,7 +412,16 @@ def _provider_response(
from app.gateway.auth_disabled import is_auth_disabled
status, unavailable_reason = _provider_status(config, channels_config, provider)
if connection:
if unavailable_reason is not None:
# The runtime provider is unavailable, so a stale "connected" row must
# not be reported as connected. Other statuses (e.g. "revoked") are
# preserved so consumers can still distinguish a revoked binding from a
# never-connected one.
if connection and connection["status"] != "connected":
connection_status = connection["status"]
else:
connection_status = "not_connected"
elif connection:
connection_status = connection["status"]
elif is_auth_disabled() and status["configured"] and unavailable_reason is None:
# Auth-disabled local mode routes every channel message to the default
@@ -561,7 +570,6 @@ async def disconnect_channel_provider_runtime(provider: str, request: Request) -
if not provider_config.enabled:
raise HTTPException(status_code=400, detail="Channel provider is not enabled")
owner_user_id = _get_user_id(request)
try:
repo = _get_repository(request, config)
except HTTPException as exc:
@@ -569,25 +577,33 @@ async def disconnect_channel_provider_runtime(provider: str, request: Request) -
raise
repo = None
if repo is not None:
for connection in await repo.list_connections(owner_user_id):
if connection["provider"] == provider and connection["status"] != "revoked":
await repo.disconnect_connection(
connection_id=connection["id"],
owner_user_id=owner_user_id,
)
current_channels_config = await _get_channels_config(request)
candidate_channels_config = dict(current_channels_config)
candidate_channels_config.pop(provider, None)
store = await _get_runtime_config_store(request)
await asyncio.to_thread(store.set_provider_disconnected, provider)
channels_config = await _load_channels_config(request, config)
request.app.state.channels_config = channels_config
stopped = await _sync_runtime_channel_after_removal(provider, channels_config)
stopped = await _sync_runtime_channel_after_removal(provider, candidate_channels_config)
if stopped is False:
display_name = _PROVIDER_META[provider]["display_name"]
raise HTTPException(status_code=400, detail=f"Failed to stop {display_name} channel. Try again.")
return _provider_response(config, channels_config, provider, _PROVIDER_META[provider])
# Revoke the DB connection rows before committing the store/cache so a repo
# failure cannot leave the store and cache saying "disconnected" while the
# DB still holds "connected" rows that a later re-configure would silently
# reactivate.
if repo is not None:
await repo.disconnect_provider_connections(provider=provider)
store = await _get_runtime_config_store(request)
await asyncio.to_thread(store.set_provider_disconnected, provider)
# Re-read the live cached config and drop only this provider so a concurrent
# mutation for a different provider is not clobbered. No await may occur
# between this read and the reassignment.
live_channels_config = await _get_channels_config(request)
live_channels_config.pop(provider, None)
request.app.state.channels_config = live_channels_config
return _provider_response(config, live_channels_config, provider, _PROVIDER_META[provider])
@router.post("/{provider}/connect", response_model=ChannelConnectResponse)
@@ -656,8 +672,8 @@ async def configure_channel_provider_runtime(
# cached by get_app_config().
runtime_config["bot_username"] = values["bot_username"]
channels_config[provider] = runtime_config
request.app.state.channels_config = channels_config
candidate_channels_config = dict(channels_config)
candidate_channels_config[provider] = runtime_config
started = await _restart_runtime_channel_if_available(provider, runtime_config)
if started is False:
@@ -667,4 +683,11 @@ async def configure_channel_provider_runtime(
store = await _get_runtime_config_store(request)
await asyncio.to_thread(store.set_provider_config, provider, runtime_config)
return _provider_response(config, channels_config, provider, _PROVIDER_META[provider])
# Re-read the live cached config and apply only this provider's change so a
# concurrent mutation for a different provider is not clobbered. No await
# may occur between this read and the reassignment.
live_channels_config = await _get_channels_config(request)
live_channels_config[provider] = runtime_config
request.app.state.channels_config = live_channels_config
return _provider_response(config, live_channels_config, provider, _PROVIDER_META[provider])
@@ -177,6 +177,24 @@ class ChannelConnectionRepository:
await session.commit()
return True
async def disconnect_provider_connections(self, *, provider: str) -> int:
"""Revoke all active user connections for an instance-wide provider removal."""
async with self.session_factory() as session:
result = await session.execute(
select(ChannelConnectionRow.id).where(
ChannelConnectionRow.provider == provider,
ChannelConnectionRow.status != "revoked",
)
)
connection_ids = [row_id for row_id in result.scalars()]
if not connection_ids:
return 0
await session.execute(update(ChannelConnectionRow).where(ChannelConnectionRow.id.in_(connection_ids)).values(status="revoked"))
await session.execute(delete(ChannelCredentialRow).where(ChannelCredentialRow.connection_id.in_(connection_ids)))
await session.commit()
return len(connection_ids)
async def store_credentials(
self,
connection_id: str,
@@ -294,6 +294,41 @@ def test_get_providers_reports_configured_channel_not_running(tmp_path, monkeypa
anyio.run(repo.close)
def test_get_providers_provider_unavailable_overrides_stale_connected_row(tmp_path):
import anyio
repo = anyio.run(_make_repo, tmp_path)
async def seed_connection():
await repo.upsert_connection(
owner_user_id=str(_user().id),
provider="slack",
external_account_id="U123",
status="connected",
)
anyio.run(seed_connection)
config = ChannelConnectionsConfig.model_validate(
{
"enabled": True,
"slack": {"enabled": True},
}
)
app = _make_app(config, repo, {})
with TestClient(app) as client:
response = client.get("/api/channels/providers")
assert response.status_code == 200
by_provider = {item["provider"]: item for item in response.json()["providers"]}
assert by_provider["slack"]["configured"] is False
assert by_provider["slack"]["connectable"] is False
assert by_provider["slack"]["connection_status"] == "not_connected"
assert "Slack credentials" in by_provider["slack"]["unavailable_reason"]
anyio.run(repo.close)
def test_get_providers_restarts_configured_channel_when_service_can_reconcile(tmp_path, monkeypatch):
import anyio
@@ -614,6 +649,54 @@ def test_runtime_config_endpoints_require_admin(tmp_path):
anyio.run(repo.close)
def test_configure_provider_runtime_rolls_back_visible_state_when_start_fails(tmp_path, monkeypatch):
import anyio
repo = anyio.run(_make_repo, tmp_path)
config = ChannelConnectionsConfig.model_validate(
{
"enabled": True,
"slack": {"enabled": True},
}
)
existing_runtime_config = {
"enabled": True,
"bot_token": "xoxb-old",
"app_token": "xapp-old",
}
runtime_config_store = ChannelRuntimeConfigStore(tmp_path / "channels" / "runtime-config.json")
runtime_config_store.set_provider_config("slack", existing_runtime_config)
service = SimpleNamespace(configure_channel=AsyncMock(return_value=False))
monkeypatch.setattr("app.channels.service.get_channel_service", lambda: service)
app = _make_app(
config,
repo,
{"slack": dict(existing_runtime_config)},
runtime_config_store=runtime_config_store,
)
with TestClient(app) as client:
response = client.post(
"/api/channels/slack/runtime-config",
json={"values": {"bot_token": "xoxb-new", "app_token": "xapp-new"}},
)
assert response.status_code == 400
assert "Failed to start Slack channel" in response.json()["detail"]
service.configure_channel.assert_awaited_once_with(
"slack",
{
"enabled": True,
"bot_token": "xoxb-new",
"app_token": "xapp-new",
},
)
assert app.state.channels_config["slack"] == existing_runtime_config
assert runtime_config_store.get_provider_config("slack") == existing_runtime_config
anyio.run(repo.close)
def test_configure_telegram_runtime_uses_new_bot_username_for_deep_link_without_mutating_config(tmp_path):
import anyio
@@ -862,7 +945,7 @@ def test_disconnect_provider_runtime_config_suppresses_file_config_and_stops_cha
anyio.run(repo.close)
def test_disconnect_provider_runtime_config_revokes_current_user_provider_connections(tmp_path):
def test_disconnect_provider_runtime_config_revokes_all_provider_connections(tmp_path):
import anyio
repo = anyio.run(_make_repo, tmp_path)
@@ -874,6 +957,18 @@ def test_disconnect_provider_runtime_config_revokes_current_user_provider_connec
external_account_id="U123",
status="connected",
)
await repo.upsert_connection(
owner_user_id="other-user",
provider="slack",
external_account_id="U456",
status="connected",
)
await repo.upsert_connection(
owner_user_id="other-user",
provider="telegram",
external_account_id="42",
status="connected",
)
anyio.run(seed_connection)
config = ChannelConnectionsConfig.model_validate(
@@ -895,10 +990,132 @@ def test_disconnect_provider_runtime_config_revokes_current_user_provider_connec
assert configure_response.status_code == 200
assert disconnect_response.status_code == 200
async def get_connection_status():
return (await repo.list_connections(str(_user().id)))[0]["status"]
async def get_connection_statuses():
return {
"admin_slack": (await repo.list_connections(str(_user().id)))[0]["status"],
"other": {item["provider"]: item["status"] for item in await repo.list_connections("other-user")},
}
assert anyio.run(get_connection_status) == "revoked"
statuses = anyio.run(get_connection_statuses)
assert statuses["admin_slack"] == "revoked"
assert statuses["other"]["slack"] == "revoked"
assert statuses["other"]["telegram"] == "connected"
anyio.run(repo.close)
def test_get_providers_preserves_revoked_status_when_provider_unavailable(tmp_path):
import anyio
repo = anyio.run(_make_repo, tmp_path)
async def seed_connection():
await repo.upsert_connection(
owner_user_id=str(_user().id),
provider="slack",
external_account_id="U123",
status="revoked",
)
anyio.run(seed_connection)
config = ChannelConnectionsConfig.model_validate(
{
"enabled": True,
"slack": {"enabled": True},
}
)
# No runtime channels_config -> the slack provider is unavailable.
app = _make_app(config, repo, {})
with TestClient(app) as client:
response = client.get("/api/channels/providers")
assert response.status_code == 200
by_provider = {item["provider"]: item for item in response.json()["providers"]}
assert by_provider["slack"]["connectable"] is False
assert by_provider["slack"]["unavailable_reason"] is not None
# A revoked binding must stay distinguishable from a never-connected one,
# even when the runtime provider is currently unavailable.
assert by_provider["slack"]["connection_status"] == "revoked"
anyio.run(repo.close)
def test_configure_provider_runtime_does_not_clobber_concurrent_config_update(tmp_path, monkeypatch):
import anyio
repo = anyio.run(_make_repo, tmp_path)
config = ChannelConnectionsConfig.model_validate(
{
"enabled": True,
"slack": {"enabled": True},
"telegram": {"enabled": True, "bot_username": "deerflow_bot"},
}
)
runtime_config_store = ChannelRuntimeConfigStore(tmp_path / "channels" / "runtime-config.json")
app = _make_app(config, repo, {}, runtime_config_store=runtime_config_store)
async def configure_channel(provider, runtime_config):
# Simulate a concurrent admin request for a *different* provider whose
# write to app.state lands while this request awaits the worker restart.
app.state.channels_config = {
**app.state.channels_config,
"telegram": {"enabled": True, "bot_token": "tg-token"},
}
return True
service = SimpleNamespace(configure_channel=configure_channel)
monkeypatch.setattr("app.channels.service.get_channel_service", lambda: service)
with TestClient(app) as client:
response = client.post(
"/api/channels/slack/runtime-config",
json={"values": {"bot_token": "xoxb-ui", "app_token": "xapp-ui"}},
)
assert response.status_code == 200
# The concurrent telegram write must survive alongside the slack write.
assert app.state.channels_config["slack"]["bot_token"] == "xoxb-ui"
assert app.state.channels_config["telegram"]["bot_token"] == "tg-token"
anyio.run(repo.close)
def test_disconnect_provider_runtime_keeps_state_consistent_when_revoke_fails(tmp_path):
import anyio
repo = anyio.run(_make_repo, tmp_path)
config = ChannelConnectionsConfig.model_validate(
{
"enabled": True,
"slack": {"enabled": True},
}
)
runtime_config_store = ChannelRuntimeConfigStore(tmp_path / "channels" / "runtime-config.json")
app = _make_app(config, repo, {}, runtime_config_store=runtime_config_store)
with TestClient(app) as client:
configure_response = client.post(
"/api/channels/slack/runtime-config",
json={"values": {"bot_token": "xoxb-ui", "app_token": "xapp-ui"}},
)
assert configure_response.status_code == 200
repo.disconnect_provider_connections = AsyncMock(side_effect=RuntimeError("db down"))
with TestClient(app, raise_server_exceptions=False) as client:
disconnect_response = client.delete("/api/channels/slack/runtime-config")
assert disconnect_response.status_code == 500
# When the DB revoke fails, the store/cache must not be left diverged from
# the DB: the provider stays configured so a later re-configure cannot
# silently reactivate un-revoked connection rows.
assert app.state.channels_config["slack"]["bot_token"] == "xoxb-ui"
assert runtime_config_store.get_provider_config("slack") == {
"enabled": True,
"bot_token": "xoxb-ui",
"app_token": "xapp-ui",
}
anyio.run(repo.close)
@@ -117,7 +117,9 @@ export function WorkspaceChannelsList() {
<SidebarMenu>
{visibleProviders.map((provider) => {
const canEditRuntimeConfig = providerCanEditRuntimeConfig(provider);
const isConnected = provider.connection_status === "connected";
const isConnected =
!provider.unavailable_reason &&
provider.connection_status === "connected";
const isPending =
(connectMutation.isPending &&
connectMutation.variables === provider.provider) ||
@@ -117,9 +117,11 @@ function ChannelProviderItem({
const configureMutation = useConfigureChannelProvider();
const disconnectProviderMutation = useDisconnectChannelProvider();
const [setupOpen, setSetupOpen] = useState(false);
const runtimeAvailable = provider.configured && !provider.unavailable_reason;
const isConnected =
connection?.status === "connected" ||
provider.connection_status === "connected";
runtimeAvailable &&
(connection?.status === "connected" ||
provider.connection_status === "connected");
const canEditRuntimeConfig = providerCanEditRuntimeConfig(provider);
const canConnect =
(provider.connectable ?? (provider.enabled && provider.configured)) &&
@@ -186,7 +188,7 @@ function ChannelProviderItem({
</ItemTitle>
<ItemDescription className="line-clamp-none">
{getProviderDescription(provider, t.channels.descriptions)}
{connectionLabel
{isConnected && connectionLabel
? ` ${t.channels.connectedAs(connectionLabel)}`
: ""}
{!isConnected && provider.unavailable_reason