Files
qqbot/tests/streaming-controller.test.ts
leoqlin 1a88c7baa7 feat: v1.6.6 — 流式消息(C2C)+ 媒体发送队列重构 + ApiError 结构化错误
- 新增 StreamingController 流式消息控制器,C2C 私聊 AI 回复以打字机效果实时推送
- 新增 sendC2CStreamMessage 流式 API 封装(/v2/users/{openid}/stream_messages)
- 新增 ApiError 结构化错误类,携带 HTTP status 和 path,支持按状态码重试/降级
- 新增 media-send.ts 公共媒体标签解析与发送队列,outbound.ts 和 streaming.ts 共用
- 新增 streaming/streamingConfig 账户配置项,支持按账户控制流式开关和节流间隔
- 新增 strip-incomplete-media-tag 和 streaming-controller 单元测试
- 重构 outbound.ts 媒体发送逻辑为调用公共模块,消除约 100 行重复代码
- audio-convert.ts 日志从 console.log 降级为 console.debug,减少生产日志噪音
- gateway.ts 集成流式控制器:per-message 创建、onPartialReply 回调、dispatch 后收尾
- 删除已清空的 user-messages.ts
- bump version to 1.6.6
2026-03-25 18:36:53 +08:00

771 lines
26 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* StreamingController 集成测试
*
* 通过 mock global.fetch 验证流式消息控制器的核心行为,
* 重点覆盖:循环消费模型 (processMediaTags) + pendingNormalizedFull 补救机制。
*
* 运行方式: npx tsx tests/streaming-controller.test.ts
*/
import assert from "node:assert";
// ============ Mock global.fetch ============
/** 记录所有流式 API 调用 */
interface StreamCall {
content: string;
inputState: number; // 1 = GENERATING, 10 = DONE
streamMsgId?: string;
index: number;
url: string;
}
/** 记录所有媒体上传 API 调用 */
interface MediaUploadCall {
url: string;
body: any;
}
let streamCalls: StreamCall[] = [];
let mediaUploadCalls: MediaUploadCall[] = [];
let streamMsgIdCounter = 0;
/** 控制流式 API 的延迟(毫秒)。设为 > 0 模拟 API 慢响应。 */
let apiDelayMs = 0;
/** 控制媒体上传 API 的延迟(毫秒)。 */
let mediaApiDelayMs = 0;
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
function resetMocks() {
streamCalls = [];
mediaUploadCalls = [];
streamMsgIdCounter = 0;
apiDelayMs = 0;
mediaApiDelayMs = 0;
}
// 保存原始 fetch
const originalFetch = globalThis.fetch;
// 覆写 global.fetch
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
const body = init?.body ? JSON.parse(init.body as string) : {};
// ---- Token 请求 ----
if (url.includes("/getAppAccessToken")) {
return new Response(JSON.stringify({
access_token: "mock-token-12345",
expires_in: "7200",
}), { status: 200, headers: { "Content-Type": "application/json" } });
}
// ---- 流式消息 API ----
if (url.includes("/stream_messages")) {
if (apiDelayMs > 0) await sleep(apiDelayMs);
const call: StreamCall = {
content: body.content_raw ?? "",
inputState: body.input_state ?? 0,
streamMsgId: body.stream_msg_id,
index: body.index ?? 0,
url,
};
streamCalls.push(call);
// 首次调用(无 stream_msg_id→ 返回新的 stream_msg_id
let respBody: any;
if (!body.stream_msg_id) {
streamMsgIdCounter++;
respBody = { id: `stream-${streamMsgIdCounter}`, timestamp: Date.now().toString() };
} else {
respBody = { id: body.stream_msg_id, timestamp: Date.now().toString() };
}
return new Response(JSON.stringify(respBody), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// ---- 富媒体上传 API (v2/users/.../files) ----
if (url.includes("/files")) {
if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs);
mediaUploadCalls.push({ url, body });
return new Response(JSON.stringify({
file_uuid: `uuid-${mediaUploadCalls.length}`,
file_info: "mock",
ttl: 3600,
}), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// ---- 普通消息 API (v2/users/.../messages) ----
if (url.includes("/messages")) {
if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs);
return new Response(JSON.stringify({
id: `msg-resp-${Date.now()}`,
timestamp: Date.now().toString(),
}), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// 未匹配的请求,回退到原始 fetch
console.warn(`[mock-fetch] 未匹配的请求: ${url}`);
return originalFetch(input, init);
};
// ---- 现在 import StreamingController它会使用被 mock 的 global.fetch ----
const { StreamingController } = await import("../src/streaming.js");
type StreamingControllerType = InstanceType<typeof StreamingController>;
// ============ 辅助函数 ============
/** 等待异步任务完成 */
async function flush(ms = 100): Promise<void> {
await sleep(ms);
}
/** 收集日志 */
const logs: string[] = [];
function createController(opts?: { mediaContext?: boolean; onReplyBoundary?: (newText: string) => void | Promise<void> }): StreamingControllerType {
logs.length = 0;
const deps: any = {
account: {
accountId: "test",
enabled: true,
appId: "test-app",
clientSecret: "test-secret",
secretSource: "config" as const,
markdownSupport: true,
config: {
streaming: true,
streamingConfig: { throttleMs: 50 }, // 短节流方便测试
},
},
userId: "user-1",
replyToMsgId: "msg-1",
eventId: "event-1",
logPrefix: "[test]",
log: {
info: (m: string) => logs.push(`[INFO] ${m}`),
error: (m: string) => logs.push(`[ERROR] ${m}`),
warn: (m: string) => logs.push(`[WARN] ${m}`),
debug: (m: string) => logs.push(`[DEBUG] ${m}`),
},
};
if (opts?.mediaContext) {
deps.mediaContext = {
account: deps.account,
event: { type: "c2c", senderId: "user-1", messageId: "msg-1" },
log: deps.log,
};
}
if (opts?.onReplyBoundary) {
deps.onReplyBoundary = opts.onReplyBoundary;
}
return new StreamingController(deps);
}
// ============ 测试框架 ============
let passed = 0;
let failed = 0;
const failedTests: string[] = [];
async function test(name: string, fn: () => Promise<void>) {
resetMocks();
try {
await fn();
console.log(`${name}`);
passed++;
} catch (e: any) {
console.log(`${name}`);
console.log(` ${e.message}`);
if (e.stack) {
const lines = (e.stack as string).split("\n").slice(1, 4);
for (const l of lines) console.log(` ${l.trim()}`);
}
// 打印关键日志(去掉 DEBUG 减少噪音)
const relevantLogs = logs.filter((l) => !l.includes("[DEBUG]")).slice(-10);
if (relevantLogs.length > 0) {
console.log(` --- 日志 ---`);
for (const l of relevantLogs) console.log(` ${l}`);
}
failed++;
failedTests.push(name);
}
}
// ============ 测试用例 ============
console.log("\n=== 1. 纯文本流式 ===");
await test("纯文本: 基本流式 → 完成", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "你好" });
await flush();
await ctrl.onPartialReply({ text: "你好世界" });
await flush();
ctrl.markFullyComplete();
await ctrl.onIdle();
// 应该有流式分片发送
assert.ok(streamCalls.length >= 2, `应至少有 2 次流式调用,实际 ${streamCalls.length}`);
// 最后一次应该是 DONE (inputState=10)
const last = streamCalls[streamCalls.length - 1];
assert.strictEqual(last.inputState, 10, "最后一次应为 DONE");
assert.ok(last.content.includes("你好世界"), `最终文本应包含完整内容,实际: "${last.content}"`);
// 不应有媒体上传
assert.strictEqual(mediaUploadCalls.length, 0, "不应有媒体上传");
});
await test("纯文本: 空文本被忽略", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "" });
await ctrl.onPartialReply({ text: undefined });
await flush();
assert.strictEqual(streamCalls.length, 0, "不应有流式调用");
});
await test("纯文本: 全空白不启动流式,后续非空白一起发送", async () => {
const ctrl = createController();
// 先来一段全空白内容 — 不应启动流式
await ctrl.onPartialReply({ text: "\n\n " });
await flush();
assert.strictEqual(streamCalls.length, 0, "全空白阶段不应有流式调用");
// 后续有非空白内容到达 — 应启动流式,且包含之前的空白
await ctrl.onPartialReply({ text: "\n\n hello world" });
await flush(200);
assert.ok(streamCalls.length >= 1, `应有至少 1 次流式调用,实际 ${streamCalls.length}`);
// 首次发送的内容应包含之前保留的空白 + 新内容
const firstContent = streamCalls[0].content;
assert.ok(firstContent.includes("hello world"), `首次发送应包含 "hello world",实际: "${firstContent}"`);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(100);
});
await test("纯文本: 全空白 + 媒体标签,空白不发送,媒体正常处理", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 全空白前缀 + 媒体标签一起到达
await ctrl.onPartialReply({ text: "\n\n<qqimg>/tmp/pic.jpg</qqimg>描述文字" });
await flush(400);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:日志中检测到了媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 1, `应检测到 qqimg 标签,实际 ${foundLogs.length}`);
// 验证:不应有 PREFIX MISMATCH 错误
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`);
// 验证:如果有流式分片发送,内容不应是纯空白
const generatingCalls = streamCalls.filter((c) => c.inputState === 1);
for (const call of generatingCalls) {
assert.ok(call.content.trim().length > 0, `流式分片不应为纯空白: "${call.content}"`);
}
});
await test("纯文本: 空白→媒体→空白→媒体→空白,只有媒体发出,空白全忽略", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 逐步送入:空白 → 媒体标签1 → 空白 → 媒体标签2 → 空白
const fullText =
"\n \n" +
"<qqimg>/tmp/pic1.jpg</qqimg>" +
"\n\n \n" +
"<qqimg>/tmp/pic2.jpg</qqimg>" +
" \n\n";
// 模拟流式分段到达
// 阶段 1纯空白
await ctrl.onPartialReply({ text: "\n \n" });
await flush(200);
assert.strictEqual(streamCalls.length, 0, "纯空白阶段不应有流式调用");
// 阶段 2空白 + 第一个媒体标签
await ctrl.onPartialReply({ text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>" });
await flush(400);
// 阶段 3继续加空白
await ctrl.onPartialReply({
text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>\n\n \n",
});
await flush(200);
// 阶段 4第二个媒体标签
await ctrl.onPartialReply({
text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>\n\n \n<qqimg>/tmp/pic2.jpg</qqimg>",
});
await flush(400);
// 阶段 5末尾空白完成
await ctrl.onPartialReply({ text: fullText });
await flush(200);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// 验证 1应检测到 2 个媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应检测到至少 2 个 qqimg 标签,实际 ${foundLogs.length}`);
// 验证 2应有 2 次媒体发送尝试sendPhoto 可能因文件不存在失败,但 sending 日志应存在)
const sendingLogs = logs.filter((l) => l.includes("sending image"));
assert.ok(sendingLogs.length >= 2, `应有至少 2 次发送图片尝试,实际 ${sendingLogs.length}`);
// 验证 3不应有 PREFIX MISMATCH 错误
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(
prefixMismatchLogs.length,
0,
`不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`,
);
// 验证 4流式分片GENERATING 状态)中不应出现纯空白内容
const generatingCallsInner = streamCalls.filter((c) => c.inputState === 1);
for (const call of generatingCallsInner) {
assert.ok(
call.content.trim().length > 0,
`流式分片不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`,
);
}
// 验证 5如果有流式启动首次调用无 stream_msg_id首次内容也不应纯空白
const startCalls = streamCalls.filter((c) => !c.streamMsgId);
for (const call of startCalls) {
assert.ok(
call.content.trim().length > 0,
`流式启动内容不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`,
);
}
});
console.log("\n=== 2. 单个媒体标签 ===");
await test("媒体标签: 多媒体后跟文本onIdle 终结不 PREFIX MISMATCH", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 模拟实际场景:两个语音标签 + 后续文本描述,逐步到达
// 阶段1第一个语音标签
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>",
});
await flush(400);
// 阶段2两个语音标签
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>\n<qqvoice>/tmp/voice2.mp3</qqvoice>",
});
await flush(400);
// 阶段3两个语音标签 + 后续文本
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>\n<qqvoice>/tmp/voice2.mp3</qqvoice>\n两条语音都发给你啦",
});
await flush(400);
// 完成
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// 验证:应检测到 2 个媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应检测到至少 2 个标签,实际 ${foundLogs.length}`);
// 核心验证:不应有 PREFIX MISMATCH
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(
prefixMismatchLogs.length,
0,
`不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`,
);
// 验证:最终流式文本(如果有)应包含后续描述文本
const doneCalls = streamCalls.filter((c) => c.inputState === 10);
if (doneCalls.length > 0) {
const lastDone = doneCalls[doneCalls.length - 1];
assert.ok(
lastDone.content.includes("两条语音都发给你啦"),
`终结分片应包含后续文本,实际: "${lastDone.content.slice(0, 80)}"`,
);
// 终结文本不应包含原始媒体标签
assert.ok(
!lastDone.content.includes("<qqvoice>"),
`终结文本不应包含 <qqvoice> 标签,实际: "${lastDone.content.slice(0, 80)}"`,
);
}
});
await test("媒体标签: 文本 + 图片 + 后续文本", async () => {
const ctrl = createController({ mediaContext: true });
// 一次性送入完整的含图片标签文本
await ctrl.onPartialReply({ text: "看图:<qqimg>/tmp/cat.jpg</qqimg>" });
await flush(300);
// 后续文本到达
await ctrl.onPartialReply({ text: "看图:<qqimg>/tmp/cat.jpg</qqimg>\n\n好看吧" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
// 验证:应有媒体上传调用(图片需要先上传再发送)
// 或者至少流式中出现过图片相关处理
// 关键验证:最终流式文本不应包含原始 <qqimg> 标签
const lastStream = streamCalls[streamCalls.length - 1];
assert.ok(!lastStream.content.includes("<qqimg>"), "最终文本不应包含 <qqimg> 标签");
});
await test("媒体标签: 纯媒体开头(无前置文本)", async () => {
const ctrl = createController({ mediaContext: true });
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/hello.mp3</qqvoice>" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:应该至少有流式会话创建或媒体处理的记录
const infoLogs = logs.filter((l) => l.includes("processMediaTags") && l.includes("found"));
assert.ok(infoLogs.length >= 1, `应检测到 qqvoice 标签。相关日志: ${infoLogs.join("; ") || "无"}`);
});
console.log("\n=== 3. 多个媒体标签(循环消费) ===");
await test("多媒体: 两个图片标签被逐个处理", async () => {
const ctrl = createController({ mediaContext: true });
const text = "图1<qqimg>/tmp/a.jpg</qqimg>\n图2<qqimg>/tmp/b.jpg</qqimg>\n完毕";
await ctrl.onPartialReply({ text });
await flush(600);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:日志中应显示找到了两个标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应找到至少 2 个标签,实际 ${foundLogs.length} 条 found 日志`);
});
console.log("\n=== 4. 未闭合标签等待 ===");
await test("未闭合标签: 逐步到达后完整处理", async () => {
const ctrl = createController({ mediaContext: true });
// 不完整的标签
await ctrl.onPartialReply({ text: "开始<qqimg>/tmp/pic" });
await flush(200);
// 验证:此时流式文本应该只包含 "开始",不含标签部分
const midCalls = [...streamCalls];
for (const call of midCalls) {
assert.ok(!call.content.includes("<qqimg>"), `中间态不应包含未闭合标签,内容: "${call.content}"`);
}
// 标签完整了
await ctrl.onPartialReply({ text: "开始<qqimg>/tmp/pic.jpg</qqimg>\n看看" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:应检测到完整标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 1, `标签完整后应被检测到found 日志: ${foundLogs.length}`);
});
console.log("\n=== 5. ★ pendingNormalizedFull 补救机制 ===");
await test("补救: 媒体处理期间最后一次 onPartialReply 不丢失", async () => {
const ctrl = createController({ mediaContext: true });
// 设置 API 有延迟,模拟 processMediaTags 执行耗时
apiDelayMs = 50;
mediaApiDelayMs = 80;
// 第1次: 含媒体标签 → 进入 processMediaTagsmediaInterruptInProgress=true
const p1 = ctrl.onPartialReply({ text: "hi<qqimg>/tmp/x.jpg</qqimg>" });
// 等一小段确保 processMediaTags 已经开始
await sleep(20);
// 第2次: 这是"最后一次" onPartialReply —— 带有新的后续文本
// 因为 mediaInterruptInProgress=true会被保存到 pendingNormalizedFull
const p2 = ctrl.onPartialReply({ text: "hi<qqimg>/tmp/x.jpg</qqimg>\n\n再见朋友" });
// 等待所有处理完成(包括 deferred re-run
await p1;
await p2;
await flush(800);
// 标记完成
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// ★ 核心验证:最终发送的文本应包含 "再见朋友"
const allStreamContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(
allStreamContent.includes("再见朋友"),
`"再见朋友" 应出现在流式发送中pendingNormalizedFull 补救)。\n实际流式内容: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]`
);
// 验证deferred 日志应出现
const deferredLogs = logs.filter((l) => l.includes("deferred"));
assert.ok(deferredLogs.length >= 1, `应有 deferred 相关日志,实际: ${deferredLogs.length}`);
});
await test("补救: 多次被跳过只保留最新,最终处理最新文本", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 30;
mediaApiDelayMs = 150; // 媒体处理很慢
// 第1次: 含媒体 → 进入长时间处理
const p1 = ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>" });
await sleep(20);
// 第2次: 被跳过 → pendingNormalizedFull = "..v1.."
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>\n后续文字v1" });
await sleep(10);
// 第3次: 被跳过 → pendingNormalizedFull 被覆盖为 "..v2.."
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>\n后续文字v2最终版" });
await p1;
await flush(800);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// ★ 核心验证:最终应包含 v2 的文本
const allStreamContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(
allStreamContent.includes("后续文字v2最终版"),
`应包含最新的 "后续文字v2最终版"。\n实际: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]`
);
});
await test("补救: 无 pending 时不触发多余的 re-run", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 只有一次 onPartialReply媒体标签后接 \n 开头的文本)
await ctrl.onPartialReply({ text: "hello<qqimg>/tmp/a.jpg</qqimg>\nbye" });
await flush(400);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:不应有 re-running 日志(因为没有被跳过的调用)
const reRunLogs = logs.filter((l) => l.includes("re-running"));
assert.strictEqual(reRunLogs.length, 0, `不应有 re-running 日志,实际: ${reRunLogs.length}`);
// ★ 验证:不应有 PREFIX MISMATCH 错误(之前 stripMediaTags 的 .trim() 会导致此问题)
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`);
});
console.log("\n=== 6. onIdle 边界 ===");
await test("onIdle: 等待媒体处理完成后再终结", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 20;
mediaApiDelayMs = 200; // 媒体发送很慢
// 发送含媒体的文本
const p = ctrl.onPartialReply({ text: "<qqimg>/tmp/slow.jpg</qqimg>\n完成" });
await sleep(30);
// 在媒体还在处理时就标记完成并触发 onIdle
ctrl.markFullyComplete();
const idlePromise = ctrl.onIdle();
await p;
await idlePromise;
await flush(500);
// 验证:应该正常完成,不降级
assert.ok(!ctrl.shouldFallbackToStatic, "不应降级到静态发送");
});
await test("onDeliver: deliver 先到达 → 禁用流式走降级", async () => {
const ctrl = createController({ mediaContext: true });
// deliver 先到达(此时 sentStreamChunkCount === 0→ 直接 transition 到 aborted
await ctrl.onDeliver({ text: "成果:<qqimg>/tmp/result.png</qqimg>" });
await flush(400);
// 验证应该已经进入终态aborted走降级路径
assert.ok(ctrl.isTerminalPhase, "deliver 先到达后应进入终态");
assert.ok(ctrl.shouldFallbackToStatic, "deliver 先到达时应降级到静态发送");
// 后续 onPartialReply 应被跳过(因为已是终态)
await ctrl.onPartialReply({ text: "这段应该被忽略" });
assert.ok(ctrl.shouldFallbackToStatic, "onPartialReply 应被跳过,仍然是降级状态");
});
await test("互斥: onPartialReply 先到 → onDeliver 被忽略(即使在媒体中断期间)", async () => {
const ctrl = createController({ mediaContext: true });
// onPartialReply 先到 → 锁定为 partial 模式
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/a.mp3</qqvoice>" });
await flush(400);
// 此时可能还在 mediaInterruptInProgresssentStreamChunkCount 可能为 0
// 但 onDeliver 应被忽略(因为 partial 先到)
await ctrl.onDeliver({ text: "<qqvoice>/tmp/a.mp3</qqvoice>" });
await flush(400);
// 验证不应降级deliver 没有生效)
assert.ok(!ctrl.shouldFallbackToStatic, "partial 先到时 deliver 不应导致降级");
assert.ok(!ctrl.isTerminalPhase || ctrl.currentPhase !== "aborted",
"不应因为 deliver 进入 abortedonPartialReply 在处理中)");
// 日志中应有 deliver 被拒绝的字样
const skipLogs = logs.filter((l) => l.includes('rejected "deliver"'));
assert.ok(skipLogs.length >= 1, `应有 deliver 被拒绝的日志,实际: ${skipLogs.join("; ") || "无"}`);
});
console.log("\n=== 7. 降级与异常 ===");
await test("降级: 从未发送分片 → fallback", async () => {
const ctrl = createController();
// 不发送任何文本就直接结束
ctrl.markFullyComplete();
await ctrl.onIdle();
assert.ok(ctrl.isTerminalPhase, "应进入终态");
});
await test("异常: onError 后正常终态", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "部分文本" });
await flush(200);
await ctrl.onError(new Error("test error"));
assert.ok(ctrl.isTerminalPhase, "onError 后应进入终态");
});
console.log("\n=== 8. 回复边界检测 ===");
await test("回复边界: 文本缩短 → 旧controller终结新controller处理新回复", async () => {
let newCtrl: StreamingControllerType | null = null;
const ctrl = createController({
onReplyBoundary: async (newText: string) => {
// 回调中创建新 controller 并处理新回复
newCtrl = createController();
await newCtrl.onPartialReply({ text: newText });
},
});
// 第一段回复
await ctrl.onPartialReply({ text: "第一段回复内容比较长" });
await flush();
// 记录第一段相关的 streamCalls 数量
const firstSegCalls = streamCalls.length;
assert.ok(firstSegCalls > 0, "第一段应已产生流式调用");
// 文本缩短 → 触发回复边界
await ctrl.onPartialReply({ text: "短" });
await flush();
// 旧 controller 应已进入终态
assert.ok(ctrl.isTerminalPhase, "旧 controller 应已进入终态");
// 新 controller 应已创建
assert.ok(newCtrl !== null, "应通过回调创建了新 controller");
// 第一段应有 DONE 分片(终结)
const doneCalls = streamCalls.filter((c) => c.inputState === 10);
assert.ok(doneCalls.length >= 1, "旧 controller 应发送了 DONE 分片终结第一段");
// 验证第一段的 DONE 分片包含第一段内容
const firstDone = doneCalls[0];
assert.ok(firstDone.content.includes("第一段回复内容比较长"), `第一段 DONE 分片应包含 "第一段回复内容比较长", 实际: "${firstDone.content}"`);
// 继续第二段回复增长
await newCtrl!.onPartialReply({ text: "短回复完整" });
await flush();
newCtrl!.markFullyComplete();
await newCtrl!.onIdle();
await flush();
// 验证新 controller 的流式调用包含第二段内容
// 新 controller 的调用在 firstSegCalls 之后(因为 streamCalls 是全局的,但 DONE 会增加一些)
const allContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(allContent.includes("短回复完整"), `应包含第二段 "短回复完整",实际: ${allContent}`);
// 两段内容是独立的流式消息,不应混在一起
const lastCall = streamCalls[streamCalls.length - 1];
assert.ok(!lastCall.content.includes("第一段回复内容比较长"), "第二段最终分片不应包含第一段内容(各自独立)");
});
// ============ 结果 ============
console.log(`\n========================================`);
console.log(` 总计: ${passed + failed} | ✅ 通过: ${passed} | ❌ 失败: ${failed}`);
if (failedTests.length > 0) {
console.log(` 失败用例:`);
for (const t of failedTests) console.log(` - ${t}`);
}
console.log(`========================================\n`);
// 恢复原始 fetch
globalThis.fetch = originalFetch;
process.exit(failed > 0 ? 1 : 0);