feat(persistence): add SQLAlchemy 2.0 async ORM scaffold

Introduce a unified database configuration (DatabaseConfig) that
controls both the LangGraph checkpointer and the DeerFlow application
persistence layer from a single `database:` config section.

New modules:
- deerflow.config.database_config — Pydantic config with memory/sqlite/postgres backends
- deerflow.persistence — async engine lifecycle, DeclarativeBase with to_dict mixin, Alembic skeleton
- deerflow.runtime.runs.store — RunStore ABC + MemoryRunStore implementation

Gateway integration initializes/tears down the persistence engine in
the existing langgraph_runtime() context manager. Legacy checkpointer
config is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng
2026-04-01 15:50:06 +08:00
parent 3e461d9d08
commit 1ff6b5f7ab
18 changed files with 1099 additions and 17 deletions
@@ -0,0 +1,13 @@
"""DeerFlow application persistence layer (SQLAlchemy 2.0 async ORM).
This module manages DeerFlow's own application data -- runs metadata,
thread ownership, cron jobs, users. It is completely separate from
LangGraph's checkpointer, which manages graph execution state.
Usage:
from deerflow.persistence import init_engine, close_engine, get_session_factory
"""
from deerflow.persistence.engine import close_engine, get_engine, get_session_factory, init_engine
__all__ = ["close_engine", "get_engine", "get_session_factory", "init_engine"]
@@ -0,0 +1,40 @@
"""SQLAlchemy declarative base with automatic to_dict support.
All DeerFlow ORM models inherit from this Base. It provides a generic
to_dict() method via SQLAlchemy's inspect() so individual models don't
need to write their own serialization logic.
LangGraph's checkpointer tables are NOT managed by this Base.
"""
from __future__ import annotations
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
"""Base class for all DeerFlow ORM models.
Provides:
- Automatic to_dict() via SQLAlchemy column inspection.
- Standard __repr__() showing all column values.
"""
def to_dict(self, *, exclude: set[str] | None = None) -> dict:
"""Convert ORM instance to plain dict.
Uses SQLAlchemy's inspect() to iterate mapped column attributes.
Args:
exclude: Optional set of column keys to omit.
Returns:
Dict of {column_key: value} for all mapped columns.
"""
exclude = exclude or set()
return {c.key: getattr(self, c.key) for c in sa_inspect(type(self)).mapper.column_attrs if c.key not in exclude}
def __repr__(self) -> str:
cols = ", ".join(f"{c.key}={getattr(self, c.key)!r}" for c in sa_inspect(type(self)).mapper.column_attrs)
return f"{type(self).__name__}({cols})"
@@ -0,0 +1,116 @@
"""Async SQLAlchemy engine lifecycle management.
Initializes at Gateway startup, provides session factory for
repositories, disposes at shutdown.
When database.backend="memory", init_engine is a no-op and
get_session_factory() returns None. Repositories must check for
None and fall back to in-memory implementations.
"""
from __future__ import annotations
import logging
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
logger = logging.getLogger(__name__)
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
async def init_engine(
backend: str,
*,
url: str = "",
echo: bool = False,
pool_size: int = 5,
sqlite_dir: str = "",
) -> None:
"""Create the async engine and session factory, then auto-create tables.
Args:
backend: "memory", "sqlite", or "postgres".
url: SQLAlchemy async URL (for sqlite/postgres).
echo: Echo SQL to log.
pool_size: Postgres connection pool size.
sqlite_dir: Directory to create for SQLite (ensured to exist).
"""
global _engine, _session_factory
if backend == "memory":
logger.info("Persistence backend=memory -- ORM engine not initialized")
return
if backend == "postgres":
try:
import asyncpg # noqa: F401
except ImportError:
raise ImportError("database.backend is set to 'postgres' but asyncpg is not installed.\nInstall it with:\n uv sync --extra postgres\nOr switch to backend: sqlite in config.yaml for single-node deployment.") from None
if backend == "sqlite":
import os
os.makedirs(sqlite_dir or ".", exist_ok=True)
_engine = create_async_engine(url, echo=echo)
elif backend == "postgres":
_engine = create_async_engine(
url,
echo=echo,
pool_size=pool_size,
pool_pre_ping=True,
)
else:
raise ValueError(f"Unknown persistence backend: {backend!r}")
_session_factory = async_sessionmaker(_engine, expire_on_commit=False)
# Auto-create tables (dev convenience). Production should use Alembic.
from deerflow.persistence.base import Base
# Import all models so Base.metadata discovers them.
# When no models exist yet (scaffolding phase), this is a no-op.
try:
import deerflow.persistence.models # noqa: F401
except ImportError:
pass
async with _engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Persistence engine initialized: backend=%s", backend)
async def init_engine_from_config(config) -> None:
"""Convenience: init engine from a DatabaseConfig object."""
if config.backend == "memory":
await init_engine("memory")
return
await init_engine(
backend=config.backend,
url=config.app_sqlalchemy_url,
echo=config.echo_sql,
pool_size=config.pool_size,
sqlite_dir=config.sqlite_dir if config.backend == "sqlite" else "",
)
def get_session_factory() -> async_sessionmaker[AsyncSession] | None:
"""Return the async session factory, or None if backend=memory."""
return _session_factory
def get_engine() -> AsyncEngine | None:
"""Return the async engine, or None if not initialized."""
return _engine
async def close_engine() -> None:
"""Dispose the engine, release all connections."""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
logger.info("Persistence engine closed")
_engine = None
_session_factory = None
@@ -0,0 +1,38 @@
[alembic]
script_location = %(here)s
# Default URL for offline mode / autogenerate.
# Runtime uses engine from DeerFlow config.
sqlalchemy.url = sqlite+aiosqlite:///./data/app.db
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
@@ -0,0 +1,63 @@
"""Alembic environment for DeerFlow application tables.
ONLY manages DeerFlow's tables (runs, threads_meta, cron_jobs, users).
LangGraph's checkpointer tables are managed by LangGraph itself -- they
have their own schema lifecycle and must not be touched by Alembic.
"""
from __future__ import annotations
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from deerflow.persistence.base import Base
# Import all models so metadata is populated.
try:
import deerflow.persistence.models # noqa: F401
except ImportError:
pass
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True, # Required for SQLite ALTER TABLE support
)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
connectable = create_async_engine(config.get_main_option("sqlalchemy.url"))
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())