refactor(persistence): unify SQLite to single deerflow.db and move checkpointer to runtime

Merge checkpoints.db and app.db into a single deerflow.db file (WAL mode
handles concurrent access safely). Move checkpointer module from
agents/checkpointer to runtime/checkpointer to better reflect its role
as a runtime infrastructure concern.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-10 10:08:23 +08:00
parent 9197000690
commit 03952eca53
17 changed files with 54 additions and 43 deletions
@@ -1,4 +1,3 @@
from .checkpointer import get_checkpointer, make_checkpointer, reset_checkpointer
from .factory import create_deerflow_agent
from .features import Next, Prev, RuntimeFeatures
from .lead_agent import make_lead_agent
@@ -18,7 +17,4 @@ __all__ = [
"make_lead_agent",
"SandboxState",
"ThreadState",
"get_checkpointer",
"reset_checkpointer",
"make_checkpointer",
]
+1 -1
View File
@@ -240,7 +240,7 @@ class DeerFlowClient:
}
checkpointer = self._checkpointer
if checkpointer is None:
from deerflow.agents.checkpointer import get_checkpointer
from deerflow.runtime.checkpointer import get_checkpointer
checkpointer = get_checkpointer()
if checkpointer is not None:
@@ -4,8 +4,10 @@ Controls BOTH the LangGraph checkpointer and the DeerFlow application
persistence layer (runs, threads metadata, users, etc.). The user
configures one backend; the system handles physical separation details.
SQLite mode: checkpointer and app use different .db files in the same
directory to avoid write-lock contention. This is automatic.
SQLite mode: checkpointer and app share a single .db file
({sqlite_dir}/deerflow.db) with WAL journal mode enabled on every
connection. WAL allows concurrent readers and a single writer without
blocking, making a unified file safe for both workloads.
Postgres mode: both use the same database URL but maintain independent
connection pools with different lifecycles.
@@ -40,7 +42,7 @@ class DatabaseConfig(BaseModel):
)
sqlite_dir: str = Field(
default=".deer-flow/data",
description=("Directory for SQLite database files. Checkpointer uses {sqlite_dir}/checkpoints.db, application data uses {sqlite_dir}/app.db."),
description=("Directory for the SQLite database file. Both checkpointer and application data share {sqlite_dir}/deerflow.db."),
)
postgres_url: str = Field(
default="",
@@ -69,21 +71,27 @@ class DatabaseConfig(BaseModel):
return str(Path(self.sqlite_dir).resolve())
@property
def sqlite_path(self) -> str:
"""Unified SQLite file path shared by checkpointer and app."""
return os.path.join(self._resolved_sqlite_dir, "deerflow.db")
# Backward-compatible aliases
@property
def checkpointer_sqlite_path(self) -> str:
"""SQLite file path for the LangGraph checkpointer."""
return os.path.join(self._resolved_sqlite_dir, "checkpoints.db")
"""SQLite file path for the LangGraph checkpointer (alias for sqlite_path)."""
return self.sqlite_path
@property
def app_sqlite_path(self) -> str:
"""SQLite file path for application ORM data."""
return os.path.join(self._resolved_sqlite_dir, "app.db")
"""SQLite file path for application ORM data (alias for sqlite_path)."""
return self.sqlite_path
@property
def app_sqlalchemy_url(self) -> str:
"""SQLAlchemy async URL for the application ORM engine."""
if self.backend == "sqlite":
return f"sqlite+aiosqlite:///{self.app_sqlite_path}"
return f"sqlite+aiosqlite:///{self.sqlite_path}"
if self.backend == "postgres":
url = self.postgres_url
if url.startswith("postgresql://"):
@@ -2,7 +2,7 @@
script_location = %(here)s
# Default URL for offline mode / autogenerate.
# Runtime uses engine from DeerFlow config.
sqlalchemy.url = sqlite+aiosqlite:///./data/app.db
sqlalchemy.url = sqlite+aiosqlite:///./data/deerflow.db
[loggers]
keys = root,sqlalchemy,alembic
@@ -5,12 +5,18 @@ Re-exports the public API of :mod:`~deerflow.runtime.runs` and
directly from ``deerflow.runtime``.
"""
from .checkpointer import checkpointer_context, get_checkpointer, make_checkpointer, reset_checkpointer
from .runs import ConflictError, DisconnectMode, RunContext, RunManager, RunRecord, RunStatus, UnsupportedStrategyError, run_agent
from .serialization import serialize, serialize_channel_values, serialize_lc_object, serialize_messages_tuple
from .store import get_store, make_store, reset_store, store_context
from .stream_bridge import END_SENTINEL, HEARTBEAT_SENTINEL, MemoryStreamBridge, StreamBridge, StreamEvent, make_stream_bridge
__all__ = [
# checkpointer
"checkpointer_context",
"get_checkpointer",
"make_checkpointer",
"reset_checkpointer",
# runs
"ConflictError",
"DisconnectMode",
@@ -7,12 +7,12 @@ Supported backends: memory, sqlite, postgres.
Usage (e.g. FastAPI lifespan)::
from deerflow.agents.checkpointer.async_provider import make_checkpointer
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
async with make_checkpointer() as checkpointer:
app.state.checkpointer = checkpointer # InMemorySaver if not configured
For sync usage see :mod:`deerflow.agents.checkpointer.provider`.
For sync usage see :mod:`deerflow.runtime.checkpointer.provider`.
"""
from __future__ import annotations
@@ -24,7 +24,7 @@ from collections.abc import AsyncIterator
from langgraph.types import Checkpointer
from deerflow.agents.checkpointer.provider import (
from deerflow.runtime.checkpointer.provider import (
POSTGRES_CONN_REQUIRED,
POSTGRES_INSTALL,
SQLITE_INSTALL,
@@ -7,7 +7,7 @@ Supported backends: memory, sqlite, postgres.
Usage::
from deerflow.agents.checkpointer.provider import get_checkpointer, checkpointer_context
from deerflow.runtime.checkpointer.provider import get_checkpointer, checkpointer_context
# Singleton — reused across calls, closed on process exit
cp = get_checkpointer()
@@ -91,7 +91,7 @@ async def make_store() -> AsyncIterator[BaseStore]:
configured checkpointer.
Reads from the same ``checkpointer`` section of *config.yaml* used by
:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer` so
:func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer` so
that both singletons always use the same persistence technology::
async with make_store() as store:
@@ -1,7 +1,7 @@
"""Async stream bridge factory.
Provides an **async context manager** aligned with
:func:`deerflow.agents.checkpointer.async_provider.make_checkpointer`.
:func:`deerflow.runtime.checkpointer.async_provider.make_checkpointer`.
Usage (e.g. FastAPI lifespan)::