Files
deer-flow/backend/packages/harness/deerflow/runtime/runs/store/base.py
T
AochenShen99 66d6a6a4e8 fix: harden run finalization persistence (#3155)
* fix: harden run finalization persistence

* style: format gateway recovery test

* fix: align run repository return types

* fix: harden completion recovery follow-up
2026-05-23 00:09:06 +08:00

143 lines
4.0 KiB
Python

"""Abstract interface for run metadata storage.
RunManager depends on this interface. Implementations:
- MemoryRunStore: in-memory dict (development, tests)
- Future: RunRepository backed by SQLAlchemy ORM
All methods accept an optional user_id for user isolation.
When user_id is None, no user filtering is applied (single-user mode).
"""
from __future__ import annotations
import abc
from typing import Any
class RunStore(abc.ABC):
@abc.abstractmethod
async def put(
self,
run_id: str,
*,
thread_id: str,
assistant_id: str | None = None,
user_id: str | None = None,
model_name: str | None = None,
status: str = "pending",
multitask_strategy: str = "reject",
metadata: dict[str, Any] | None = None,
kwargs: dict[str, Any] | None = None,
error: str | None = None,
created_at: str | None = None,
) -> None:
pass
@abc.abstractmethod
async def get(
self,
run_id: str,
*,
user_id: str | None = None,
) -> dict[str, Any] | None:
pass
@abc.abstractmethod
async def list_by_thread(
self,
thread_id: str,
*,
user_id: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
pass
@abc.abstractmethod
async def update_status(
self,
run_id: str,
status: str,
*,
error: str | None = None,
) -> bool | None:
"""Update a run status.
Returns ``False`` when the store can prove no row was updated. Older or
lightweight stores may return ``None`` when they cannot report rowcount.
"""
pass
@abc.abstractmethod
async def delete(self, run_id: str) -> None:
pass
@abc.abstractmethod
async def update_model_name(
self,
run_id: str,
model_name: str | None,
) -> None:
"""Update the model_name field for an existing run."""
pass
@abc.abstractmethod
async def update_run_completion(
self,
run_id: str,
*,
status: str,
total_input_tokens: int = 0,
total_output_tokens: int = 0,
total_tokens: int = 0,
llm_call_count: int = 0,
lead_agent_tokens: int = 0,
subagent_tokens: int = 0,
middleware_tokens: int = 0,
message_count: int = 0,
last_ai_message: str | None = None,
first_human_message: str | None = None,
error: str | None = None,
) -> bool | None:
"""Persist final completion fields.
Returns ``False`` when the store can prove no row was updated.
"""
pass
async def update_run_progress(
self,
run_id: str,
*,
total_input_tokens: int | None = None,
total_output_tokens: int | None = None,
total_tokens: int | None = None,
llm_call_count: int | None = None,
lead_agent_tokens: int | None = None,
subagent_tokens: int | None = None,
middleware_tokens: int | None = None,
message_count: int | None = None,
last_ai_message: str | None = None,
first_human_message: str | None = None,
) -> None:
"""Persist a best-effort running snapshot without changing run status."""
return None
@abc.abstractmethod
async def list_pending(self, *, before: str | None = None) -> list[dict[str, Any]]:
pass
@abc.abstractmethod
async def list_inflight(self, *, before: str | None = None) -> list[dict[str, Any]]:
"""Return persisted runs that are still ``pending`` or ``running``."""
pass
@abc.abstractmethod
async def aggregate_tokens_by_thread(self, thread_id: str, *, include_active: bool = False) -> dict[str, Any]:
"""Aggregate token usage for completed runs in a thread.
Returns a dict with keys: total_tokens, total_input_tokens,
total_output_tokens, total_runs, by_model (model_name → {tokens, runs}),
by_caller ({lead_agent, subagent, middleware}).
"""
pass