Files
deer-flow/backend/packages/harness/deerflow/agents/middlewares/uploads_middleware.py
T
SHIYAO ZHANG 5ff230eafd feat(uploads): inject document outline into agent context for converted files (#1738)
* feat(uploads): inject document outline into agent context for converted files

Extract headings from converted .md files and inject them into the
<uploaded_files> context block so the agent can navigate large documents
by line number before reading.

- Add `extract_outline()` to `file_conversion.py`: recognises standard
  Markdown headings (#/##/###) and SEC-style bold structural headings
  (**ITEM N. BUSINESS**, **PART II**); caps at 50 entries; excludes
  cover-page boilerplate (WASHINGTON DC, CURRENT REPORT, SIGNATURES)
- Add `_extract_outline_for_file()` helper in `uploads_middleware.py`:
  looks for a sibling `.md` file produced by the conversion pipeline
- Update `UploadsMiddleware._create_files_message()` to render the outline
  under each file entry with `L{line}: {title}` format and a `read_file`
  prompt for range-based reading
- Tests: 10 new tests for `extract_outline()`, 4 new tests for outline
  injection in `UploadsMiddleware`; existing test updated for new `outline`
  field in `uploaded_files` state

Partially addresses #1647 (agent ignores uploaded files).

* fix(uploads): stream outline file reads and strip inline bold from heading titles

- Switch extract_outline() from read_text().splitlines() to open()+line iteration
  so large converted documents are not loaded into memory on every agent turn;
  exits as soon as MAX_OUTLINE_ENTRIES is reached (Copilot suggestion)
- Strip **...** wrapper from standard Markdown heading titles before appending
  to outline so agent context stays clean (e.g. "## **Overview**" → "Overview")
  (Copilot suggestion)
- Remove unused pathlib.Path import and fix import sort order in test_file_conversion.py
  to satisfy ruff CI lint

* fix(uploads): show truncation hint when outline exceeds MAX_OUTLINE_ENTRIES

When extract_outline() hits the cap it now appends a sentinel entry
{"truncated": True} instead of silently dropping the rest of the headings.
UploadsMiddleware reads the sentinel and renders a hint line:

  ... (showing first 50 headings; use `read_file` to explore further)

Without this the agent had no way to know the outline was incomplete and
would treat the first 50 headings as the full document structure.

* fix(uploads): fall back to configurable.thread_id when runtime.context lacks thread_id

runtime.context does not always carry thread_id (depends on LangGraph
invocation path). ThreadDataMiddleware already falls back to
get_config().configurable.thread_id — apply the same pattern so
UploadsMiddleware can resolve the uploads directory and attach outlines
in all invocation paths.

* style: apply ruff format

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-04-03 20:52:47 +08:00

248 lines
9.9 KiB
Python

"""Middleware to inject uploaded files information into agent context."""
import logging
from pathlib import Path
from typing import NotRequired, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime
from deerflow.config.paths import Paths, get_paths
from deerflow.utils.file_conversion import extract_outline
logger = logging.getLogger(__name__)
def _extract_outline_for_file(file_path: Path) -> list[dict]:
"""Return the document outline for *file_path* if a converted .md exists.
Looks for a sibling ``<stem>.md`` file produced by the upload conversion
pipeline. Returns an empty list when the file is not a converted document
or when no headings are found.
"""
md_path = file_path.with_suffix(".md")
if not md_path.is_file():
return []
outline = extract_outline(md_path)
if outline:
logger.debug("Extracted %d outline entries from %s", len(outline), file_path.name)
return outline
class UploadsMiddlewareState(AgentState):
"""State schema for uploads middleware."""
uploaded_files: NotRequired[list[dict] | None]
class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
"""Middleware to inject uploaded files information into the agent context.
Reads file metadata from the current message's additional_kwargs.files
(set by the frontend after upload) and prepends an <uploaded_files> block
to the last human message so the model knows which files are available.
"""
state_schema = UploadsMiddlewareState
def __init__(self, base_dir: str | None = None):
"""Initialize the middleware.
Args:
base_dir: Base directory for thread data. Defaults to Paths resolution.
"""
super().__init__()
self._paths = Paths(base_dir) if base_dir else get_paths()
def _format_file_entry(self, file: dict, lines: list[str]) -> None:
"""Append a single file entry (name, size, path, optional outline) to lines."""
size_kb = file["size"] / 1024
size_str = f"{size_kb:.1f} KB" if size_kb < 1024 else f"{size_kb / 1024:.1f} MB"
lines.append(f"- {file['filename']} ({size_str})")
lines.append(f" Path: {file['path']}")
outline = file.get("outline") or []
if outline:
truncated = outline[-1].get("truncated", False) if outline else False
visible = [e for e in outline if not e.get("truncated")]
lines.append(" Document outline (use `read_file` with line ranges to read sections):")
for entry in visible:
lines.append(f" L{entry['line']}: {entry['title']}")
if truncated:
lines.append(f" ... (showing first {len(visible)} headings; use `read_file` to explore further)")
lines.append("")
def _create_files_message(self, new_files: list[dict], historical_files: list[dict]) -> str:
"""Create a formatted message listing uploaded files.
Args:
new_files: Files uploaded in the current message.
historical_files: Files uploaded in previous messages.
Each file dict may contain an optional ``outline`` key — a list of
``{title, line}`` dicts extracted from the converted Markdown file.
Returns:
Formatted string inside <uploaded_files> tags.
"""
lines = ["<uploaded_files>"]
lines.append("The following files were uploaded in this message:")
lines.append("")
if new_files:
for file in new_files:
self._format_file_entry(file, lines)
else:
lines.append("(empty)")
lines.append("")
if historical_files:
lines.append("The following files were uploaded in previous messages and are still available:")
lines.append("")
for file in historical_files:
self._format_file_entry(file, lines)
lines.append("You can read these files using the `read_file` tool with the paths shown above.")
lines.append("</uploaded_files>")
return "\n".join(lines)
def _files_from_kwargs(self, message: HumanMessage, uploads_dir: Path | None = None) -> list[dict] | None:
"""Extract file info from message additional_kwargs.files.
The frontend sends uploaded file metadata in additional_kwargs.files
after a successful upload. Each entry has: filename, size (bytes),
path (virtual path), status.
Args:
message: The human message to inspect.
uploads_dir: Physical uploads directory used to verify file existence.
When provided, entries whose files no longer exist are skipped.
Returns:
List of file dicts with virtual paths, or None if the field is absent or empty.
"""
kwargs_files = (message.additional_kwargs or {}).get("files")
if not isinstance(kwargs_files, list) or not kwargs_files:
return None
files = []
for f in kwargs_files:
if not isinstance(f, dict):
continue
filename = f.get("filename") or ""
if not filename or Path(filename).name != filename:
continue
if uploads_dir is not None and not (uploads_dir / filename).is_file():
continue
files.append(
{
"filename": filename,
"size": int(f.get("size") or 0),
"path": f"/mnt/user-data/uploads/{filename}",
"extension": Path(filename).suffix,
}
)
return files if files else None
@override
def before_agent(self, state: UploadsMiddlewareState, runtime: Runtime) -> dict | None:
"""Inject uploaded files information before agent execution.
New files come from the current message's additional_kwargs.files.
Historical files are scanned from the thread's uploads directory,
excluding the new ones.
Prepends <uploaded_files> context to the last human message content.
The original additional_kwargs (including files metadata) is preserved
on the updated message so the frontend can read it from the stream.
Args:
state: Current agent state.
runtime: Runtime context containing thread_id.
Returns:
State updates including uploaded files list.
"""
messages = list(state.get("messages", []))
if not messages:
return None
last_message_index = len(messages) - 1
last_message = messages[last_message_index]
if not isinstance(last_message, HumanMessage):
return None
# Resolve uploads directory for existence checks
thread_id = (runtime.context or {}).get("thread_id")
if thread_id is None:
try:
from langgraph.config import get_config
thread_id = get_config().get("configurable", {}).get("thread_id")
except RuntimeError:
pass # get_config() raises outside a runnable context (e.g. unit tests)
uploads_dir = self._paths.sandbox_uploads_dir(thread_id) if thread_id else None
# Get newly uploaded files from the current message's additional_kwargs.files
new_files = self._files_from_kwargs(last_message, uploads_dir) or []
# Collect historical files from the uploads directory (all except the new ones)
new_filenames = {f["filename"] for f in new_files}
historical_files: list[dict] = []
if uploads_dir and uploads_dir.exists():
for file_path in sorted(uploads_dir.iterdir()):
if file_path.is_file() and file_path.name not in new_filenames:
stat = file_path.stat()
historical_files.append(
{
"filename": file_path.name,
"size": stat.st_size,
"path": f"/mnt/user-data/uploads/{file_path.name}",
"extension": file_path.suffix,
"outline": _extract_outline_for_file(file_path),
}
)
# Attach outlines to new files as well
if uploads_dir:
for file in new_files:
phys_path = uploads_dir / file["filename"]
file["outline"] = _extract_outline_for_file(phys_path)
if not new_files and not historical_files:
return None
logger.debug(f"New files: {[f['filename'] for f in new_files]}, historical: {[f['filename'] for f in historical_files]}")
# Create files message and prepend to the last human message content
files_message = self._create_files_message(new_files, historical_files)
# Extract original content - handle both string and list formats
original_content = ""
if isinstance(last_message.content, str):
original_content = last_message.content
elif isinstance(last_message.content, list):
text_parts = []
for block in last_message.content:
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
original_content = "\n".join(text_parts)
# Create new message with combined content.
# Preserve additional_kwargs (including files metadata) so the frontend
# can read structured file info from the streamed message.
updated_message = HumanMessage(
content=f"{files_message}\n\n{original_content}",
id=last_message.id,
additional_kwargs=last_message.additional_kwargs,
)
messages[last_message_index] = updated_message
return {
"uploaded_files": new_files,
"messages": messages,
}