feat: flush memory before summarization (#2176)

* feat: flush memory before summarization

* fix: keep agent-scoped memory on summarization flush

* fix: harden summarization hook plumbing

* fix: address summarization review feedback

* style: format memory middleware
This commit is contained in:
DanielWalnut
2026-04-14 15:01:06 +08:00
committed by GitHub
parent e4f896e90d
commit 4ba3167f48
10 changed files with 667 additions and 189 deletions
@@ -61,48 +61,88 @@ class MemoryUpdateQueue:
return
with self._lock:
existing_context = next(
(context for context in self._queue if context.thread_id == thread_id),
None,
)
merged_correction_detected = correction_detected or (existing_context.correction_detected if existing_context is not None else False)
merged_reinforcement_detected = reinforcement_detected or (existing_context.reinforcement_detected if existing_context is not None else False)
context = ConversationContext(
self._enqueue_locked(
thread_id=thread_id,
messages=messages,
agent_name=agent_name,
correction_detected=merged_correction_detected,
reinforcement_detected=merged_reinforcement_detected,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
)
# Check if this thread already has a pending update
# If so, replace it with the newer one
self._queue = [c for c in self._queue if c.thread_id != thread_id]
self._queue.append(context)
# Reset or start the debounce timer
self._reset_timer()
logger.info("Memory update queued for thread %s, queue size: %d", thread_id, len(self._queue))
def add_nowait(
self,
thread_id: str,
messages: list[Any],
agent_name: str | None = None,
correction_detected: bool = False,
reinforcement_detected: bool = False,
) -> None:
"""Add a conversation and start processing immediately in the background."""
config = get_memory_config()
if not config.enabled:
return
with self._lock:
self._enqueue_locked(
thread_id=thread_id,
messages=messages,
agent_name=agent_name,
correction_detected=correction_detected,
reinforcement_detected=reinforcement_detected,
)
self._schedule_timer(0)
logger.info("Memory update queued for immediate processing on thread %s, queue size: %d", thread_id, len(self._queue))
def _enqueue_locked(
self,
*,
thread_id: str,
messages: list[Any],
agent_name: str | None,
correction_detected: bool,
reinforcement_detected: bool,
) -> None:
existing_context = next(
(context for context in self._queue if context.thread_id == thread_id),
None,
)
merged_correction_detected = correction_detected or (existing_context.correction_detected if existing_context is not None else False)
merged_reinforcement_detected = reinforcement_detected or (existing_context.reinforcement_detected if existing_context is not None else False)
context = ConversationContext(
thread_id=thread_id,
messages=messages,
agent_name=agent_name,
correction_detected=merged_correction_detected,
reinforcement_detected=merged_reinforcement_detected,
)
self._queue = [c for c in self._queue if c.thread_id != thread_id]
self._queue.append(context)
def _reset_timer(self) -> None:
"""Reset the debounce timer."""
config = get_memory_config()
self._schedule_timer(config.debounce_seconds)
logger.debug("Memory update timer set for %ss", config.debounce_seconds)
def _schedule_timer(self, delay_seconds: float) -> None:
"""Schedule queue processing after the provided delay."""
# Cancel existing timer if any
if self._timer is not None:
self._timer.cancel()
# Start new timer
self._timer = threading.Timer(
config.debounce_seconds,
delay_seconds,
self._process_queue,
)
self._timer.daemon = True
self._timer.start()
logger.debug("Memory update timer set for %ss", config.debounce_seconds)
def _process_queue(self) -> None:
"""Process all queued conversation contexts."""
# Import here to avoid circular dependency
@@ -110,8 +150,8 @@ class MemoryUpdateQueue:
with self._lock:
if self._processing:
# Already processing, reschedule
self._reset_timer()
# Preserve immediate flush semantics even if another worker is active.
self._schedule_timer(0)
return
if not self._queue:
@@ -164,6 +204,13 @@ class MemoryUpdateQueue:
self._process_queue()
def flush_nowait(self) -> None:
"""Start queue processing immediately in a background thread."""
with self._lock:
# Daemon thread: queued messages may be lost if the process exits
# before _process_queue completes. Acceptable for best-effort memory updates.
self._schedule_timer(0)
def clear(self) -> None:
"""Clear the queue without processing.