mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-04-30 14:02:56 +08:00
matrix: stream tool progress in previews
This commit is contained in:
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Changes
|
||||
|
||||
- Matrix/streaming: stream tool-progress updates into live Matrix preview edits by default when preview streaming is active, with `streaming.preview.toolProgress: false` to keep answer previews while hiding interim tool lines. Thanks @gumadeiras.
|
||||
- Plugins/models: wire manifest `modelCatalog.aliases` and `modelCatalog.suppressions` into model-catalog planning and built-in model suppression, with OpenAI stale Spark suppression now declared in the plugin manifest before runtime fallback. Thanks @shakkernerd.
|
||||
- Channels/Yuanbao: register the Tencent Yuanbao external channel plugin (`openclaw-plugin-yuanbao`) in the official channel catalog, contract suites, and community plugin docs, with a new `docs/channels/yuanbao.md` quick-start guide for WebSocket bot DMs and group chats. (#72756) Thanks @loongfay.
|
||||
- Channels/QQBot: add full group chat support (history tracking, @-mention gating, activation modes, per-group config, FIFO message queue with deliver debounce), C2C `stream_messages` streaming with a `StreamingController` lifecycle manager, unified `sendMedia` with chunked upload for large files, and refactor the engine into pipeline stages, focused outbound submodules, builtin slash-command modules, and explicit DI ports via `createEngineAdapters()`. (#70624) Thanks @cxyhhhhh.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
51586a795b6e1b821b3ccb2ef8e92e69ba4ef65fea254738b5a0b7b380d91bd1 config-baseline.json
|
||||
5e437b340f321aafa12697df2b77a2f13b33069042784fd49de4273ebacf46be config-baseline.json
|
||||
7dcb21e47ddd5de98e2af1ecbc41e11ac0c5742819c359e6d851fbc39c0226e9 config-baseline.core.json
|
||||
07963db49502132f26db396c56b36e018b110e6c55a68b3cb012d3ec96f43901 config-baseline.channel.json
|
||||
13d038300d90d4dd064aa2ac79def867799d1be403cf9d3e81dfad35ef459a21 config-baseline.plugin.json
|
||||
c4f07c228d4f07e7afafa5b600b4a80f5b26aaed7267c7287a64d04a527be8e8 config-baseline.channel.json
|
||||
10400fb5b294fe3f2e97ab69327c75308bb2bf014b399ec0e2d59b8cd9d16ff1 config-baseline.plugin.json
|
||||
|
||||
@@ -189,6 +189,24 @@ Matrix reply streaming is opt-in. `streaming` controls how OpenClaw delivers the
|
||||
}
|
||||
```
|
||||
|
||||
To keep live answer previews but hide interim tool/progress lines, use object
|
||||
form:
|
||||
|
||||
```json5
|
||||
{
|
||||
channels: {
|
||||
matrix: {
|
||||
streaming: {
|
||||
mode: "partial",
|
||||
preview: {
|
||||
toolProgress: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
| `streaming` | Behavior |
|
||||
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
| `"off"` (default) | Wait for the full reply, send once. `true` ↔ `"partial"`, `false` ↔ `"off"`. |
|
||||
@@ -206,6 +224,7 @@ Notes:
|
||||
|
||||
- If a preview grows past Matrix's per-event size limit, OpenClaw stops preview streaming and falls back to final-only delivery.
|
||||
- Media replies always send attachments normally. If a stale preview can no longer be reused safely, OpenClaw redacts it before sending the final media reply.
|
||||
- Tool-progress preview updates are enabled by default when Matrix preview streaming is active. Set `streaming.preview.toolProgress: false` to keep preview edits for answer text but leave tool progress on the normal delivery path.
|
||||
- Preview edits cost extra Matrix API calls. Leave `streaming: "off"` if you want the most conservative rate-limit profile.
|
||||
|
||||
## Approval metadata
|
||||
@@ -850,7 +869,7 @@ Allowlist-style fields (`groupAllowFrom`, `dm.allowFrom`, `groups.<room>.users`)
|
||||
- `replyToMode`: `"off"`, `"first"`, `"all"`, or `"batched"`.
|
||||
- `threadReplies`: `"off"`, `"inbound"`, or `"always"`.
|
||||
- `threadBindings`: per-channel overrides for thread-bound session routing and lifecycle.
|
||||
- `streaming`: `"off"` (default), `"partial"`, `"quiet"`. `true` ↔ `"partial"`, `false` ↔ `"off"`.
|
||||
- `streaming`: `"off"` (default), `"partial"`, `"quiet"`, or object form `{ mode, preview: { toolProgress } }`. `true` ↔ `"partial"`, `false` ↔ `"off"`.
|
||||
- `blockStreaming`: when `true`, completed assistant blocks are kept as separate progress messages.
|
||||
- `markdown`: optional Markdown rendering config for outbound text.
|
||||
- `responsePrefix`: optional string prepended to outbound replies.
|
||||
|
||||
@@ -188,7 +188,7 @@ Preview streaming can also include **tool-progress** updates — short status li
|
||||
|
||||
Supported surfaces:
|
||||
|
||||
- **Discord**, **Slack**, and **Telegram** stream tool-progress into the live preview edit by default when preview streaming is active.
|
||||
- **Discord**, **Slack**, **Telegram**, and **Matrix** stream tool-progress into the live preview edit by default when preview streaming is active.
|
||||
- Telegram has shipped with tool-progress preview updates enabled since `v2026.4.22`; keeping them enabled preserves that released behavior.
|
||||
- **Mattermost** already folds tool activity into its single draft preview post (see above).
|
||||
- Tool-progress edits follow the active preview streaming mode; they are skipped when preview streaming is `off` or when block streaming has taken over the message.
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { matrixApprovalNativeRuntime } from "./approval-handler.runtime.js";
|
||||
|
||||
type MatrixDeliverPendingParams = Parameters<
|
||||
typeof matrixApprovalNativeRuntime.transport.deliverPending
|
||||
>[0];
|
||||
|
||||
function buildMatrixApprovalRoomTarget(
|
||||
roomId: string,
|
||||
): MatrixDeliverPendingParams["plannedTarget"] {
|
||||
return {
|
||||
surface: "approver-dm",
|
||||
target: {
|
||||
to: `room:${roomId}`,
|
||||
},
|
||||
reason: "preferred",
|
||||
};
|
||||
}
|
||||
|
||||
describe("matrixApprovalNativeRuntime", () => {
|
||||
it("sends versioned Matrix approval content with pending exec approvals", async () => {
|
||||
const sendSingleTextMessage = vi.fn().mockResolvedValue({
|
||||
@@ -10,6 +26,35 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
roomId: "!room:example.org",
|
||||
});
|
||||
const reactMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const view = {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-1",
|
||||
phase: "pending",
|
||||
title: "Exec Approval Required",
|
||||
description: "A command needs your approval.",
|
||||
metadata: [],
|
||||
ask: "on-request",
|
||||
agentId: "agent-1",
|
||||
commandText: "echo hi",
|
||||
commandPreview: "echo hi",
|
||||
cwd: "/repo",
|
||||
host: "gateway",
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
style: "success",
|
||||
command: "/approve req-1 allow-once",
|
||||
},
|
||||
{
|
||||
decision: "deny",
|
||||
label: "Deny",
|
||||
style: "danger",
|
||||
command: "/approve req-1 deny",
|
||||
},
|
||||
],
|
||||
expiresAtMs: 1_000,
|
||||
} satisfies MatrixDeliverPendingParams["view"];
|
||||
const pendingPayload = await matrixApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
accountId: "default",
|
||||
@@ -27,35 +72,7 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
},
|
||||
approvalKind: "exec",
|
||||
nowMs: 100,
|
||||
view: {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-1",
|
||||
phase: "pending",
|
||||
title: "Exec Approval Required",
|
||||
description: "A command needs your approval.",
|
||||
metadata: [],
|
||||
ask: "on-request",
|
||||
agentId: "agent-1",
|
||||
commandText: "echo hi",
|
||||
commandPreview: "echo hi",
|
||||
cwd: "/repo",
|
||||
host: "gateway",
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
style: "success",
|
||||
command: "/approve req-1 allow-once",
|
||||
},
|
||||
{
|
||||
decision: "deny",
|
||||
label: "Deny",
|
||||
style: "danger",
|
||||
command: "/approve req-1 deny",
|
||||
},
|
||||
],
|
||||
expiresAtMs: 1_000,
|
||||
} as never,
|
||||
view,
|
||||
});
|
||||
|
||||
await matrixApprovalNativeRuntime.transport.deliverPending({
|
||||
@@ -70,16 +87,12 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
},
|
||||
request: {} as never,
|
||||
approvalKind: "exec",
|
||||
plannedTarget: {
|
||||
surface: "approver-dm",
|
||||
target: { to: "room:!room:example.org" },
|
||||
reason: "preferred",
|
||||
} as never,
|
||||
plannedTarget: buildMatrixApprovalRoomTarget("!room:example.org"),
|
||||
preparedTarget: {
|
||||
to: "room:!room:example.org",
|
||||
roomId: "!room:example.org",
|
||||
},
|
||||
view: {} as never,
|
||||
view,
|
||||
pendingPayload,
|
||||
});
|
||||
|
||||
@@ -175,6 +188,24 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
roomId: "!room:example.org",
|
||||
});
|
||||
const reactMessage = vi.fn().mockResolvedValue(undefined);
|
||||
const view = {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-1",
|
||||
phase: "pending",
|
||||
title: "Exec Approval Required",
|
||||
description: "A command needs your approval.",
|
||||
metadata: [],
|
||||
commandText: "echo hi",
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
style: "success",
|
||||
command: "/approve req-1 allow-once",
|
||||
},
|
||||
],
|
||||
expiresAtMs: 1_000,
|
||||
} satisfies MatrixDeliverPendingParams["view"];
|
||||
const pendingPayload = await matrixApprovalNativeRuntime.presentation.buildPendingPayload({
|
||||
cfg: {} as never,
|
||||
accountId: "default",
|
||||
@@ -189,24 +220,7 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
},
|
||||
approvalKind: "exec",
|
||||
nowMs: 100,
|
||||
view: {
|
||||
approvalKind: "exec",
|
||||
approvalId: "req-1",
|
||||
phase: "pending",
|
||||
title: "Exec Approval Required",
|
||||
description: "A command needs your approval.",
|
||||
metadata: [],
|
||||
commandText: "echo hi",
|
||||
actions: [
|
||||
{
|
||||
decision: "allow-once",
|
||||
label: "Allow Once",
|
||||
style: "success",
|
||||
command: "/approve req-1 allow-once",
|
||||
},
|
||||
],
|
||||
expiresAtMs: 1_000,
|
||||
} as never,
|
||||
view,
|
||||
});
|
||||
|
||||
const entry = await matrixApprovalNativeRuntime.transport.deliverPending({
|
||||
@@ -222,16 +236,12 @@ describe("matrixApprovalNativeRuntime", () => {
|
||||
},
|
||||
request: {} as never,
|
||||
approvalKind: "exec",
|
||||
plannedTarget: {
|
||||
surface: "approver-dm",
|
||||
target: { to: "room:!room:example.org" },
|
||||
reason: "preferred",
|
||||
} as never,
|
||||
plannedTarget: buildMatrixApprovalRoomTarget("!room:example.org"),
|
||||
preparedTarget: {
|
||||
to: "room:!room:example.org",
|
||||
roomId: "!room:example.org",
|
||||
},
|
||||
view: {} as never,
|
||||
view,
|
||||
pendingPayload,
|
||||
});
|
||||
|
||||
|
||||
@@ -87,4 +87,18 @@ describe("MatrixConfigSchema SecretInput", () => {
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts Matrix streaming preview tool progress config", () => {
|
||||
const result = MatrixConfigSchema.safeParse({
|
||||
homeserver: "https://matrix.example.org",
|
||||
accessToken: "token",
|
||||
streaming: {
|
||||
mode: "partial",
|
||||
preview: {
|
||||
toolProgress: false,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(result.success).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -62,6 +62,18 @@ const matrixNetworkSchema = z
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
const matrixStreamingSchema = z
|
||||
.object({
|
||||
mode: z.enum(["partial", "quiet", "off"]).optional(),
|
||||
preview: z
|
||||
.object({
|
||||
toolProgress: z.boolean().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
})
|
||||
.strict();
|
||||
|
||||
export const MatrixConfigSchema = z.object({
|
||||
name: z.string().optional(),
|
||||
enabled: z.boolean().optional(),
|
||||
@@ -84,7 +96,9 @@ export const MatrixConfigSchema = z.object({
|
||||
groupPolicy: GroupPolicySchema.optional(),
|
||||
contextVisibility: ContextVisibilityModeSchema.optional(),
|
||||
blockStreaming: z.boolean().optional(),
|
||||
streaming: z.union([z.enum(["partial", "quiet", "off"]), z.boolean()]).optional(),
|
||||
streaming: z
|
||||
.union([z.enum(["partial", "quiet", "off"]), z.boolean(), matrixStreamingSchema])
|
||||
.optional(),
|
||||
replyToMode: z.enum(["off", "first", "all", "batched"]).optional(),
|
||||
threadReplies: z.enum(["off", "inbound", "always"]).optional(),
|
||||
textChunkLimit: z.number().optional(),
|
||||
|
||||
@@ -35,6 +35,7 @@ type MatrixHandlerTestHarnessOptions = {
|
||||
dmThreadReplies?: "off" | "inbound" | "always";
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
streaming?: MatrixStreamingMode;
|
||||
previewToolProgressEnabled?: boolean;
|
||||
blockStreamingEnabled?: boolean;
|
||||
dmEnabled?: boolean;
|
||||
dmPolicy?: "pairing" | "allowlist" | "open" | "disabled";
|
||||
@@ -228,6 +229,7 @@ export function createMatrixHandlerTestHarness(
|
||||
dmThreadReplies: options.dmThreadReplies,
|
||||
dmSessionScope: options.dmSessionScope,
|
||||
streaming: options.streaming ?? "off",
|
||||
previewToolProgressEnabled: options.previewToolProgressEnabled ?? false,
|
||||
blockStreamingEnabled: options.blockStreamingEnabled ?? false,
|
||||
dmEnabled: options.dmEnabled ?? true,
|
||||
dmPolicy: options.dmPolicy ?? "open",
|
||||
|
||||
@@ -1488,6 +1488,7 @@ describe("matrix monitor handler pairing account scope", () => {
|
||||
replyToMode: "off",
|
||||
threadReplies: "inbound",
|
||||
streaming: "off",
|
||||
previewToolProgressEnabled: false,
|
||||
blockStreamingEnabled: false,
|
||||
dmEnabled: true,
|
||||
dmPolicy: "open",
|
||||
@@ -2506,6 +2507,31 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
context?: { assistantMessageIndex?: number },
|
||||
) => Promise<void> | void;
|
||||
onAssistantMessageStart?: () => void;
|
||||
suppressDefaultToolProgressMessages?: boolean;
|
||||
onToolStart?: (payload: { name?: string }) => Promise<void>;
|
||||
onItemEvent?: (payload: {
|
||||
progressText?: string;
|
||||
summary?: string;
|
||||
title?: string;
|
||||
name?: string;
|
||||
}) => Promise<void>;
|
||||
onPlanUpdate?: (payload: {
|
||||
phase: string;
|
||||
explanation?: string;
|
||||
steps?: string[];
|
||||
}) => Promise<void>;
|
||||
onApprovalEvent?: (payload: { phase: string; command?: string }) => Promise<void>;
|
||||
onCommandOutput?: (payload: {
|
||||
phase: string;
|
||||
name?: string;
|
||||
exitCode?: number;
|
||||
title?: string;
|
||||
}) => Promise<void>;
|
||||
onPatchSummary?: (payload: {
|
||||
phase: string;
|
||||
summary?: string;
|
||||
title?: string;
|
||||
}) => Promise<void>;
|
||||
disableBlockStreaming?: boolean;
|
||||
};
|
||||
|
||||
@@ -2513,6 +2539,7 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
replyToMode?: "off" | "first" | "all" | "batched";
|
||||
blockStreamingEnabled?: boolean;
|
||||
streaming?: "partial" | "quiet";
|
||||
previewToolProgressEnabled?: boolean;
|
||||
}) {
|
||||
let capturedDeliver: DeliverFn | undefined;
|
||||
let capturedReplyOpts: ReplyOpts | undefined;
|
||||
@@ -2542,6 +2569,7 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
streaming: opts?.streaming ?? "quiet",
|
||||
previewToolProgressEnabled: opts?.previewToolProgressEnabled ?? false,
|
||||
blockStreamingEnabled: opts?.blockStreamingEnabled ?? false,
|
||||
replyToMode: opts?.replyToMode ?? "off",
|
||||
client: { redactEvent: redactEventMock },
|
||||
@@ -2624,6 +2652,69 @@ describe("matrix monitor handler draft streaming", () => {
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("streams tool progress into the Matrix draft preview when enabled", async () => {
|
||||
const { dispatch, redactEventMock } = createStreamingHarness({
|
||||
previewToolProgressEnabled: true,
|
||||
});
|
||||
const { deliver, opts, finish } = await dispatch();
|
||||
|
||||
expect(opts.suppressDefaultToolProgressMessages).toBe(true);
|
||||
await opts.onToolStart?.({ name: "read_file" });
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe(
|
||||
"Working...\n- `tool: read_file`",
|
||||
);
|
||||
|
||||
await deliver({ text: "Done" }, { kind: "final" });
|
||||
|
||||
expect(editMessageMatrixMock).toHaveBeenCalledWith(
|
||||
"!room:example.org",
|
||||
"$draft1",
|
||||
"Done",
|
||||
expect.objectContaining({
|
||||
extraContent: { [MATRIX_OPENCLAW_FINALIZED_PREVIEW_KEY]: true },
|
||||
}),
|
||||
);
|
||||
expect(deliverMatrixRepliesMock).not.toHaveBeenCalled();
|
||||
expect(redactEventMock).not.toHaveBeenCalled();
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("keeps Matrix tool progress mentions inside code formatting", async () => {
|
||||
const { dispatch } = createStreamingHarness({
|
||||
previewToolProgressEnabled: true,
|
||||
streaming: "partial",
|
||||
});
|
||||
const { opts, finish } = await dispatch();
|
||||
|
||||
await opts.onItemEvent?.({
|
||||
progressText: "@room ping @alice:example.org [label](https://example.org)",
|
||||
});
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe(
|
||||
"Working...\n- `@room ping @alice:example.org [label](https://example.org)`",
|
||||
);
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("leaves Matrix tool progress on the default tool delivery path when disabled", async () => {
|
||||
const { dispatch } = createStreamingHarness({
|
||||
previewToolProgressEnabled: false,
|
||||
});
|
||||
const { opts, finish } = await dispatch();
|
||||
|
||||
expect(opts.suppressDefaultToolProgressMessages).toBeUndefined();
|
||||
expect(opts.onToolStart).toBeUndefined();
|
||||
expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled();
|
||||
await finish();
|
||||
});
|
||||
|
||||
it("keeps partial preview-first finalization on the existing draft when text is unchanged", async () => {
|
||||
const { dispatch, redactEventMock } = createStreamingHarness({
|
||||
blockStreamingEnabled: true,
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
evaluateSupplementalContextVisibility,
|
||||
resolveChannelContextVisibilityMode,
|
||||
} from "openclaw/plugin-sdk/context-visibility-runtime";
|
||||
import type { GetReplyOptions } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
@@ -76,6 +77,7 @@ import { isMatrixVerificationRoomMessage } from "./verification-utils.js";
|
||||
|
||||
const ALLOW_FROM_STORE_CACHE_TTL_MS = 30_000;
|
||||
const PAIRING_REPLY_COOLDOWN_MS = 5 * 60_000;
|
||||
const MATRIX_TOOL_PROGRESS_MAX_CHARS = 300;
|
||||
let matrixSendModulePromise: Promise<typeof import("../send.js")> | undefined;
|
||||
let acpBindingRuntimePromise:
|
||||
| Promise<typeof import("openclaw/plugin-sdk/acp-binding-runtime")>
|
||||
@@ -171,6 +173,7 @@ export type MatrixMonitorHandlerParams = {
|
||||
/** DM session grouping behavior. */
|
||||
dmSessionScope?: "per-user" | "per-room";
|
||||
streaming: MatrixStreamingMode;
|
||||
previewToolProgressEnabled: boolean;
|
||||
blockStreamingEnabled: boolean;
|
||||
dmEnabled: boolean;
|
||||
dmPolicy: "open" | "pairing" | "allowlist" | "disabled";
|
||||
@@ -354,6 +357,32 @@ function resolveMatrixAllowBotsMode(value?: boolean | "mentions"): MatrixAllowBo
|
||||
return "off";
|
||||
}
|
||||
|
||||
function formatMatrixToolProgressMarkdownCode(text: string): string {
|
||||
const clipped =
|
||||
text.length <= MATRIX_TOOL_PROGRESS_MAX_CHARS
|
||||
? text
|
||||
: `${text.slice(0, MATRIX_TOOL_PROGRESS_MAX_CHARS - 1).trimEnd()}...`;
|
||||
const safe = clipped.replaceAll("`", "'");
|
||||
return `\`${safe}\``;
|
||||
}
|
||||
|
||||
function formatMatrixCommandOutputToolProgress(payload: {
|
||||
exitCode?: number | null;
|
||||
name?: string;
|
||||
title?: string;
|
||||
}) {
|
||||
if (!payload.name) {
|
||||
return payload.title;
|
||||
}
|
||||
if (payload.exitCode === 0) {
|
||||
return `${payload.name} ok`;
|
||||
}
|
||||
if (payload.exitCode != null) {
|
||||
return `${payload.name} (exit ${payload.exitCode})`;
|
||||
}
|
||||
return payload.name;
|
||||
}
|
||||
|
||||
export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParams) {
|
||||
const {
|
||||
client,
|
||||
@@ -374,6 +403,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
previewToolProgressEnabled,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
dmPolicy,
|
||||
@@ -1466,6 +1496,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
)
|
||||
: undefined;
|
||||
draftStreamRef = draftStream;
|
||||
const shouldStreamPreviewToolProgress = Boolean(draftStream) && previewToolProgressEnabled;
|
||||
type PendingDraftBoundary = {
|
||||
messageGeneration: number;
|
||||
endOffset: number;
|
||||
@@ -1479,9 +1510,91 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
const pendingDraftBoundaries: PendingDraftBoundary[] = [];
|
||||
const latestQueuedDraftBoundaryOffsets = new Map<number, number>();
|
||||
let currentDraftReplyToId = draftReplyToId;
|
||||
let previewToolProgressSuppressed = false;
|
||||
let previewToolProgressLines: string[] = [];
|
||||
// Set after the first final payload consumes or discards the draft event
|
||||
// so subsequent finals go through normal delivery.
|
||||
|
||||
const pushPreviewToolProgress = (line?: string) => {
|
||||
if (!draftStream || !shouldStreamPreviewToolProgress || previewToolProgressSuppressed) {
|
||||
return;
|
||||
}
|
||||
const normalized = line?.replace(/\s+/g, " ").trim();
|
||||
if (!normalized) {
|
||||
return;
|
||||
}
|
||||
const previous = previewToolProgressLines.at(-1);
|
||||
if (previous === normalized) {
|
||||
return;
|
||||
}
|
||||
previewToolProgressLines = [...previewToolProgressLines, normalized].slice(-8);
|
||||
draftStream.update(
|
||||
[
|
||||
"Working...",
|
||||
...previewToolProgressLines.map(
|
||||
(entry) => `- ${formatMatrixToolProgressMarkdownCode(entry)}`,
|
||||
),
|
||||
].join("\n"),
|
||||
);
|
||||
};
|
||||
|
||||
const suppressPreviewToolProgressForAnswerText = (text: string | undefined) => {
|
||||
if (!text?.trim()) {
|
||||
return;
|
||||
}
|
||||
previewToolProgressSuppressed = true;
|
||||
previewToolProgressLines = [];
|
||||
};
|
||||
|
||||
const resetPreviewToolProgress = () => {
|
||||
previewToolProgressSuppressed = false;
|
||||
previewToolProgressLines = [];
|
||||
};
|
||||
|
||||
const buildPreviewToolProgressReplyOptions = (): Partial<GetReplyOptions> => {
|
||||
if (!shouldStreamPreviewToolProgress) {
|
||||
return {};
|
||||
}
|
||||
return {
|
||||
suppressDefaultToolProgressMessages: true,
|
||||
onToolStart: async (payload) => {
|
||||
const toolName = payload.name?.trim();
|
||||
pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running");
|
||||
},
|
||||
onItemEvent: async (payload) => {
|
||||
pushPreviewToolProgress(
|
||||
payload.progressText ?? payload.summary ?? payload.title ?? payload.name,
|
||||
);
|
||||
},
|
||||
onPlanUpdate: async (payload) => {
|
||||
if (payload.phase !== "update") {
|
||||
return;
|
||||
}
|
||||
pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning");
|
||||
},
|
||||
onApprovalEvent: async (payload) => {
|
||||
if (payload.phase !== "requested") {
|
||||
return;
|
||||
}
|
||||
pushPreviewToolProgress(
|
||||
payload.command ? `approval: ${payload.command}` : "approval requested",
|
||||
);
|
||||
},
|
||||
onCommandOutput: async (payload) => {
|
||||
if (payload.phase !== "end") {
|
||||
return;
|
||||
}
|
||||
pushPreviewToolProgress(formatMatrixCommandOutputToolProgress(payload));
|
||||
},
|
||||
onPatchSummary: async (payload) => {
|
||||
if (payload.phase !== "end") {
|
||||
return;
|
||||
}
|
||||
pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied");
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const getDisplayableDraftText = () => {
|
||||
const nextDraftBoundaryOffset = pendingDraftBoundaries.find(
|
||||
(boundary) => boundary.messageGeneration === currentDraftMessageGeneration,
|
||||
@@ -1771,6 +1884,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
onPartialReply: draftStream
|
||||
? (payload) => {
|
||||
latestDraftFullText = payload.text ?? "";
|
||||
suppressPreviewToolProgressForAnswerText(latestDraftFullText);
|
||||
updateDraftFromLatestFullText();
|
||||
}
|
||||
: undefined,
|
||||
@@ -1788,8 +1902,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
onAssistantMessageStart: draftStream
|
||||
? () => {
|
||||
resetDraftBlockOffsets();
|
||||
resetPreviewToolProgress();
|
||||
}
|
||||
: undefined,
|
||||
...buildPreviewToolProgressReplyOptions(),
|
||||
onModelSelected,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { z } from "openclaw/plugin-sdk/zod";
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { MatrixConfig, MatrixStreamingMode } from "../../types.js";
|
||||
import type { MatrixRoomInfo } from "./room-info.js";
|
||||
|
||||
type DirectRoomTrackerOptions = {
|
||||
@@ -382,11 +383,12 @@ vi.mock("./startup.js", () => ({
|
||||
runMatrixStartupMaintenance: hoisted.runMatrixStartupMaintenance,
|
||||
}));
|
||||
|
||||
let matrixMonitorTesting: typeof import("./index.js").__testing;
|
||||
let monitorMatrixProvider: typeof import("./index.js").monitorMatrixProvider;
|
||||
|
||||
describe("monitorMatrixProvider", () => {
|
||||
beforeAll(async () => {
|
||||
({ monitorMatrixProvider } = await import("./index.js"));
|
||||
({ __testing: matrixMonitorTesting, monitorMatrixProvider } = await import("./index.js"));
|
||||
});
|
||||
|
||||
async function flushUntil(predicate: () => boolean, message: string): Promise<void> {
|
||||
@@ -466,6 +468,30 @@ describe("monitorMatrixProvider", () => {
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
});
|
||||
|
||||
it.each([
|
||||
[undefined, "off", false],
|
||||
[false, "off", false],
|
||||
[true, "partial", true],
|
||||
["off", "off", false],
|
||||
["partial", "partial", true],
|
||||
["quiet", "quiet", true],
|
||||
[{}, "off", false],
|
||||
[{ mode: "off" }, "off", false],
|
||||
[{ mode: "partial" }, "partial", true],
|
||||
[{ mode: "quiet" }, "quiet", true],
|
||||
[{ mode: "partial", preview: { toolProgress: false } }, "partial", false],
|
||||
[{ mode: "quiet", preview: { toolProgress: false } }, "quiet", false],
|
||||
[{ mode: "off", preview: { toolProgress: true } }, "off", false],
|
||||
] satisfies Array<[MatrixConfig["streaming"], MatrixStreamingMode, boolean]>)(
|
||||
"resolves streaming=%j to mode=%s and toolProgress=%s",
|
||||
(streaming, expectedMode, expectedPreviewToolProgressEnabled) => {
|
||||
expect(matrixMonitorTesting.resolveMatrixStreamingMode(streaming)).toBe(expectedMode);
|
||||
expect(matrixMonitorTesting.resolveMatrixPreviewToolProgressEnabled(streaming)).toBe(
|
||||
expectedPreviewToolProgressEnabled,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it("returns immediately when the abort signal is already canceled", async () => {
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
@@ -13,7 +13,13 @@ import {
|
||||
type RuntimeEnv,
|
||||
} from "../../runtime-api.js";
|
||||
import { getMatrixRuntime } from "../../runtime.js";
|
||||
import type { CoreConfig, ReplyToMode } from "../../types.js";
|
||||
import type {
|
||||
CoreConfig,
|
||||
MatrixConfig,
|
||||
MatrixStreamingConfig,
|
||||
MatrixStreamingMode,
|
||||
ReplyToMode,
|
||||
} from "../../types.js";
|
||||
import { resolveMatrixAccountConfig } from "../account-config.js";
|
||||
import { resolveConfiguredMatrixBotUserIds } from "../accounts.js";
|
||||
import { setActiveMatrixClient } from "../active-client.js";
|
||||
@@ -60,6 +66,46 @@ export type MonitorMatrixOpts = {
|
||||
setStatus?: (next: import("openclaw/plugin-sdk/channel-contract").ChannelAccountSnapshot) => void;
|
||||
};
|
||||
|
||||
function isMatrixStreamingConfig(
|
||||
streaming: MatrixConfig["streaming"],
|
||||
): streaming is MatrixStreamingConfig {
|
||||
return Boolean(streaming && typeof streaming === "object" && !Array.isArray(streaming));
|
||||
}
|
||||
|
||||
function resolveMatrixStreamingMode(streaming: MatrixConfig["streaming"]): MatrixStreamingMode {
|
||||
if (streaming === true || streaming === "partial") {
|
||||
return "partial";
|
||||
}
|
||||
if (streaming === "quiet") {
|
||||
return "quiet";
|
||||
}
|
||||
if (isMatrixStreamingConfig(streaming)) {
|
||||
if (streaming.mode === "partial" || streaming.mode === "quiet") {
|
||||
return streaming.mode;
|
||||
}
|
||||
}
|
||||
return "off";
|
||||
}
|
||||
|
||||
function resolveMatrixPreviewToolProgress(streaming: MatrixConfig["streaming"]): boolean {
|
||||
if (!isMatrixStreamingConfig(streaming)) {
|
||||
return true;
|
||||
}
|
||||
return streaming.preview?.toolProgress ?? true;
|
||||
}
|
||||
|
||||
function resolveMatrixPreviewToolProgressEnabled(streaming: MatrixConfig["streaming"]): boolean {
|
||||
return (
|
||||
resolveMatrixStreamingMode(streaming) !== "off" && resolveMatrixPreviewToolProgress(streaming)
|
||||
);
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
resolveMatrixPreviewToolProgress,
|
||||
resolveMatrixPreviewToolProgressEnabled,
|
||||
resolveMatrixStreamingMode,
|
||||
};
|
||||
|
||||
const DEFAULT_MEDIA_MAX_MB = 20;
|
||||
|
||||
export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promise<void> {
|
||||
@@ -244,12 +290,10 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const historyLimit = Math.max(0, accountConfig.historyLimit ?? globalGroupChatHistoryLimit ?? 0);
|
||||
const mediaMaxMb = opts.mediaMaxMb ?? accountConfig.mediaMaxMb ?? DEFAULT_MEDIA_MAX_MB;
|
||||
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
|
||||
const streaming: "partial" | "quiet" | "off" =
|
||||
accountConfig.streaming === true || accountConfig.streaming === "partial"
|
||||
? "partial"
|
||||
: accountConfig.streaming === "quiet"
|
||||
? "quiet"
|
||||
: "off";
|
||||
const streaming = resolveMatrixStreamingMode(accountConfig.streaming);
|
||||
const previewToolProgressEnabled = resolveMatrixPreviewToolProgressEnabled(
|
||||
accountConfig.streaming,
|
||||
);
|
||||
const blockStreamingEnabled = accountConfig.blockStreaming === true;
|
||||
const startupMs = Date.now();
|
||||
const startupGraceMs = 0;
|
||||
@@ -340,6 +384,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
dmThreadReplies,
|
||||
dmSessionScope,
|
||||
streaming,
|
||||
previewToolProgressEnabled,
|
||||
blockStreamingEnabled,
|
||||
dmEnabled,
|
||||
dmPolicy,
|
||||
|
||||
@@ -695,6 +695,28 @@ describe("sendSingleTextMessageMatrix", () => {
|
||||
).not.toContain("matrix.to");
|
||||
});
|
||||
|
||||
it("does not activate mentions inside Matrix tool-progress code spans", async () => {
|
||||
const { client, sendMessage } = makeClient();
|
||||
|
||||
await sendSingleTextMessageMatrix(
|
||||
"room:!room:example",
|
||||
"Working...\n- `@room ping @alice:example.org !room:example.org`",
|
||||
{
|
||||
client,
|
||||
cfg: {} as never,
|
||||
},
|
||||
);
|
||||
|
||||
expect(sendMessage.mock.calls[0]?.[1]).toMatchObject({
|
||||
body: "Working...\n- `@room ping @alice:example.org !room:example.org`",
|
||||
"m.mentions": {},
|
||||
});
|
||||
const formattedBody = (sendMessage.mock.calls[0]?.[1] as { formatted_body?: string })
|
||||
.formatted_body;
|
||||
expect(formattedBody).toContain("<code>@room ping @alice:example.org !room:example.org</code>");
|
||||
expect(formattedBody).not.toContain("matrix.to");
|
||||
});
|
||||
|
||||
it("merges extra content fields into single-event sends", async () => {
|
||||
const { client, sendMessage } = makeClient();
|
||||
|
||||
|
||||
@@ -85,6 +85,15 @@ export type MatrixExecApprovalConfig = {
|
||||
|
||||
export type MatrixStreamingMode = "partial" | "quiet" | "off";
|
||||
|
||||
export type MatrixStreamingConfig = {
|
||||
/** Preview streaming mode for Matrix replies. Default: "off". */
|
||||
mode?: MatrixStreamingMode;
|
||||
preview?: {
|
||||
/** Show tool/progress activity in the live draft preview. Default: true. */
|
||||
toolProgress?: boolean;
|
||||
};
|
||||
};
|
||||
|
||||
export type MatrixNetworkConfig = {
|
||||
/** Dangerous opt-in for trusted private/internal Matrix homeservers. */
|
||||
dangerouslyAllowPrivateNetwork?: boolean;
|
||||
@@ -200,11 +209,13 @@ export type MatrixConfig = {
|
||||
* stay visible as separate progress messages. When combined with
|
||||
* preview streaming, Matrix keeps a live draft for the current block and
|
||||
* preserves completed blocks as separate messages.
|
||||
* - `streaming.preview.toolProgress: false` keeps answer preview edits but
|
||||
* hides interim tool/progress lines.
|
||||
* - `true` maps to `"partial"`, `false` maps to `"off"` for backward
|
||||
* compatibility.
|
||||
* compatibility. Object form uses `streaming.mode`.
|
||||
* Default: `"off"`.
|
||||
*/
|
||||
streaming?: MatrixStreamingMode | boolean;
|
||||
streaming?: MatrixStreamingMode | MatrixStreamingConfig | boolean;
|
||||
};
|
||||
|
||||
export type CoreConfig = {
|
||||
|
||||
Reference in New Issue
Block a user