diff --git a/backend/docs/Storage_Design.md b/backend/docs/Storage_Design.md new file mode 100644 index 000000000..b56ea4f80 --- /dev/null +++ b/backend/docs/Storage_Design.md @@ -0,0 +1,401 @@ +# Storage Package Design + +## Background + +DeerFlow currently has several persistence responsibilities spread across app, gateway, runtime, and legacy persistence modules. This makes the persistence boundary difficult to reason about and creates several migration risks: + +- Routers and runtime services can accidentally depend on concrete persistence implementations instead of stable contracts. +- User/auth, run metadata, thread metadata, feedback, run events, and checkpointer setup are initialized through different paths. +- Some persistence behavior is duplicated between memory, SQLite, and PostgreSQL-oriented code paths. +- Incremental migration is hard because app-level code and storage-level code are coupled. +- Adding or validating another SQL backend requires touching app/runtime code instead of a storage-owned package. + +The storage package is introduced to make application data persistence a package-level capability with explicit contracts, a clear boundary, and SQL backend compatibility. + +## Goals + +- Provide a standalone `packages/storage` package for durable application data. +- Support SQLite, PostgreSQL, and MySQL through a shared persistence construction flow. +- Keep LangGraph checkpointer initialization compatible with the same database backend. +- Expose repository contracts as the only package-level data access boundary. +- Let the app layer depend on app-owned adapters under `app.infra.storage`, not on storage DB implementation classes. +- Allow the app/gateway migration to happen in small steps without forcing a large rewrite. + +## Non-Goals + +- This design does not remove legacy persistence in the first PR. +- This design does not move routers directly onto storage package models. +- This design does not make app routers own SQLAlchemy sessions. +- Cron persistence is intentionally out of scope for the storage package foundation. +- Memory backend is not part of the durable storage package. Memory compatibility, if still needed by app runtime, belongs outside `packages/storage`. + +## Storage Design Principles + +### Package-Owned Durable Storage + +`packages/storage` owns durable application data persistence. It defines: + +- configuration shape for storage-backed persistence +- SQLAlchemy models +- repository contracts and DTOs +- SQL repository implementations +- persistence factory functions +- compatibility helpers for config-driven initialization + +The package should be usable without importing `app.gateway`, routers, auth providers, or runtime-specific gateway objects. + +### SQL Backend Compatibility + +The package supports three SQL backends: + +- SQLite for local/single-node deployments +- PostgreSQL for production multi-node deployments +- MySQL for deployments that standardize on MySQL + +Backend-specific differences are handled inside the storage package: + +- SQLAlchemy async engine URL construction +- LangGraph checkpointer connection-string compatibility +- JSON metadata filtering across SQLite/PostgreSQL/MySQL +- SQL dialect behavior around locking, aggregation, and JSON type semantics + +### Unified Persistence Bundle + +Storage initialization returns an `AppPersistence` bundle: + +```python +@dataclass(slots=True) +class AppPersistence: + checkpointer: Checkpointer + engine: AsyncEngine + session_factory: async_sessionmaker[AsyncSession] + setup: Callable[[], Awaitable[None]] + aclose: Callable[[], Awaitable[None]] +``` + +The app runtime can initialize persistence once, call `setup()`, and then inject: + +- `checkpointer` +- `session_factory` +- repository adapters + +This keeps checkpointer and application data aligned to the same backend without requiring routers to understand database configuration. + +## Package Layout + +```text +backend/packages/storage/ + store/ + config/ + storage_config.py + app_config.py + persistence/ + factory.py + types.py + base_model.py + json_compat.py + drivers/ + sqlite.py + postgres.py + mysql.py + repositories/ + contracts/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + models/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + db/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + factory.py +``` + +## Persistence Construction + +The primary storage entrypoint is: + +```python +from store.persistence import create_persistence_from_storage_config + +persistence = await create_persistence_from_storage_config(storage_config) +await persistence.setup() +``` + +For app-level compatibility with existing database config shape: + +```python +from store.persistence import create_persistence_from_database_config + +persistence = await create_persistence_from_database_config(config.database) +await persistence.setup() +``` + +Expected app startup flow: + +```python +persistence = await create_persistence_from_database_config(config.database) +await persistence.setup() + +app.state.persistence = persistence +app.state.checkpointer = persistence.checkpointer +app.state.session_factory = persistence.session_factory +``` + +Expected app shutdown flow: + +```python +await app.state.persistence.aclose() +``` + +## Repository Contract Design + +Repository contracts are the storage package's public data access boundary. They live under `store.repositories.contracts` and are re-exported from `store.repositories`. + +The key contract groups are: + +- `UserRepositoryProtocol` +- `RunRepositoryProtocol` +- `ThreadMetaRepositoryProtocol` +- `FeedbackRepositoryProtocol` +- `RunEventRepositoryProtocol` + +Each contract owns: + +- input DTOs, such as `UserCreate`, `RunCreate`, `ThreadMetaCreate` +- output DTOs, such as `User`, `Run`, `ThreadMeta` +- repository protocol methods +- domain-specific exceptions when needed, such as `InvalidMetadataFilterError` + +Repository construction is session-based: + +```python +from store.repositories import build_run_repository + +async with persistence.session_factory() as session: + repo = build_run_repository(session) + run = await repo.get_run(run_id) +``` + +This keeps transaction ownership explicit. The storage package does not hide commits or session lifecycle inside global singletons. + +## App/Infra Calling Contract + +The app layer should not call `store.repositories.db.*` directly. The intended app boundary is `app.infra.storage`. + +`app.infra.storage` is responsible for: + +- receiving `session_factory` from FastAPI runtime initialization +- owning session lifecycle for app-facing repository methods +- translating storage DTOs to app/gateway DTOs only when needed +- preserving the existing app-facing names during migration +- depending on storage repository protocols, not concrete DB classes + +Expected adapter pattern: + +```python +class StorageRunRepository(RunRepositoryProtocol): + def __init__(self, session_factory): + self._session_factory = session_factory + + async def get_run(self, run_id: str): + async with self._session_factory() as session: + repo = build_run_repository(session) + return await repo.get_run(run_id) +``` + +For gateway compatibility, app state can keep existing names while the implementation changes: + +```python +app.state.run_store = StorageRunStore(run_repository) +app.state.feedback_repo = StorageFeedbackStore(feedback_repository) +app.state.thread_store = StorageThreadMetaStore(thread_meta_repository) +app.state.run_event_store = StorageRunEventStore(run_event_repository) +app.state.checkpointer = persistence.checkpointer +app.state.session_factory = persistence.session_factory +``` + +The app-facing objects may expose legacy method names during migration, but their internal data access should go through storage contracts. + +## Boundary Rules + +### Allowed Calls + +Storage package callers may use: + +```python +from store.persistence import create_persistence_from_database_config +from store.persistence import create_persistence_from_storage_config +from store.repositories import build_run_repository +from store.repositories import build_user_repository +from store.repositories import build_thread_meta_repository +from store.repositories import build_feedback_repository +from store.repositories import build_run_event_repository +from store.repositories import RunRepositoryProtocol +from store.repositories import UserRepositoryProtocol +``` + +App layer callers should use: + +```python +from app.infra.storage import StorageRunRepository +from app.infra.storage import StorageUserDataRepository +from app.infra.storage import StorageThreadMetaRepository +from app.infra.storage import StorageFeedbackRepository +from app.infra.storage import StorageRunEventRepository +``` + +### Prohibited Calls + +App/gateway/router/auth code must not import: + +```python +from store.repositories.db import DbRunRepository +from store.repositories.models import Run +from store.persistence.base_model import MappedBase +``` + +Routers must not: + +- create SQLAlchemy engines +- create SQLAlchemy sessions directly +- call storage DB repository classes directly +- commit/rollback storage transactions directly unless explicitly scoped by an infra adapter +- depend on storage SQLAlchemy model classes + +Storage package code must not import: + +```python +import app.gateway +import app.infra +import deerflow.runtime +``` + +The dependency direction is: + +```text +app/gateway -> app.infra.storage -> packages/storage contracts/factories -> packages/storage db implementations +``` + +The reverse direction is forbidden. + +## Checkpointer Compatibility + +The storage persistence bundle initializes the LangGraph checkpointer alongside application data persistence. + +Backend-specific notes: + +- SQLite uses `langgraph-checkpoint-sqlite`. +- PostgreSQL uses `langgraph-checkpoint-postgres` and requires a string `postgresql://...` connection URL. +- MySQL uses `langgraph-checkpoint-mysql` and requires a string MySQL connection URL. + +SQLAlchemy may use async driver URLs such as `postgresql+asyncpg://...` or `mysql+aiomysql://...`, but LangGraph checkpointer constructors expect plain string connection URLs. This conversion belongs inside the storage driver implementation. + +## JSON Metadata Filtering + +Thread metadata search supports dialect-aware JSON filtering through `store.persistence.json_compat`. + +The matcher supports: + +- `None` +- `bool` +- `int` +- `float` +- `str` + +It rejects: + +- unsafe keys +- nested JSON path expressions +- dict/list values +- integers outside signed 64-bit range + +This prevents SQL/JSON path injection, avoids compiled-cache type drift, and preserves type semantics such as `True != 1` and explicit JSON `null` not matching a missing key. + +## Step-by-Step Implementation Plan + +### Step 1: Introduce Storage Package Foundation + +- Add `backend/packages/storage`. +- Add storage config models. +- Add `AppPersistence`. +- Add SQLite/PostgreSQL/MySQL persistence drivers. +- Add repository contracts, models, DB implementations, and factory helpers. +- Add package dependency wiring. +- Exclude cron persistence. + +### Step 2: Harden Storage Backend Compatibility + +- Validate SQLite setup and repository behavior. +- Validate PostgreSQL and MySQL with local E2E tests. +- Fix checkpointer connection-string compatibility. +- Fix PostgreSQL locking and aggregation differences. +- Add dialect-aware JSON metadata filtering. + +### Step 3: Add App Infra Adapters + +- Add `backend/app/infra/storage`. +- Implement app-facing repositories that own session lifecycle. +- Keep storage contracts as the only data access boundary. +- Add legacy compatibility adapters for existing app/gateway method shapes. +- Keep app/gateway imports out of `packages/storage`. + +### Step 4: Switch FastAPI Runtime Injection + +- Initialize storage persistence in FastAPI startup/lifespan. +- Attach `persistence`, `checkpointer`, and `session_factory` to `app.state`. +- Preserve existing external state names: + - `run_store` + - `feedback_repo` + - `thread_store` + - `run_event_store` + - `checkpointer` + - `session_factory` +- Start with user/auth provider construction, then migrate run/thread/feedback/run_event. + +### Step 5: Router and Auth Compatibility + +- Ensure routers consume app-facing adapters, not storage DB classes. +- Ensure auth providers depend on user repository contracts. +- Keep router response shapes unchanged. +- Add focused auth/admin/router regression tests. + +### Step 6: Cleanup Legacy Persistence + +- Compare old persistence usage after app/gateway migration. +- Remove unused old repository implementations only after all call sites move. +- Keep compatibility shims only where needed for a transition window. +- Delete memory backend paths from storage-owned durable persistence. + +## Testing Strategy + +Unit tests should cover: + +- config parsing +- persistence setup +- table creation +- repository CRUD/query behavior +- typed JSON metadata filtering +- dialect SQL compilation +- cron exclusion + +E2E tests should cover: + +- SQLite persistence setup +- PostgreSQL temporary database setup +- MySQL temporary database setup +- repository contract behavior across all supported SQL backends +- JSON/Unicode round trip +- rollback behavior +- persistence close/cleanup + +E2E tests may remain local-only if CI does not provide PostgreSQL/MySQL services. diff --git a/backend/docs/Storage_Design_ZH.md b/backend/docs/Storage_Design_ZH.md new file mode 100644 index 000000000..04096612b --- /dev/null +++ b/backend/docs/Storage_Design_ZH.md @@ -0,0 +1,401 @@ +# Storage Package 设计文档 + +## 背景 + +DeerFlow 当前有多类持久化职责分散在 app、gateway、runtime 和旧 persistence 模块中。这会带来几个问题: + +- routers 和 runtime services 容易依赖具体 persistence 实现,而不是稳定契约。 +- user/auth、run metadata、thread metadata、feedback、run events、checkpointer setup 的初始化路径不统一。 +- memory、SQLite、PostgreSQL 相关路径中存在部分重复逻辑。 +- app 层代码和 storage 层代码耦合,导致增量迁移困难。 +- 增加或验证新的 SQL backend 时,需要改动 app/runtime,而不是只改 storage package。 + +引入 storage package 的目标,是把应用数据持久化抽象成 package 级能力,并提供明确契约、清晰边界和 SQL backend 兼容性。 + +## 目标 + +- 新增独立的 `packages/storage`,负责 durable application data。 +- 通过统一 persistence 构造流程支持 SQLite、PostgreSQL、MySQL。 +- 保持 LangGraph checkpointer 与同一个数据库 backend 兼容。 +- 将 repository contracts 作为 package 对外唯一数据访问边界。 +- app 层通过 `app.infra.storage` 适配 storage,而不是直接依赖 storage DB 实现类。 +- 支持 app/gateway 后续小步迁移,避免一次性大重构。 + +## 非目标 + +- 第一阶段不删除旧 persistence。 +- 不让 routers 直接依赖 storage package models。 +- 不让 app routers 管理 SQLAlchemy sessions。 +- cron persistence 不属于 storage package 基础迁移范围。 +- memory backend 不属于 durable storage package。若 app runtime 仍需要 memory 兼容,应放在 `packages/storage` 之外。 + +## Storage 设计理念 + +### Package 自己负责 Durable Storage + +`packages/storage` 负责应用数据的 durable persistence,包括: + +- storage 持久化配置 +- SQLAlchemy models +- repository contracts 和 DTOs +- SQL repository 实现 +- persistence factory functions +- 面向现有 config 的兼容初始化入口 + +该 package 不应该 import `app.gateway`、routers、auth providers 或 runtime 中的 gateway 对象。 + +### SQL Backend 兼容 + +该 package 支持三种 SQL backend: + +- SQLite:本地或单节点部署 +- PostgreSQL:生产多节点部署 +- MySQL:使用 MySQL 作为标准数据库的部署 + +backend 差异在 storage package 内部处理: + +- SQLAlchemy async engine URL 构造 +- LangGraph checkpointer 连接串兼容 +- SQLite/PostgreSQL/MySQL 的 JSON metadata filter +- 不同 SQL 方言在 locking、aggregation、JSON 类型语义上的差异 + +### 统一 Persistence Bundle + +Storage 初始化返回 `AppPersistence` bundle: + +```python +@dataclass(slots=True) +class AppPersistence: + checkpointer: Checkpointer + engine: AsyncEngine + session_factory: async_sessionmaker[AsyncSession] + setup: Callable[[], Awaitable[None]] + aclose: Callable[[], Awaitable[None]] +``` + +app runtime 只需要初始化一次 persistence,调用 `setup()`,然后注入: + +- `checkpointer` +- `session_factory` +- repository adapters + +这样 checkpointer 和应用数据可以对齐到同一个 backend,同时 routers 不需要理解数据库配置。 + +## Package 结构 + +```text +backend/packages/storage/ + store/ + config/ + storage_config.py + app_config.py + persistence/ + factory.py + types.py + base_model.py + json_compat.py + drivers/ + sqlite.py + postgres.py + mysql.py + repositories/ + contracts/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + models/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + db/ + user.py + run.py + thread_meta.py + feedback.py + run_event.py + factory.py +``` + +## Persistence 构造 + +storage 的主要入口: + +```python +from store.persistence import create_persistence_from_storage_config + +persistence = await create_persistence_from_storage_config(storage_config) +await persistence.setup() +``` + +为了兼容现有 app database config,也提供: + +```python +from store.persistence import create_persistence_from_database_config + +persistence = await create_persistence_from_database_config(config.database) +await persistence.setup() +``` + +预期 app startup 流程: + +```python +persistence = await create_persistence_from_database_config(config.database) +await persistence.setup() + +app.state.persistence = persistence +app.state.checkpointer = persistence.checkpointer +app.state.session_factory = persistence.session_factory +``` + +预期 app shutdown 流程: + +```python +await app.state.persistence.aclose() +``` + +## Repository 契约设计 + +Repository contracts 是 storage package 对外公开的数据访问边界。它们位于 `store.repositories.contracts`,并通过 `store.repositories` re-export。 + +主要契约包括: + +- `UserRepositoryProtocol` +- `RunRepositoryProtocol` +- `ThreadMetaRepositoryProtocol` +- `FeedbackRepositoryProtocol` +- `RunEventRepositoryProtocol` + +每组契约包含: + +- 输入 DTO,例如 `UserCreate`、`RunCreate`、`ThreadMetaCreate` +- 输出 DTO,例如 `User`、`Run`、`ThreadMeta` +- repository protocol methods +- 必要的领域异常,例如 `InvalidMetadataFilterError` + +Repository 通过 session 构造: + +```python +from store.repositories import build_run_repository + +async with persistence.session_factory() as session: + repo = build_run_repository(session) + run = await repo.get_run(run_id) +``` + +这样可以让 transaction ownership 保持明确。storage package 不通过全局 singleton 隐式隐藏 commit 或 session 生命周期。 + +## App/Infra 调用契约 + +app 层不应该直接调用 `store.repositories.db.*`。预期的 app 边界是 `app.infra.storage`。 + +`app.infra.storage` 负责: + +- 从 FastAPI runtime 初始化中接收 `session_factory` +- 为 app-facing repository methods 管理 session 生命周期 +- 在必要时将 storage DTOs 转成 app/gateway DTOs +- 迁移期间保留现有 app-facing 名称 +- 依赖 storage repository protocols,而不是具体 DB classes + +预期 adapter 模式: + +```python +class StorageRunRepository(RunRepositoryProtocol): + def __init__(self, session_factory): + self._session_factory = session_factory + + async def get_run(self, run_id: str): + async with self._session_factory() as session: + repo = build_run_repository(session) + return await repo.get_run(run_id) +``` + +为了兼容 gateway,app state 可以暂时保持现有名字,只替换内部实现: + +```python +app.state.run_store = StorageRunStore(run_repository) +app.state.feedback_repo = StorageFeedbackStore(feedback_repository) +app.state.thread_store = StorageThreadMetaStore(thread_meta_repository) +app.state.run_event_store = StorageRunEventStore(run_event_repository) +app.state.checkpointer = persistence.checkpointer +app.state.session_factory = persistence.session_factory +``` + +app-facing objects 可以在迁移期间保留旧方法名,但内部数据访问必须经过 storage contracts。 + +## 边界规则 + +### 允许调用的范围 + +storage package 调用方可以使用: + +```python +from store.persistence import create_persistence_from_database_config +from store.persistence import create_persistence_from_storage_config +from store.repositories import build_run_repository +from store.repositories import build_user_repository +from store.repositories import build_thread_meta_repository +from store.repositories import build_feedback_repository +from store.repositories import build_run_event_repository +from store.repositories import RunRepositoryProtocol +from store.repositories import UserRepositoryProtocol +``` + +app 层应该使用: + +```python +from app.infra.storage import StorageRunRepository +from app.infra.storage import StorageUserDataRepository +from app.infra.storage import StorageThreadMetaRepository +from app.infra.storage import StorageFeedbackRepository +from app.infra.storage import StorageRunEventRepository +``` + +### 禁止调用的范围 + +app/gateway/router/auth 代码不应该 import: + +```python +from store.repositories.db import DbRunRepository +from store.repositories.models import Run +from store.persistence.base_model import MappedBase +``` + +routers 禁止: + +- 创建 SQLAlchemy engines +- 直接创建 SQLAlchemy sessions +- 直接调用 storage DB repository classes +- 直接 commit/rollback storage transactions,除非这是 infra adapter 明确管理的范围 +- 依赖 storage SQLAlchemy model classes + +storage package 禁止 import: + +```python +import app.gateway +import app.infra +import deerflow.runtime +``` + +依赖方向必须是: + +```text +app/gateway -> app.infra.storage -> packages/storage contracts/factories -> packages/storage db implementations +``` + +禁止反向依赖。 + +## Checkpointer 兼容 + +storage persistence bundle 会同时初始化 LangGraph checkpointer 和应用数据持久化。 + +backend 说明: + +- SQLite 使用 `langgraph-checkpoint-sqlite`。 +- PostgreSQL 使用 `langgraph-checkpoint-postgres`,需要字符串形式的 `postgresql://...` 连接串。 +- MySQL 使用 `langgraph-checkpoint-mysql`,需要字符串形式的 MySQL 连接串。 + +SQLAlchemy 可以使用 `postgresql+asyncpg://...` 或 `mysql+aiomysql://...` 这类 async driver URL,但 LangGraph checkpointer 构造函数需要普通字符串连接串。这个转换应该封装在 storage driver implementation 内部。 + +## JSON Metadata Filtering + +Thread metadata search 通过 `store.persistence.json_compat` 支持跨方言 JSON filtering。 + +支持的 filter value 类型: + +- `None` +- `bool` +- `int` +- `float` +- `str` + +拒绝: + +- unsafe keys +- nested JSON path expressions +- dict/list values +- 超出 signed 64-bit 范围的整数 + +这样可以避免 SQL/JSON path injection,避免 compiled-cache 类型漂移,并保留类型语义,例如 `True != 1`,显式 JSON `null` 不等于 missing key。 + +## 分步实现方案 + +### 第 1 步:新增 Storage Package 基础 + +- 新增 `backend/packages/storage`。 +- 增加 storage config models。 +- 增加 `AppPersistence`。 +- 增加 SQLite/PostgreSQL/MySQL persistence drivers。 +- 增加 repository contracts、models、DB implementations 和 factory helpers。 +- 接入 package dependency。 +- 排除 cron persistence。 + +### 第 2 步:补齐 Storage Backend 兼容性 + +- 验证 SQLite setup 和 repository 行为。 +- 使用本地 E2E 验证 PostgreSQL 和 MySQL。 +- 修复 checkpointer 连接串兼容。 +- 修复 PostgreSQL locking 和 aggregation 差异。 +- 增加跨方言 JSON metadata filtering。 + +### 第 3 步:新增 App Infra Adapters + +- 新增 `backend/app/infra/storage`。 +- 实现 app-facing repositories,由它们管理 session 生命周期。 +- 保持 storage contracts 作为唯一数据访问边界。 +- 为现有 app/gateway method shape 增加兼容 adapters。 +- 避免 `packages/storage` import app/gateway。 + +### 第 4 步:切换 FastAPI Runtime 注入 + +- 在 FastAPI startup/lifespan 中初始化 storage persistence。 +- 将 `persistence`、`checkpointer`、`session_factory` 注入 `app.state`。 +- 暂时保留现有对外 state 名称: + - `run_store` + - `feedback_repo` + - `thread_store` + - `run_event_store` + - `checkpointer` + - `session_factory` +- 先切 user/auth provider 构造,再逐步迁移 run/thread/feedback/run_event。 + +### 第 5 步:Router 和 Auth 兼容 + +- 确保 routers 消费 app-facing adapters,而不是 storage DB classes。 +- 确保 auth providers 依赖 user repository contracts。 +- 保持 router response shapes 不变。 +- 增加 auth/admin/router regression tests。 + +### 第 6 步:清理旧 Persistence + +- app/gateway 迁移完成后,再比较旧 persistence usage。 +- 所有 call sites 迁移完成后,再删除未使用的旧 repository implementations。 +- 只在必要时保留短期 compatibility shims。 +- 从 storage-owned durable persistence 中移除 memory backend 路径。 + +## 测试策略 + +单测应覆盖: + +- config parsing +- persistence setup +- table creation +- repository CRUD/query behavior +- typed JSON metadata filtering +- dialect SQL compilation +- cron exclusion + +E2E 应覆盖: + +- SQLite persistence setup +- PostgreSQL temporary database setup +- MySQL temporary database setup +- 所有支持 SQL backend 下的 repository contract 行为 +- JSON/Unicode round trip +- rollback behavior +- persistence close/cleanup + +如果 CI 暂时没有 PostgreSQL/MySQL services,E2E 可以先作为 local-only 验证保留。