Files
qqbot/tests/streaming-controller.test.ts
Mingkuan 2ea3dbf452 Release/1.7.0 (#243)
* feat: support raw msg ref

* fix: mention msg content

* fix: qqmedia tag

* feat: parse msgrefid

* feat: 流式消息能力增强、斜杠指令扩展与媒体标签处理优化

## 流式消息 (streaming)
- 支持流式消息引用:回复时携带引用消息 ID,通过 message_reference 字段传递
- 重构回复边界(reply boundary)机制:检测到模型返回新回复(文本前缀不匹配)时,
  不再终结当前会话并创建新 controller,改为在已有内容后拼接 "\n\n" 分隔符,
  在同一流式会话中继续发送,移除 onReplyBoundary 回调及 gateway 中的重建逻辑
- 移除遇到未闭合媒体标签时提前终结流式会话的逻辑,改为继续正常流式发送安全文本
  部分(performFlush 内置的 stripIncompleteMediaTag 已能保证安全),等待下次
  onPartialReply 带来更多文本后标签自然闭合
- 移除 sendStreamChunk / doStartStreaming 中冗余的 resp.code 错误检查
- streaming 阶段状态转换新增 idle 回退(首分片发送失败时可回退到 idle)
- sendC2CStreamMessage 返回类型从 StreamMessageResponse 统一为 MessageResponse;
  仅终结分片(DONE)触发 sendAndNotify 引用回调,中间分片直接调用 apiRequest
- 移除 types.ts 中不再使用的 StreamMessage / PendingReply 等冗余类型定义
- 补充 streaming-controller 测试用例:覆盖回复边界检测、多段拼接连续发送、
  sentIndex 连续性等场景

## 斜杠指令 (slash-commands)
- 新增 /bot-streaming on|off 指令,支持用户在私聊中查看和切换流式消息开关,
  修改后即时生效并持久化到 openclaw.json 配置文件
- 指令仅在 c2c(私聊)场景下可用,群聊场景下返回提示信息

## API 模块 (api)
- 引入 ApiLogger 接口与 setApiLogger 注入机制,将 api 模块内散落的
  console.log/error 调用统一替换为可注入的 log 实例,便于日志分级和集中管理
- gateway 启动时自动调用 setApiLogger 注入框架 log 实例
- startBackgroundTokenRefresh 中将参数名 log 改为 refreshLog,
  避免与模块级 log 变量冲突

## 网关 (gateway)
- 简化 StreamingController 创建流程:移除 createStreamingController 工厂函数
  及 onReplyBoundary 回调注册,直接内联创建 controller
- dispatch 失败时统一输出错误日志,包含是否收到过响应的信息

## 媒体标签处理 (media-send)
- 修复 splitMessageByMediaTags 中正则贪婪匹配导致的字符异常问题,
  改为非贪婪匹配避免跨标签吞并内容
- 新增代码块感知逻辑(isInsideCodeBlock):在匹配媒体标签前先识别并排除
  围栏代码块(```)区域,避免代码块中的标签被误当作媒体资源处理
- hasMediaTags / findFirstClosedMediaTag / splitByMediaTags 均集成代码块过滤
- findFirstClosedMediaTag 不再对 textBefore 做 trim 和多余换行合并处理
- 新增 code-block-media-tag 测试文件,覆盖代码块内标签忽略、混合内容拆分、
  嵌套代码块、未闭合代码块等边界场景

* ci: 添加插件测试环境流水线流程

* fix: 文件超2G错误信息优化

* fix: 优化文件上传兜底文案

* fix(api): 流式消息不存储引用,移除 DONE 分片的 sendAndNotify 回调

* feat: update ref struct

* fix(ref); 移除引用中暂时无用的赋值。

* feat: 支持自然语言更新版本

* feat(qqbot-upgrade): 声音qqbot-upgrade skill

* feat: 增加/bot-streaming不支持时的默认回复

* refactor: 重构升级脚本降级架构 v4 + 安全扫描绕过 + 内置插件禁用

## 降级架构重构
- 两级降级:Level 1 原生命令 → Level 2 npm pack + openclaw install 本地目录
- Level 1 去掉无效的多源重试(ClawHub 限流时换 npm 源无意义)
- Level 2 多源重试(npm pack 直接走 npm registry,真正绕过 ClawHub)

## 安全扫描绕过(openclaw ≥2026.3.30)
- ≥3.30 跳过 update 路径(update 不支持 --dangerously-force-unsafe-install)
- Level 2 传解压后的目录路径而非 tarball(绕过 installPluginFromArchive 漏传 flag 的 bug)
- 版本检测:仅 ≥3.30 时加 --dangerously-force-unsafe-install

## 内置插件冲突处理
- 默认禁用内置冲突插件(openclaw ≥2026.3.31 内置了 qqbot)
- 安装前清理历史遗留 ID(qqbot/openclaw-qq)的 entries/installs/allow
- 安装后验证内置插件确实已禁用

## 其它优化
- update 超时从 1000s 缩短至 180s(失败更快降级)
- install 前清除配置中的插件记录(避免 already exists)
- 精简告警信息:超时提示静默化,重试中间失败去掉 ⚠️
- 步骤标签对齐 [1/4]~[4/4]

* feat: inject OpenClaw version into User-Agent via runtime

- Add PluginRuntime.version field to openclaw-plugin-sdk.d.ts
- Replace static PLUGIN_USER_AGENT const with getPluginUserAgent() function
  so the OpenClaw version can be updated after runtime injection
- Add setOpenClawVersion() to api.ts for runtime to call
- Call setOpenClawVersion(runtime.version) in setQQBotRuntime()
- Update all PLUGIN_USER_AGENT usages to getPluginUserAgent()
- Fall back to OPENCLAW_VERSION / OPENCLAW_SERVICE_VERSION env vars
  before runtime is available

* fix: remove env-based OpenClaw version fallback

Reading version from process.env is a security risk as env vars
can be tampered. Use runtime.version exclusively; fall back to
"unknown" until runtime is injected.

* fix(upgrade): 切换 gateway.reload.mode=hot 防止安装期间配置写入触发 cgroup kill

问题根因:
- openclaw gateway restart 使用 systemctl --user restart
- systemd 默认 KillMode=control-group,重启时清理整个 cgroup
- 安装流程多次写 openclaw.json,hybrid 模式下触发 restart,脚本被杀

修复:
- snapshot_config 提前到所有写操作前(含 disable_builtin_plugins)
- 安装窗口开始时切换 gateway.reload.mode=hot(热更新不重启)
- 新增 restore_reload_mode():有原值 config set 恢复,无原值 config unset 删除
- cleanup_on_exit 和 [4/4] 前均调用恢复,正常/异常退出均兜底

* fix: 修复windows下路径编码异常导致文件发送错误的问题

* feat: add preferOver to disable built-in qqbot plugin

* feat: 插件升级skill优化

* feat: 更新skill的仓库名

* feat: update 1.7.0 changelog

* Merge branch 'feat/1.7.0-revertskill' into 'main' (merge request !44)

feat: revert qqbot-upgrade skill

---------

Co-authored-by: cxyhhhhh <chenxuyang.win@qq.com>
2026-04-02 16:51:19 +08:00

841 lines
30 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* StreamingController 集成测试
*
* 通过 mock global.fetch 验证流式消息控制器的核心行为,
* 重点覆盖:循环消费模型 (processMediaTags) + pendingNormalizedFull 补救机制。
*
* 运行方式: npx tsx tests/streaming-controller.test.ts
*/
import assert from "node:assert";
// ============ Mock global.fetch ============
/** 记录所有流式 API 调用 */
interface StreamCall {
content: string;
inputState: number; // 1 = GENERATING, 10 = DONE
streamMsgId?: string;
index: number;
url: string;
}
/** 记录所有媒体上传 API 调用 */
interface MediaUploadCall {
url: string;
body: any;
}
let streamCalls: StreamCall[] = [];
let mediaUploadCalls: MediaUploadCall[] = [];
let streamMsgIdCounter = 0;
/** 控制流式 API 的延迟(毫秒)。设为 > 0 模拟 API 慢响应。 */
let apiDelayMs = 0;
/** 控制媒体上传 API 的延迟(毫秒)。 */
let mediaApiDelayMs = 0;
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
function resetMocks() {
streamCalls = [];
mediaUploadCalls = [];
streamMsgIdCounter = 0;
apiDelayMs = 0;
mediaApiDelayMs = 0;
}
// 保存原始 fetch
const originalFetch = globalThis.fetch;
// 覆写 global.fetch
globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise<Response> => {
const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url;
const body = init?.body ? JSON.parse(init.body as string) : {};
// ---- Token 请求 ----
if (url.includes("/getAppAccessToken")) {
return new Response(JSON.stringify({
access_token: "mock-token-12345",
expires_in: "7200",
}), { status: 200, headers: { "Content-Type": "application/json" } });
}
// ---- 流式消息 API ----
if (url.includes("/stream_messages")) {
if (apiDelayMs > 0) await sleep(apiDelayMs);
const call: StreamCall = {
content: body.content_raw ?? "",
inputState: body.input_state ?? 0,
streamMsgId: body.stream_msg_id,
index: body.index ?? 0,
url,
};
streamCalls.push(call);
// 首次调用(无 stream_msg_id→ 返回新的 stream_msg_id
let respBody: any;
if (!body.stream_msg_id) {
streamMsgIdCounter++;
respBody = { id: `stream-${streamMsgIdCounter}`, timestamp: Date.now().toString() };
} else {
respBody = { id: body.stream_msg_id, timestamp: Date.now().toString() };
}
return new Response(JSON.stringify(respBody), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// ---- 富媒体上传 API (v2/users/.../files) ----
if (url.includes("/files")) {
if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs);
mediaUploadCalls.push({ url, body });
return new Response(JSON.stringify({
file_uuid: `uuid-${mediaUploadCalls.length}`,
file_info: "mock",
ttl: 3600,
}), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// ---- 普通消息 API (v2/users/.../messages) ----
if (url.includes("/messages")) {
if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs);
return new Response(JSON.stringify({
id: `msg-resp-${Date.now()}`,
timestamp: Date.now().toString(),
}), {
status: 200,
headers: { "Content-Type": "application/json" },
});
}
// 未匹配的请求,回退到原始 fetch
console.warn(`[mock-fetch] 未匹配的请求: ${url}`);
return originalFetch(input, init);
};
// ---- 现在 import StreamingController它会使用被 mock 的 global.fetch ----
const { StreamingController } = await import("../src/streaming.js");
type StreamingControllerType = InstanceType<typeof StreamingController>;
// ============ 辅助函数 ============
/** 等待异步任务完成 */
async function flush(ms = 100): Promise<void> {
await sleep(ms);
}
/** 收集日志 */
const logs: string[] = [];
function createController(opts?: { mediaContext?: boolean }): StreamingControllerType {
logs.length = 0;
const deps: any = {
account: {
accountId: "test",
enabled: true,
appId: "test-app",
clientSecret: "test-secret",
secretSource: "config" as const,
markdownSupport: true,
config: {
streaming: true,
streamingConfig: { throttleMs: 50 }, // 短节流方便测试
},
},
userId: "user-1",
replyToMsgId: "msg-1",
eventId: "event-1",
logPrefix: "[test]",
log: {
info: (m: string) => logs.push(`[INFO] ${m}`),
error: (m: string) => logs.push(`[ERROR] ${m}`),
warn: (m: string) => logs.push(`[WARN] ${m}`),
debug: (m: string) => logs.push(`[DEBUG] ${m}`),
},
};
if (opts?.mediaContext) {
deps.mediaContext = {
account: deps.account,
event: { type: "c2c", senderId: "user-1", messageId: "msg-1" },
log: deps.log,
};
}
return new StreamingController(deps);
}
// ============ 测试框架 ============
let passed = 0;
let failed = 0;
const failedTests: string[] = [];
async function test(name: string, fn: () => Promise<void>) {
resetMocks();
try {
await fn();
console.log(`${name}`);
passed++;
} catch (e: any) {
console.log(`${name}`);
console.log(` ${e.message}`);
if (e.stack) {
const lines = (e.stack as string).split("\n").slice(1, 4);
for (const l of lines) console.log(` ${l.trim()}`);
}
// 打印关键日志(去掉 DEBUG 减少噪音)
const relevantLogs = logs.filter((l) => !l.includes("[DEBUG]")).slice(-10);
if (relevantLogs.length > 0) {
console.log(` --- 日志 ---`);
for (const l of relevantLogs) console.log(` ${l}`);
}
failed++;
failedTests.push(name);
}
}
// ============ 测试用例 ============
console.log("\n=== 1. 纯文本流式 ===");
await test("纯文本: 基本流式 → 完成", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "你好" });
await flush();
await ctrl.onPartialReply({ text: "你好世界" });
await flush();
ctrl.markFullyComplete();
await ctrl.onIdle();
// 应该有流式分片发送
assert.ok(streamCalls.length >= 2, `应至少有 2 次流式调用,实际 ${streamCalls.length}`);
// 最后一次应该是 DONE (inputState=10)
const last = streamCalls[streamCalls.length - 1];
assert.strictEqual(last.inputState, 10, "最后一次应为 DONE");
assert.ok(last.content.includes("你好世界"), `最终文本应包含完整内容,实际: "${last.content}"`);
// 不应有媒体上传
assert.strictEqual(mediaUploadCalls.length, 0, "不应有媒体上传");
});
await test("纯文本: 空文本被忽略", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "" });
await ctrl.onPartialReply({ text: undefined });
await flush();
assert.strictEqual(streamCalls.length, 0, "不应有流式调用");
});
await test("纯文本: 全空白不启动流式,后续非空白一起发送", async () => {
const ctrl = createController();
// 先来一段全空白内容 — 不应启动流式
await ctrl.onPartialReply({ text: "\n\n " });
await flush();
assert.strictEqual(streamCalls.length, 0, "全空白阶段不应有流式调用");
// 后续有非空白内容到达 — 应启动流式,且包含之前的空白
await ctrl.onPartialReply({ text: "\n\n hello world" });
await flush(200);
assert.ok(streamCalls.length >= 1, `应有至少 1 次流式调用,实际 ${streamCalls.length}`);
// 首次发送的内容应包含之前保留的空白 + 新内容
const firstContent = streamCalls[0].content;
assert.ok(firstContent.includes("hello world"), `首次发送应包含 "hello world",实际: "${firstContent}"`);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(100);
});
await test("纯文本: 全空白 + 媒体标签,空白不发送,媒体正常处理", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 全空白前缀 + 媒体标签一起到达
await ctrl.onPartialReply({ text: "\n\n<qqimg>/tmp/pic.jpg</qqimg>描述文字" });
await flush(400);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:日志中检测到了媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 1, `应检测到 qqimg 标签,实际 ${foundLogs.length}`);
// 验证:不应有 PREFIX MISMATCH 错误
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`);
// 验证:如果有流式分片发送,内容不应是纯空白
const generatingCalls = streamCalls.filter((c) => c.inputState === 1);
for (const call of generatingCalls) {
assert.ok(call.content.trim().length > 0, `流式分片不应为纯空白: "${call.content}"`);
}
});
await test("纯文本: 空白→媒体→空白→媒体→空白,只有媒体发出,空白全忽略", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 逐步送入:空白 → 媒体标签1 → 空白 → 媒体标签2 → 空白
const fullText =
"\n \n" +
"<qqimg>/tmp/pic1.jpg</qqimg>" +
"\n\n \n" +
"<qqimg>/tmp/pic2.jpg</qqimg>" +
" \n\n";
// 模拟流式分段到达
// 阶段 1纯空白
await ctrl.onPartialReply({ text: "\n \n" });
await flush(200);
assert.strictEqual(streamCalls.length, 0, "纯空白阶段不应有流式调用");
// 阶段 2空白 + 第一个媒体标签
await ctrl.onPartialReply({ text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>" });
await flush(400);
// 阶段 3继续加空白
await ctrl.onPartialReply({
text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>\n\n \n",
});
await flush(200);
// 阶段 4第二个媒体标签
await ctrl.onPartialReply({
text: "\n \n<qqimg>/tmp/pic1.jpg</qqimg>\n\n \n<qqimg>/tmp/pic2.jpg</qqimg>",
});
await flush(400);
// 阶段 5末尾空白完成
await ctrl.onPartialReply({ text: fullText });
await flush(200);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// 验证 1应检测到 2 个媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应检测到至少 2 个 qqimg 标签,实际 ${foundLogs.length}`);
// 验证 2应有 2 次媒体发送尝试sendPhoto 可能因文件不存在失败,但 sending 日志应存在)
const sendingLogs = logs.filter((l) => l.includes("sending image"));
assert.ok(sendingLogs.length >= 2, `应有至少 2 次发送图片尝试,实际 ${sendingLogs.length}`);
// 验证 3不应有 PREFIX MISMATCH 错误
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(
prefixMismatchLogs.length,
0,
`不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`,
);
// 验证 4流式分片GENERATING 状态)中不应出现纯空白内容
const generatingCallsInner = streamCalls.filter((c) => c.inputState === 1);
for (const call of generatingCallsInner) {
assert.ok(
call.content.trim().length > 0,
`流式分片不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`,
);
}
// 验证 5如果有流式启动首次调用无 stream_msg_id首次内容也不应纯空白
const startCalls = streamCalls.filter((c) => !c.streamMsgId);
for (const call of startCalls) {
assert.ok(
call.content.trim().length > 0,
`流式启动内容不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`,
);
}
});
console.log("\n=== 2. 单个媒体标签 ===");
await test("媒体标签: 多媒体后跟文本onIdle 终结不 PREFIX MISMATCH", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 模拟实际场景:两个语音标签 + 后续文本描述,逐步到达
// 阶段1第一个语音标签
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>",
});
await flush(400);
// 阶段2两个语音标签
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>\n<qqvoice>/tmp/voice2.mp3</qqvoice>",
});
await flush(400);
// 阶段3两个语音标签 + 后续文本
await ctrl.onPartialReply({
text: "<qqvoice>/tmp/voice1.mp3</qqvoice>\n<qqvoice>/tmp/voice2.mp3</qqvoice>\n两条语音都发给你啦",
});
await flush(400);
// 完成
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// 验证:应检测到 2 个媒体标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应检测到至少 2 个标签,实际 ${foundLogs.length}`);
// 核心验证:不应有 PREFIX MISMATCH
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(
prefixMismatchLogs.length,
0,
`不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`,
);
// 验证:最终流式文本(如果有)应包含后续描述文本
const doneCalls = streamCalls.filter((c) => c.inputState === 10);
if (doneCalls.length > 0) {
const lastDone = doneCalls[doneCalls.length - 1];
assert.ok(
lastDone.content.includes("两条语音都发给你啦"),
`终结分片应包含后续文本,实际: "${lastDone.content.slice(0, 80)}"`,
);
// 终结文本不应包含原始媒体标签
assert.ok(
!lastDone.content.includes("<qqvoice>"),
`终结文本不应包含 <qqvoice> 标签,实际: "${lastDone.content.slice(0, 80)}"`,
);
}
});
await test("媒体标签: 文本 + 图片 + 后续文本", async () => {
const ctrl = createController({ mediaContext: true });
// 一次性送入完整的含图片标签文本
await ctrl.onPartialReply({ text: "看图:<qqimg>/tmp/cat.jpg</qqimg>" });
await flush(300);
// 后续文本到达
await ctrl.onPartialReply({ text: "看图:<qqimg>/tmp/cat.jpg</qqimg>\n\n好看吧" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
// 验证:应有媒体上传调用(图片需要先上传再发送)
// 或者至少流式中出现过图片相关处理
// 关键验证:最终流式文本不应包含原始 <qqimg> 标签
const lastStream = streamCalls[streamCalls.length - 1];
assert.ok(!lastStream.content.includes("<qqimg>"), "最终文本不应包含 <qqimg> 标签");
});
await test("媒体标签: 纯媒体开头(无前置文本)", async () => {
const ctrl = createController({ mediaContext: true });
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/hello.mp3</qqvoice>" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:应该至少有流式会话创建或媒体处理的记录
const infoLogs = logs.filter((l) => l.includes("processMediaTags") && l.includes("found"));
assert.ok(infoLogs.length >= 1, `应检测到 qqvoice 标签。相关日志: ${infoLogs.join("; ") || "无"}`);
});
console.log("\n=== 3. 多个媒体标签(循环消费) ===");
await test("多媒体: 两个图片标签被逐个处理", async () => {
const ctrl = createController({ mediaContext: true });
const text = "图1<qqimg>/tmp/a.jpg</qqimg>\n图2<qqimg>/tmp/b.jpg</qqimg>\n完毕";
await ctrl.onPartialReply({ text });
await flush(600);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:日志中应显示找到了两个标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 2, `应找到至少 2 个标签,实际 ${foundLogs.length} 条 found 日志`);
});
console.log("\n=== 4. 未闭合标签等待 ===");
await test("未闭合标签: 逐步到达后完整处理", async () => {
const ctrl = createController({ mediaContext: true });
// 不完整的标签
await ctrl.onPartialReply({ text: "开始<qqimg>/tmp/pic" });
await flush(200);
// 验证:此时流式文本应该只包含 "开始",不含标签部分
const midCalls = [...streamCalls];
for (const call of midCalls) {
assert.ok(!call.content.includes("<qqimg>"), `中间态不应包含未闭合标签,内容: "${call.content}"`);
}
// 标签完整了
await ctrl.onPartialReply({ text: "开始<qqimg>/tmp/pic.jpg</qqimg>\n看看" });
await flush(300);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:应检测到完整标签
const foundLogs = logs.filter((l) => l.includes("processMediaTags: found"));
assert.ok(foundLogs.length >= 1, `标签完整后应被检测到found 日志: ${foundLogs.length}`);
});
console.log("\n=== 5. ★ pendingNormalizedFull 补救机制 ===");
await test("补救: 媒体处理期间最后一次 onPartialReply 不丢失", async () => {
const ctrl = createController({ mediaContext: true });
// 设置 API 有延迟,模拟 processMediaTags 执行耗时
apiDelayMs = 50;
mediaApiDelayMs = 80;
// 第1次: 含媒体标签 → 进入 processMediaTagsmediaInterruptInProgress=true
const p1 = ctrl.onPartialReply({ text: "hi<qqimg>/tmp/x.jpg</qqimg>" });
// 等一小段确保 processMediaTags 已经开始
await sleep(20);
// 第2次: 这是"最后一次" onPartialReply —— 带有新的后续文本
// 因为 mediaInterruptInProgress=true会被保存到 pendingNormalizedFull
const p2 = ctrl.onPartialReply({ text: "hi<qqimg>/tmp/x.jpg</qqimg>\n\n再见朋友" });
// 等待所有处理完成
await p1;
await p2;
await flush(800);
// 标记完成
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// ★ 核心验证:最终发送的文本应包含 "再见朋友"
const allStreamContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(
allStreamContent.includes("再见朋友"),
`"再见朋友" 应出现在流式发送中pendingNormalizedFull 补救)。\n实际流式内容: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]`
);
});
await test("补救: 多次被跳过只保留最新,最终处理最新文本", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 30;
mediaApiDelayMs = 150; // 媒体处理很慢
// 第1次: 含媒体 → 进入长时间处理
const p1 = ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>" });
await sleep(20);
// 第2次: 被跳过 → pendingNormalizedFull = "..v1.."
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>\n后续文字v1" });
await sleep(10);
// 第3次: 被跳过 → pendingNormalizedFull 被覆盖为 "..v2.."
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/song.mp3</qqvoice>\n后续文字v2最终版" });
await p1;
await flush(800);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(300);
// ★ 核心验证:最终应包含 v2 的文本
const allStreamContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(
allStreamContent.includes("后续文字v2最终版"),
`应包含最新的 "后续文字v2最终版"。\n实际: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]`
);
});
await test("补救: 无 pending 时不触发多余的 re-run", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 5;
mediaApiDelayMs = 5;
// 只有一次 onPartialReply媒体标签后接 \n 开头的文本)
await ctrl.onPartialReply({ text: "hello<qqimg>/tmp/a.jpg</qqimg>\nbye" });
await flush(400);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush(200);
// 验证:不应有 re-running 日志(因为没有被跳过的调用)
const reRunLogs = logs.filter((l) => l.includes("re-running"));
assert.strictEqual(reRunLogs.length, 0, `不应有 re-running 日志,实际: ${reRunLogs.length}`);
// ★ 验证:不应有 PREFIX MISMATCH 错误(之前 stripMediaTags 的 .trim() 会导致此问题)
const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH"));
assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH实际: ${prefixMismatchLogs.join("; ")}`);
});
console.log("\n=== 6. onIdle 边界 ===");
await test("onIdle: 等待媒体处理完成后再终结", async () => {
const ctrl = createController({ mediaContext: true });
apiDelayMs = 20;
mediaApiDelayMs = 200; // 媒体发送很慢
// 发送含媒体的文本
const p = ctrl.onPartialReply({ text: "<qqimg>/tmp/slow.jpg</qqimg>\n完成" });
await sleep(30);
// 在媒体还在处理时就标记完成并触发 onIdle
ctrl.markFullyComplete();
const idlePromise = ctrl.onIdle();
await p;
await idlePromise;
await flush(500);
// 验证:应该正常完成,不降级
assert.ok(!ctrl.shouldFallbackToStatic, "不应降级到静态发送");
});
await test("onDeliver: deliver 先到达 → 禁用流式走降级", async () => {
const ctrl = createController({ mediaContext: true });
// deliver 先到达(此时 sentStreamChunkCount === 0→ 直接 transition 到 aborted
await ctrl.onDeliver({ text: "成果:<qqimg>/tmp/result.png</qqimg>" });
await flush(400);
// 验证应该已经进入终态aborted走降级路径
assert.ok(ctrl.isTerminalPhase, "deliver 先到达后应进入终态");
assert.ok(ctrl.shouldFallbackToStatic, "deliver 先到达时应降级到静态发送");
// 后续 onPartialReply 应被跳过(因为已是终态)
await ctrl.onPartialReply({ text: "这段应该被忽略" });
assert.ok(ctrl.shouldFallbackToStatic, "onPartialReply 应被跳过,仍然是降级状态");
});
await test("互斥: onPartialReply 先到 → onDeliver 被忽略(即使在媒体中断期间)", async () => {
const ctrl = createController({ mediaContext: true });
// onPartialReply 先到 → 锁定为 partial 模式
await ctrl.onPartialReply({ text: "<qqvoice>/tmp/a.mp3</qqvoice>" });
await flush(400);
// 此时可能还在 mediaInterruptInProgresssentStreamChunkCount 可能为 0
// 但 onDeliver 应被忽略(因为 partial 先到)
await ctrl.onDeliver({ text: "<qqvoice>/tmp/a.mp3</qqvoice>" });
await flush(400);
// 验证不应降级deliver 没有生效)
assert.ok(!ctrl.shouldFallbackToStatic, "partial 先到时 deliver 不应导致降级");
assert.ok(!ctrl.isTerminalPhase || ctrl.currentPhase !== "aborted",
"不应因为 deliver 进入 abortedonPartialReply 在处理中)");
// 日志中应有 deliver 被拒绝的字样
const skipLogs = logs.filter((l) => l.includes('rejected "deliver"'));
assert.ok(skipLogs.length >= 1, `应有 deliver 被拒绝的日志,实际: ${skipLogs.join("; ") || "无"}`);
});
console.log("\n=== 7. 降级与异常 ===");
await test("降级: 从未发送分片 → fallback", async () => {
const ctrl = createController();
// 不发送任何文本就直接结束
ctrl.markFullyComplete();
await ctrl.onIdle();
assert.ok(ctrl.isTerminalPhase, "应进入终态");
});
await test("异常: onError 后正常终态", async () => {
const ctrl = createController();
await ctrl.onPartialReply({ text: "部分文本" });
await flush(200);
await ctrl.onError(new Error("test error"));
assert.ok(ctrl.isTerminalPhase, "onError 后应进入终态");
});
console.log("\n=== 8. 回复边界检测(拼接模式) ===");
await test("回复边界: 新回复拼接到旧内容后面,用两个换行分隔", async () => {
const ctrl = createController();
// 第一段回复
await ctrl.onPartialReply({ text: "第一段回复" });
await flush();
// 验证初始状态
const c = ctrl as any;
assert.strictEqual(c.lastRawFull, "第一段回复", `lastRawFull 应为 "第一段回复",实际: "${c.lastRawFull}"`);
assert.strictEqual(c._boundaryPrefix, null, `初始 _boundaryPrefix 应为 null`);
// 文本缩短(新回复来了)→ 触发边界检测 → 拼接
await ctrl.onPartialReply({ text: "新" });
await flush();
// 验证边界拼接后的状态
assert.strictEqual(c._boundaryPrefix, "第一段回复\n\n", `_boundaryPrefix 应为 "第一段回复\\n\\n",实际: "${c._boundaryPrefix}"`);
assert.strictEqual(c.lastRawFull, "第一段回复\n\n新", `lastRawFull 应为 "第一段回复\\n\\n新",实际: "${c.lastRawFull}"`);
assert.ok(c.lastNormalizedFull.includes("第一段回复"), `lastNormalizedFull 应包含第一段内容`);
assert.ok(c.lastNormalizedFull.includes("新"), `lastNormalizedFull 应包含新回复内容`);
// 新回复继续增长 → 不应再次触发边界
await ctrl.onPartialReply({ text: "新回复增长了" });
await flush();
assert.strictEqual(c._boundaryPrefix, "第一段回复\n\n", `增长后 _boundaryPrefix 不应变化,实际: "${c._boundaryPrefix}"`);
assert.strictEqual(c.lastRawFull, "第一段回复\n\n新回复增长了", `lastRawFull 应为 "第一段回复\\n\\n新回复增长了",实际: "${c.lastRawFull}"`);
// 继续增长
await ctrl.onPartialReply({ text: "新回复增长了更多内容" });
await flush();
assert.strictEqual(c.lastRawFull, "第一段回复\n\n新回复增长了更多内容", `lastRawFull 应为拼接后的完整文本,实际: "${c.lastRawFull}"`);
// 验证日志中只触发了一次边界检测
const boundaryLogs = logs.filter((l) => l.includes("reply boundary detected"));
assert.strictEqual(boundaryLogs.length, 1, `应只触发 1 次边界检测,实际: ${boundaryLogs.length}`);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush();
// 验证最终发送的内容同时包含两段
const allContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(allContent.includes("第一段回复"), `最终发送应包含 "第一段回复",实际: ${allContent}`);
assert.ok(allContent.includes("新回复增长了更多内容"), `最终发送应包含 "新回复增长了更多内容",实际: ${allContent}`);
// 旧 controller 不应进入 aborted应正常 completed
assert.ok(ctrl.isTerminalPhase, "应进入终态");
assert.ok(!ctrl.shouldFallbackToStatic, "不应降级");
});
await test("回复边界: 多次边界拼接(三段回复)", async () => {
const ctrl = createController();
// 第一段
await ctrl.onPartialReply({ text: "AAA" });
await flush();
const c = ctrl as any;
assert.strictEqual(c.lastRawFull, "AAA");
assert.strictEqual(c._boundaryPrefix, null);
// 第二段(第一次边界)
await ctrl.onPartialReply({ text: "BBB" });
await flush();
assert.strictEqual(c._boundaryPrefix, "AAA\n\n", `第一次边界后 _boundaryPrefix 应为 "AAA\\n\\n",实际: "${c._boundaryPrefix}"`);
assert.strictEqual(c.lastRawFull, "AAA\n\nBBB", `第一次边界后 lastRawFull 应为 "AAA\\n\\nBBB",实际: "${c.lastRawFull}"`);
// 第二段继续增长
await ctrl.onPartialReply({ text: "BBB完整" });
await flush();
assert.strictEqual(c.lastRawFull, "AAA\n\nBBB完整", `增长后 lastRawFull 应为 "AAA\\n\\nBBB完整",实际: "${c.lastRawFull}"`);
// 第三段(第二次边界)
await ctrl.onPartialReply({ text: "CCC" });
await flush();
assert.strictEqual(c._boundaryPrefix, "AAA\n\nBBB完整\n\n", `第二次边界后 _boundaryPrefix 应为 "AAA\\n\\nBBB完整\\n\\n",实际: "${c._boundaryPrefix}"`);
assert.strictEqual(c.lastRawFull, "AAA\n\nBBB完整\n\nCCC", `第二次边界后 lastRawFull 应为 "AAA\\n\\nBBB完整\\n\\nCCC",实际: "${c.lastRawFull}"`);
// 第三段继续增长
await ctrl.onPartialReply({ text: "CCC结束" });
await flush();
assert.strictEqual(c.lastRawFull, "AAA\n\nBBB完整\n\nCCC结束");
// 验证总共触发了 2 次边界检测
const boundaryLogs = logs.filter((l) => l.includes("reply boundary detected"));
assert.strictEqual(boundaryLogs.length, 2, `应触发 2 次边界检测,实际: ${boundaryLogs.length}`);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush();
// 验证最终发送的内容包含三段
const allContent = streamCalls.map((c) => c.content).join(" || ");
assert.ok(allContent.includes("AAA"), `应包含 "AAA"`);
assert.ok(allContent.includes("BBB完整"), `应包含 "BBB完整"`);
assert.ok(allContent.includes("CCC结束"), `应包含 "CCC结束"`);
});
await test("回复边界: 边界后 sentIndex 不受影响,内容连续发送", async () => {
const ctrl = createController();
// 第一段回复足够长,确保流式已启动
await ctrl.onPartialReply({ text: "第一段比较长的回复内容" });
await flush(200);
const c = ctrl as any;
const sentIndexBefore = c.sentIndex;
// 触发边界
await ctrl.onPartialReply({ text: "第二段" });
await flush(200);
// sentIndex 不应被重置(因为是在同一个流式会话中继续)
assert.ok(c.sentIndex >= sentIndexBefore, `边界后 sentIndex (${c.sentIndex}) 应 >= 边界前 (${sentIndexBefore})`);
// 验证流式会话没有中断streamMsgId 应不变)
// 边界不会导致流式会话终结和重建
assert.ok(c.streamMsgId !== null, `边界后 streamMsgId 应仍存在,实际: ${c.streamMsgId}`);
ctrl.markFullyComplete();
await ctrl.onIdle();
await flush();
});
// ============ 结果 ============
console.log(`\n========================================`);
console.log(` 总计: ${passed + failed} | ✅ 通过: ${passed} | ❌ 失败: ${failed}`);
if (failedTests.length > 0) {
console.log(` 失败用例:`);
for (const t of failedTests) console.log(` - ${t}`);
}
console.log(`========================================\n`);
// 恢复原始 fetch
globalThis.fetch = originalFetch;
process.exit(failed > 0 ? 1 : 0);