feat(mattermost): keep draft previews on one visible sink per turn (#47838)

Merged via squash.

Prepared head SHA: e4e3205176
Co-authored-by: ninjaa <1315093+ninjaa@users.noreply.github.com>
Co-authored-by: mukhtharcm <56378562+mukhtharcm@users.noreply.github.com>
Reviewed-by: @mukhtharcm
This commit is contained in:
Aditya Advani
2026-04-20 05:27:12 -07:00
committed by GitHub
parent 91d31197be
commit b38988ca96
6 changed files with 825 additions and 35 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
- Plugins/tasks: add a detached runtime registration contract so plugin executors can own detached task lifecycle and cancellation without reaching into core task internals. (#68915) Thanks @mbelinky.
- Terminal/logging: optimize `sanitizeForLog()` by replacing the iterative control-character stripping loop with a single regex pass while preserving the existing ANSI-first sanitization behavior. (#67205) Thanks @bulutmuf.
- QA/CI: make `openclaw qa suite` and `openclaw qa telegram` fail by default when scenarios fail, add `--allow-failures` for artifact-only runs, and tighten live-lane defaults for CI automation. (#69122) Thanks @joshavant.
- Mattermost: stream thinking, tool activity, and partial reply text into a single draft preview post that finalizes in place when safe. (#47838) thanks @ninjaa.
### Fixes

View File

@@ -545,6 +545,15 @@ export async function updateMattermostPost(
});
}
export async function deleteMattermostPost(
client: MattermostClient,
postId: string,
): Promise<void> {
await client.request<void>(`/posts/${postId}`, {
method: "DELETE",
});
}
export async function uploadMattermostFile(
client: MattermostClient,
params: {

View File

@@ -0,0 +1,214 @@
import { describe, expect, it, vi } from "vitest";
import type { MattermostClient } from "./client.js";
import { buildMattermostToolStatusText, createMattermostDraftStream } from "./draft-stream.js";
type RequestRecord = {
path: string;
init?: RequestInit;
};
function createMockClient(): {
client: MattermostClient;
calls: RequestRecord[];
requestMock: ReturnType<typeof vi.fn>;
} {
const calls: RequestRecord[] = [];
let nextId = 1;
const requestImpl: MattermostClient["request"] = async <T>(
path: string,
init?: RequestInit,
): Promise<T> => {
calls.push({ path, init });
if (path === "/posts") {
return { id: `post-${nextId++}` } as T;
}
if (path.startsWith("/posts/")) {
return { id: "patched" } as T;
}
return {} as T;
};
const requestMock = vi.fn(requestImpl);
const client: MattermostClient = {
baseUrl: "https://chat.example.com",
apiBaseUrl: "https://chat.example.com/api/v4",
token: "token",
request: requestMock as MattermostClient["request"],
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
};
return { client, calls, requestMock };
}
describe("createMattermostDraftStream", () => {
it("creates a preview post and updates it on later changes", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
rootId: "root-1",
throttleMs: 0,
});
stream.update("Running `read`…");
await stream.flush();
stream.update("Running `read`…");
await stream.flush();
expect(calls).toHaveLength(1);
expect(calls[0]?.path).toBe("/posts");
const createBody = JSON.parse((calls[0]?.init?.body as string | undefined) ?? "{}");
expect(createBody).toMatchObject({
channel_id: "channel-1",
root_id: "root-1",
message: "Running `read`…",
});
expect(stream.postId()).toBe("post-1");
});
it("does not resend identical updates", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
throttleMs: 0,
});
stream.update("Working...");
await stream.flush();
stream.update("Working...");
await stream.flush();
expect(calls).toHaveLength(1);
});
it("clears the preview post when no final reply is delivered", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
rootId: "root-1",
throttleMs: 0,
});
stream.update("Working...");
await stream.flush();
await stream.clear();
expect(calls).toHaveLength(2);
expect(calls[1]?.path).toBe("/posts/post-1");
expect(calls[1]?.init?.method).toBe("DELETE");
expect(stream.postId()).toBeUndefined();
});
it("stop flushes the last pending update and ignores later ones", async () => {
const { client, calls } = createMockClient();
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
rootId: "root-1",
throttleMs: 1000,
});
stream.update("Working...");
await stream.flush();
stream.update("Stale partial");
await stream.stop();
stream.update("Late partial");
await stream.flush();
expect(calls).toHaveLength(2);
expect(calls[0]?.path).toBe("/posts");
expect(calls[1]?.path).toBe("/posts/post-1");
expect(JSON.parse((calls[1]?.init?.body as string | undefined) ?? "{}")).toMatchObject({
message: "Stale partial",
});
});
it("warns and stops when preview creation fails", async () => {
const warn = vi.fn();
const requestImpl: MattermostClient["request"] = async () => {
throw new Error("boom");
};
const requestMock = vi.fn(requestImpl);
const client: MattermostClient = {
baseUrl: "https://chat.example.com",
apiBaseUrl: "https://chat.example.com/api/v4",
token: "token",
request: requestMock as MattermostClient["request"],
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
};
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
throttleMs: 0,
warn,
});
stream.update("Working...");
await stream.flush();
stream.update("Still working...");
await stream.flush();
expect(warn).toHaveBeenCalled();
expect(requestMock).toHaveBeenCalledTimes(1);
expect(stream.postId()).toBeUndefined();
});
it("does not resend after an update failure followed by stop", async () => {
const warn = vi.fn();
const calls: RequestRecord[] = [];
let failNextPatch = true;
const requestImpl: MattermostClient["request"] = async <T>(
path: string,
init?: RequestInit,
): Promise<T> => {
calls.push({ path, init });
if (path === "/posts") {
return { id: "post-1" } as T;
}
if (path === "/posts/post-1") {
if (failNextPatch) {
failNextPatch = false;
throw new Error("patch failed");
}
return { id: "patched" } as T;
}
return {} as T;
};
const requestMock = vi.fn(requestImpl);
const client: MattermostClient = {
baseUrl: "https://chat.example.com",
apiBaseUrl: "https://chat.example.com/api/v4",
token: "token",
request: requestMock as MattermostClient["request"],
fetchImpl: vi.fn() as MattermostClient["fetchImpl"],
};
const stream = createMattermostDraftStream({
client,
channelId: "channel-1",
throttleMs: 1000,
warn,
});
stream.update("Working...");
await stream.flush();
stream.update("Will fail");
await stream.flush();
await stream.stop();
expect(warn).toHaveBeenCalledWith("mattermost stream preview failed: patch failed");
expect(calls).toHaveLength(2);
expect(calls[0]?.path).toBe("/posts");
expect(calls[1]?.path).toBe("/posts/post-1");
});
});
describe("buildMattermostToolStatusText", () => {
it("renders a status with the tool name", () => {
expect(buildMattermostToolStatusText({ name: "read" })).toBe("Running `read`…");
});
it("falls back to a generic running tool status", () => {
expect(buildMattermostToolStatusText({ name: "exec" })).toBe("Running `exec`…");
});
});

