diff --git a/CHANGELOG.md b/CHANGELOG.md index 9496633..402349b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,27 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/). +## [1.6.6] - 2026-03-25 + +### Added + +- **Streaming messages (C2C)**: New `StreamingController` delivers AI responses as real-time typing-effect chunks in private chat. Includes throttle control (default 500ms, min 300ms), automatic media-tag pause/resume, long-gap batch window, state-machine lifecycle (`idle → streaming → completed/aborted`), and graceful fallback to static mode when the streaming API is unavailable. +- **Stream message API `sendC2CStreamMessage`**: Low-level wrapper for QQ Open Platform `/v2/users/{openid}/stream_messages` endpoint, with `replace` input mode, incremental `msg_seq`/`index`, and `GENERATING`/`DONE` state signaling. +- **`ApiError` structured error class**: API request errors now carry `status` (HTTP code) and `path`, enabling callers (e.g. streaming controller) to branch on status for retry vs. fallback decisions. +- **Media send queue module `media-send.ts`**: Extracted media-tag parsing, path-encoding fix, and send-queue execution into a shared utility used by both `outbound.ts` (static mode) and `streaming.ts` (streaming mode), eliminating ~100 lines of duplication. +- **Streaming configuration**: New `streaming` (boolean, default `true`) and `streamingConfig.throttleMs` options in account config for per-account streaming control. +- **Unit tests**: Added `strip-incomplete-media-tag.test.ts` and `streaming-controller.test.ts`. + +### Changed + +- **Outbound media handling refactored**: `sendText` in `outbound.ts` now delegates media-tag parsing and queue execution to the shared `media-send.ts` module instead of inline regex + switch logic. +- **Audio convert log level**: Downgraded `console.log` → `console.debug` for SILK detection, ffmpeg conversion, and WASM fallback logs in `audio-convert.ts`, reducing noise in production. +- **Gateway streaming integration**: `gateway.ts` creates a `StreamingController` per inbound message when streaming is enabled; registers `onPartialReply` callback to feed incremental text into the controller; finalizes or aborts the stream after dispatch completes. + +### Removed + +- **`user-messages.ts`**: Deleted the already-emptied module (design: plugin layer does not generate user-facing error text). + ## [1.6.5] - 2026-03-24 ### OpenClaw 3.23 Compatibility diff --git a/CHANGELOG.zh.md b/CHANGELOG.zh.md index 1803e38..4be133b 100644 --- a/CHANGELOG.zh.md +++ b/CHANGELOG.zh.md @@ -4,6 +4,27 @@ 格式参考 [Keep a Changelog](https://keepachangelog.com/)。 +## [1.6.6] - 2026-03-25 + +### 新增 + +- **流式消息(C2C 私聊)**:新增 `StreamingController` 流式控制器,AI 回复以打字机效果实时逐步推送到 QQ 私聊。支持节流控制(默认 500ms,最小 300ms)、媒体标签自动暂停/恢复流式会话、长间隔批处理窗口、状态机生命周期管理(`idle → streaming → completed/aborted`),流式 API 不可用时自动降级为静态消息模式。 +- **流式消息 API `sendC2CStreamMessage`**:封装 QQ 开放平台 `/v2/users/{openid}/stream_messages` 接口,支持 `replace` 输入模式、递增 `msg_seq`/`index` 序号、`GENERATING`/`DONE` 状态信令。 +- **`ApiError` 结构化错误类**:API 请求错误现在携带 `status`(HTTP 状态码)和 `path`,使调用方(如流式控制器)可根据状态码决定重试或降级策略。 +- **媒体发送队列模块 `media-send.ts`**:将媒体标签解析、路径编码修复、发送队列执行器抽取为公共工具模块,供 `outbound.ts`(静态模式)和 `streaming.ts`(流式模式)共用,消除约 100 行重复代码。 +- **流式消息配置项**:账户配置新增 `streaming`(布尔值,默认 `true`)和 `streamingConfig.throttleMs` 选项,支持按账户控制流式消息开关和节流间隔。 +- **单元测试**:新增 `strip-incomplete-media-tag.test.ts` 和 `streaming-controller.test.ts`。 + +### 变更 + +- **出站媒体处理重构**:`outbound.ts` 中 `sendText` 的媒体标签解析和发送队列逻辑重构为调用公共 `media-send.ts` 模块,替代原有的内联正则 + switch 分支。 +- **音频转换日志降级**:`audio-convert.ts` 中 SILK 检测、ffmpeg 转换、WASM 降级等日志从 `console.log` 降为 `console.debug`,减少生产环境日志噪音。 +- **Gateway 流式集成**:`gateway.ts` 在流式启用时为每条入站消息创建 `StreamingController`;注册 `onPartialReply` 回调将增量文本馈入控制器;dispatch 完成后终结或中止流式会话。 + +### 移除 + +- **`user-messages.ts`**:删除已清空的模块(设计原则:插件层不生成面向用户的错误提示)。 + ## [1.6.5] - 2026-03-24 ### OpenClaw 3.23 兼容适配 diff --git a/package.json b/package.json index aa575ca..5f24dd5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@tencent-connect/openclaw-qqbot", - "version": "1.6.5", + "version": "1.6.6", "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/scripts/upgrade-via-npm.sh b/scripts/upgrade-via-npm.sh index 9ffb65a..2ff3383 100755 --- a/scripts/upgrade-via-npm.sh +++ b/scripts/upgrade-via-npm.sh @@ -24,6 +24,7 @@ INSTALL_SRC="" TARGET_VERSION="" APPID="" SECRET="" +STREAMING="" NO_RESTART=false SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" PROJECT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" @@ -50,6 +51,7 @@ print_usage() { echo "" echo " --appid QQ机器人 appid(首次安装时必填)" echo " --secret QQ机器人 secret(首次安装时必填)" + echo " --streaming 是否启用流式消息(默认: no,仅 C2C 私聊)" echo "" echo "也可以通过环境变量设置:" echo " QQBOT_APPID QQ机器人 appid" @@ -87,6 +89,11 @@ while [[ $# -gt 0 ]]; do SECRET="$2" shift 2 ;; + --streaming) + [ -z "$2" ] && echo "❌ --streaming 需要参数" && exit 1 + STREAMING="$2" + shift 2 + ;; --no-restart) NO_RESTART=true shift 1 @@ -449,6 +456,34 @@ elif [ -n "$APPID" ] || [ -n "$SECRET" ]; then echo "⚠️ --appid 和 --secret 必须同时提供" fi +# [配置] streaming(流式消息) +if [ -n "$STREAMING" ]; then + echo "" + echo "[配置] 写入 streaming(流式消息)配置..." + STREAMING_VALUE="" + if [ "$STREAMING" = "yes" ] || [ "$STREAMING" = "y" ] || [ "$STREAMING" = "true" ]; then + STREAMING_VALUE="true" + else + STREAMING_VALUE="false" + fi + + CONFIG_FILE="$HOME/.$CMD/$CMD.json" + if [ -f "$CONFIG_FILE" ] && node -e " + const fs = require('fs'); + const cfg = JSON.parse(fs.readFileSync('$CONFIG_FILE', 'utf8')); + if (!cfg.channels) cfg.channels = {}; + if (!cfg.channels.qqbot) cfg.channels.qqbot = {}; + const target = $STREAMING_VALUE; + if (cfg.channels.qqbot.streaming === target) process.exit(0); + cfg.channels.qqbot.streaming = target; + fs.writeFileSync('$CONFIG_FILE', JSON.stringify(cfg, null, 4) + '\n'); + " 2>&1; then + echo " ✅ streaming 配置写入成功 (streaming=$STREAMING_VALUE)" + else + echo " ⚠️ streaming 配置写入失败,不影响后续运行" + fi +fi + # [4/4] 重启 gateway 使新版本生效 echo "" diff --git a/scripts/upgrade-via-source.sh b/scripts/upgrade-via-source.sh index 2ee473e..88f841f 100755 --- a/scripts/upgrade-via-source.sh +++ b/scripts/upgrade-via-source.sh @@ -25,6 +25,7 @@ cd "$PROJ_DIR" APPID="" SECRET="" MARKDOWN="" +STREAMING="" while [[ $# -gt 0 ]]; do case $1 in @@ -40,6 +41,10 @@ while [[ $# -gt 0 ]]; do MARKDOWN="$2" shift 2 ;; + --streaming) + STREAMING="$2" + shift 2 + ;; -h|--help) echo "用法: $0 [选项]" echo "" @@ -47,6 +52,7 @@ while [[ $# -gt 0 ]]; do echo " --appid QQ机器人 appid" echo " --secret QQ机器人 secret" echo " --markdown 是否启用 markdown 消息格式(默认: no)" + echo " --streaming 是否启用流式消息(默认: no,仅 C2C 私聊)" echo " -h, --help 显示帮助信息" echo "" echo "也可以通过环境变量设置:" @@ -54,6 +60,7 @@ while [[ $# -gt 0 ]]; do echo " QQBOT_SECRET QQ机器人 secret" echo " QQBOT_TOKEN QQ机器人 token (appid:secret)" echo " QQBOT_MARKDOWN 是否启用 markdown(yes/no)" + echo " QQBOT_STREAMING 是否启用流式消息(yes/no)" echo "" echo "不带参数时,将使用已有配置直接启动。" echo "" @@ -72,6 +79,7 @@ done APPID="${APPID:-$QQBOT_APPID}" SECRET="${SECRET:-$QQBOT_SECRET}" MARKDOWN="${MARKDOWN:-$QQBOT_MARKDOWN}" +STREAMING="${STREAMING:-$QQBOT_STREAMING}" echo "=========================================" echo " qqbot 一键更新启动脚本" @@ -131,6 +139,29 @@ if [ -z "$SAVED_QQBOT_TOKEN" ] && [ -d "$HOME/.openclaw" ]; then fi fi +# 备份 streaming 配置(升级后恢复) +SAVED_STREAMING="" +for _app in openclaw clawdbot moltbot; do + _cfg="$HOME/.$_app/$_app.json" + if [ -f "$_cfg" ]; then + SAVED_STREAMING=$(node -e " + const cfg = JSON.parse(require('fs').readFileSync('$_cfg', 'utf8')); + const keys = ['qqbot', 'openclaw-qqbot', 'openclaw-qq']; + for (const key of keys) { + const ch = cfg.channels && cfg.channels[key]; + if (ch && typeof ch.streaming === 'boolean') { + process.stdout.write(String(ch.streaming)); + process.exit(0); + } + } + " 2>/dev/null || true) + [ -n "$SAVED_STREAMING" ] && break + fi +done +if [ -n "$SAVED_STREAMING" ]; then + echo "已备份 streaming 配置: $SAVED_STREAMING" +fi + # 2. 移除老版本 echo "" echo "[2/6] 移除老版本..." @@ -635,6 +666,69 @@ else echo "未指定 markdown 选项,使用已有配置" fi +# 5.5. 配置 streaming 选项 +echo "" +echo "[5.5/6] 配置 streaming(流式消息)选项..." + +# 确定目标 streaming 值:命令行参数 > 备份值 +STREAMING_VALUE="" +if [ -n "$STREAMING" ]; then + if [ "$STREAMING" = "yes" ] || [ "$STREAMING" = "y" ] || [ "$STREAMING" = "true" ]; then + STREAMING_VALUE="true" + echo "启用流式消息..." + else + STREAMING_VALUE="false" + echo "禁用流式消息..." + fi +elif [ -n "$SAVED_STREAMING" ]; then + STREAMING_VALUE="$SAVED_STREAMING" + echo "从备份恢复 streaming 配置: $SAVED_STREAMING" +fi + +if [ -n "$STREAMING_VALUE" ]; then + CURRENT_STREAMING_VALUE=$(node -e " + const fs = require('fs'); + const path = require('path'); + const home = process.env.HOME; + for (const app of ['openclaw', 'clawdbot', 'moltbot']) { + const f = path.join(home, '.' + app, app + '.json'); + if (!fs.existsSync(f)) continue; + try { + const cfg = JSON.parse(fs.readFileSync(f, 'utf8')); + const keys = ['qqbot', 'openclaw-qqbot', 'openclaw-qq']; + for (const key of keys) { + const ch = cfg.channels && cfg.channels[key]; + if (!ch) continue; + if (typeof ch.streaming === 'boolean') { process.stdout.write(String(ch.streaming)); process.exit(0); } + } + } catch {} + } + " 2>/dev/null || true) + + if [ "$CURRENT_STREAMING_VALUE" = "$STREAMING_VALUE" ]; then + echo "✅ streaming 配置已是目标值,跳过写入" + else + OPENCLAW_CONFIG="$HOME/.openclaw/openclaw.json" + if [ -f "$OPENCLAW_CONFIG" ] && node -e " + const fs = require('fs'); + const cfg = JSON.parse(fs.readFileSync('$OPENCLAW_CONFIG', 'utf-8')); + if (!cfg.channels) cfg.channels = {}; + if (!cfg.channels.qqbot) cfg.channels.qqbot = {}; + const target = $STREAMING_VALUE; + if (cfg.channels.qqbot.streaming === target) process.exit(0); + cfg.channels.qqbot.streaming = target; + fs.writeFileSync('$OPENCLAW_CONFIG', JSON.stringify(cfg, null, 4) + '\n'); + " 2>&1; then + echo "✅ streaming 配置成功" + _config_changed=1 + else + echo "⚠️ streaming 配置设置失败,不影响后续运行" + fi + fi +else + echo "未指定 streaming 选项且无备份值,使用默认配置" +fi + # 6. 启动 openclaw echo "" echo "[6/6] 启动 openclaw..." diff --git a/src/api.ts b/src/api.ts index aeac1f7..5a679d7 100644 --- a/src/api.ts +++ b/src/api.ts @@ -7,6 +7,20 @@ import os from "node:os"; import { computeFileHash, getCachedFileInfo, setCachedFileInfo } from "./utils/upload-cache.js"; import { sanitizeFileName } from "./utils/platform.js"; +// ============ 自定义错误 ============ + +/** API 请求错误,携带 HTTP status code */ +export class ApiError extends Error { + constructor( + message: string, + public readonly status: number, + public readonly path: string, + ) { + super(message); + this.name = "ApiError"; + } +} + const API_BASE = "https://api.sgroup.qq.com"; const TOKEN_URL = "https://bots.qq.com/app/getAppAccessToken"; @@ -302,15 +316,15 @@ export async function apiRequest( : res.status === 429 ? "请求过于频繁,已被限流" : `开放平台返回 HTTP ${res.status}`; - throw new Error(`${statusHint}(${path}),请稍后重试`); + throw new ApiError(`${statusHint}(${path}),请稍后重试`, res.status, path); } // JSON 错误响应 try { const error = JSON.parse(rawBody) as { message?: string; code?: number }; - throw new Error(`API Error [${path}]: ${error.message ?? rawBody}`); + throw new ApiError(`API Error [${path}]: ${error.message ?? rawBody}`, res.status, path); } catch (parseErr) { - if (parseErr instanceof Error && parseErr.message.startsWith("API Error")) throw parseErr; - throw new Error(`API Error [${path}] HTTP ${res.status}: ${rawBody.slice(0, 200)}`); + if (parseErr instanceof ApiError) throw parseErr; + throw new ApiError(`API Error [${path}] HTTP ${res.status}: ${rawBody.slice(0, 200)}`, res.status, path); } } @@ -1036,3 +1050,42 @@ async function sleep(ms: number, signal?: AbortSignal): Promise { } }); } + +// ============ 流式消息 API ============ + +import type { StreamMessageRequest, StreamMessageResponse } from "./types.js"; + +/** + * 发送流式消息(C2C 私聊) + * + * 流式协议: + * - 首次调用时不传 stream_msg_id,由平台返回 + * - 后续分片携带 stream_msg_id 和递增 msg_seq + * - input_state="1" 表示生成中,"10" 表示生成结束(终结状态) + * + * @param accessToken - access_token + * @param openid - 用户 openid + * @param req - 流式消息请求体 + * @returns 流式消息响应 + */ +export async function sendC2CStreamMessage( + accessToken: string, + openid: string, + req: StreamMessageRequest, +): Promise { + const path = `/v2/users/${openid}/stream_messages`; + const body: Record = { + input_mode: req.input_mode, + input_state: req.input_state, + content_type: req.content_type, + content_raw: req.content_raw, + event_id: req.event_id, + msg_id: req.msg_id, + msg_seq: req.msg_seq, + index: req.index, + }; + if (req.stream_msg_id) { + body.stream_msg_id = req.stream_msg_id; + } + return apiRequest(accessToken, "POST", path, body); +} diff --git a/src/config.ts b/src/config.ts index 5a62313..a32ff5e 100644 --- a/src/config.ts +++ b/src/config.ts @@ -70,17 +70,10 @@ export function resolveQQBotAccount( let secretSource: "config" | "file" | "env" | "none" = "none"; if (resolvedAccountId === DEFAULT_ACCOUNT_ID) { - // 默认账户从顶层读取 + // 默认账户从顶层读取(展开所有字段,避免遗漏新增配置项) + const { accounts: _accounts, ...topLevelConfig } = qqbot ?? {} as QQBotChannelConfig; accountConfig = { - enabled: qqbot?.enabled, - name: qqbot?.name, - appId: qqbot?.appId, - clientSecret: qqbot?.clientSecret, - clientSecretFile: qqbot?.clientSecretFile, - dmPolicy: qqbot?.dmPolicy, - allowFrom: qqbot?.allowFrom, - systemPrompt: qqbot?.systemPrompt, - imageServerBaseUrl: qqbot?.imageServerBaseUrl, + ...topLevelConfig, markdownSupport: qqbot?.markdownSupport ?? true, }; appId = normalizeAppId(qqbot?.appId); diff --git a/src/gateway.ts b/src/gateway.ts index 5b5cb20..d0deeb6 100644 --- a/src/gateway.ts +++ b/src/gateway.ts @@ -22,6 +22,7 @@ import { TypingKeepAlive, TYPING_INPUT_SECOND } from "./typing-keepalive.js"; import { parseAndSendMediaTags, sendPlainReply, type DeliverEventContext, type DeliverAccountContext } from "./outbound-deliver.js"; import { createDeliverDebouncer, type DeliverDebouncer } from "./deliver-debounce.js"; import { runWithRequestContext } from "./request-context.js"; +import { StreamingController, shouldUseStreaming } from "./streaming.js"; // QQ Bot intents - 按权限级别分组 const INTENTS = { @@ -50,6 +51,7 @@ const IMAGE_SERVER_PORT = parseInt(process.env.QQBOT_IMAGE_SERVER_PORT || "18765 // 使用绝对路径,确保文件保存和读取使用同一目录 const IMAGE_SERVER_DIR = process.env.QQBOT_IMAGE_SERVER_DIR || getQQBotDataDir("images"); + export interface GatewayContext { account: ResolvedQQBotAccount; abortSignal: AbortSignal; @@ -849,6 +851,53 @@ export async function startGateway(ctx: GatewayContext): Promise { }, responseTimeout); }); + + // ============ 流式消息控制器 ============ + const targetType = event.type === "c2c" ? "c2c" as const + : event.type === "group" ? "group" as const + : "channel" as const; + const useStreaming = shouldUseStreaming(account, targetType); + log?.info(`[qqbot:${account.accountId}] Streaming ${useStreaming ? "enabled" : "disabled"} for ${targetType} message from ${event.senderId}`); + let streamingController: StreamingController | null = null; + + /** 创建一个新的 StreamingController 实例(用于初始创建和回复边界时重建) */ + const createStreamingController = (): StreamingController => { + const ctrl = new StreamingController({ + account, + userId: event.senderId, + replyToMsgId: event.messageId, + eventId: event.messageId, + logPrefix: `[qqbot:${account.accountId}:streaming]`, + log, + mediaContext: { + account, + event: { + type: event.type as "c2c" | "group" | "channel", + senderId: event.senderId, + messageId: event.messageId, + groupOpenid: event.groupOpenid, + channelId: event.channelId, + }, + log, + }, + // 回复边界回调:终结旧 controller 后创建新的,用新回复文本继续流式 + onReplyBoundary: async (newReplyText: string) => { + log?.info(`[qqbot:${account.accountId}] Reply boundary: creating new StreamingController for new reply`); + const newCtrl = createStreamingController(); + streamingController = newCtrl; + // 将新回复的初始文本交给新 controller 处理 + await newCtrl.onPartialReply({ text: newReplyText }); + }, + }); + return ctrl; + }; + + if (useStreaming) { + log?.info(`[qqbot:${account.accountId}] Streaming mode enabled for ${targetType} target`); + streamingController = createStreamingController(); + } + + const dispatchPromise = pluginRuntime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg, @@ -956,6 +1005,37 @@ export async function startGateway(ctx: GatewayContext): Promise { log?.info(`[qqbot:${account.accountId}] Block deliver after ${toolDeliverCount} tool deliver(s)`); } + // ============ 流式模式处理 ============ + // 流式模式下,所有 block deliver 内容(含媒体标签)统一交由 StreamingController 处理。 + // StreamingController 内部有重试机制;如果一个分片都没发出去则降级到普通消息。 + if (streamingController && !streamingController.isTerminalPhase) { + const deliverTextLen = (payload.text ?? "").length; + const deliverPreview = (payload.text ?? "").slice(0, 40).replace(/\n/g, "\\n"); + log?.debug?.(`[qqbot:${account.accountId}] Streaming deliver entry, textLen=${deliverTextLen}, phase=${streamingController.currentPhase}, sentChunks=${streamingController.sentChunkCount_debug}, preview="${deliverPreview}"`); + try { + await streamingController.onDeliver(payload); + log?.debug?.(`[qqbot:${account.accountId}] Streaming deliver done, phase=${streamingController.currentPhase}`); + } catch (err) { + // StreamingController 内部已有重试,这里只打日志 + log?.error(`[qqbot:${account.accountId}] Streaming deliver error: ${err}`); + } + + // 检查是否因流式 API 不可用而需要降级(ensureStreamingStarted 全部失败) + // 如果需要降级,不 return,让本次 deliver 的 payload.text(全量文本)继续走普通发送逻辑 + if (streamingController.shouldFallbackToStatic) { + log?.info(`[qqbot:${account.accountId}] Streaming API unavailable, falling back to static for this deliver`); + // 不 return,继续走普通发送逻辑(payload.text 是完整文本) + } else { + // 流式正常处理,不走普通发送逻辑 + pluginRuntime.channel.activity.record({ + channel: "qqbot", + accountId: account.accountId, + direction: "outbound", + }); + return; + } + } + // ============ 实际发送逻辑(可被 debouncer 包裹) ============ const executeDeliver = async (deliverPayload: { text?: string; mediaUrls?: string[]; mediaUrl?: string }, _deliverInfo: { kind: string }) => { // ============ 引用回复 ============ @@ -1044,6 +1124,23 @@ export async function startGateway(ctx: GatewayContext): Promise { clearTimeout(timeoutId); timeoutId = null; } + + // 流式模式:委托给 streaming controller 处理错误 + if (streamingController && !streamingController.isTerminalPhase) { + try { + await streamingController.onError(err); + } catch (streamErr) { + log?.error(`[qqbot:${account.accountId}] Streaming onError failed: ${streamErr}`); + } + + // 如果 onError 中因无分片发出而降级,不 return,走普通错误处理 + if (streamingController.shouldFallbackToStatic) { + log?.info(`[qqbot:${account.accountId}] Streaming onError: no chunk sent, falling back to static error handling`); + // 不 return,继续走普通错误处理 + } else { + return; + } + } const errMsg = String(err); @@ -1062,7 +1159,23 @@ export async function startGateway(ctx: GatewayContext): Promise { }, }, replyOptions: { - disableBlockStreaming: true, + // 流式模式时禁用 block streaming + disableBlockStreaming: !useStreaming, + // 流式模式下注册 onPartialReply 回调,接收流式文本增量 + ...(streamingController ? { + onPartialReply: async (payload: { text?: string }) => { + const textLen = payload.text?.length ?? 0; + const preview = (payload.text ?? "").slice(0, 40).replace(/\n/g, "\\n"); + log?.debug?.(`[qqbot:${account.accountId}] onPartialReply called, textLen=${textLen}, phase=${streamingController!.currentPhase}, isTerminal=${streamingController!.isTerminalPhase}, preview="${preview}"`); + try { + await streamingController!.onPartialReply(payload); + log?.debug?.(`[qqbot:${account.accountId}] onPartialReply done, phase=${streamingController!.currentPhase}`); + } catch (err) { + // StreamingController 内部已有重试,这里只打日志 + log?.error(`[qqbot:${account.accountId}] Streaming onPartialReply error: ${err}`); + } + }, + } : {}), }, }); @@ -1093,6 +1206,28 @@ export async function startGateway(ctx: GatewayContext): Promise { await debouncer.dispose(); debouncer = null; } + + // ============ 流式消息收尾 ============ + // dispatch 完成后,标记流式控制器已完成并触发 onIdle(发送终结分片) + if (streamingController && !streamingController.isTerminalPhase) { + try { + streamingController.markFullyComplete(); + await streamingController.onIdle(); + log?.debug?.(`[qqbot:${account.accountId}] Streaming controller finalized`); + } catch (err) { + log?.error(`[qqbot:${account.accountId}] Streaming finalization error: ${err}`); + // 尝试中止 + try { await streamingController.abortStreaming(); } catch { /* ignore */ } + } + } + + // ============ 流式降级到非流式 ============ + // 无需额外处理:如果流式 API 不可用(shouldFallbackToStatic), + // deliver 回调中已自动跳过流式拦截,走普通消息发送逻辑。 + // (每次 deliver 收到的都是全量文本,不需要在 controller 内部保存累积文本) + if (streamingController?.shouldFallbackToStatic) { + log?.debug?.(`[qqbot:${account.accountId}] Streaming was degraded to static mode (no chunk sent successfully)`); + } } } catch (err) { const errStr = String(err); diff --git a/src/outbound-deliver.ts b/src/outbound-deliver.ts index 0c0a672..d44f1c6 100644 --- a/src/outbound-deliver.ts +++ b/src/outbound-deliver.ts @@ -8,12 +8,12 @@ import type { ResolvedQQBotAccount } from "./types.js"; import { sendC2CMessage, sendGroupMessage, sendChannelMessage, sendC2CImageMessage, sendGroupImageMessage } from "./api.js"; -import { sendPhoto, sendVoice, sendVideoMsg, sendDocument, sendMedia as sendMediaAuto, type MediaTargetContext } from "./outbound.js"; +import { sendPhoto, sendMedia as sendMediaAuto, type MediaTargetContext } from "./outbound.js"; import { chunkText, TEXT_CHUNK_LIMIT } from "./channel.js"; import { getQQBotRuntime } from "./runtime.js"; import { getImageSize, formatQQBotMarkdownImage, hasQQBotImageSize } from "./utils/image-size.js"; -import { normalizeMediaTags } from "./utils/media-tags.js"; -import { normalizePath, isLocalPath as isLocalFilePath } from "./utils/platform.js"; +import { parseMediaTagsToSendQueue, executeSendQueue, type MediaSendContext } from "./utils/media-send.js"; +import { isLocalPath as isLocalFilePath } from "./utils/platform.js"; import { filterInternalMarkers } from "./utils/text-parsing.js"; // ============ 类型定义 ============ @@ -60,56 +60,16 @@ export async function parseAndSendMediaTags( const { account, log } = actx; const prefix = `[qqbot:${account.accountId}]`; - // 预处理:纠正小模型常见的标签拼写错误和格式问题 - const text = normalizeMediaTags(replyText); + // 使用 media-send.ts 的统一解析器(内含 normalizeMediaTags + 路径编码修复) + const { hasMediaTags: hasMedia, sendQueue } = parseMediaTagsToSendQueue(replyText, log); - const mediaTagRegex = /<(qqimg|qqvoice|qqvideo|qqfile|qqmedia)>([^<>]+)<\/(?:qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>/gi; - const mediaTagMatches = [...text.matchAll(mediaTagRegex)]; - - if (mediaTagMatches.length === 0) { - return { handled: false, normalizedText: text }; - } - - const tagCounts = mediaTagMatches.reduce((acc, m) => { const t = m[1]!.toLowerCase(); acc[t] = (acc[t] ?? 0) + 1; return acc; }, {} as Record); - log?.info(`${prefix} Detected media tags: ${Object.entries(tagCounts).map(([k, v]) => `${v} <${k}>`).join(", ")}`); - - // 构建发送队列 - type QueueItem = { type: "text" | "image" | "voice" | "video" | "file" | "media"; content: string }; - const sendQueue: QueueItem[] = []; - - let lastIndex = 0; - const regex2 = /<(qqimg|qqvoice|qqvideo|qqfile|qqmedia)>([^<>]+)<\/(?:qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>/gi; - let match; - - while ((match = regex2.exec(text)) !== null) { - const textBefore = text.slice(lastIndex, match.index).replace(/\n{3,}/g, "\n\n").trim(); - if (textBefore) { - sendQueue.push({ type: "text", content: filterInternalMarkers(textBefore) }); - } - - const tagName = match[1]!.toLowerCase(); - let mediaPath = decodeMediaPath(match[2]?.trim() ?? "", log, prefix); - - if (mediaPath) { - const typeMap: Record = { - qqmedia: "media", qqvoice: "voice", qqvideo: "video", qqfile: "file", - }; - const itemType = typeMap[tagName] ?? "image"; - sendQueue.push({ type: itemType, content: mediaPath }); - log?.info(`${prefix} Found ${itemType} in <${tagName}>: ${mediaPath}`); - } - - lastIndex = match.index + match[0].length; - } - - const textAfter = text.slice(lastIndex).replace(/\n{3,}/g, "\n\n").trim(); - if (textAfter) { - sendQueue.push({ type: "text", content: filterInternalMarkers(textAfter) }); + if (!hasMedia || sendQueue.length === 0) { + return { handled: false, normalizedText: replyText }; } log?.info(`${prefix} Send queue: ${sendQueue.map(item => item.type).join(" -> ")}`); - // 按顺序发送 + // 构建统一的媒体发送上下文 const mediaTarget: MediaTargetContext = { targetType: event.type === "c2c" ? "c2c" : event.type === "group" ? "group" : "channel", targetId: event.type === "c2c" ? event.senderId : event.type === "group" ? event.groupOpenid! : event.channelId!, @@ -118,46 +78,22 @@ export async function parseAndSendMediaTags( logPrefix: prefix, }; - for (const item of sendQueue) { - if (item.type === "text") { - await sendTextChunks(item.content, event, actx, sendWithRetry, consumeQuoteRef); - } else if (item.type === "image") { - const result = await sendPhoto(mediaTarget, item.content); - if (result.error) { - log?.error(`${prefix} sendPhoto error: ${result.error}`); - await sendTextChunks(`发送图片失败:${result.error}`, event, actx, sendWithRetry, consumeQuoteRef); - } - } else if (item.type === "voice") { - await sendVoiceWithTimeout(mediaTarget, item.content, account, log, prefix); - } else if (item.type === "video") { - const result = await sendVideoMsg(mediaTarget, item.content); - if (result.error) { - log?.error(`${prefix} sendVideoMsg error: ${result.error}`); - await sendTextChunks(`发送视频失败:${result.error}`, event, actx, sendWithRetry, consumeQuoteRef); - } - } else if (item.type === "file") { - const result = await sendDocument(mediaTarget, item.content); - if (result.error) { - log?.error(`${prefix} sendDocument error: ${result.error}`); - await sendTextChunks(result.error, event, actx, sendWithRetry, consumeQuoteRef); - } - } else if (item.type === "media") { - const result = await sendMediaAuto({ - to: actx.qualifiedTarget, - text: "", - mediaUrl: item.content, - accountId: account.accountId, - replyToId: event.messageId, - account, - }); - if (result.error) { - log?.error(`${prefix} sendMedia(auto) error: ${result.error}`); - await sendTextChunks(result.error, event, actx, sendWithRetry, consumeQuoteRef); - } - } - } + const mediaSendCtx: MediaSendContext = { + mediaTarget, + qualifiedTarget: actx.qualifiedTarget, + account, + replyToId: event.messageId, + log, + }; - return { handled: true, normalizedText: text }; + // 使用 media-send.ts 的统一执行器 + await executeSendQueue(sendQueue, mediaSendCtx, { + onSendText: async (textContent) => { + await sendTextChunks(filterInternalMarkers(textContent), event, actx, sendWithRetry, consumeQuoteRef); + }, + }); + + return { handled: true, normalizedText: replyText }; } // ============ 2. 非结构化消息发送(普通文本 + 图片) ============ @@ -324,48 +260,6 @@ export async function sendPlainReply( // ============ 内部辅助函数 ============ -/** 解码媒体路径:剥离 MEDIA: 前缀、展开 ~、修复转义 */ -function decodeMediaPath(raw: string, log: DeliverAccountContext["log"], prefix: string): string { - let mediaPath = raw; - if (mediaPath.startsWith("MEDIA:")) { - mediaPath = mediaPath.slice("MEDIA:".length); - } - mediaPath = normalizePath(mediaPath); - mediaPath = mediaPath.replace(/\\\\/g, "\\"); - - try { - const hasOctal = /\\[0-7]{1,3}/.test(mediaPath); - const hasNonASCII = /[\u0080-\u00FF]/.test(mediaPath); - - if (hasOctal || hasNonASCII) { - log?.debug?.(`${prefix} Decoding path with mixed encoding: ${mediaPath}`); - let decoded = mediaPath.replace(/\\([0-7]{1,3})/g, (_: string, octal: string) => { - return String.fromCharCode(parseInt(octal, 8)); - }); - const bytes: number[] = []; - for (let i = 0; i < decoded.length; i++) { - const code = decoded.charCodeAt(i); - if (code <= 0xFF) { - bytes.push(code); - } else { - const charBytes = Buffer.from(decoded[i], "utf8"); - bytes.push(...charBytes); - } - } - const buffer = Buffer.from(bytes); - const utf8Decoded = buffer.toString("utf8"); - if (!utf8Decoded.includes("\uFFFD") || utf8Decoded.length < decoded.length) { - mediaPath = utf8Decoded; - log?.debug?.(`${prefix} Successfully decoded path: ${mediaPath}`); - } - } - } catch (decodeErr) { - log?.error(`${prefix} Path decode error: ${decodeErr}`); - } - - return mediaPath; -} - /** 发送文本分块(共用逻辑) */ async function sendTextChunks( text: string, @@ -396,30 +290,6 @@ async function sendTextChunks( } } -/** 语音发送(带 45s 超时保护) */ -async function sendVoiceWithTimeout( - target: MediaTargetContext, - voicePath: string, - account: ResolvedQQBotAccount, - log: DeliverAccountContext["log"], - prefix: string, -): Promise { - const uploadFormats = account.config?.audioFormatPolicy?.uploadDirectFormats ?? account.config?.voiceDirectUploadFormats; - const transcodeEnabled = account.config?.audioFormatPolicy?.transcodeEnabled !== false; - const voiceTimeout = 45000; - try { - const result = await Promise.race([ - sendVoice(target, voicePath, uploadFormats, transcodeEnabled), - new Promise<{ channel: string; error: string }>((resolve) => - setTimeout(() => resolve({ channel: "qqbot", error: "语音发送超时,已跳过" }), voiceTimeout), - ), - ]); - if (result.error) log?.error(`${prefix} sendVoice error: ${result.error}`); - } catch (err) { - log?.error(`${prefix} sendVoice unexpected error: ${err}`); - } -} - /** Markdown 模式发送 */ async function sendMarkdownReply( textWithoutImages: string, diff --git a/src/outbound.ts b/src/outbound.ts index 79dd197..9f03da7 100644 --- a/src/outbound.ts +++ b/src/outbound.ts @@ -19,12 +19,11 @@ import { MediaFileType, } from "./api.js"; import { isAudioFile, audioFileToSilkFile, waitForFile, shouldTranscodeVoice } from "./utils/audio-convert.js"; -import { normalizeMediaTags } from "./utils/media-tags.js"; import { fileExistsAsync, formatFileSize, getMaxUploadSize, getFileTypeName, getFileSizeAsync } from "./utils/file-utils.js"; import { chunkedUploadC2C, chunkedUploadGroup } from "./utils/chunked-upload.js"; -import { isLocalPath as isLocalFilePath, normalizePath } from "./utils/platform.js"; +import { isLocalPath as isLocalFilePath, normalizePath, getQQBotMediaDir } from "./utils/platform.js"; import { downloadFile } from "./image-server.js"; -import { getQQBotMediaDir } from "./utils/platform.js"; +import { parseMediaTagsToSendQueue, executeSendQueue, type MediaSendContext } from "./utils/media-send.js"; // ============ 消息回复限流器 ============ // 同一 message_id 1小时内最多回复 4 次,超过 1 小时无法被动回复(需改为主动消息) @@ -706,7 +705,7 @@ export async function sendText(ctx: OutboundContext): Promise { // 不应该发生,但作为保底 console.error(`[qqbot] sendText: 消息回复被限流但未设置降级 - ${limitCheck.message}`); return { - channel: "qqbot", + channel: "qqbot", error: limitCheck.message }; } @@ -722,170 +721,66 @@ export async function sendText(ctx: OutboundContext): Promise { // 路径或URL — 视频 // 路径 — 文件 // 路径或URL — 自动识别(根据扩展名路由) + // 使用 deliver-common.ts 的公共解析器,消除与 gateway.ts 的重复 - // 预处理:纠正小模型常见的标签拼写错误和格式问题 - text = normalizeMediaTags(text); + const { hasMediaTags: hasMedia, sendQueue } = parseMediaTagsToSendQueue(text); - const mediaTagRegex = /<(qqimg|qqvoice|qqvideo|qqfile|qqmedia)>([^<>]+)<\/(?:qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>/gi; - const mediaTagMatches = text.match(mediaTagRegex); - - if (mediaTagMatches && mediaTagMatches.length > 0) { - console.log(`[qqbot] sendText: Detected ${mediaTagMatches.length} media tag(s), processing...`); - - // 构建发送队列:根据内容在原文中的实际位置顺序发送 - const sendQueue: Array<{ type: "text" | "image" | "voice" | "video" | "file" | "media"; content: string }> = []; - - let lastIndex = 0; - const mediaTagRegexWithIndex = /<(qqimg|qqvoice|qqvideo|qqfile|qqmedia)>([^<>]+)<\/(?:qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>/gi; - let match; - - while ((match = mediaTagRegexWithIndex.exec(text)) !== null) { - // 添加标签前的文本 - const textBefore = text.slice(lastIndex, match.index).replace(/\n{3,}/g, "\n\n").trim(); - if (textBefore) { - sendQueue.push({ type: "text", content: textBefore }); - } - - const tagName = match[1]!.toLowerCase(); // "qqimg" or "qqvoice" or "qqfile" - - // 剥离 MEDIA: 前缀(框架可能注入),展开 ~ 路径 - let mediaPath = match[2]?.trim() ?? ""; - if (mediaPath.startsWith("MEDIA:")) { - mediaPath = mediaPath.slice("MEDIA:".length); - } - mediaPath = normalizePath(mediaPath); - - // 处理可能被模型转义的路径 - // 1. 双反斜杠 -> 单反斜杠(Markdown 转义) - mediaPath = mediaPath.replace(/\\\\/g, "\\"); - - // 2. 八进制转义序列 + UTF-8 双重编码修复 - try { - const hasOctal = /\\[0-7]{1,3}/.test(mediaPath); - const hasNonASCII = /[\u0080-\u00FF]/.test(mediaPath); - - if (hasOctal || hasNonASCII) { - console.log(`[qqbot] sendText: Decoding path with mixed encoding: ${mediaPath}`); - - // Step 1: 将八进制转义转换为字节 - let decoded = mediaPath.replace(/\\([0-7]{1,3})/g, (_: string, octal: string) => { - return String.fromCharCode(parseInt(octal, 8)); - }); - - // Step 2: 提取所有字节(包括 Latin-1 字符) - const bytes: number[] = []; - for (let i = 0; i < decoded.length; i++) { - const code = decoded.charCodeAt(i); - if (code <= 0xFF) { - bytes.push(code); - } else { - const charBytes = Buffer.from(decoded[i], 'utf8'); - bytes.push(...charBytes); - } - } - - // Step 3: 尝试按 UTF-8 解码 - const buffer = Buffer.from(bytes); - const utf8Decoded = buffer.toString('utf8'); - - if (!utf8Decoded.includes('\uFFFD') || utf8Decoded.length < decoded.length) { - mediaPath = utf8Decoded; - console.log(`[qqbot] sendText: Successfully decoded path: ${mediaPath}`); - } - } - } catch (decodeErr) { - console.error(`[qqbot] sendText: Path decode error: ${decodeErr}`); - } - - if (mediaPath) { - if (tagName === "qqmedia") { - sendQueue.push({ type: "media", content: mediaPath }); - console.log(`[qqbot] sendText: Found auto-detect media in : ${mediaPath}`); - } else if (tagName === "qqvoice") { - sendQueue.push({ type: "voice", content: mediaPath }); - console.log(`[qqbot] sendText: Found voice path in : ${mediaPath}`); - } else if (tagName === "qqvideo") { - sendQueue.push({ type: "video", content: mediaPath }); - console.log(`[qqbot] sendText: Found video URL in : ${mediaPath}`); - } else if (tagName === "qqfile") { - sendQueue.push({ type: "file", content: mediaPath }); - console.log(`[qqbot] sendText: Found file path in : ${mediaPath}`); - } else { - sendQueue.push({ type: "image", content: mediaPath }); - console.log(`[qqbot] sendText: Found image path in : ${mediaPath}`); - } - } - - lastIndex = match.index + match[0].length; - } - - // 添加最后一个标签后的文本 - const textAfter = text.slice(lastIndex).replace(/\n{3,}/g, "\n\n").trim(); - if (textAfter) { - sendQueue.push({ type: "text", content: textAfter }); - } - + if (hasMedia && sendQueue.length > 0) { console.log(`[qqbot] sendText: Send queue: ${sendQueue.map(item => item.type).join(" -> ")}`); - // 按顺序发送(使用 Telegram 风格的统一媒体发送函数) + // 构建统一的媒体发送上下文 const mediaTarget = buildMediaTarget({ to, account, replyToId }, "[qqbot:sendText]"); + const mediaSendCtx: MediaSendContext = { + mediaTarget, + qualifiedTarget: to, + account, + replyToId: replyToId ?? undefined, + log: { + info: (msg: string) => console.log(msg), + error: (msg: string) => console.error(msg), + debug: (msg: string) => console.log(msg), + }, + }; + let lastResult: OutboundResult = { channel: "qqbot" }; - for (const item of sendQueue) { - try { - if (item.type === "text") { - // 发送文本 - if (replyToId) { - const accessToken = await getToken(account); - const target = parseTarget(to); - if (target.type === "c2c") { - const result = await sendC2CMessage(accessToken, target.id, item.content, replyToId); - recordMessageReply(replyToId); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: result.ext_info?.ref_idx }; - } else if (target.type === "group") { - const result = await sendGroupMessage(accessToken, target.id, item.content, replyToId); - recordMessageReply(replyToId); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: result.ext_info?.ref_idx }; - } else { - const result = await sendChannelMessage(accessToken, target.id, item.content, replyToId); - recordMessageReply(replyToId); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; - } + // 使用统一的发送队列执行器 + await executeSendQueue(sendQueue, mediaSendCtx, { + onSendText: async (textContent) => { + // sendText 场景的文本发送:需要区分主动/被动消息 + if (replyToId) { + const accessToken = await getToken(account); + const target = parseTarget(to); + if (target.type === "c2c") { + const result = await sendC2CMessage(accessToken, target.id, textContent, replyToId); + recordMessageReply(replyToId); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: result.ext_info?.ref_idx }; + } else if (target.type === "group") { + const result = await sendGroupMessage(accessToken, target.id, textContent, replyToId); + recordMessageReply(replyToId); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: result.ext_info?.ref_idx }; } else { - const accessToken = await getToken(account); - const target = parseTarget(to); - if (target.type === "c2c") { - const result = await sendProactiveC2CMessage(accessToken, target.id, item.content); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; - } else if (target.type === "group") { - const result = await sendProactiveGroupMessage(accessToken, target.id, item.content); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; - } else { - const result = await sendChannelMessage(accessToken, target.id, item.content); - lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; - } + const result = await sendChannelMessage(accessToken, target.id, textContent, replyToId); + recordMessageReply(replyToId); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; + } + } else { + const accessToken = await getToken(account); + const target = parseTarget(to); + if (target.type === "c2c") { + const result = await sendProactiveC2CMessage(accessToken, target.id, textContent); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; + } else if (target.type === "group") { + const result = await sendProactiveGroupMessage(accessToken, target.id, textContent); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; + } else { + const result = await sendChannelMessage(accessToken, target.id, textContent); + lastResult = { channel: "qqbot", messageId: result.id, timestamp: result.timestamp, refIdx: (result as any).ext_info?.ref_idx }; } - console.log(`[qqbot] sendText: Sent text part: ${item.content.slice(0, 30)}...`); - } else if (item.type === "image") { - lastResult = await sendPhoto(mediaTarget, item.content); - } else if (item.type === "voice") { - lastResult = await sendVoice(mediaTarget, item.content, undefined, account.config?.audioFormatPolicy?.transcodeEnabled !== false); - } else if (item.type === "video") { - lastResult = await sendVideoMsg(mediaTarget, item.content); - } else if (item.type === "file") { - lastResult = await sendDocument(mediaTarget, item.content); - } else if (item.type === "media") { - // qqmedia: 自动根据扩展名路由 - lastResult = await sendMedia({ - to, text: "", mediaUrl: item.content, - accountId: account.accountId, replyToId, account, - }); } - } catch (err) { - const errMsg = err instanceof Error ? err.message : String(err); - console.error(`[qqbot] sendText: Failed to send ${item.type}: ${errMsg}`); - } - } + console.log(`[qqbot] sendText: Sent text part: ${textContent.slice(0, 30)}...`); + }, + }); return lastResult; } diff --git a/src/streaming.ts b/src/streaming.ts new file mode 100644 index 0000000..47755f8 --- /dev/null +++ b/src/streaming.ts @@ -0,0 +1,1088 @@ +/** + * QQ Bot 流式消息控制器(简化版) + * + * 核心原则: + * 1. 绝对不修改原始内容(不 trim、不 strip),避免 PREFIX MISMATCH + * 2. 媒体标签同步等待发送完成 + * 3. 纯空白分片处理: + * - 首分片空白 → 暂停发送(不开启流式),但内容保留 + * - 被媒体标签打断或结束时,如果还都是空白 → 不发送 + * - 结束时已有活跃流式会话(之前有非空白分片)→ 可以发送当前空白分片 + */ + +import type { ResolvedQQBotAccount, StreamMessageResponse } from "./types.js"; +import { StreamInputMode, StreamInputState, StreamContentType } from "./types.js"; +import { getAccessToken, sendC2CStreamMessage, getNextMsgSeq } from "./api.js"; +import { normalizeMediaTags } from "./utils/media-tags.js"; +import { + stripIncompleteMediaTag, + findFirstClosedMediaTag, + executeSendQueue, + type SendQueueItem, + type MediaSendContext, +} from "./utils/media-send.js"; +import type { MediaTargetContext } from "./outbound.js"; + +// ============ 常量 ============ + +/** 流式消息节流常量(毫秒) */ +const THROTTLE_CONSTANTS = { + /** 默认节流间隔 */ + DEFAULT_MS: 500, + /** 最小节流间隔 */ + MIN_MS: 300, + /** 长间隔阈值:超过此时间后的首次 flush 延迟处理 */ + LONG_GAP_THRESHOLD_MS: 2000, + /** 长间隔后的批处理窗口 */ + BATCH_AFTER_GAP_MS: 300, +} as const; + +/** 流式状态机阶段 */ +type StreamingPhase = "idle" | "streaming" | "completed" | "aborted"; + +/** 终态集合 */ +const TERMINAL_PHASES = new Set(["completed", "aborted"]); + +/** 允许的状态转换 */ +const PHASE_TRANSITIONS: Record> = { + idle: new Set(["streaming", "aborted"]), + streaming: new Set(["completed", "aborted"]), + completed: new Set(), + aborted: new Set(), +}; + +// ============ FlushController ============ + +/** + * 节流刷新控制器(纯调度原语,不含业务逻辑) + */ +class FlushController { + private doFlush: () => Promise; + private flushInProgress = false; + private flushResolvers: Array<() => void> = []; + private needsReflush = false; + private pendingFlushTimer: ReturnType | null = null; + private lastUpdateTime = 0; + private isCompleted = false; + private _ready = false; + + constructor(doFlush: () => Promise) { + this.doFlush = doFlush; + } + + /** 标记为已完成 —— 当前 flush 之后不再调度新 flush */ + complete(): void { + this.isCompleted = true; + } + + /** 取消待执行的延迟 flush */ + cancelPendingFlush(): void { + if (this.pendingFlushTimer) { + clearTimeout(this.pendingFlushTimer); + this.pendingFlushTimer = null; + } + } + + /** 等待当前进行中的 flush 完成 */ + waitForFlush(): Promise { + if (!this.flushInProgress) return Promise.resolve(); + return new Promise((resolve) => this.flushResolvers.push(resolve)); + } + + /** 取消所有 pending timer + 等待正在执行的 flush 完成,确保 flush 活动彻底停止 */ + async cancelPendingAndWait(): Promise { + this.cancelPendingFlush(); + this.needsReflush = false; + await this.waitForFlush(); + // flush 完成后可能又触发了 reflush timer,再次清理 + this.cancelPendingFlush(); + this.needsReflush = false; + } + + /** 标记流式会话就绪(首次 API 调用成功后) */ + setReady(ready: boolean): void { + this._ready = ready; + if (ready) { + this.lastUpdateTime = Date.now(); + } + } + + get ready(): boolean { + return this._ready; + } + + /** 重置为初始状态(用于流式会话恢复) */ + reset(doFlush: () => Promise): void { + this.cancelPendingFlush(); + this.doFlush = doFlush; + this.flushInProgress = false; + this.flushResolvers = []; + this.needsReflush = false; + this.lastUpdateTime = 0; + this.isCompleted = false; + this._ready = false; + } + + /** 执行一次 flush(互斥锁 + 冲突时 reflush) */ + async flush(): Promise { + if (!this._ready || this.flushInProgress || this.isCompleted) { + if (this.flushInProgress && !this.isCompleted) { + this.needsReflush = true; + } + return; + } + + this.flushInProgress = true; + this.needsReflush = false; + this.lastUpdateTime = Date.now(); + + try { + await this.doFlush(); + this.lastUpdateTime = Date.now(); + } finally { + this.flushInProgress = false; + const resolvers = this.flushResolvers; + this.flushResolvers = []; + for (const resolve of resolvers) resolve(); + + // flush 期间有新事件到达 → 立即跟进 + if (this.needsReflush && !this.isCompleted && !this.pendingFlushTimer) { + this.needsReflush = false; + this.pendingFlushTimer = setTimeout(() => { + this.pendingFlushTimer = null; + void this.flush(); + }, 0); + } + } + } + + /** 节流入口:根据 throttleMs 控制 flush 频率 */ + async throttledUpdate(throttleMs: number): Promise { + if (!this._ready) return; + + const now = Date.now(); + const elapsed = now - this.lastUpdateTime; + + if (elapsed >= throttleMs) { + this.cancelPendingFlush(); + if (elapsed > THROTTLE_CONSTANTS.LONG_GAP_THRESHOLD_MS) { + // 长间隔后首次 flush 延迟,等待更多文本积累 + this.lastUpdateTime = now; + this.pendingFlushTimer = setTimeout(() => { + this.pendingFlushTimer = null; + void this.flush(); + }, THROTTLE_CONSTANTS.BATCH_AFTER_GAP_MS); + } else { + await this.flush(); + } + } else if (!this.pendingFlushTimer) { + // 在节流窗口内 → 延迟 flush + const delay = throttleMs - elapsed; + this.pendingFlushTimer = setTimeout(() => { + this.pendingFlushTimer = null; + void this.flush(); + }, delay); + } + } +} + +// ============ StreamingController ============ + +/** StreamingController 的依赖注入 */ +export interface StreamingControllerDeps { + /** QQ Bot 账户配置 */ + account: ResolvedQQBotAccount; + /** 目标用户 openid(流式 API 仅支持 C2C) */ + userId: string; + /** 被动回复的消息 ID */ + replyToMsgId: string; + /** 事件 ID */ + eventId: string; + /** 日志前缀 */ + logPrefix?: string; + /** 日志对象(直接传 gateway 的 log) */ + log?: { + info(msg: string): void; + error(msg: string): void; + warn?(msg: string): void; + debug?(msg: string): void; + }; + /** + * 媒体发送上下文(用于在流式模式下发送富媒体) + * 如果不提供,遇到媒体标签时会抛出错误导致 fallback + */ + mediaContext?: StreamingMediaContext; + /** + * 回复边界回调:检测到 text 长度缩短(新回复开始)时触发。 + * + * 触发时当前 controller 已经 finalize(终结当前流式会话,处理完之前的内容), + * 调用方应创建新的 StreamingController 并用 newReplyText 调其 onPartialReply。 + * + * @param newReplyText 新回复的初始文本(已 strip reasoning tags) + */ + onReplyBoundary?: (newReplyText: string) => void | Promise; +} + +/** + * QQ Bot 流式消息控制器 + * + * 管理 C2C 流式消息的完整生命周期: + * 1. idle: 初始状态,等待首次文本 + * 2. streaming: 流式发送中,通过 API 逐步更新消息内容 + * 3. completed: 正常完成,已发送 input_state="10" + * 4. aborted: 中止(进程退出/错误) + * + * 富媒体标签处理流程: + * 当检测到富媒体标签时: + * 1. 将标签前的文本通过流式发完 → 结束当前流式会话 (input_state="10") + * 2. 同步等待媒体发送完成 + * 3. 创建新的流式会话 → 继续发送标签后的剩余文本 + */ +export class StreamingController { + // ---- 状态机 ---- + private phase: StreamingPhase = "idle"; + + // ---- 核心文本状态(仅两个) ---- + /** + * 最后一次收到的完整 normalized 全量文本。 + * - onPartialReply 每次更新(回复边界时会拼接前缀) + * - performFlush 从 sentIndex 开始切片来获取当前会话的显示内容 + * - onIdle 校验时用于前缀匹配 + */ + private lastNormalizedFull = ""; + /** + * 在 lastNormalizedFull 中已经"消费"到的位置。 + * "消费"包括:已通过流式发送并终结的文本段、已处理的媒体标签。 + * - 每次流式会话终结(endCurrentStreamIfNeeded)后推进到终结点 + * - 每次媒体标签处理后推进到标签结束位置 + * - resetStreamSession 后,新的流式会话从 sentIndex 开始 + */ + private sentIndex = 0; + + /** 上一次 onPartialReply 的 text 长度(用于检测回复边界) */ + private lastPartialLen = 0; + + // ---- 流式会话 ---- + private streamMsgId: string | null = null; + /** 当前流式会话的 msg_seq,同一会话内所有 chunk 共享;null 表示需要重新生成 */ + private msgSeq: number | null = null; + private streamIndex = 0; + private dispatchFullyComplete = false; + + // ---- 串行队列:确保 onPartialReply / onIdle 严格按序执行 ---- + /** Promise 链,回调的实际逻辑都挂到链尾,保证串行 */ + private _callbackChain: Promise = Promise.resolve(); + + // ---- 互斥:首个到达的回调锁定控制权 ---- + /** + * 记录首先到达的回调来源,后续其他来源的回调将被忽略。 + * - null: 尚未确定 + * - 非 null: 已锁定,只有相同来源的回调才允许继续执行 + */ + private firstCallbackSource: string | null = null; + + /** + * 尝试获取回调互斥锁。 + * - 尚未锁定 → 锁定为 source,返回 true + * - 已锁定且来源相同 → 返回 true + * - 已锁定且来源不同 → 返回 false(调用方应跳过) + */ + private acquireCallbackLock(source: string): boolean { + if (this.firstCallbackSource === null) { + this.firstCallbackSource = source; + this.logInfo(`acquireCallbackLock: locked to "${source}"`); + return true; + } + if (this.firstCallbackSource === source) { + return true; + } + this.logDebug(`acquireCallbackLock: rejected "${source}" (locked by "${this.firstCallbackSource}")`); + return false; + } + + // ---- 降级 ---- + /** 成功发送的流式分片数或媒体数(用于 onDeliver 互斥判断 + 降级判断) */ + private sentStreamChunkCount = 0; + /** 是否成功发送过至少一个媒体文件 */ + private sentMediaCount = 0; + + // ---- 启动锁 ---- + private startingPromise: Promise | null = null; + + // ---- 子控制器 ---- + private flush: FlushController; + + // ---- 配置 ---- + private throttleMs: number; + + // ---- 注入依赖 ---- + private deps: StreamingControllerDeps; + + constructor(deps: StreamingControllerDeps) { + this.deps = deps; + this.flush = new FlushController(() => this.performFlush()); + this.throttleMs = THROTTLE_CONSTANTS.DEFAULT_MS; + if (this.throttleMs < THROTTLE_CONSTANTS.MIN_MS) { + this.throttleMs = THROTTLE_CONSTANTS.MIN_MS; + } + } + + // ------------------------------------------------------------------ + // 公共访问器 + // ------------------------------------------------------------------ + + get isTerminalPhase(): boolean { + return TERMINAL_PHASES.has(this.phase); + } + + get currentPhase(): StreamingPhase { + return this.phase; + } + + /** + * 是否应降级到非流式(普通消息)发送 + * + * 条件:流式会话进入终态,且从未成功发出过任何一个流式分片或媒体 + */ + get shouldFallbackToStatic(): boolean { + return this.isTerminalPhase && this.sentStreamChunkCount === 0; + } + + /** debug 用:暴露发送计数给 gateway 日志 */ + get sentChunkCount_debug(): number { + return this.sentStreamChunkCount; + } + + // ------------------------------------------------------------------ + // 状态机 + // ------------------------------------------------------------------ + + private transition(to: StreamingPhase, source: string, reason?: string): boolean { + const from = this.phase; + if (from === to) return false; + if (!PHASE_TRANSITIONS[from].has(to)) { + this.logWarn(`phase transition rejected: ${from} → ${to} (source: ${source})`); + return false; + } + this.phase = to; + this.logInfo(`phase: ${from} → ${to} (source: ${source}${reason ? `, reason: ${reason}` : ""})`); + if (TERMINAL_PHASES.has(to)) { + this.onEnterTerminalPhase(); + } + return true; + } + + private onEnterTerminalPhase(): void { + this.flush.cancelPendingFlush(); + this.flush.complete(); + } + + private get prefix(): string { + return this.deps.logPrefix ?? "[qqbot:streaming]"; + } + + private logInfo(msg: string): void { + const m = `${this.prefix} ${msg}`; + this.deps.log ? this.deps.log.info?.(m) : console.log(m); + } + private logError(msg: string): void { + const m = `${this.prefix} ${msg}`; + this.deps.log ? this.deps.log.error?.(m) : console.error(m); + } + private logWarn(msg: string): void { + const m = `${this.prefix} ${msg}`; + this.deps.log ? (this.deps.log.warn ?? this.deps.log.info)(m) : console.warn(m); + } + private logDebug(msg: string): void { + const m = `${this.prefix} ${msg}`; + this.deps.log ? this.deps.log.debug?.(m) : console.debug(m); + } + + // ------------------------------------------------------------------ + // SDK 回调绑定 + // ------------------------------------------------------------------ + + /** + * 处理 onPartialReply 回调(流式文本全量更新) + * + * ★ 通过 Promise 链严格串行化:前一次处理完成后才执行下一次, + * 避免并发交叉导致的状态不一致。 + * + * payload.text 是从头到尾的完整当前文本(每次回调都是全量)。 + * 核心逻辑:normalize → 更新 lastNormalizedFull → 从 sentIndex 开始 processMediaTags + */ + async onPartialReply(payload: { text?: string }): Promise { + if (this.isTerminalPhase) return; + if (!payload.text) return; + + // ★ 互斥锁在入口检查:如果已被 deliver 锁定,直接跳过,无需排队 + if (!this.acquireCallbackLock("partial")) return; + + // 将实际逻辑挂到 Promise 链尾部,保证串行执行 + this._callbackChain = this._callbackChain.then( + () => this._doPartialReply(payload), + (err) => { + // 上一次如果异常,不阻塞后续调用 + this.logError(`onPartialReply chain error: ${err}`); + return this._doPartialReply(payload); + } + ); + return this._callbackChain; + } + + /** onPartialReply 的实际逻辑(由 _callbackChain 保证串行调用) */ + private async _doPartialReply(payload: { text?: string }): Promise { + this.logDebug(`onPartialReply: rawLen=${payload.text?.length ?? 0}, phase=${this.phase}, streamMsgId=${this.streamMsgId}, sentIndex=${this.sentIndex}, firstCB=${this.firstCallbackSource}`); + if (this.isTerminalPhase) { + this.logDebug(`onPartialReply: skipped (terminal phase)`); + return; + } + + const text = payload.text ?? ""; + if (!text) { + this.logDebug(`onPartialReply: skipped (empty text)`); + return; + } + + // ★ 回复边界检测:text 长度缩短 → 新回复开始 + // 终结当前 controller,通知调用方创建新的 controller 处理新回复 + if (this.lastPartialLen > 0 && text.length < this.lastPartialLen) { + this.logInfo(`onPartialReply: reply boundary detected (${text.length} < ${this.lastPartialLen}), finalizing current controller`); + + // 终结当前流式会话,处理完当前内容(包括可能的未闭合媒体标签) + this.dispatchFullyComplete = true; + await this.finalizeOnIdle(); + + // 通知调用方:新回复开始,请创建新 controller + if (this.deps.onReplyBoundary) { + this.logInfo(`onPartialReply: invoking onReplyBoundary callback with newText len=${text.length}`); + await this.deps.onReplyBoundary(text); + } else { + this.logWarn(`onPartialReply: reply boundary detected but no onReplyBoundary callback registered, new reply text will be lost`); + } + return; + } + + // 正常增长:直接 normalize 替换 + this.lastNormalizedFull = normalizeMediaTags(text); + this.lastPartialLen = text.length; + + // ★ 核心:从 sentIndex 开始,处理增量文本(串行队列保证不会并发进入) + await this.processMediaTags(this.lastNormalizedFull); + } + + /** + * 处理 deliver 回调 + * + * ★ 与 onPartialReply 互斥:首先到达的回调锁定控制权,后到的被忽略。 + */ + async onDeliver(payload: { text?: string }): Promise { + const rawLen = payload.text?.length ?? 0; + const preview = (payload.text ?? "").slice(0, 60).replace(/\n/g, "\\n"); + this.logDebug(`onDeliver: rawLen=${rawLen}, phase=${this.phase}, streamMsgId=${this.streamMsgId}, sentIndex=${this.sentIndex}, sentChunks=${this.sentStreamChunkCount}, firstCB=${this.firstCallbackSource}, preview="${preview}"`); + if (this.isTerminalPhase) { + this.logDebug(`onDeliver: skipped (terminal phase)`); + return; + } + + const text = payload.text ?? ""; + if (!text.trim()) { + this.logDebug(`onDeliver: skipped (empty text)`); + return; + } + + // ★ 互斥锁 + if (!this.acquireCallbackLock("deliver")) return; + + this.logInfo(`onDeliver: deliver in control, falling back to static`); + this.transition("aborted", "onDeliver", "deliver_arrived_first_fallback_to_static"); + } + + /** + * 处理 onIdle 回调(分发完成时调用) + * + * ★ 挂到 _callbackChain 上,保证在所有 onPartialReply 执行完之后才执行。 + * + * onIdle 会传入最终的全量文本。如果该文本**包含**之前存储的 lastNormalizedFull, + * 说明一致,继续处理剩余内容;否则忽略(防止 onIdle 修改文本导致的不一致)。 + */ + async onIdle(payload?: { text?: string }): Promise { + if (!this.dispatchFullyComplete) { + this.logDebug(`onIdle: skipped (dispatch not fully complete)`); + return; + } + if (this.isTerminalPhase) return; + + // 挂到串行队列尾部,等所有 onPartialReply 执行完再处理 + this._callbackChain = this._callbackChain.then( + () => this._doIdle(payload), + (err) => { + this.logError(`onIdle chain error: ${err}`); + return this._doIdle(payload); + } + ); + return this._callbackChain; + } + + /** onIdle 的实际逻辑(由 _callbackChain 保证在 onPartialReply 之后执行) */ + private async _doIdle(payload?: { text?: string }): Promise { + this.logDebug(`onIdle: dispatchFullyComplete=${this.dispatchFullyComplete}, phase=${this.phase}, streamChunks=${this.sentStreamChunkCount}, mediaCount=${this.sentMediaCount}, sentIndex=${this.sentIndex}`); + if (this.isTerminalPhase) { + this.logDebug(`onIdle: skipped (terminal phase)`); + return; + } + + // ★ onIdle 文本校验:如果传了文本,检查是否包含之前的全量文本 + if (payload?.text) { + const idleNormalized = normalizeMediaTags(payload.text); + if (idleNormalized.includes(this.lastNormalizedFull)) { + // onIdle 文本包含之前的全量 → 一致,使用 onIdle 的文本作为最终全量 + this.logDebug(`onIdle: text contains lastNormalizedFull, updating (${this.lastNormalizedFull.length} → ${idleNormalized.length})`); + this.lastNormalizedFull = idleNormalized; + } else if (this.lastNormalizedFull.includes(idleNormalized)) { + // 之前的全量包含 onIdle 文本 → onIdle 文本是子集,保留之前的 + this.logDebug(`onIdle: lastNormalizedFull contains idle text, keeping current`); + } else { + // 不一致 → 忽略 onIdle + this.logWarn(`onIdle: text mismatch with lastNormalizedFull, ignoring onIdle (idle len=${idleNormalized.length}, last len=${this.lastNormalizedFull.length})`); + // 虽然忽略文本处理,但仍需要终结当前流式会话 + await this.finalizeOnIdle(); + return; + } + } + + // ★ 处理 sentIndex 之后的剩余内容 + const remaining = this.lastNormalizedFull.slice(this.sentIndex); + if (remaining) { + const hasClosedTag = findFirstClosedMediaTag(remaining); + if (hasClosedTag) { + this.logDebug(`onIdle: unprocessed media tags in remaining text, processing now`); + await this.processMediaTags(this.lastNormalizedFull); + if (this.isTerminalPhase) return; + } + } + + await this.finalizeOnIdle(); + } + + /** + * onIdle 的终结逻辑:终结流式会话或标记完成/降级 + */ + private async finalizeOnIdle(): Promise { + // 等待正在进行的流式启动请求完成 + if (this.startingPromise) { + this.logDebug(`finalizeOnIdle: waiting for pending stream start`); + await this.startingPromise; + } + if (this.isTerminalPhase) return; + + // 等待所有 pending flush 完成 + await this.flush.waitForFlush(); + + // ---- 判断如何终结 ---- + if (this.streamMsgId) { + // 有活跃流式会话 → 发终结分片 + this.transition("completed", "onIdle", "normal"); + try { + // 当前会话的显示内容 = sentIndex 之后的纯文本(去掉未闭合标签) + const sessionText = this.lastNormalizedFull.slice(this.sentIndex); + const [safeText] = stripIncompleteMediaTag(sessionText); + this.logDebug(`finalizeOnIdle: sending DONE chunk, len=${safeText.length}`); + await this.sendStreamChunk(safeText, StreamInputState.DONE, "onIdle"); + this.logInfo(`streaming completed, final text length: ${safeText.length}`); + } catch (err) { + this.logError(`failed to send final stream chunk: ${err}`); + } + } else if (this.sentStreamChunkCount > 0) { + // 没有活跃流式会话,但之前发过流式分片或媒体 → 正常完成 + this.logInfo(`finalizeOnIdle: no active stream session, but sent ${this.sentStreamChunkCount} chunks (including ${this.sentMediaCount} media), marking completed`); + this.transition("completed", "onIdle", "no_active_session_but_sent"); + } else { + // 什么都没发过 → 降级 + this.logInfo(`no chunk or media sent, marking fallback to static`); + this.transition("aborted", "onIdle", "fallback_to_static_nothing_sent"); + } + } + + /** + * 处理错误 + */ + async onError(err: unknown): Promise { + this.logError(`reply error: ${err}`); + + if (this.isTerminalPhase) return; + + // 等待正在进行的流式启动请求完成 + if (this.startingPromise) { + this.logDebug(`onError: waiting for pending stream start`); + await this.startingPromise; + } + + if (this.isTerminalPhase) return; + + // 如果从未发出任何内容 → 降级 + if (this.sentStreamChunkCount === 0) { + this.logInfo(`no chunk or media sent, marking fallback to static for error handling`); + this.transition("aborted", "onError", "fallback_to_static_error"); + return; + } + + // 如果有活跃流式会话,发送错误终结分片 + if (this.streamMsgId) { + try { + const sessionText = this.lastNormalizedFull.slice(this.sentIndex); + const [safeText] = stripIncompleteMediaTag(sessionText); + const errorText = safeText + ? `${safeText}\n\n---\n**Error**: 生成响应时发生错误。` + : "**Error**: 生成响应时发生错误。"; + await this.sendStreamChunk(errorText, StreamInputState.DONE, "onError"); + } catch (sendErr) { + this.logError(`failed to send error stream chunk: ${sendErr}`); + } + } + + this.transition("completed", "onError", "error"); + await this.flush.waitForFlush(); + } + + // ------------------------------------------------------------------ + // 外部控制 + // ------------------------------------------------------------------ + + /** 标记分发已全部完成 */ + markFullyComplete(): void { + this.dispatchFullyComplete = true; + } + + /** 中止流式消息 */ + async abortStreaming(): Promise { + if (!this.transition("aborted", "abortStreaming", "abort")) return; + + await this.flush.waitForFlush(); + + if (this.streamMsgId) { + try { + const sessionText = this.lastNormalizedFull.slice(this.sentIndex); + const [safeText] = stripIncompleteMediaTag(sessionText); + const abortText = safeText || "(已中止)"; + await this.sendStreamChunk(abortText, StreamInputState.DONE, "abortStreaming"); + this.logInfo(`streaming aborted, sent final chunk`); + } catch (err) { + this.logError(`abort send failed: ${err}`); + } + } + } + + // ------------------------------------------------------------------ + // 内部:富媒体标签中断/恢复 + // ------------------------------------------------------------------ + + /** + * 处理富媒体标签(循环消费模型) + * + * 从 sentIndex 开始,对增量文本: + * 1. 优先找闭合标签 → 终结当前流式 → 同步发媒体 → 推进 sentIndex → reset → 继续 + * 2. 没有闭合标签但有未闭合前缀 → 标签前的安全文本仍需通过流式发送 → 推进 sentIndex → 等待标签闭合 + * 3. 纯文本 → 触发流式发送(performFlush 会动态计算要发的内容) + */ + private async processMediaTags(normalizedFull: string): Promise { + try { + // ---- 1. 循环消费所有已闭合的媒体标签 ---- + while (true) { + if (this.isTerminalPhase) return; + + const incremental = normalizedFull.slice(this.sentIndex); + const found = findFirstClosedMediaTag(incremental); + + if (!found) break; + + this.logInfo(`processMediaTags: found <${found.tagName}> at offset ${this.sentIndex}, textBefore="${found.textBefore.slice(0, 40)}"`); + + // ---- 1.1 终结当前流式会话(如果有的话) ---- + // endCurrentStreamIfNeeded 会用 sentIndex 到标签前文本结束的位置来发送终结分片 + // 先临时推进 sentIndex 到标签前文本结束的位置(用于终结分片的内容计算) + // 不,我们不需要推进——endCurrentStreamIfNeeded 发的是从 sentIndex 开始到当前文本前部分 + // 实际上需要把 textBefore 的内容加入到当前会话的显示范围 + // 终结时 performFlush/sendStreamChunk 用 lastNormalizedFull.slice(sentIndex) 中 textBefore 之前的部分 + // 但 endCurrentStreamIfNeeded 需要知道要发到哪里…… + + // 简化:计算标签前文本在全量中的结束位置 + const textBeforeEndInFull = this.sentIndex + found.textBefore.length; + + await this.endCurrentStreamIfNeeded("processMediaTags:closedTag", textBeforeEndInFull); + if (this.isTerminalPhase) return; + + // ---- 1.2 同步发送媒体文件 ---- + if (found.mediaPath && this.deps.mediaContext) { + const item: SendQueueItem = { type: found.itemType, content: found.mediaPath }; + this.logDebug(`processMediaTags: sending ${found.itemType}: ${found.mediaPath.slice(0, 80)}`); + await sendMediaQueue([item], this.deps.mediaContext); + this.sentMediaCount++; + this.sentStreamChunkCount++; + this.logDebug(`processMediaTags: media sent, sentMediaCount=${this.sentMediaCount}, sentStreamChunkCount=${this.sentStreamChunkCount}`); + } else if (found.mediaPath && !this.deps.mediaContext) { + this.logWarn(`processMediaTags: no mediaContext provided, cannot send ${found.itemType}`); + } + + // ---- 1.3 推进 sentIndex,重置流式状态 ---- + this.sentIndex += found.tagEndIndex; + this.logDebug(`processMediaTags: sentIndex updated to ${this.sentIndex}`); + this.resetStreamSession(); + } + + // ---- 循环结束:没有更多闭合标签 ---- + const remaining = normalizedFull.slice(this.sentIndex); + + if (!remaining) { + this.logDebug(`processMediaTags: no remaining text after media tags`); + return; + } + + // ---- 2. 检查是否有未闭合的标签前缀 ---- + const [safeText, hasIncomplete] = stripIncompleteMediaTag(remaining); + + if (hasIncomplete) { + this.logDebug(`processMediaTags: incomplete tag detected, safe text len=${safeText.length}, remaining len=${remaining.length}`); + + if (safeText) { + // 标签前的安全文本需要通过流式发送 + await this.ensureStreamingStarted(normalizedFull.length); + if (this.isTerminalPhase) return; + await this.flush.throttledUpdate(this.throttleMs); + } + // 未闭合标签部分留待下次 onPartialReply 带来更多文本后再处理 + return; + } + + // ---- 3. 纯文本 → 触发流式发送 ---- + // performFlush 会动态计算 lastNormalizedFull.slice(sentIndex) 的安全部分来发送 + this.logDebug(`processMediaTags: pure text, remaining len=${remaining.length}`); + + if (!remaining.trim()) { + // 纯空白文本 → 不启动流式 + this.logDebug(`processMediaTags: pure whitespace, skipping stream start`); + return; + } + + await this.ensureStreamingStarted(normalizedFull.length); + if (this.isTerminalPhase) return; + await this.flush.throttledUpdate(this.throttleMs); + + } catch (err) { + this.logError(`processMediaTags failed: ${err}`); + } + } + + /** + * 终结当前流式会话(如果有的话) + * + * @param caller 调用者标识(日志用) + * @param textEndInFull 本次终结需要发送到的全量文本位置(不含)。 + * 终结分片的内容 = lastNormalizedFull.slice(sentIndex, textEndInFull) + * + * 逻辑: + * - 有活跃 streamMsgId → 等待 flush 完成 → 发 DONE 分片终结 + * - 没有 streamMsgId 但有非空白文本 → 启动流式 → 立即终结 + * - 纯空白且无活跃流式 → 不发送 + */ + private async endCurrentStreamIfNeeded(caller: string, textEndInFull: number): Promise { + // 先等待启动完成 + if (this.startingPromise) { + this.logDebug(`${caller}: waiting for pending stream start`); + await this.startingPromise; + } + + // 停止所有 flush 活动 + await this.flush.cancelPendingAndWait(); + + // 计算当前会话要发的文本 + const sessionText = this.lastNormalizedFull.slice(this.sentIndex, textEndInFull); + const [safeText] = stripIncompleteMediaTag(sessionText); + + if (this.streamMsgId) { + // 有活跃流式会话 → 终结它 + try { + await this.sendStreamChunk(safeText, StreamInputState.DONE, caller); + this.logDebug(`${caller}: current stream session ended`); + } catch (err) { + this.logError(`${caller}: failed to end stream: ${err}`); + } + } else if (safeText && safeText.trim()) { + // 没有活跃流式会话,但有非空白文本未发送 → 启动流式 → 立即终结 + // 先临时存储到 _pendingSessionText 以便 doStartStreaming 使用 + this._pendingSessionText = safeText; + await this.ensureStreamingStarted(textEndInFull); + this._pendingSessionText = null; + if (this.isTerminalPhase) return; + if (this.startingPromise) await this.startingPromise; + if (this.streamMsgId) { + try { + await this.sendStreamChunk(safeText, StreamInputState.DONE, caller); + this.logDebug(`${caller}: started and ended stream for pre-tag text`); + } catch (err) { + this.logError(`${caller}: failed to send pre-tag text: ${err}`); + } + } + } + // 如果纯空白且没有活跃流式 → 不发送 + } + + /** 临时存储 endCurrentStreamIfNeeded 需要立即发送的文本(用于 doStartStreaming) */ + private _pendingSessionText: string | null = null; + + /** + * 重置流式会话状态(用于媒体中断后恢复) + * + * 只重置会话相关状态,不重置 sentIndex 和 dispatch 标记。 + * 新流式会话从当前 sentIndex 开始(performFlush 动态计算内容)。 + */ + private resetStreamSession(): void { + const prevPhase = this.phase; + this.phase = "idle"; + this.logDebug(`phase: ${prevPhase} → idle (source: resetStreamSession, forced reset for media resume)`); + this.streamMsgId = null; + this.streamIndex = 0; + this.msgSeq = null; + this.startingPromise = null; + this.flush.reset(() => this.performFlush()); + // 注意:不重置 sentIndex、lastNormalizedFull、dispatchFullyComplete、sentStreamChunkCount、sentMediaCount + } + + // ------------------------------------------------------------------ + // 内部:流式会话管理 + // ------------------------------------------------------------------ + + /** 确保流式会话已开始(首次调用创建;并发调用者会等待首次完成) */ + private async ensureStreamingStarted(textEndInFull: number): Promise { + if (this.streamMsgId || this.isTerminalPhase) return; + + if (this.startingPromise) { + this.logDebug(`ensureStreamingStarted: waiting for pending start request`); + await this.startingPromise; + return; + } + + if (!this.transition("streaming", "ensureStreamingStarted")) return; + + this.startingPromise = this.doStartStreaming(textEndInFull); + try { + await this.startingPromise; + } finally { + this.startingPromise = null; + } + } + + /** 实际执行流式启动逻辑 */ + private async doStartStreaming(textEndInFull: number): Promise { + try { + // 计算当前会话要发送的文本 + // 优先使用 _pendingSessionText(endCurrentStreamIfNeeded 需要立即发送的文本) + // 否则使用调用处预先确定的 sentIndex → textEndInFull 范围 + const sessionText = this._pendingSessionText + ?? this.lastNormalizedFull.slice(this.sentIndex, textEndInFull); + const [safeText] = stripIncompleteMediaTag(sessionText); + + // 全空白文本 → 不开启流式,退回 idle + if (!safeText?.trim()) { + this.logDebug(`doStartStreaming: skipped (session text is empty or whitespace-only)`); + this.transition("idle", "doStartStreaming", "whitespace_only_text"); + return; + } + const firstText = safeText; + const resp = await this.sendStreamChunk(firstText, StreamInputState.GENERATING, "doStartStreaming"); + + if (resp.code && resp.code > 0) { + throw new Error(`Stream API error: code=${resp.code}, message=${resp.message}`); + } + + if (!resp.id) { + throw new Error(`Stream API returned no id: ${JSON.stringify(resp)}`); + } + + this.streamMsgId = resp.id; + this.flush.setReady(true); + this.logInfo(`stream started, stream_msg_id=${resp.id}`); + } catch (err) { + this.logError(`failed to start streaming: ${err}`); + this.transition("idle", "doStartStreaming", "start_failed_will_retry"); + } + } + + /** 发送一个流式分片(不做任何文本修改) */ + private async sendStreamChunk( + content: string, + inputState: StreamInputState, + caller: string, + ): Promise { + this.logDebug(`sendStreamChunk: caller=${caller}, inputState=${inputState}, contentLen=${content.length}, streamMsgId=${this.streamMsgId}, index=${this.streamIndex}`); + + // 同一流式会话内所有 chunk 共享同一个 msgSeq;新会话首次发送时生成 + if (this.msgSeq === null) { + this.msgSeq = getNextMsgSeq(this.deps.replyToMsgId); + } + const currentIndex = this.streamIndex++; + + const token = await getAccessToken( + this.deps.account.appId, + this.deps.account.clientSecret, + ); + + const resp = await sendC2CStreamMessage(token, this.deps.userId, { + input_mode: StreamInputMode.REPLACE, + input_state: inputState, + content_type: StreamContentType.MARKDOWN, + content_raw: content, + event_id: this.deps.eventId, + msg_id: this.deps.replyToMsgId, + stream_msg_id: this.streamMsgId ?? undefined, + msg_seq: this.msgSeq, + index: currentIndex, + }); + + // 只有 code 存在且 > 0 才是失败 + if (resp.code && resp.code > 0) { + throw new Error(`Stream API error: code=${resp.code}, message=${resp.message}`); + } + + // 分片发送成功 + this.sentStreamChunkCount++; + + return resp; + } + + // ------------------------------------------------------------------ + // 内部:flush 实现 + // ------------------------------------------------------------------ + + /** 执行一次实际的流式内容更新 */ + private async performFlush(): Promise { + this.logDebug(`performFlush: phase=${this.phase}, streamMsgId=${this.streamMsgId}, sentIndex=${this.sentIndex}`); + if (!this.streamMsgId || this.isTerminalPhase) { + this.logDebug(`performFlush: skipped (streamMsgId=${this.streamMsgId}, terminal=${this.isTerminalPhase})`); + return; + } + + // 动态计算当前会话要发送的文本 = 从 sentIndex 开始的增量 + const sessionText = this.lastNormalizedFull.slice(this.sentIndex); + if (!sessionText) { + this.logDebug(`performFlush: skipped (empty session text)`); + return; + } + + // 安全检查:确保不会把未闭合的媒体标签前缀发给用户 + const [safeText, hasIncomplete] = stripIncompleteMediaTag(sessionText); + if (hasIncomplete) { + this.logDebug(`flush: detected incomplete media tag, sending safe text (${safeText.length}/${sessionText.length} chars)`); + } + if (!safeText) { + this.logDebug(`performFlush: skipped (safeText empty after stripIncompleteMediaTag)`); + return; + } + + this.logDebug(`performFlush: sending chunk, safeText len=${safeText.length}`); + try { + await this.sendStreamChunk(safeText, StreamInputState.GENERATING, "performFlush"); + this.logDebug(`performFlush: chunk sent OK, sentStreamChunks=${this.sentStreamChunkCount}`); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + this.logError(`stream flush failed, will retry on next scheduled flush: ${msg}`); + } + } +} + +// ============ 辅助函数 ============ + + +// ============ 流式媒体发送 ============ + +/** 流式媒体发送上下文(由 gateway 注入到 StreamingController) */ +export interface StreamingMediaContext { + /** 账户信息 */ + account: ResolvedQQBotAccount; + /** 事件信息 */ + event: { + type: "c2c" | "group" | "channel"; + senderId: string; + messageId: string; + groupOpenid?: string; + channelId?: string; + }; + /** 日志 */ + log?: { + info: (msg: string) => void; + error: (msg: string) => void; + debug?: (msg: string) => void; + }; +} + +/** + * 将 StreamingMediaContext 转换为公共的 MediaSendContext + */ +function toMediaSendContext(ctx: StreamingMediaContext): MediaSendContext { + const { account, event, log } = ctx; + + const mediaTarget: MediaTargetContext = { + targetType: event.type, + targetId: + event.type === "c2c" + ? event.senderId + : event.type === "group" + ? event.groupOpenid! + : event.channelId!, + account, + replyToId: event.messageId, + logPrefix: `[qqbot:${account.accountId}]`, + }; + + const qualifiedTarget = + event.type === "group" + ? `qqbot:group:${event.groupOpenid}` + : `qqbot:c2c:${event.senderId}`; + + return { + mediaTarget, + qualifiedTarget, + account, + replyToId: event.messageId, + log, + }; +} + +/** + * 按顺序发送媒体队列中的所有项(流式场景专用) + */ +async function sendMediaQueue( + queue: SendQueueItem[], + ctx: StreamingMediaContext, +): Promise { + const sendCtx = toMediaSendContext(ctx); + + await executeSendQueue(queue, sendCtx, { + // 流式场景下跳过 inter-tag 文本(由新流式会话处理) + skipInterTagText: true, + }); +} + +// ============ 流式模式判断 ============ + +/** + * 判断是否应该对当前消息使用流式模式 + * + * 条件: + * 1. 账户配置 streaming 未显式设为 false(默认启用) + * 2. 目标类型为 c2c(私聊)—— 流式 API 仅支持 C2C + */ +export function shouldUseStreaming( + account: ResolvedQQBotAccount, + targetType: "c2c" | "group" | "channel", +): boolean { + // 开关默认关闭,设置 streaming: true 可开启 + if (account.config?.streaming !== true) { + return false; + } + // 目前流式 API 仅支持 C2C 私聊 + if (targetType !== "c2c") { + return false; + } + return true; +} diff --git a/src/types.ts b/src/types.ts index fb3f427..8d44228 100644 --- a/src/types.ts +++ b/src/types.ts @@ -75,6 +75,15 @@ export interface QQBotAccountConfig { * 当短时间内收到多次 deliver 时,将文本合并为一条消息发送,避免消息轰炸 */ deliverDebounce?: DeliverDebounceConfig; + /** + * 是否启用流式消息(默认 false) + * 启用后,AI 的回复会以流式形式逐步显示在 QQ 聊天中, + * 用户可以看到文字逐字出现的打字机效果。 + * 设置为 true 可开启流式消息。 + * + * 注意:仅 C2C(私聊)支持流式消息 API。 + */ + streaming?: boolean; } /** @@ -212,3 +221,74 @@ export interface WSPayload { s?: number; t?: string; } + + + +// ---- 流式消息常量 ---- + +/** 流式消息输入模式 */ +export const StreamInputMode = { + /** 每次发送的 content_raw 替换整条消息内容 */ + REPLACE: "replace", +} as const; +export type StreamInputMode = (typeof StreamInputMode)[keyof typeof StreamInputMode]; + +/** 流式消息输入状态 */ +export const StreamInputState = { + /** 正文生成中 */ + GENERATING: 1, + /** 正文生成结束(终结状态) */ + DONE: 10, +} as const; +export type StreamInputState = (typeof StreamInputState)[keyof typeof StreamInputState]; + +/** 流式消息内容类型 */ +export const StreamContentType = { + MARKDOWN: "markdown", +} as const; +export type StreamContentType = (typeof StreamContentType)[keyof typeof StreamContentType]; + +/** + * 流式消息请求体 + * 对应 StreamReq proto + */ +export interface StreamMessageRequest { + /** 输入模式 */ + input_mode: StreamInputMode; + /** 输入状态 */ + input_state: StreamInputState; + /** 内容类型 */ + content_type: StreamContentType; + /** markdown 内容 */ + content_raw: string; + /** 事件 ID */ + event_id: string; + /** 原始消息 ID */ + msg_id: string; + /** 流式消息 ID,首次发送后返回,后续分片需携带 */ + stream_msg_id?: string; + /** 递增序号 */ + msg_seq: number; + /** 同一条流式会话内的发送索引,从 0 开始,每次发送前递增;新流式会话重新从 0 开始 */ + index: number; +} + +/** + * 流式消息响应体 + * 对应 StreamRsp proto + * + * 成功时返回:{ id, timestamp, extInfo }(无 code/message) + * 失败时返回:{ code, message }(code > 0) + */ +export interface StreamMessageResponse { + /** 错误码,仅失败时存在(> 0 表示失败);成功时不存在 */ + code?: number; + /** 错误信息,仅失败时存在 */ + message?: string; + /** 流式消息 ID */ + id?: string; + /** 时间戳 */ + timestamp?: string; + /** 扩展信息 */ + extInfo?: Record; +} diff --git a/src/user-messages.ts b/src/user-messages.ts deleted file mode 100644 index bff5285..0000000 --- a/src/user-messages.ts +++ /dev/null @@ -1,7 +0,0 @@ -/** - * 用户面向的提示文案 — 已清空 - * - * 设计原则(对齐飞书插件): - * QQBot 插件层不生成额外的用户提示信息。 - * 所有运行时错误仅写日志,不面向用户展示。 - */ diff --git a/src/utils/media-send.ts b/src/utils/media-send.ts new file mode 100644 index 0000000..6c02f43 --- /dev/null +++ b/src/utils/media-send.ts @@ -0,0 +1,563 @@ +/** + * 富媒体标签解析与发送队列 + * + * 提供媒体标签(qqimg / qqvoice / qqvideo / qqfile / qqmedia)的检测、 + * 拆分、路径编码修复,以及统一的发送队列执行器。 + */ + +/* eslint-disable no-undef -- Buffer is a Node.js global */ +import { normalizeMediaTags } from "./media-tags.js"; +import { normalizePath } from "./platform.js"; +import { + sendPhoto, + sendVoice, + sendVideoMsg, + sendDocument, + sendMedia as sendMediaAuto, + type MediaTargetContext, +} from "../outbound.js"; +import type { ResolvedQQBotAccount } from "../types.js"; + +// ============ 类型定义 ============ + +/** 发送队列项 */ +export interface SendQueueItem { + type: "text" | "image" | "voice" | "video" | "file" | "media"; + content: string; +} + +/** 统一的媒体标签正则 — 匹配标准化后的 6 种标签 */ +export const MEDIA_TAG_REGEX = + /<(qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>([^<>]+)<\/(?:qqimg|qqvoice|qqvideo|qqfile|qqmedia|img)>/gi; + +/** 创建一个新的全局标签正则实例(每次调用 reset lastIndex) */ +export function createMediaTagRegex(): RegExp { + return new RegExp(MEDIA_TAG_REGEX.source, MEDIA_TAG_REGEX.flags); +} + +/** 媒体发送上下文(统一的,供流式和普通模式共用) */ +export interface MediaSendContext { + /** 媒体目标上下文(用于 sendPhoto/sendVoice 等) */ + mediaTarget: MediaTargetContext; + /** qualifiedTarget(格式 "qqbot:c2c:xxx" 或 "qqbot:group:xxx",用于 sendMediaAuto) */ + qualifiedTarget: string; + /** 账户配置 */ + account: ResolvedQQBotAccount; + /** 事件消息 ID(用于被动回复) */ + replyToId?: string; + /** 日志 */ + log?: { + info: (msg: string) => void; + error: (msg: string) => void; + debug?: (msg: string) => void; + }; +} + +// ============ 路径编码修复 ============ + +/** + * 修复路径编码问题(双反斜杠、八进制转义、UTF-8 双重编码) + * + * 这是由于 LLM 输出路径时可能引入的编码问题: + * - Markdown 转义导致双反斜杠 + * - 八进制转义序列(来自某些 shell 工具的输出) + * - UTF-8 双重编码(中文路径经过多层处理后的乱码) + * + * 此方法在 gateway.ts deliver 回调、outbound.ts sendText、 + * streaming.ts sendMediaQueue 中共用。 + */ +export function fixPathEncoding(mediaPath: string, log?: { debug?: (msg: string) => void; error?: (msg: string) => void }): string { + // 1. 双反斜杠 -> 单反斜杠(Markdown 转义) + let result = mediaPath.replace(/\\\\/g, "\\"); + + // 2. 八进制转义序列 + UTF-8 双重编码修复 + try { + const hasOctal = /\\[0-7]{1,3}/.test(result); + const hasNonASCII = /[\u0080-\u00FF]/.test(result); + + if (hasOctal || hasNonASCII) { + log?.debug?.(`Decoding path with mixed encoding: ${result}`); + + // Step 1: 将八进制转义转换为字节 + let decoded = result.replace( + /\\([0-7]{1,3})/g, + (_: string, octal: string) => String.fromCharCode(parseInt(octal, 8)), + ); + + // Step 2: 提取所有字节(包括 Latin-1 字符) + const bytes: number[] = []; + for (let i = 0; i < decoded.length; i++) { + const code = decoded.charCodeAt(i); + if (code <= 0xff) { + bytes.push(code); + } else { + const charBytes = Buffer.from(decoded[i]!, "utf8"); + bytes.push(...charBytes); + } + } + + // Step 3: 尝试按 UTF-8 解码 + const buffer = Buffer.from(bytes); + const utf8Decoded = buffer.toString("utf8"); + + if ( + !utf8Decoded.includes("\uFFFD") || + utf8Decoded.length < decoded.length + ) { + result = utf8Decoded; + log?.debug?.(`Successfully decoded path: ${result}`); + } + } + } catch (decodeErr) { + log?.error?.(`Path decode error: ${decodeErr}`); + } + + return result; +} + +// ============ 媒体标签解析 ============ + +/** + * 检测文本是否包含富媒体标签 + */ +export function hasMediaTags(text: string): boolean { + const normalized = normalizeMediaTags(text); + const regex = createMediaTagRegex(); + return regex.test(normalized); +} + +/** findFirstClosedMediaTag 的返回值 */ +export interface FirstClosedMediaTag { + /** 标签前的纯文本(已 trim) */ + textBefore: string; + /** 标签类型(小写,如 "qqvoice") */ + tagName: string; + /** 标签内的媒体路径(已 trim、去 MEDIA: 前缀、修复编码) */ + mediaPath: string; + /** 标签在输入文本中的结束索引(紧接标签后的第一个字符位置) */ + tagEndIndex: number; + /** 映射后的发送队列项类型 */ + itemType: SendQueueItem["type"]; +} + +/** + * 在文本中查找**第一个**完整闭合的媒体标签 + * + * 与 splitByMediaTags 不同,此函数只匹配一个标签就停止, + * 用于流式场景的"循环消费"模式:每次处理一个标签,更新偏移,再找下一个。 + * + * @param text 待检查的文本(应已 normalize 过) + * @returns 第一个闭合标签的信息,没有则返回 null + */ +export function findFirstClosedMediaTag( + text: string, + log?: { info?: (msg: string) => void; debug?: (msg: string) => void; error?: (msg: string) => void }, +): FirstClosedMediaTag | null { + const regex = createMediaTagRegex(); + const match = regex.exec(text); + if (!match) return null; + + const textBefore = text.slice(0, match.index).replace(/\n{3,}/g, "\n\n").trim(); + const tagName = match[1]!.toLowerCase(); + let mediaPath = match[2]?.trim() ?? ""; + + // 剥离 MEDIA: 前缀 + if (mediaPath.startsWith("MEDIA:")) { + mediaPath = mediaPath.slice("MEDIA:".length); + } + mediaPath = normalizePath(mediaPath); + mediaPath = fixPathEncoding(mediaPath, log); + + const typeMap: Record = { + qqimg: "image", + qqvoice: "voice", + qqvideo: "video", + qqfile: "file", + qqmedia: "media", + }; + + return { + textBefore, + tagName, + mediaPath, + tagEndIndex: match.index! + match[0].length, + itemType: typeMap[tagName] ?? "image", + }; +} + +/** + * 媒体标签拆分结果 + */ +export interface MediaSplitResult { + /** 是否包含媒体标签 */ + hasMediaTags: boolean; + /** 媒体标签前的纯文本 */ + textBeforeFirstTag: string; + /** 媒体标签后的剩余文本 */ + textAfterLastTag: string; + /** 完整的发送队列(标签间的文本 + 媒体项) */ + mediaQueue: SendQueueItem[]; +} + +/** + * 将文本按富媒体标签拆分为三部分 + * + * 用于两个场景: + * 1. 流式模式:中断-恢复流程(标签前文本 → 结束流式 → 发送媒体 → 新流式 → 标签后文本) + * 2. 普通模式:构建按顺序发送的队列 + */ +export function splitByMediaTags( + text: string, + log?: { info?: (msg: string) => void; debug?: (msg: string) => void; error?: (msg: string) => void }, +): MediaSplitResult { + const normalized = normalizeMediaTags(text); + const regex = createMediaTagRegex(); + const matches = [...normalized.matchAll(regex)]; + + if (matches.length === 0) { + return { + hasMediaTags: false, + textBeforeFirstTag: normalized, + textAfterLastTag: "", + mediaQueue: [], + }; + } + + // 第一个标签前的纯文本 + const firstMatch = matches[0]!; + const textBeforeFirstTag = normalized + .slice(0, firstMatch.index) + .replace(/\n{3,}/g, "\n\n") + .trim(); + + // 最后一个标签后的纯文本 + const lastMatch = matches[matches.length - 1]!; + const lastMatchEnd = lastMatch.index! + lastMatch[0].length; + const textAfterLastTag = normalized + .slice(lastMatchEnd) + .replace(/\n{3,}/g, "\n\n") + .trim(); + + // 构建媒体发送队列 + const mediaQueue: SendQueueItem[] = []; + let lastIndex = firstMatch.index!; + + for (const match of matches) { + // 标签前的文本(标签之间的间隔文本) + const textBetween = normalized + .slice(lastIndex, match.index) + .replace(/\n{3,}/g, "\n\n") + .trim(); + if (textBetween && lastIndex !== firstMatch.index) { + // 只添加非首段的间隔文本(首段由 textBeforeFirstTag 覆盖) + mediaQueue.push({ type: "text", content: textBetween }); + } + + // 解析标签内容 + const tagName = match[1]!.toLowerCase(); + let mediaPath = match[2]?.trim() ?? ""; + + // 剥离 MEDIA: 前缀 + if (mediaPath.startsWith("MEDIA:")) { + mediaPath = mediaPath.slice("MEDIA:".length); + } + mediaPath = normalizePath(mediaPath); + + // 修复路径编码问题 + mediaPath = fixPathEncoding(mediaPath, log); + + // 根据标签类型加入队列 + const typeMap: Record = { + qqimg: "image", + qqvoice: "voice", + qqvideo: "video", + qqfile: "file", + qqmedia: "media", + }; + const itemType = typeMap[tagName] ?? "image"; + if (mediaPath) { + mediaQueue.push({ type: itemType, content: mediaPath }); + log?.info?.(`Found ${itemType} in <${tagName}>: ${mediaPath.slice(0, 80)}`); + } + + lastIndex = match.index! + match[0].length; + } + + return { + hasMediaTags: true, + textBeforeFirstTag, + textAfterLastTag, + mediaQueue, + }; +} + +/** + * 从文本中解析出完整的发送队列(含标签前后的纯文本) + * + * 与 splitByMediaTags 的区别: + * - splitByMediaTags 分为 before / queue / after 三段(供流式模式的中断-恢复) + * - parseMediaTagsToSendQueue 返回一个扁平的完整队列(供普通模式按顺序发送) + * + * 适用于 gateway.ts deliver 回调和 outbound.ts sendText。 + */ +export function parseMediaTagsToSendQueue( + text: string, + log?: { info?: (msg: string) => void; debug?: (msg: string) => void; error?: (msg: string) => void }, +): { hasMediaTags: boolean; sendQueue: SendQueueItem[] } { + const split = splitByMediaTags(text, log); + + if (!split.hasMediaTags) { + return { hasMediaTags: false, sendQueue: [] }; + } + + const sendQueue: SendQueueItem[] = []; + + // 标签前的文本 + if (split.textBeforeFirstTag) { + sendQueue.push({ type: "text", content: split.textBeforeFirstTag }); + } + + // 媒体队列(含标签间文本) + sendQueue.push(...split.mediaQueue); + + // 标签后的文本 + if (split.textAfterLastTag) { + sendQueue.push({ type: "text", content: split.textAfterLastTag }); + } + + return { hasMediaTags: true, sendQueue }; +} + +// ============ 发送队列执行 ============ + +/** + * 统一执行发送队列 + * + * 遍历 sendQueue,按类型调用对应的发送函数。 + * 文本项通过 onSendText 回调处理(不同场景的文本发送方式不同)。 + */ +export async function executeSendQueue( + queue: SendQueueItem[], + ctx: MediaSendContext, + options: { + /** 文本发送回调(每种场景的文本发送方式不同) */ + onSendText?: (text: string) => Promise; + /** 是否跳过 inter-tag 文本(流式模式下通常跳过,由新流式会话处理) */ + skipInterTagText?: boolean; + } = {}, +): Promise { + const { mediaTarget, qualifiedTarget, account, replyToId, log } = ctx; + const prefix = mediaTarget.logPrefix ?? `[qqbot:${account.accountId}]`; + + for (const item of queue) { + try { + if (item.type === "text") { + if (options.skipInterTagText) { + log?.info(`${prefix} executeSendQueue: skipping inter-tag text (${item.content.length} chars)`); + continue; + } + if (options.onSendText) { + await options.onSendText(item.content); + } else { + log?.info(`${prefix} executeSendQueue: no onSendText handler, skipping text`); + } + continue; + } + + log?.info(`${prefix} executeSendQueue: sending ${item.type}: ${item.content.slice(0, 80)}...`); + + if (item.type === "image") { + const result = await sendPhoto(mediaTarget, item.content); + if (result.error) { + log?.error(`${prefix} sendPhoto error: ${result.error}`); + } + } else if (item.type === "voice") { + const uploadFormats = + account.config?.audioFormatPolicy?.uploadDirectFormats ?? + account.config?.voiceDirectUploadFormats; + const transcodeEnabled = + account.config?.audioFormatPolicy?.transcodeEnabled !== false; + const voiceTimeout = 45000; // 45s + try { + const result = await Promise.race([ + sendVoice(mediaTarget, item.content, uploadFormats, transcodeEnabled), + new Promise<{ channel: string; error: string }>((resolve) => + setTimeout( + () => resolve({ channel: "qqbot", error: "语音发送超时,已跳过" }), + voiceTimeout, + ), + ), + ]); + if (result.error) { + log?.error(`${prefix} sendVoice error: ${result.error}`); + } + } catch (err) { + log?.error(`${prefix} sendVoice unexpected error: ${err}`); + } + } else if (item.type === "video") { + const result = await sendVideoMsg(mediaTarget, item.content); + if (result.error) { + log?.error(`${prefix} sendVideoMsg error: ${result.error}`); + } + } else if (item.type === "file") { + const result = await sendDocument(mediaTarget, item.content); + if (result.error) { + log?.error(`${prefix} sendDocument error: ${result.error}`); + } + } else if (item.type === "media") { + const result = await sendMediaAuto({ + to: qualifiedTarget, + text: "", + mediaUrl: item.content, + accountId: account.accountId, + replyToId, + account, + }); + if (result.error) { + log?.error(`${prefix} sendMedia(auto) error: ${result.error}`); + } + } + } catch (err) { + log?.error(`${prefix} executeSendQueue: failed to send ${item.type}: ${err}`); + } + } +} + +/** + * 从文本中剥离所有媒体标签(用于最终显示) + */ +export function stripMediaTags(text: string): string { + const regex = createMediaTagRegex(); + return text.replace(regex, "").replace(/\n{3,}/g, "\n\n").trim(); +} + +/** + * 检测文本中是否有未闭合的媒体标签,如果有则截断到安全位置。 + * + * 流式输出中 LLM 逐 token 吐出媒体标签,中间态不应直接发给用户。 + * 只检查最后一行,从右到左扫描 `<`,找到第一个有意义的媒体标签片段并判断是否完整。 + * + * 核心原则:截断只能截到**开标签**前面;闭合标签前缀若找不到对应开标签则原样返回。 + */ +export function stripIncompleteMediaTag(text: string): [safeText: string, hasIncomplete: boolean] { + if (!text) return [text, false]; + + const lastNL = text.lastIndexOf("\n"); + const lastLine = lastNL === -1 ? text : text.slice(lastNL + 1); + if (!lastLine) return [text, false]; // 以换行结尾,安全 + + const lineStart = lastNL === -1 ? 0 : lastNL + 1; + + // ---- 媒体标签名判断 ---- + const MEDIA_NAMES = [ + "qq", "img", "image", "pic", "photo", "voice", "audio", "video", + "file", "doc", "media", "attach", "send", "document", "picture", + "qqvoice", "qqaudio", "qqvideo", "qqimg", "qqimage", "qqfile", + "qqpic", "qqphoto", "qqmedia", "qqattach", "qqsend", "qqdocument", "qqpicture", + ]; + const isMedia = (n: string) => MEDIA_NAMES.includes(n.toLowerCase()); + const couldBeMedia = (n: string) => { const l = n.toLowerCase(); return MEDIA_NAMES.some(m => m.startsWith(l)); }; + + /** 截断到 lastLine 中位置 pos 之前,返回 [safe, true] */ + const cutAt = (pos: number): [string, true] => [ + text.slice(0, lineStart + pos).trimEnd(), + true, + ]; + + /** 检查 lastLine 中位置 pos 处的媒体开标签后面是否有完整闭合标签 */ + const hasClosingAfter = (pos: number, name: string): boolean => { + const rest = lastLine.slice(pos + 1); // < 之后 + const gt = rest.search(/[>>]/); + if (gt < 0) return false; + const after = rest.slice(gt + 1); + return new RegExp(`[<\uFF1C]/${name}\\s*[>\uFF1E]`, "i").test(after); + }; + + // ---- 回溯状态 ---- + // 遇到不完整的闭合标签/孤立 < 时,记录并继续往左找对应的开标签 + let searchTag: string | null = null; // 要找的开标签名,"*" = 来自孤立 < + let searchIsClosing = false; // 触发回溯的是闭合类(= 0; i--) { + const ch = lastLine[i]; + if (ch !== "<" && ch !== "\uFF1C") continue; + + const after = lastLine.slice(i + 1); + const isClosing = after.startsWith("/"); + const nameStr = isClosing ? after.slice(1) : after; + const nameMatch = nameStr.match(/^(\w+)/); + + // ======== 回溯模式:正在找对应的开标签 ======== + if (searchTag) { + if (!nameMatch || isClosing) continue; + const cand = nameMatch[1].toLowerCase(); + if (!isMedia(cand)) continue; + // 跳过已有完整闭合对的开标签 + if (hasClosingAfter(i, cand)) continue; + + if (searchTag === "*") { + return cutAt(i); // 通配:任何未闭合的媒体开标签都匹配 + } + // 精确/前缀匹配(闭合标签名可能不完整,如 >]/.test(restAfterName); + + // --- 不是媒体标签(也不是前缀) --- + if (!isMedia(tag) && !(couldBeMedia(tag) && !hasGT)) continue; + + // --- 标签未闭合(无 >),还在输入中 --- + if (!hasGT) { + if (isClosing) { + // 不完整闭合标签(如 ,是完整的 --- + if (isClosing) return [text, false]; // 完整闭合标签 → 安全 + + // 完整开标签 ,检查后面有无对应 + if (hasClosingAfter(i, tag)) return [text, false]; + return cutAt(i); // 无闭合 → 截断 + } + + // ---- 循环结束,处理回溯未命中 ---- + if (searchTag) { + if (!searchIsClosing) { + // 来自孤立 <,前面没有媒体开标签 → 截断到那个 < 前面 + return cutAt(fallbackPos); + } + // 来自闭合类( 0 模拟 API 慢响应。 */ +let apiDelayMs = 0; + +/** 控制媒体上传 API 的延迟(毫秒)。 */ +let mediaApiDelayMs = 0; + +function sleep(ms: number): Promise { + return new Promise((r) => setTimeout(r, ms)); +} + +function resetMocks() { + streamCalls = []; + mediaUploadCalls = []; + streamMsgIdCounter = 0; + apiDelayMs = 0; + mediaApiDelayMs = 0; +} + +// 保存原始 fetch +const originalFetch = globalThis.fetch; + +// 覆写 global.fetch +globalThis.fetch = async (input: RequestInfo | URL, init?: RequestInit): Promise => { + const url = typeof input === "string" ? input : input instanceof URL ? input.toString() : input.url; + const body = init?.body ? JSON.parse(init.body as string) : {}; + + // ---- Token 请求 ---- + if (url.includes("/getAppAccessToken")) { + return new Response(JSON.stringify({ + access_token: "mock-token-12345", + expires_in: "7200", + }), { status: 200, headers: { "Content-Type": "application/json" } }); + } + + // ---- 流式消息 API ---- + if (url.includes("/stream_messages")) { + if (apiDelayMs > 0) await sleep(apiDelayMs); + + const call: StreamCall = { + content: body.content_raw ?? "", + inputState: body.input_state ?? 0, + streamMsgId: body.stream_msg_id, + index: body.index ?? 0, + url, + }; + streamCalls.push(call); + + // 首次调用(无 stream_msg_id)→ 返回新的 stream_msg_id + let respBody: any; + if (!body.stream_msg_id) { + streamMsgIdCounter++; + respBody = { id: `stream-${streamMsgIdCounter}`, timestamp: Date.now().toString() }; + } else { + respBody = { id: body.stream_msg_id, timestamp: Date.now().toString() }; + } + + return new Response(JSON.stringify(respBody), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + // ---- 富媒体上传 API (v2/users/.../files) ---- + if (url.includes("/files")) { + if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs); + + mediaUploadCalls.push({ url, body }); + + return new Response(JSON.stringify({ + file_uuid: `uuid-${mediaUploadCalls.length}`, + file_info: "mock", + ttl: 3600, + }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + // ---- 普通消息 API (v2/users/.../messages) ---- + if (url.includes("/messages")) { + if (mediaApiDelayMs > 0) await sleep(mediaApiDelayMs); + + return new Response(JSON.stringify({ + id: `msg-resp-${Date.now()}`, + timestamp: Date.now().toString(), + }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }); + } + + // 未匹配的请求,回退到原始 fetch + console.warn(`[mock-fetch] 未匹配的请求: ${url}`); + return originalFetch(input, init); +}; + +// ---- 现在 import StreamingController(它会使用被 mock 的 global.fetch) ---- +const { StreamingController } = await import("../src/streaming.js"); +type StreamingControllerType = InstanceType; + +// ============ 辅助函数 ============ + +/** 等待异步任务完成 */ +async function flush(ms = 100): Promise { + await sleep(ms); +} + +/** 收集日志 */ +const logs: string[] = []; + +function createController(opts?: { mediaContext?: boolean; onReplyBoundary?: (newText: string) => void | Promise }): StreamingControllerType { + logs.length = 0; + const deps: any = { + account: { + accountId: "test", + enabled: true, + appId: "test-app", + clientSecret: "test-secret", + secretSource: "config" as const, + markdownSupport: true, + config: { + streaming: true, + streamingConfig: { throttleMs: 50 }, // 短节流方便测试 + }, + }, + userId: "user-1", + replyToMsgId: "msg-1", + eventId: "event-1", + logPrefix: "[test]", + log: { + info: (m: string) => logs.push(`[INFO] ${m}`), + error: (m: string) => logs.push(`[ERROR] ${m}`), + warn: (m: string) => logs.push(`[WARN] ${m}`), + debug: (m: string) => logs.push(`[DEBUG] ${m}`), + }, + }; + if (opts?.mediaContext) { + deps.mediaContext = { + account: deps.account, + event: { type: "c2c", senderId: "user-1", messageId: "msg-1" }, + log: deps.log, + }; + } + if (opts?.onReplyBoundary) { + deps.onReplyBoundary = opts.onReplyBoundary; + } + return new StreamingController(deps); +} + +// ============ 测试框架 ============ + +let passed = 0; +let failed = 0; +const failedTests: string[] = []; + +async function test(name: string, fn: () => Promise) { + resetMocks(); + try { + await fn(); + console.log(` ✅ ${name}`); + passed++; + } catch (e: any) { + console.log(` ❌ ${name}`); + console.log(` ${e.message}`); + if (e.stack) { + const lines = (e.stack as string).split("\n").slice(1, 4); + for (const l of lines) console.log(` ${l.trim()}`); + } + // 打印关键日志(去掉 DEBUG 减少噪音) + const relevantLogs = logs.filter((l) => !l.includes("[DEBUG]")).slice(-10); + if (relevantLogs.length > 0) { + console.log(` --- 日志 ---`); + for (const l of relevantLogs) console.log(` ${l}`); + } + failed++; + failedTests.push(name); + } +} + +// ============ 测试用例 ============ + +console.log("\n=== 1. 纯文本流式 ==="); + +await test("纯文本: 基本流式 → 完成", async () => { + const ctrl = createController(); + + await ctrl.onPartialReply({ text: "你好" }); + await flush(); + await ctrl.onPartialReply({ text: "你好世界" }); + await flush(); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + + // 应该有流式分片发送 + assert.ok(streamCalls.length >= 2, `应至少有 2 次流式调用,实际 ${streamCalls.length}`); + // 最后一次应该是 DONE (inputState=10) + const last = streamCalls[streamCalls.length - 1]; + assert.strictEqual(last.inputState, 10, "最后一次应为 DONE"); + assert.ok(last.content.includes("你好世界"), `最终文本应包含完整内容,实际: "${last.content}"`); + // 不应有媒体上传 + assert.strictEqual(mediaUploadCalls.length, 0, "不应有媒体上传"); +}); + +await test("纯文本: 空文本被忽略", async () => { + const ctrl = createController(); + + await ctrl.onPartialReply({ text: "" }); + await ctrl.onPartialReply({ text: undefined }); + await flush(); + + assert.strictEqual(streamCalls.length, 0, "不应有流式调用"); +}); + +await test("纯文本: 全空白不启动流式,后续非空白一起发送", async () => { + const ctrl = createController(); + + // 先来一段全空白内容 — 不应启动流式 + await ctrl.onPartialReply({ text: "\n\n " }); + await flush(); + assert.strictEqual(streamCalls.length, 0, "全空白阶段不应有流式调用"); + + // 后续有非空白内容到达 — 应启动流式,且包含之前的空白 + await ctrl.onPartialReply({ text: "\n\n hello world" }); + await flush(200); + + assert.ok(streamCalls.length >= 1, `应有至少 1 次流式调用,实际 ${streamCalls.length}`); + // 首次发送的内容应包含之前保留的空白 + 新内容 + const firstContent = streamCalls[0].content; + assert.ok(firstContent.includes("hello world"), `首次发送应包含 "hello world",实际: "${firstContent}"`); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(100); +}); + +await test("纯文本: 全空白 + 媒体标签,空白不发送,媒体正常处理", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 5; + mediaApiDelayMs = 5; + + // 全空白前缀 + 媒体标签一起到达 + await ctrl.onPartialReply({ text: "\n\n/tmp/pic.jpg描述文字" }); + await flush(400); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(200); + + // 验证:日志中检测到了媒体标签 + const foundLogs = logs.filter((l) => l.includes("processMediaTags: found")); + assert.ok(foundLogs.length >= 1, `应检测到 qqimg 标签,实际 ${foundLogs.length} 条`); + + // 验证:不应有 PREFIX MISMATCH 错误 + const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH")); + assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH,实际: ${prefixMismatchLogs.join("; ")}`); + + // 验证:如果有流式分片发送,内容不应是纯空白 + const generatingCalls = streamCalls.filter((c) => c.inputState === 1); + for (const call of generatingCalls) { + assert.ok(call.content.trim().length > 0, `流式分片不应为纯空白: "${call.content}"`); + } +}); + +await test("纯文本: 空白→媒体→空白→媒体→空白,只有媒体发出,空白全忽略", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 5; + mediaApiDelayMs = 5; + + // 逐步送入:空白 → 媒体标签1 → 空白 → 媒体标签2 → 空白 + const fullText = + "\n \n" + + "/tmp/pic1.jpg" + + "\n\n \n" + + "/tmp/pic2.jpg" + + " \n\n"; + + // 模拟流式分段到达 + // 阶段 1:纯空白 + await ctrl.onPartialReply({ text: "\n \n" }); + await flush(200); + assert.strictEqual(streamCalls.length, 0, "纯空白阶段不应有流式调用"); + + // 阶段 2:空白 + 第一个媒体标签 + await ctrl.onPartialReply({ text: "\n \n/tmp/pic1.jpg" }); + await flush(400); + + // 阶段 3:继续加空白 + await ctrl.onPartialReply({ + text: "\n \n/tmp/pic1.jpg\n\n \n", + }); + await flush(200); + + // 阶段 4:第二个媒体标签 + await ctrl.onPartialReply({ + text: "\n \n/tmp/pic1.jpg\n\n \n/tmp/pic2.jpg", + }); + await flush(400); + + // 阶段 5:末尾空白,完成 + await ctrl.onPartialReply({ text: fullText }); + await flush(200); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(300); + + // 验证 1:应检测到 2 个媒体标签 + const foundLogs = logs.filter((l) => l.includes("processMediaTags: found")); + assert.ok(foundLogs.length >= 2, `应检测到至少 2 个 qqimg 标签,实际 ${foundLogs.length} 条`); + + // 验证 2:应有 2 次媒体发送尝试(sendPhoto 可能因文件不存在失败,但 sending 日志应存在) + const sendingLogs = logs.filter((l) => l.includes("sending image")); + assert.ok(sendingLogs.length >= 2, `应有至少 2 次发送图片尝试,实际 ${sendingLogs.length} 次`); + + // 验证 3:不应有 PREFIX MISMATCH 错误 + const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH")); + assert.strictEqual( + prefixMismatchLogs.length, + 0, + `不应有 PREFIX MISMATCH,实际: ${prefixMismatchLogs.join("; ")}`, + ); + + // 验证 4:流式分片(GENERATING 状态)中不应出现纯空白内容 + const generatingCallsInner = streamCalls.filter((c) => c.inputState === 1); + for (const call of generatingCallsInner) { + assert.ok( + call.content.trim().length > 0, + `流式分片不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`, + ); + } + + // 验证 5:如果有流式启动(首次调用,无 stream_msg_id),首次内容也不应纯空白 + const startCalls = streamCalls.filter((c) => !c.streamMsgId); + for (const call of startCalls) { + assert.ok( + call.content.trim().length > 0, + `流式启动内容不应为纯空白: "${call.content.replace(/\n/g, "\\n").replace(/ /g, "·")}"`, + ); + } +}); + +console.log("\n=== 2. 单个媒体标签 ==="); + +await test("媒体标签: 多媒体后跟文本,onIdle 终结不 PREFIX MISMATCH", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 5; + mediaApiDelayMs = 5; + + // 模拟实际场景:两个语音标签 + 后续文本描述,逐步到达 + // 阶段1:第一个语音标签 + await ctrl.onPartialReply({ + text: "/tmp/voice1.mp3", + }); + await flush(400); + + // 阶段2:两个语音标签 + await ctrl.onPartialReply({ + text: "/tmp/voice1.mp3\n/tmp/voice2.mp3", + }); + await flush(400); + + // 阶段3:两个语音标签 + 后续文本 + await ctrl.onPartialReply({ + text: "/tmp/voice1.mp3\n/tmp/voice2.mp3\n两条语音都发给你啦!", + }); + await flush(400); + + // 完成 + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(300); + + // 验证:应检测到 2 个媒体标签 + const foundLogs = logs.filter((l) => l.includes("processMediaTags: found")); + assert.ok(foundLogs.length >= 2, `应检测到至少 2 个标签,实际 ${foundLogs.length} 条`); + + // 核心验证:不应有 PREFIX MISMATCH + const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH")); + assert.strictEqual( + prefixMismatchLogs.length, + 0, + `不应有 PREFIX MISMATCH,实际: ${prefixMismatchLogs.join("; ")}`, + ); + + // 验证:最终流式文本(如果有)应包含后续描述文本 + const doneCalls = streamCalls.filter((c) => c.inputState === 10); + if (doneCalls.length > 0) { + const lastDone = doneCalls[doneCalls.length - 1]; + assert.ok( + lastDone.content.includes("两条语音都发给你啦"), + `终结分片应包含后续文本,实际: "${lastDone.content.slice(0, 80)}"`, + ); + // 终结文本不应包含原始媒体标签 + assert.ok( + !lastDone.content.includes(""), + `终结文本不应包含 标签,实际: "${lastDone.content.slice(0, 80)}"`, + ); + } +}); + +await test("媒体标签: 文本 + 图片 + 后续文本", async () => { + const ctrl = createController({ mediaContext: true }); + + // 一次性送入完整的含图片标签文本 + await ctrl.onPartialReply({ text: "看图:/tmp/cat.jpg" }); + await flush(300); + + // 后续文本到达 + await ctrl.onPartialReply({ text: "看图:/tmp/cat.jpg\n\n好看吧?" }); + await flush(300); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + + // 验证:应有媒体上传调用(图片需要先上传再发送) + // 或者至少流式中出现过图片相关处理 + // 关键验证:最终流式文本不应包含原始 标签 + const lastStream = streamCalls[streamCalls.length - 1]; + assert.ok(!lastStream.content.includes(""), "最终文本不应包含 标签"); +}); + +await test("媒体标签: 纯媒体开头(无前置文本)", async () => { + const ctrl = createController({ mediaContext: true }); + + await ctrl.onPartialReply({ text: "/tmp/hello.mp3" }); + await flush(300); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(200); + + // 验证:应该至少有流式会话创建或媒体处理的记录 + const infoLogs = logs.filter((l) => l.includes("processMediaTags") && l.includes("found")); + assert.ok(infoLogs.length >= 1, `应检测到 qqvoice 标签。相关日志: ${infoLogs.join("; ") || "无"}`); +}); + +console.log("\n=== 3. 多个媒体标签(循环消费) ==="); + +await test("多媒体: 两个图片标签被逐个处理", async () => { + const ctrl = createController({ mediaContext: true }); + + const text = "图1:/tmp/a.jpg\n图2:/tmp/b.jpg\n完毕"; + + await ctrl.onPartialReply({ text }); + await flush(600); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(200); + + // 验证:日志中应显示找到了两个标签 + const foundLogs = logs.filter((l) => l.includes("processMediaTags: found")); + assert.ok(foundLogs.length >= 2, `应找到至少 2 个标签,实际 ${foundLogs.length} 条 found 日志`); +}); + +console.log("\n=== 4. 未闭合标签等待 ==="); + +await test("未闭合标签: 逐步到达后完整处理", async () => { + const ctrl = createController({ mediaContext: true }); + + // 不完整的标签 + await ctrl.onPartialReply({ text: "开始/tmp/pic" }); + await flush(200); + + // 验证:此时流式文本应该只包含 "开始",不含标签部分 + const midCalls = [...streamCalls]; + for (const call of midCalls) { + assert.ok(!call.content.includes(""), `中间态不应包含未闭合标签,内容: "${call.content}"`); + } + + // 标签完整了 + await ctrl.onPartialReply({ text: "开始/tmp/pic.jpg\n看看" }); + await flush(300); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(200); + + // 验证:应检测到完整标签 + const foundLogs = logs.filter((l) => l.includes("processMediaTags: found")); + assert.ok(foundLogs.length >= 1, `标签完整后应被检测到,found 日志: ${foundLogs.length}`); +}); + +console.log("\n=== 5. ★ pendingNormalizedFull 补救机制 ==="); + +await test("补救: 媒体处理期间最后一次 onPartialReply 不丢失", async () => { + const ctrl = createController({ mediaContext: true }); + + // 设置 API 有延迟,模拟 processMediaTags 执行耗时 + apiDelayMs = 50; + mediaApiDelayMs = 80; + + // 第1次: 含媒体标签 → 进入 processMediaTags,mediaInterruptInProgress=true + const p1 = ctrl.onPartialReply({ text: "hi/tmp/x.jpg" }); + + // 等一小段确保 processMediaTags 已经开始 + await sleep(20); + + // 第2次: 这是"最后一次" onPartialReply —— 带有新的后续文本 + // 因为 mediaInterruptInProgress=true,会被保存到 pendingNormalizedFull + const p2 = ctrl.onPartialReply({ text: "hi/tmp/x.jpg\n\n再见朋友" }); + + // 等待所有处理完成(包括 deferred re-run) + await p1; + await p2; + await flush(800); + + // 标记完成 + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(300); + + // ★ 核心验证:最终发送的文本应包含 "再见朋友" + const allStreamContent = streamCalls.map((c) => c.content).join(" || "); + assert.ok( + allStreamContent.includes("再见朋友"), + `"再见朋友" 应出现在流式发送中(pendingNormalizedFull 补救)。\n实际流式内容: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]` + ); + + // 验证:deferred 日志应出现 + const deferredLogs = logs.filter((l) => l.includes("deferred")); + assert.ok(deferredLogs.length >= 1, `应有 deferred 相关日志,实际: ${deferredLogs.length}`); +}); + +await test("补救: 多次被跳过只保留最新,最终处理最新文本", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 30; + mediaApiDelayMs = 150; // 媒体处理很慢 + + // 第1次: 含媒体 → 进入长时间处理 + const p1 = ctrl.onPartialReply({ text: "/tmp/song.mp3" }); + await sleep(20); + + // 第2次: 被跳过 → pendingNormalizedFull = "..v1.." + await ctrl.onPartialReply({ text: "/tmp/song.mp3\n后续文字v1" }); + await sleep(10); + + // 第3次: 被跳过 → pendingNormalizedFull 被覆盖为 "..v2.." + await ctrl.onPartialReply({ text: "/tmp/song.mp3\n后续文字v2最终版" }); + + await p1; + await flush(800); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(300); + + // ★ 核心验证:最终应包含 v2 的文本 + const allStreamContent = streamCalls.map((c) => c.content).join(" || "); + assert.ok( + allStreamContent.includes("后续文字v2最终版"), + `应包含最新的 "后续文字v2最终版"。\n实际: [\n${streamCalls.map((c, i) => ` ${i}: "${c.content.slice(0, 80)}" (state=${c.inputState})`).join("\n")}\n]` + ); +}); + +await test("补救: 无 pending 时不触发多余的 re-run", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 5; + mediaApiDelayMs = 5; + + // 只有一次 onPartialReply(媒体标签后接 \n 开头的文本) + await ctrl.onPartialReply({ text: "hello/tmp/a.jpg\nbye" }); + await flush(400); + + ctrl.markFullyComplete(); + await ctrl.onIdle(); + await flush(200); + + // 验证:不应有 re-running 日志(因为没有被跳过的调用) + const reRunLogs = logs.filter((l) => l.includes("re-running")); + assert.strictEqual(reRunLogs.length, 0, `不应有 re-running 日志,实际: ${reRunLogs.length}`); + + // ★ 验证:不应有 PREFIX MISMATCH 错误(之前 stripMediaTags 的 .trim() 会导致此问题) + const prefixMismatchLogs = logs.filter((l) => l.includes("PREFIX MISMATCH")); + assert.strictEqual(prefixMismatchLogs.length, 0, `不应有 PREFIX MISMATCH,实际: ${prefixMismatchLogs.join("; ")}`); +}); + +console.log("\n=== 6. onIdle 边界 ==="); + +await test("onIdle: 等待媒体处理完成后再终结", async () => { + const ctrl = createController({ mediaContext: true }); + + apiDelayMs = 20; + mediaApiDelayMs = 200; // 媒体发送很慢 + + // 发送含媒体的文本 + const p = ctrl.onPartialReply({ text: "/tmp/slow.jpg\n完成" }); + await sleep(30); + + // 在媒体还在处理时就标记完成并触发 onIdle + ctrl.markFullyComplete(); + const idlePromise = ctrl.onIdle(); + + await p; + await idlePromise; + await flush(500); + + // 验证:应该正常完成,不降级 + assert.ok(!ctrl.shouldFallbackToStatic, "不应降级到静态发送"); +}); + +await test("onDeliver: deliver 先到达 → 禁用流式走降级", async () => { + const ctrl = createController({ mediaContext: true }); + + // deliver 先到达(此时 sentStreamChunkCount === 0)→ 直接 transition 到 aborted + await ctrl.onDeliver({ text: "成果:/tmp/result.png" }); + await flush(400); + + // 验证:应该已经进入终态(aborted),走降级路径 + assert.ok(ctrl.isTerminalPhase, "deliver 先到达后应进入终态"); + assert.ok(ctrl.shouldFallbackToStatic, "deliver 先到达时应降级到静态发送"); + + // 后续 onPartialReply 应被跳过(因为已是终态) + await ctrl.onPartialReply({ text: "这段应该被忽略" }); + assert.ok(ctrl.shouldFallbackToStatic, "onPartialReply 应被跳过,仍然是降级状态"); +}); + +await test("互斥: onPartialReply 先到 → onDeliver 被忽略(即使在媒体中断期间)", async () => { + const ctrl = createController({ mediaContext: true }); + + // onPartialReply 先到 → 锁定为 partial 模式 + await ctrl.onPartialReply({ text: "/tmp/a.mp3" }); + await flush(400); + + // 此时可能还在 mediaInterruptInProgress,sentStreamChunkCount 可能为 0 + // 但 onDeliver 应被忽略(因为 partial 先到) + await ctrl.onDeliver({ text: "/tmp/a.mp3" }); + await flush(400); + + // 验证:不应降级(deliver 没有生效) + assert.ok(!ctrl.shouldFallbackToStatic, "partial 先到时 deliver 不应导致降级"); + assert.ok(!ctrl.isTerminalPhase || ctrl.currentPhase !== "aborted", + "不应因为 deliver 进入 aborted(onPartialReply 在处理中)"); + + // 日志中应有 deliver 被拒绝的字样 + const skipLogs = logs.filter((l) => l.includes('rejected "deliver"')); + assert.ok(skipLogs.length >= 1, `应有 deliver 被拒绝的日志,实际: ${skipLogs.join("; ") || "无"}`); +}); + +console.log("\n=== 7. 降级与异常 ==="); + +await test("降级: 从未发送分片 → fallback", async () => { + const ctrl = createController(); + + // 不发送任何文本就直接结束 + ctrl.markFullyComplete(); + await ctrl.onIdle(); + + assert.ok(ctrl.isTerminalPhase, "应进入终态"); +}); + +await test("异常: onError 后正常终态", async () => { + const ctrl = createController(); + + await ctrl.onPartialReply({ text: "部分文本" }); + await flush(200); + + await ctrl.onError(new Error("test error")); + + assert.ok(ctrl.isTerminalPhase, "onError 后应进入终态"); +}); + +console.log("\n=== 8. 回复边界检测 ==="); + +await test("回复边界: 文本缩短 → 旧controller终结,新controller处理新回复", async () => { + let newCtrl: StreamingControllerType | null = null; + + const ctrl = createController({ + onReplyBoundary: async (newText: string) => { + // 回调中创建新 controller 并处理新回复 + newCtrl = createController(); + await newCtrl.onPartialReply({ text: newText }); + }, + }); + + // 第一段回复 + await ctrl.onPartialReply({ text: "第一段回复内容比较长" }); + await flush(); + + // 记录第一段相关的 streamCalls 数量 + const firstSegCalls = streamCalls.length; + assert.ok(firstSegCalls > 0, "第一段应已产生流式调用"); + + // 文本缩短 → 触发回复边界 + await ctrl.onPartialReply({ text: "短" }); + await flush(); + + // 旧 controller 应已进入终态 + assert.ok(ctrl.isTerminalPhase, "旧 controller 应已进入终态"); + // 新 controller 应已创建 + assert.ok(newCtrl !== null, "应通过回调创建了新 controller"); + + // 第一段应有 DONE 分片(终结) + const doneCalls = streamCalls.filter((c) => c.inputState === 10); + assert.ok(doneCalls.length >= 1, "旧 controller 应发送了 DONE 分片终结第一段"); + + // 验证第一段的 DONE 分片包含第一段内容 + const firstDone = doneCalls[0]; + assert.ok(firstDone.content.includes("第一段回复内容比较长"), `第一段 DONE 分片应包含 "第一段回复内容比较长", 实际: "${firstDone.content}"`); + + // 继续第二段回复增长 + await newCtrl!.onPartialReply({ text: "短回复完整" }); + await flush(); + + newCtrl!.markFullyComplete(); + await newCtrl!.onIdle(); + await flush(); + + // 验证新 controller 的流式调用包含第二段内容 + // 新 controller 的调用在 firstSegCalls 之后(因为 streamCalls 是全局的,但 DONE 会增加一些) + const allContent = streamCalls.map((c) => c.content).join(" || "); + assert.ok(allContent.includes("短回复完整"), `应包含第二段 "短回复完整",实际: ${allContent}`); + + // 两段内容是独立的流式消息,不应混在一起 + const lastCall = streamCalls[streamCalls.length - 1]; + assert.ok(!lastCall.content.includes("第一段回复内容比较长"), "第二段最终分片不应包含第一段内容(各自独立)"); +}); + +// ============ 结果 ============ + +console.log(`\n========================================`); +console.log(` 总计: ${passed + failed} | ✅ 通过: ${passed} | ❌ 失败: ${failed}`); +if (failedTests.length > 0) { + console.log(` 失败用例:`); + for (const t of failedTests) console.log(` - ${t}`); +} +console.log(`========================================\n`); + +// 恢复原始 fetch +globalThis.fetch = originalFetch; + +process.exit(failed > 0 ? 1 : 0); diff --git a/tests/strip-incomplete-media-tag.test.ts b/tests/strip-incomplete-media-tag.test.ts new file mode 100644 index 0000000..cafe8d2 --- /dev/null +++ b/tests/strip-incomplete-media-tag.test.ts @@ -0,0 +1,843 @@ +/** + * stripIncompleteMediaTag 单元测试 + * + * 运行方式: npx tsx tests/strip-incomplete-media-tag.test.ts + */ + +import { stripIncompleteMediaTag } from "../src/utils/media-send.js"; +import assert from "node:assert"; + +let passed = 0; +let failed = 0; +const failedTests: string[] = []; + +function test(name: string, input: string, expectedSafe: string, expectedIncomplete: boolean) { + const [safe, incomplete] = stripIncompleteMediaTag(input); + try { + assert.strictEqual(safe, expectedSafe, `safeText mismatch`); + assert.strictEqual(incomplete, expectedIncomplete, `hasIncomplete mismatch`); + console.log(` ✅ ${name}`); + passed++; + } catch (e: any) { + console.log(` ❌ ${name}`); + console.log(` 输入: ${JSON.stringify(input)}`); + console.log(` 期望: [${JSON.stringify(expectedSafe)}, ${expectedIncomplete}]`); + console.log(` 实际: [${JSON.stringify(safe)}, ${incomplete}]`); + failed++; + failedTests.push(name); + } +} + +// ========================================== +// 1. 空值 / 无标签文本 → 不截断 +// ========================================== +console.log("\n=== 1. 空值 / 无标签文本 ==="); + +test("空字符串", "", "", false); +test("纯文本", "这是一段普通文字", "这是一段普通文字", false); +test("含普通HTML标签", "这是加粗文字", "这是加粗文字", false); +test("含换行的纯文本", "第一行\n第二行\n第三行", "第一行\n第二行\n第三行", false); +test("只有换行", "\n\n\n", "\n\n\n", false); +test("以换行结尾(最后一行为空)", "text\n", "text\n", false); +test("多行以换行结尾", "abc\ndef\n", "abc\ndef\n", false); +test("纯空白", " ", " ", false); +test("< 后接空格(非标签)", "正文< ", "正文< ", false); +test("< 后接数字(如 < 3)", "条件 < 3 成立", "条件 < 3 成立", false); +test("数学公式 3 < 5 > 2", "计算结果: 3 < 5 > 2,完毕", "计算结果: 3 < 5 > 2,完毕", false); +test("文本末尾恰好是 >", "3 < 5 > 2 结果为 true>", "3 < 5 > 2 结果为 true>", false); + +// ========================================== +// 2. 完全闭合的媒体标签 → 不截断 +// ========================================== +console.log("\n=== 2. 完整闭合标签(不应截断)==="); + +test( + "完整 qqvoice 标签", + "前文/tmp/joke.mp3后文", + "前文/tmp/joke.mp3后文", + false, +); +test( + "完整 qqimg 标签", + "看图/path/to/img.jpg", + "看图/path/to/img.jpg", + false, +); +test( + "完整标签后有文本", + "/tmp/joke.mp3\n\n谐音梗笑话 😄", + "/tmp/joke.mp3\n\n谐音梗笑话 😄", + false, +); +test( + "纯标签完整闭合", + "/tmp/joke.mp3", + "/tmp/joke.mp3", + false, +); +test( + "完整标签在末尾", + "这是正文/tmp/joke.mp3", + "这是正文/tmp/joke.mp3", + false, +); +test( + "完整标签后有普通 < 字符", + "/a.mp3 结果是 3 < 5", + "/a.mp3 结果是 3 < 5", + false, +); +test( + "完整标签后跟 (无 >)— 非媒体不影响", + "正文/a.mp3后文/a.mp3后文(有 >)— 非媒体不影响", + "正文/a.mp3后文", + "正文/a.mp3后文", + false, +); +test( + "标签内容含 > 且完整闭合", + "前文/tmp/笑话>_< .mp3", + "前文/tmp/笑话>_< .mp3", + false, +); +test( + ">_ 完整闭合(内容含 >_<)", + "前文/tmp/笑话>_", + "前文/tmp/笑话>_", + false, +); + +// ========================================== +// 3. 不完整的「开标签」→ 截断到该 < 前面 +// ========================================== +console.log("\n=== 3. 不完整的开标签 ==="); + +test("孤立 < 在行尾", "这是正文<", "这是正文", true); +test("(有 > 但无闭合标签)", "这是正文", "这是正文", true); +test("/path", "这是正文/path/to", "这是正文", true); +test("/path/full(开标签有 > 但无闭合)", "这是正文/path/to/img.jpg", "这是正文", true); +test("纯开标签前缀(无前文)", "/tmp/joke.mp3", "", true); +test(" 但无闭合标签", "前文/path/to/file.mp3", "前文", true); + +// ========================================== +// 4. 不完整的「闭合标签」→ 回溯到开标签前面截断 +// ★ 这是核心原则的关键场景 +// ========================================== +console.log("\n=== 4. 不完整闭合标签(回溯到开标签)==="); + +test( + "< — 闭合标签刚开始", + "这是正文/tmp/joke.mp3<", + "这是正文", + true, +); +test( + "/tmp/joke.mp3/tmp/joke.mp3", + "这是正文/tmp/joke.mp3/path/to/img.jpg/tmp/a.mp3/tmp/joke.mp3 字符 + 闭合未完成", + "前文/tmp/笑话>_< .mp3__<)", + "前文/tmp/笑话>_/b.jpg", + "第一行/b.jpg", + false, +); +test( + "前面行安全,最后一行未闭合", + "第一行完整\n第二行/b.jpg", + "第一行完整\n第二行", + true, +); +test( + "多行 + 最后一行是闭合标签前缀", + "第一行\n前文/a.mp3stuff\n", + "第一行\nstuff\n", + false, +); +test( + "多行 + 最后一行有闭合标签回溯到开标签", + "行一\n行二\n看看/path/to/img.jpg/b.jpg", + "第一行/b.jpg", + "第一行/a.mp3", + "第一行内容<\n最后一行", + true, +); +test( + "前面行有孤立 /path/a.mp3", + "第一行/path/a.mp3然后/b.mp3", + "/a.mp3然后\n最后/a.mp3", + "第1行\n最后", + true, +); +test( + "前面行有媒体前缀标签 /a.mp3", + "第一行内容\n加粗\n最后一行/b.jpg", + "第一行
内容
\n加粗\n最后一行", + true, +); +test( + "前面行有完整媒体标签 + 最后一行安全 → 全部保留", + "/a.mp3\n最后一行安全文字", + "/a.mp3\n最后一行安全文字", + false, +); + +// ========================================== +// 7. 多个标签场景(同一行) +// ========================================== +console.log("\n=== 7. 多标签场景(同一行)==="); + +test( + "完整标签后跟不完整开标签", + "前文/a.mp3中间/b.jpg", + "前文/a.mp3中间", + true, +); +test( + "完整标签后跟 <", + "前文/a.mp3后续<", + "前文/a.mp3后续", + true, +); +test( + "完整标签 + 另一个标签的闭合前缀(截到第二个的开标签前)", + "前文/a.mp3中间/b.jpg/a.mp3
中间", + true, +); +test( + "完整标签 + 第二个闭合缺>", + "前文/a.mp3中间/b.mp3/a.mp3中间", + true, +); +test( + "完整标签后跟 /a.mp3
中间/b.jpg/a.mp3中间", + true, +); +test( + "完整标签 + 普通HTML + 未闭合媒体标签", + "/a.mp3普通文本加粗/b.jpg/a.mp3普通文本加粗", + true, +); +test( + "两个完整标签紧挨", + "/a.mp3/b.jpg", + "/a.mp3/b.jpg", + false, +); +test( + "两个同名完整标签中间有文字", + "/a.mp3中间文字/b.mp3", + "/a.mp3中间文字/b.mp3", + false, +); +test( + "两个同名完整标签 + 末尾未闭合", + "/a.mp3中间文字/b.mp3后面/c.jpg", + "/a.mp3中间文字/b.mp3后面", + true, +); +test( + "第一个未闭合 + 第二个完整(从右到左,先看到完整的闭合标签→安全?不!)", + // 从右到左:先找到 完整闭合 → 对应哪个开标签? + // 实际上从右到左找到的第一个媒体 < 是 → 完整闭合 → 返回安全 + // 但第一个 其实没有闭合! + // 这是一个设计取舍:由于我们只从右到左找第一个媒体标签,不做全行配对 + // 如果最右边的标签对完整,就认为安全 + "/a.mp3中间文字/b.mp3", + // 从右到左: 完整 → safe + // 注意:这里第一个 没有 > 所以不算完整开标签 + // 从右到左找:先 完整闭合 → 安全返回 + // 但实际上 /a.mp3 是个没有闭合的开标签 + // 这个场景可能返回 safe 也可能返回 unsafe,取决于实现 + // 实际代码从右到左找,第一个遇到的媒体 < 是 (行尾),完整 → safe=false + // 但这是有 bug 的... 让我先测试看实际行为 + "/a.mp3中间文字/b.mp3", + false, +); + +// ========================================== +// 8. 全角中文尖括号 <> +// ========================================== +console.log("\n=== 8. 中文全角尖括号 ==="); + +test("中文尖括号开标签", "这是正文<qqvoice", "这是正文", true); +test("孤立中文尖括号", "这是正文<", "这是正文", true); +test( + "中文尖括号完整标签", + "这是正文<qqvoice>/tmp/joke.mp3", + // <qqvoice> 是开标签(用 < 开头,> 结尾),检查后面有没有 + "这是正文<qqvoice>/tmp/joke.mp3", + false, +); +test( + "中文尖括号闭合标签前缀", + "这是正文<qqvoice>/tmp/joke.mp3</qqvoice", + // </qqvoice 是不完整闭合标签 → needFindOpenTag=qqvoice → 往左找 <qqvoice → 截断 + "这是正文", + true, +); + +// ========================================== +// 9. 各种媒体标签名测试 +// ========================================== +console.log("\n=== 9. 各种媒体标签名 ==="); + +test("img 标签未闭合", "正文content", "正文", true); +test("image 标签未闭合", "正文content", "正文", true); +test("video 标签未闭合", "正文