mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-23 08:25:57 +00:00
fix(events): use metadata flag instead of heuristic for dict content detection
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -30,10 +30,10 @@ class DbRunEventStore(RunEventStore):
|
|||||||
d["created_at"] = val.isoformat()
|
d["created_at"] = val.isoformat()
|
||||||
d.pop("id", None)
|
d.pop("id", None)
|
||||||
# Restore dict content that was JSON-serialized on write
|
# Restore dict content that was JSON-serialized on write
|
||||||
content = d.get("content", "")
|
raw = d.get("content", "")
|
||||||
if isinstance(content, str) and content and content[0] in ("{", "["):
|
if isinstance(raw, str) and d.get("metadata", {}).get("content_is_dict"):
|
||||||
try:
|
try:
|
||||||
d["content"] = json.loads(content)
|
d["content"] = json.loads(raw)
|
||||||
except (json.JSONDecodeError, ValueError):
|
except (json.JSONDecodeError, ValueError):
|
||||||
pass
|
pass
|
||||||
return d
|
return d
|
||||||
@@ -48,7 +48,11 @@ class DbRunEventStore(RunEventStore):
|
|||||||
|
|
||||||
async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None):
|
async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None):
|
||||||
content, metadata = self._truncate_trace(category, content, metadata)
|
content, metadata = self._truncate_trace(category, content, metadata)
|
||||||
db_content = json.dumps(content, default=str, ensure_ascii=False) if isinstance(content, dict) else content
|
if isinstance(content, dict):
|
||||||
|
db_content = json.dumps(content, default=str, ensure_ascii=False)
|
||||||
|
metadata = {**(metadata or {}), "content_is_dict": True}
|
||||||
|
else:
|
||||||
|
db_content = content
|
||||||
async with self._sf() as session:
|
async with self._sf() as session:
|
||||||
max_seq = await session.scalar(select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id))
|
max_seq = await session.scalar(select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id))
|
||||||
seq = (max_seq or 0) + 1
|
seq = (max_seq or 0) + 1
|
||||||
@@ -82,7 +86,11 @@ class DbRunEventStore(RunEventStore):
|
|||||||
category = e.get("category", "trace")
|
category = e.get("category", "trace")
|
||||||
metadata = e.get("metadata")
|
metadata = e.get("metadata")
|
||||||
content, metadata = self._truncate_trace(category, content, metadata)
|
content, metadata = self._truncate_trace(category, content, metadata)
|
||||||
db_content = json.dumps(content, default=str, ensure_ascii=False) if isinstance(content, dict) else content
|
if isinstance(content, dict):
|
||||||
|
db_content = json.dumps(content, default=str, ensure_ascii=False)
|
||||||
|
metadata = {**(metadata or {}), "content_is_dict": True}
|
||||||
|
else:
|
||||||
|
db_content = content
|
||||||
row = RunEventRow(
|
row = RunEventRow(
|
||||||
thread_id=e["thread_id"],
|
thread_id=e["thread_id"],
|
||||||
run_id=e["run_id"],
|
run_id=e["run_id"],
|
||||||
|
|||||||
@@ -419,6 +419,58 @@ class TestDbBackedLifecycle:
|
|||||||
await close_engine()
|
await close_engine()
|
||||||
|
|
||||||
|
|
||||||
|
class TestDictContentFlag:
|
||||||
|
"""Verify that content_is_dict metadata flag controls deserialization."""
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_db_store_str_starting_with_brace_not_deserialized(self, tmp_path):
|
||||||
|
"""Plain string content starting with { should NOT be deserialized."""
|
||||||
|
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
|
||||||
|
from deerflow.runtime.events.store.db import DbRunEventStore
|
||||||
|
|
||||||
|
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
|
||||||
|
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
|
||||||
|
sf = get_session_factory()
|
||||||
|
store = DbRunEventStore(sf)
|
||||||
|
|
||||||
|
record = await store.put(
|
||||||
|
thread_id="t1",
|
||||||
|
run_id="r1",
|
||||||
|
event_type="tool_end",
|
||||||
|
category="trace",
|
||||||
|
content="{not json, just a string}",
|
||||||
|
)
|
||||||
|
events = await store.list_events("t1", "r1")
|
||||||
|
assert events[0]["content"] == "{not json, just a string}"
|
||||||
|
assert isinstance(events[0]["content"], str)
|
||||||
|
|
||||||
|
await close_engine()
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_db_store_str_starting_with_bracket_not_deserialized(self, tmp_path):
|
||||||
|
"""Plain string content like '[1, 2, 3]' should NOT be deserialized."""
|
||||||
|
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
|
||||||
|
from deerflow.runtime.events.store.db import DbRunEventStore
|
||||||
|
|
||||||
|
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
|
||||||
|
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
|
||||||
|
sf = get_session_factory()
|
||||||
|
store = DbRunEventStore(sf)
|
||||||
|
|
||||||
|
record = await store.put(
|
||||||
|
thread_id="t1",
|
||||||
|
run_id="r1",
|
||||||
|
event_type="tool_end",
|
||||||
|
category="trace",
|
||||||
|
content="[1, 2, 3]",
|
||||||
|
)
|
||||||
|
events = await store.list_events("t1", "r1")
|
||||||
|
assert events[0]["content"] == "[1, 2, 3]"
|
||||||
|
assert isinstance(events[0]["content"], str)
|
||||||
|
|
||||||
|
await close_engine()
|
||||||
|
|
||||||
|
|
||||||
class TestDictContent:
|
class TestDictContent:
|
||||||
"""Verify that store backends accept str | dict content."""
|
"""Verify that store backends accept str | dict content."""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user