View File

@@ -0,0 +1,131 @@
import { createFinalizableDraftLifecycle } from "openclaw/plugin-sdk/channel-lifecycle";
import {
createMattermostPost,
deleteMattermostPost,
updateMattermostPost,
type MattermostClient,
} from "./client.js";
const MATTERMOST_STREAM_MAX_CHARS = 4000;
const DEFAULT_THROTTLE_MS = 1000;
export type MattermostDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
postId: () => string | undefined;
clear: () => Promise<void>;
stop: () => Promise<void>;
forceNewMessage: () => void;
};
export function normalizeMattermostDraftText(text: string, maxChars: number): string {
const trimmed = text.trim();
if (!trimmed) {
return "";
}
if (trimmed.length <= maxChars) {
return trimmed;
}
return `${trimmed.slice(0, Math.max(0, maxChars - 3)).trimEnd()}...`;
}
export function buildMattermostToolStatusText(params: { name?: string; phase?: string }): string {
const tool = params.name?.trim() ? ` \`${params.name.trim()}\`` : " tool";
return `Running${tool}`;
}
export function createMattermostDraftStream(params: {
client: MattermostClient;
channelId: string;
rootId?: string;
maxChars?: number;
throttleMs?: number;
renderText?: (text: string) => string;
log?: (message: string) => void;
warn?: (message: string) => void;
}): MattermostDraftStream {
const maxChars = Math.min(
params.maxChars ?? MATTERMOST_STREAM_MAX_CHARS,
MATTERMOST_STREAM_MAX_CHARS,
);
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const streamState = { stopped: false, final: false };
let streamPostId: string | undefined;
let lastSentText = "";
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
if (streamState.stopped && !streamState.final) {
return false;
}
const rendered = params.renderText?.(text) ?? text;
const normalized = normalizeMattermostDraftText(rendered, maxChars);
if (!normalized) {
return false;
}
if (normalized === lastSentText) {
return true;
}
try {
if (streamPostId) {
await updateMattermostPost(params.client, streamPostId, {
message: normalized,
});
} else {
const sent = await createMattermostPost(params.client, {
channelId: params.channelId,
message: normalized,
rootId: params.rootId,
});
const postId = sent.id?.trim();
if (!postId) {
streamState.stopped = true;
params.warn?.("mattermost stream preview stopped (missing post id from create)");
return false;
}
streamPostId = postId;
}
lastSentText = normalized;
return true;
} catch (err) {
streamState.stopped = true;
params.warn?.(
`mattermost stream preview failed: ${err instanceof Error ? err.message : String(err)}`,
);
return false;
}
};
const { loop, update, stop, clear } = createFinalizableDraftLifecycle({
throttleMs,
state: streamState,
sendOrEditStreamMessage,
readMessageId: () => streamPostId,
clearMessageId: () => {
streamPostId = undefined;
},
isValidMessageId: (value): value is string => typeof value === "string" && value.length > 0,
deleteMessage: async (postId) => {
await deleteMattermostPost(params.client, postId);
},
warn: params.warn,
warnPrefix: "mattermost stream preview cleanup failed",
});
const forceNewMessage = () => {
streamPostId = undefined;
lastSentText = "";
loop.resetPending();
loop.resetThrottleWindow();
};
params.log?.(`mattermost stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return {
update,
flush: loop.flush,
postId: () => streamPostId,
clear,
stop,
forceNewMessage,
};
}

View File

@@ -1,9 +1,13 @@
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import { describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../runtime-api.js";
import { resolveMattermostAccount } from "./accounts.js";
import * as clientModule from "./client.js";
import type { MattermostClient } from "./client.js";
import {
buildMattermostModelPickerSelectMessageSid,
canFinalizeMattermostPreviewInPlace,
deliverMattermostReplyWithDraftPreview,
evaluateMattermostMentionGate,
MattermostRetryableInboundError,
processMattermostReplayGuardedPost,
@@ -11,6 +15,8 @@ import {
resolveMattermostEffectiveReplyToId,
resolveMattermostReplyRootId,
resolveMattermostThreadSessionContext,
shouldFinalizeMattermostPreviewAfterDispatch,
shouldClearMattermostDraftPreview,
type MattermostMentionGateInput,
type MattermostRequireMentionResolverInput,
} from "./monitor.js";
@@ -41,6 +47,34 @@ function resolveRequireMentionForTest(params: MattermostRequireMentionResolverIn
return true;
}
const updateMattermostPostSpy = vi.spyOn(clientModule, "updateMattermostPost");
function createMattermostClientMock(): MattermostClient {
return {
baseUrl: "https://chat.example.com",
apiBaseUrl: "https://chat.example.com/api/v4",
token: "token",
request: vi.fn(async () => ({})) as MattermostClient["request"],
fetchImpl: vi.fn(
async () => new Response(null, { status: 200 }),
) as MattermostClient["fetchImpl"],
};
}
function createDraftStreamMock(postId: string | undefined = "preview-post-1") {
return {
flush: vi.fn(async () => {}),
postId: vi.fn(() => postId),
clear: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
};
}
beforeEach(() => {
vi.clearAllMocks();
updateMattermostPostSpy.mockResolvedValue({ id: "patched" } as never);
});
function evaluateMentionGateForMessage(params: { cfg: OpenClawConfig; threadRootId?: string }) {
const account = resolveMattermostAccount({ cfg: params.cfg, accountId: "default" });
const resolver = vi.fn(resolveRequireMentionForTest);
@@ -167,6 +201,186 @@ describe("resolveMattermostReplyRootId", () => {
});
});
describe("canFinalizeMattermostPreviewInPlace", () => {
it("allows in-place finalization when the final reply target matches the preview thread", () => {
expect(
canFinalizeMattermostPreviewInPlace({
previewRootId: "thread-root-456",
threadRootId: "thread-root-456",
replyToId: "child-post-789",
}),
).toBe(true);
});
it("prevents in-place finalization when a top-level preview would become a threaded reply", () => {
expect(
canFinalizeMattermostPreviewInPlace({
replyToId: "child-post-789",
}),
).toBe(false);
});
});
describe("shouldClearMattermostDraftPreview", () => {
it("deletes the preview after successful normal final delivery", () => {
expect(
shouldClearMattermostDraftPreview({
finalizedViaPreviewPost: false,
finalReplyDelivered: true,
}),
).toBe(true);
});
it("keeps the preview when final delivery failed", () => {
expect(
shouldClearMattermostDraftPreview({
finalizedViaPreviewPost: false,
finalReplyDelivered: false,
}),
).toBe(false);
});
it("keeps the preview when it already became the final reply", () => {
expect(
shouldClearMattermostDraftPreview({
finalizedViaPreviewPost: true,
finalReplyDelivered: true,
}),
).toBe(false);
});
});
describe("deliverMattermostReplyWithDraftPreview", () => {
it("deletes the preview after a successful normal final send", async () => {
const draftStream = createDraftStreamMock();
const deliverFinal = vi.fn(async () => {});
await deliverMattermostReplyWithDraftPreview({
payload: { text: "All good", replyToId: "reply-1" } as never,
info: { kind: "final" },
client: createMattermostClientMock(),
draftStream,
resolvePreviewFinalText: (text) => text?.trim(),
previewState: { finalizedViaPreviewPost: false },
logVerboseMessage: vi.fn(),
deliverFinal,
});
expect(deliverFinal).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
expect(updateMattermostPostSpy).not.toHaveBeenCalled();
});
it("deletes the preview after a successful non-finalizable media final", async () => {
const draftStream = createDraftStreamMock();
const deliverFinal = vi.fn(async () => {});
await deliverMattermostReplyWithDraftPreview({
payload: {
text: "Photo",
replyToId: "reply-1",
mediaUrl: "https://example.com/a.png",
} as never,
info: { kind: "final" },
client: createMattermostClientMock(),
draftStream,
effectiveReplyToId: "thread-root-1",
resolvePreviewFinalText: (text) => text?.trim(),
previewState: { finalizedViaPreviewPost: false },
logVerboseMessage: vi.fn(),
deliverFinal,
});
expect(deliverFinal).toHaveBeenCalledTimes(1);
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("finalizes the preview in place when the final targets the same thread", async () => {
const draftStream = createDraftStreamMock();
const deliverFinal = vi.fn(async () => {});
await deliverMattermostReplyWithDraftPreview({
payload: { text: "Final answer", replyToId: "child-post-789" } as never,
info: { kind: "final" },
client: createMattermostClientMock(),
draftStream,
effectiveReplyToId: "thread-root-456",
resolvePreviewFinalText: (text) => text?.trim(),
previewState: { finalizedViaPreviewPost: false },
logVerboseMessage: vi.fn(),
deliverFinal,
});
expect(updateMattermostPostSpy).toHaveBeenCalledWith(
expect.anything(),
"preview-post-1",
expect.objectContaining({ message: "Final answer" }),
);
expect(draftStream.stop).toHaveBeenCalledTimes(1);
expect(draftStream.stop.mock.invocationCallOrder[0]).toBeLessThan(
updateMattermostPostSpy.mock.invocationCallOrder[0] ?? Number.POSITIVE_INFINITY,
);
expect(deliverFinal).not.toHaveBeenCalled();
expect(draftStream.clear).not.toHaveBeenCalled();
});
it("keeps the existing preview unchanged when final delivery fails", async () => {
const draftStream = createDraftStreamMock();
const deliverFinal = vi.fn(async () => {
throw new Error("send failed");
});
await expect(
deliverMattermostReplyWithDraftPreview({
payload: { text: "Broken", replyToId: "reply-1" } as never,
info: { kind: "final" },
client: createMattermostClientMock(),
draftStream,
resolvePreviewFinalText: (text) => text?.trim(),
previewState: { finalizedViaPreviewPost: false },
logVerboseMessage: vi.fn(),
deliverFinal,
}),
).rejects.toThrow("send failed");
expect(draftStream.clear).not.toHaveBeenCalled();
expect(updateMattermostPostSpy).not.toHaveBeenCalledWith(
expect.anything(),
"preview-post-1",
expect.objectContaining({ message: "↓ See below." }),
);
});
});
describe("shouldFinalizeMattermostPreviewAfterDispatch", () => {
it("reuses the preview only for a single eligible final payload", () => {
expect(
shouldFinalizeMattermostPreviewAfterDispatch({
finalCount: 1,
canFinalizeInPlace: true,
}),
).toBe(true);
});
it("falls back to normal sends for multi-payload finals", () => {
expect(
shouldFinalizeMattermostPreviewAfterDispatch({
finalCount: 2,
canFinalizeInPlace: true,
}),
).toBe(false);
});
it("falls back to normal sends when the final cannot be edited into the preview", () => {
expect(
shouldFinalizeMattermostPreviewAfterDispatch({
finalCount: 1,
canFinalizeInPlace: false,
}),
).toBe(false);
});
});
describe("resolveMattermostEffectiveReplyToId", () => {
it("keeps an existing thread root", () => {
expect(

View File

@@ -10,9 +10,12 @@ import {
createMattermostClient,
fetchMattermostMe,
normalizeMattermostBaseUrl,
updateMattermostPost,
type MattermostClient,
type MattermostPost,
type MattermostUser,
} from "./client.js";
import { buildMattermostToolStatusText, createMattermostDraftStream } from "./draft-stream.js";
import {
computeInteractionCallbackUrl,
createMattermostInteractionHandler,
@@ -236,6 +239,120 @@ export function resolveMattermostReplyRootId(params: {
return normalizeOptionalString(params.replyToId);
}
export function canFinalizeMattermostPreviewInPlace(params: {
previewRootId?: string;
threadRootId?: string;
replyToId?: string;
}): boolean {
return (
resolveMattermostReplyRootId({
threadRootId: params.threadRootId,
replyToId: params.replyToId,
}) === params.previewRootId?.trim()
);
}
export function shouldClearMattermostDraftPreview(params: {
finalizedViaPreviewPost: boolean;
finalReplyDelivered: boolean;
}): boolean {
return params.finalReplyDelivered && !params.finalizedViaPreviewPost;
}
export function shouldFinalizeMattermostPreviewAfterDispatch(params: {
finalCount: number;
canFinalizeInPlace: boolean;
}): boolean {
return params.finalCount === 1 && params.canFinalizeInPlace;
}
type MattermostDraftPreviewState = {
finalizedViaPreviewPost: boolean;
};
type MattermostDraftPreviewDeliverParams = {
payload: ReplyPayload;
info: { kind: "tool" | "block" | "final" };
client: MattermostClient;
draftStream: Pick<
ReturnType<typeof createMattermostDraftStream>,
"flush" | "postId" | "clear" | "stop"
>;
effectiveReplyToId?: string;
resolvePreviewFinalText: (text?: string) => string | undefined;
previewState: MattermostDraftPreviewState;
logVerboseMessage: (message: string) => void;
deliverFinal: () => Promise<void>;
};
export async function deliverMattermostReplyWithDraftPreview(
params: MattermostDraftPreviewDeliverParams,
): Promise<void> {
if (params.payload.isReasoning) {
return;
}
const isFinal = params.info.kind === "final";
let previewPostId: string | undefined;
if (isFinal) {
await params.draftStream.flush();
const hasMedia =
Boolean(params.payload.mediaUrl) || (params.payload.mediaUrls?.length ?? 0) > 0;
const previewFinalText = params.resolvePreviewFinalText(params.payload.text);
previewPostId = params.draftStream.postId();
if (
typeof previewPostId === "string" &&
!hasMedia &&
typeof previewFinalText === "string" &&
!params.payload.isError &&
canFinalizeMattermostPreviewInPlace({
previewRootId: params.effectiveReplyToId,
threadRootId: params.effectiveReplyToId,
replyToId: params.payload.replyToId,
})
) {
try {
// Seal the preview before the final edit so late draft events cannot
// patch over the finalized visible message.
await params.draftStream.stop();
await updateMattermostPost(params.client, previewPostId, {
message: previewFinalText,
});
params.previewState.finalizedViaPreviewPost = true;
return;
} catch (err) {
params.logVerboseMessage(
`mattermost preview final edit failed; falling back to normal send (${String(err)})`,
);
}
}
}
let finalReplyDelivered = false;
try {
await params.deliverFinal();
finalReplyDelivered = true;
} finally {
if (
isFinal &&
typeof previewPostId === "string" &&
shouldClearMattermostDraftPreview({
finalizedViaPreviewPost: params.previewState.finalizedViaPreviewPost,
finalReplyDelivered,
})
) {
try {
await params.draftStream.clear();
} catch (err) {
params.logVerboseMessage(
`mattermost draft preview clear failed after successful final delivery (${String(err)})`,
);
}
}
}
}
export function resolveMattermostEffectiveReplyToId(params: {
kind: ChatType;
postId?: string | null;
@@ -1516,52 +1633,156 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
},
},
});
const { dispatcher, replyOptions, markDispatchIdle } =
const draftStream = createMattermostDraftStream({
client,
channelId,
rootId: effectiveReplyToId,
throttleMs: 1200,
log: logVerboseMessage,
warn: logVerboseMessage,
});
let lastPartialText = "";
const previewState: MattermostDraftPreviewState = {
finalizedViaPreviewPost: false,
};
const resolvePreviewFinalText = (text?: string) => {
if (typeof text !== "string") {
return undefined;
}
const formatted = core.channel.text.convertMarkdownTables(text, tableMode);
const chunkMode = core.channel.text.resolveChunkMode(
cfg,
"mattermost",
account.accountId,
);
const chunks = core.channel.text.chunkMarkdownTextWithMode(
formatted,
textLimit,
chunkMode,
);
if (!chunks.length && formatted) {
chunks.push(formatted);
}
if (chunks.length != 1) {
return undefined;
}
const trimmed = chunks[0]?.trim();
if (!trimmed) {
return undefined;
}
if (
lastPartialText &&
lastPartialText.startsWith(trimmed) &&
trimmed.length < lastPartialText.length
) {
return undefined;
}
return trimmed;
};
const updateDraftFromPartial = (text?: string) => {
const cleaned = text?.trim();
if (!cleaned) {
return;
}
if (cleaned === lastPartialText) {
return;
}
if (
lastPartialText &&
lastPartialText.startsWith(cleaned) &&
cleaned.length < lastPartialText.length
) {
return;
}
lastPartialText = cleaned;
draftStream.update(cleaned);
};
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
core.channel.reply.createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
typingCallbacks,
deliver: async (payload: ReplyPayload) => {
await deliverMattermostReplyPayload({
core,
cfg,
deliver: async (payload: ReplyPayload, info) => {
await deliverMattermostReplyWithDraftPreview({
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
}),
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
info,
client,
draftStream,
effectiveReplyToId,
resolvePreviewFinalText,
previewState,
logVerboseMessage,
deliverFinal: async () => {
await deliverMattermostReplyPayload({
core,
cfg,
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
}),
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
});
runtime.log?.(`delivered reply to ${to}`);
},
});
runtime.log?.(`delivered reply to ${to}`);
},
onError: (err, info) => {
runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`);
},
});
await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
onModelSelected,
},
}),
});
try {
await core.channel.reply.withReplyDispatcher({
dispatcher,
onSettled: () => {
markDispatchIdle();
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
ctx: ctxPayload,
cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming: true,
onModelSelected,
onPartialReply: (payload) => {
updateDraftFromPartial(payload.text);
},
onAssistantMessageStart: () => {
lastPartialText = "";
},
onReasoningEnd: () => {
lastPartialText = "";
},
onReasoningStream: async () => {
if (!lastPartialText) {
draftStream.update("Thinking…");
}
},
onToolStart: async (payload) => {
draftStream.update(buildMattermostToolStatusText(payload));
},
},
}),
});
} finally {
try {
await draftStream.stop();
} catch (err) {
logVerboseMessage(`mattermost draft preview cleanup failed: ${String(err)}`);
}
markRunComplete();
}
if (historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: channelHistories,