mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-05-20 15:11:09 +00:00
Merge branch 'main' into feature/dev-daemon-v2
This commit is contained in:
@@ -4,10 +4,34 @@ import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client";
|
||||
|
||||
import { getLangGraphBaseURL } from "../config";
|
||||
|
||||
let _singleton: LangGraphClient | null = null;
|
||||
export function getAPIClient(isMock?: boolean): LangGraphClient {
|
||||
_singleton ??= new LangGraphClient({
|
||||
import { sanitizeRunStreamOptions } from "./stream-mode";
|
||||
|
||||
function createCompatibleClient(isMock?: boolean): LangGraphClient {
|
||||
const client = new LangGraphClient({
|
||||
apiUrl: getLangGraphBaseURL(isMock),
|
||||
});
|
||||
|
||||
const originalRunStream = client.runs.stream.bind(client.runs);
|
||||
client.runs.stream = ((threadId, assistantId, payload) =>
|
||||
originalRunStream(
|
||||
threadId,
|
||||
assistantId,
|
||||
sanitizeRunStreamOptions(payload),
|
||||
)) as typeof client.runs.stream;
|
||||
|
||||
const originalJoinStream = client.runs.joinStream.bind(client.runs);
|
||||
client.runs.joinStream = ((threadId, runId, options) =>
|
||||
originalJoinStream(
|
||||
threadId,
|
||||
runId,
|
||||
sanitizeRunStreamOptions(options),
|
||||
)) as typeof client.runs.joinStream;
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
let _singleton: LangGraphClient | null = null;
|
||||
export function getAPIClient(isMock?: boolean): LangGraphClient {
|
||||
_singleton ??= createCompatibleClient(isMock);
|
||||
return _singleton;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
import assert from "node:assert/strict";
|
||||
import test from "node:test";
|
||||
|
||||
const { sanitizeRunStreamOptions } = await import(
|
||||
new URL("./stream-mode.ts", import.meta.url).href
|
||||
);
|
||||
|
||||
void test("drops unsupported stream modes from array payloads", () => {
|
||||
const sanitized = sanitizeRunStreamOptions({
|
||||
streamMode: [
|
||||
"values",
|
||||
"messages-tuple",
|
||||
"custom",
|
||||
"updates",
|
||||
"events",
|
||||
"tools",
|
||||
],
|
||||
});
|
||||
|
||||
assert.deepEqual(sanitized.streamMode, [
|
||||
"values",
|
||||
"messages-tuple",
|
||||
"custom",
|
||||
"updates",
|
||||
"events",
|
||||
]);
|
||||
});
|
||||
|
||||
void test("drops unsupported stream modes from scalar payloads", () => {
|
||||
const sanitized = sanitizeRunStreamOptions({
|
||||
streamMode: "tools",
|
||||
});
|
||||
|
||||
assert.equal(sanitized.streamMode, undefined);
|
||||
});
|
||||
|
||||
void test("keeps payloads without streamMode untouched", () => {
|
||||
const options = {
|
||||
streamSubgraphs: true,
|
||||
};
|
||||
|
||||
assert.equal(sanitizeRunStreamOptions(options), options);
|
||||
});
|
||||
@@ -0,0 +1,68 @@
|
||||
const SUPPORTED_RUN_STREAM_MODES = new Set([
|
||||
"values",
|
||||
"messages",
|
||||
"messages-tuple",
|
||||
"updates",
|
||||
"events",
|
||||
"debug",
|
||||
"tasks",
|
||||
"checkpoints",
|
||||
"custom",
|
||||
] as const);
|
||||
|
||||
const warnedUnsupportedStreamModes = new Set<string>();
|
||||
|
||||
export function warnUnsupportedStreamModes(
|
||||
modes: string[],
|
||||
warn: (message: string) => void = console.warn,
|
||||
) {
|
||||
const unseenModes = modes.filter((mode) => {
|
||||
if (warnedUnsupportedStreamModes.has(mode)) {
|
||||
return false;
|
||||
}
|
||||
warnedUnsupportedStreamModes.add(mode);
|
||||
return true;
|
||||
});
|
||||
|
||||
if (unseenModes.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
warn(
|
||||
`[deer-flow] Dropped unsupported LangGraph stream mode(s): ${unseenModes.join(", ")}`,
|
||||
);
|
||||
}
|
||||
|
||||
export function sanitizeRunStreamOptions<T>(options: T): T {
|
||||
if (
|
||||
typeof options !== "object" ||
|
||||
options === null ||
|
||||
!("streamMode" in options)
|
||||
) {
|
||||
return options;
|
||||
}
|
||||
|
||||
const streamMode = options.streamMode;
|
||||
if (streamMode == null) {
|
||||
return options;
|
||||
}
|
||||
|
||||
const requestedModes = Array.isArray(streamMode) ? streamMode : [streamMode];
|
||||
const sanitizedModes = requestedModes.filter((mode) =>
|
||||
SUPPORTED_RUN_STREAM_MODES.has(mode),
|
||||
);
|
||||
|
||||
if (sanitizedModes.length === requestedModes.length) {
|
||||
return options;
|
||||
}
|
||||
|
||||
const droppedModes = requestedModes.filter(
|
||||
(mode) => !SUPPORTED_RUN_STREAM_MODES.has(mode),
|
||||
);
|
||||
warnUnsupportedStreamModes(droppedModes);
|
||||
|
||||
return {
|
||||
...options,
|
||||
streamMode: Array.isArray(streamMode) ? sanitizedModes : sanitizedModes[0],
|
||||
};
|
||||
}
|
||||
@@ -326,7 +326,6 @@ export function useThreadStream({
|
||||
threadId: threadId,
|
||||
streamSubgraphs: true,
|
||||
streamResumable: true,
|
||||
streamMode: ["values", "messages-tuple", "custom"],
|
||||
config: {
|
||||
recursion_limit: 1000,
|
||||
},
|
||||
|
||||
@@ -20,7 +20,34 @@ SERVICE="${3:-Service}"
|
||||
elapsed=0
|
||||
interval=1
|
||||
|
||||
while ! lsof -i :"$PORT" -sTCP:LISTEN -t >/dev/null 2>&1; do
|
||||
is_port_listening() {
|
||||
if command -v lsof >/dev/null 2>&1; then
|
||||
if lsof -nP -iTCP:"$PORT" -sTCP:LISTEN -t >/dev/null 2>&1; then
|
||||
return 0
|
||||
fi
|
||||
fi
|
||||
|
||||
if command -v ss >/dev/null 2>&1; then
|
||||
if ss -ltn "( sport = :$PORT )" 2>/dev/null | tail -n +2 | grep -q .; then
|
||||
return 0
|
||||
fi
|
||||
fi
|
||||
|
||||
if command -v netstat >/dev/null 2>&1; then
|
||||
if netstat -ltn 2>/dev/null | awk '{print $4}' | grep -Eq "(^|[.:])${PORT}$"; then
|
||||
return 0
|
||||
fi
|
||||
fi
|
||||
|
||||
if command -v timeout >/dev/null 2>&1; then
|
||||
timeout 1 bash -c "exec 3<>/dev/tcp/127.0.0.1/$PORT" >/dev/null 2>&1
|
||||
return $?
|
||||
fi
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
while ! is_port_listening; do
|
||||
if [ "$elapsed" -ge "$TIMEOUT" ]; then
|
||||
echo ""
|
||||
echo "✗ $SERVICE failed to start on port $PORT after ${TIMEOUT}s"
|
||||
|
||||
Reference in New Issue
Block a user