refactor(channels): finish turn kernel migration

This commit is contained in:
Peter Steinberger
2026-04-30 01:30:49 +01:00
parent 442e14e359
commit 1ead1b2d18
18 changed files with 802 additions and 531 deletions

View File

@@ -1211,6 +1211,7 @@
"plugins/sdk-subpaths",
"plugins/sdk-entrypoints",
"plugins/sdk-runtime",
"plugins/sdk-channel-turn",
"plugins/sdk-agent-harness",
"plugins/sdk-setup",
"plugins/sdk-testing",

View File

@@ -681,6 +681,9 @@ Write colocated tests in `src/channel.test.ts`:
<Card title="Runtime helpers" icon="settings" href="/plugins/sdk-runtime">
TTS, STT, media, subagent via api.runtime
</Card>
<Card title="Channel turn kernel" icon="bolt" href="/plugins/sdk-channel-turn">
Shared inbound turn lifecycle: ingest, resolve, record, dispatch, finalize
</Card>
</CardGroup>
<Note>

View File

@@ -0,0 +1,393 @@
---
summary: "runtime.channel.turn -- the shared inbound turn kernel that bundled and third-party channel plugins use to record, dispatch, and finalize agent turns"
title: "Channel turn kernel"
sidebarTitle: "Channel turn"
read_when:
- You are building a channel plugin and want the shared inbound turn lifecycle
- You are migrating a channel monitor off hand-rolled record/dispatch glue
- You need to understand admission, ingest, classify, preflight, resolve, record, dispatch, and finalize stages
---
The channel turn kernel is the shared inbound state machine that turns a normalized platform event into an agent turn. Channel plugins provide the platform facts and the delivery callback. Core owns the orchestration: ingest, classify, preflight, resolve, authorize, assemble, record, dispatch, and finalize.
Use this when your plugin is on the inbound message hot path. For non-message events (slash commands, modals, button interactions, lifecycle events, reactions, voice state), keep them plugin-local. The kernel only owns events that may become an agent text turn.
<Info>
The kernel is reached through the injected plugin runtime as `runtime.channel.turn.*`. The plugin runtime type is exported from `openclaw/plugin-sdk/core`, so third-party native plugins can use these entry points the same way bundled channel plugins do.
</Info>
## Why a shared kernel
Channel plugins repeat the same inbound flow: normalize, route, gate, build a context, record session metadata, dispatch the agent turn, finalize delivery state. Without a shared kernel, a change to mention gating, tool-only visible replies, session metadata, pending history, or dispatch finalization has to be applied per channel.
The kernel keeps four concepts deliberately separate:
- `ConversationFacts`: where the message came from
- `RouteFacts`: which agent and session should process it
- `ReplyPlanFacts`: where visible replies should go
- `MessageFacts`: what body and supplemental context the agent should see
Slack DMs, Telegram topics, Matrix threads, and Feishu topic sessions all distinguish these in practice. Treating them as one identifier causes drift over time.
## Stage lifecycle
The kernel runs the same fixed pipeline regardless of channel:
1. `ingest` -- adapter converts a raw platform event into `NormalizedTurnInput`
2. `classify` -- adapter declares whether this event can start an agent turn
3. `preflight` -- adapter does dedupe, self-echo, hydration, debounce, decryption, partial fact prefill
4. `resolve` -- adapter returns a fully assembled turn (route, reply plan, message, delivery)
5. `authorize` -- DM, group, mention, and command policy applied to the assembled facts
6. `assemble` -- `FinalizedMsgContext` built from the facts via `buildContext`
7. `record` -- inbound session metadata and last route persisted
8. `dispatch` -- agent turn executed through the buffered block dispatcher
9. `finalize` -- adapter `onFinalize` runs even on dispatch error
Each stage emits a structured log event when a `log` callback is supplied. See [Observability](#observability).
## Admission kinds
The kernel does not throw when a turn is gated. It returns a `ChannelTurnAdmission`:
| Kind | When |
| ------------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
| `dispatch` | Turn is admitted. Agent turn runs and the visible reply path is exercised. |
| `observeOnly` | Turn runs end-to-end but the delivery adapter sends nothing visible. Used for broadcast observer agents and other passive multi-agent flows. |
| `handled` | A platform event was consumed locally (lifecycle, reaction, button, modal). Kernel skips dispatch. |
| `drop` | Skip path. Optionally `recordHistory: true` keeps the message in pending group history so a future mention has context. |
Admission can come from `classify` (event class said it cannot start a turn), from `preflight` (dedupe, self-echo, missing mention with history record), or from `resolveTurn` itself.
## Entry points
The runtime exposes three preferred entry points so adapters can opt in at the level that matches the channel.
```typescript
runtime.channel.turn.run(...) // adapter-driven full pipeline
runtime.channel.turn.runPrepared(...) // channel owns dispatch; kernel runs record + finalize
runtime.channel.turn.buildContext(...) // pure facts to FinalizedMsgContext mapping
```
Two older runtime helpers remain available for Plugin SDK compatibility:
```typescript
runtime.channel.turn.runResolved(...) // deprecated compatibility alias; prefer run
runtime.channel.turn.dispatchAssembled(...) // deprecated compatibility alias; prefer run or runPrepared
```
### run
Use when your channel can express its inbound flow as a `ChannelTurnAdapter<TRaw>`. The adapter has callbacks for `ingest`, optional `classify`, optional `preflight`, mandatory `resolveTurn`, and optional `onFinalize`.
```typescript
await runtime.channel.turn.run({
channel: "tlon",
accountId,
raw: platformEvent,
adapter: {
ingest(raw) {
return {
id: raw.messageId,
timestamp: raw.timestamp,
rawText: raw.body,
textForAgent: raw.body,
};
},
classify(input) {
return { kind: "message", canStartAgentTurn: input.rawText.length > 0 };
},
async preflight(input, eventClass) {
if (await isDuplicate(input.id)) {
return { admission: { kind: "drop", reason: "dedupe" } };
}
return {};
},
resolveTurn(input) {
return buildAssembledTurn(input);
},
onFinalize(result) {
clearPendingGroupHistory(result);
},
},
});
```
`run` is the right shape when the channel has small adapter logic and benefits from owning the lifecycle through hooks.
### runPrepared
Use when the channel has a complex local dispatcher with previews, retries, edits, or thread bootstrap that must stay channel-owned. The kernel still records the inbound session before dispatch and surfaces a uniform `DispatchedChannelTurnResult`.
```typescript
const { dispatchResult } = await runtime.channel.turn.runPrepared({
channel: "matrix",
accountId,
routeSessionKey,
storePath,
ctxPayload,
recordInboundSession,
record: {
onRecordError,
updateLastRoute,
},
onPreDispatchFailure: async (err) => {
await stopStatusReactions();
},
runDispatch: async () => {
return await runMatrixOwnedDispatcher();
},
});
```
Rich channels (Matrix, Mattermost, Microsoft Teams, Feishu, QQ Bot) use `runPrepared` because their dispatcher orchestrates platform-specific behavior the kernel must not learn about.
### buildContext
A pure function that maps fact bundles into `FinalizedMsgContext`. Use it when your channel hand-rolls part of the pipeline but wants consistent context shape.
```typescript
const ctxPayload = runtime.channel.turn.buildContext({
channel: "googlechat",
accountId,
messageId,
timestamp,
from,
sender,
conversation,
route,
reply,
message,
access,
media,
supplemental,
});
```
`buildContext` is also useful inside `resolveTurn` callbacks when assembling a turn for `run`.
<Note>
Deprecated SDK helpers such as `dispatchInboundReplyWithBase` still bridge through an assembled-turn helper. New plugin code should use `run` or `runPrepared`.
</Note>
## Fact types
The facts the kernel consumes from your adapter are platform-agnostic. Translate platform objects into these shapes before handing them to the kernel.
### NormalizedTurnInput
| Field | Purpose |
| ----------------- | ---------------------------------------------------------------------------- |
| `id` | Stable message id used for dedupe and logs |
| `timestamp` | Optional epoch ms |
| `rawText` | Body as received from the platform |
| `textForAgent` | Optional cleaned body for the agent (mention strip, typing trim) |
| `textForCommands` | Optional body used for `/command` parsing |
| `raw` | Optional pass-through reference for adapter callbacks that need the original |
### ChannelEventClass
| Field | Purpose |
| ---------------------- | ----------------------------------------------------------------------- |
| `kind` | `message`, `command`, `interaction`, `reaction`, `lifecycle`, `unknown` |
| `canStartAgentTurn` | If false the kernel returns `{ kind: "handled" }` |
| `requiresImmediateAck` | Hint for adapters that need to ACK before dispatch |
### SenderFacts
| Field | Purpose |
| -------------- | -------------------------------------------------------------- |
| `id` | Stable platform sender id |
| `name` | Display name |
| `username` | Handle if distinct from `name` |
| `tag` | Discord-style discriminator or platform tag |
| `roles` | Role ids, used for member-role allowlist matching |
| `isBot` | True when the sender is a known bot (kernel uses for dropping) |
| `isSelf` | True when the sender is the configured agent itself |
| `displayLabel` | Pre-rendered label for envelope text |
### ConversationFacts
| Field | Purpose |
| ----------------- | -------------------------------------------------------------------- |
| `kind` | `direct`, `group`, or `channel` |
| `id` | Conversation id used for routing |
| `label` | Human label for the envelope |
| `spaceId` | Optional outer space identifier (Slack workspace, Matrix homeserver) |
| `parentId` | Outer conversation id when this is a thread |
| `threadId` | Thread id when this message is inside a thread |
| `nativeChannelId` | Platform-native channel id when different from the routing id |
| `routePeer` | Peer used for `resolveAgentRoute` lookup |
### RouteFacts
| Field | Purpose |
| ----------------------- | ---------------------------------------------------------- |
| `agentId` | Agent that should handle this turn |
| `accountId` | Optional override (multi-account channels) |
| `routeSessionKey` | Session key used for routing |
| `dispatchSessionKey` | Session key used at dispatch when different from route key |
| `persistedSessionKey` | Session key written to persisted session metadata |
| `parentSessionKey` | Parent for branched/threaded sessions |
| `modelParentSessionKey` | Model-side parent for branched sessions |
| `mainSessionKey` | Main DM owner pin for direct conversations |
| `createIfMissing` | Allow record step to create a missing session row |
### ReplyPlanFacts
| Field | Purpose |
| ------------------------- | ------------------------------------------------------- |
| `to` | Logical reply target written into context `To` |
| `originatingTo` | Originating context target (`OriginatingTo`) |
| `nativeChannelId` | Platform-native channel id for delivery |
| `replyTarget` | Final visible-reply destination if it differs from `to` |
| `deliveryTarget` | Lower-level delivery override |
| `replyToId` | Quoted/anchored message id |
| `replyToIdFull` | Full-form quoted id when the platform has both |
| `messageThreadId` | Thread id at delivery time |
| `threadParentId` | Parent message id of the thread |
| `sourceReplyDeliveryMode` | `thread`, `reply`, `channel`, `direct`, or `none` |
### AccessFacts
`AccessFacts` carries the booleans the authorize stage needs. Identity matching stays in the channel: the kernel only consumes the result.
| Field | Purpose |
| ---------- | ------------------------------------------------------------------------- |
| `dm` | DM allow/pairing/deny decision and `allowFrom` list |
| `group` | Group policy, route allow, sender allow, allowlist, mention requirement |
| `commands` | Command authorization across configured authorizers |
| `mentions` | Whether mention detection is possible and whether the agent was mentioned |
### MessageFacts
| Field | Purpose |
| ---------------- | -------------------------------------------------------------- |
| `body` | Final envelope body (formatted) |
| `rawBody` | Raw inbound body |
| `bodyForAgent` | Body the agent sees |
| `commandBody` | Body used for command parsing |
| `envelopeFrom` | Pre-rendered sender label for the envelope |
| `senderLabel` | Optional override for the rendered sender |
| `preview` | Short redacted preview for logs |
| `inboundHistory` | Recent inbound history entries when the channel keeps a buffer |
### SupplementalContextFacts
Supplemental context covers quote, forwarded, and thread-bootstrap context. The kernel applies the configured `contextVisibility` policy. The channel adapter only provides facts and `senderAllowed` flags so cross-channel policy stays consistent.
### InboundMediaFacts
Media is fact-shaped. Platform download, auth, SSRF policy, CDN rules, and decryption stay channel-local. The kernel maps facts into `MediaPath`, `MediaUrl`, `MediaType`, `MediaPaths`, `MediaUrls`, `MediaTypes`, and `MediaTranscribedIndexes`.
## Adapter contract
For full `run`, the adapter shape is:
```typescript
type ChannelTurnAdapter<TRaw> = {
ingest(raw: TRaw): Promise<NormalizedTurnInput | null> | NormalizedTurnInput | null;
classify?(input: NormalizedTurnInput): Promise<ChannelEventClass> | ChannelEventClass;
preflight?(
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
): Promise<PreflightFacts | ChannelTurnAdmission | null | undefined>;
resolveTurn(
input: NormalizedTurnInput,
eventClass: ChannelEventClass,
preflight: PreflightFacts,
): Promise<ChannelTurnResolved> | ChannelTurnResolved;
onFinalize?(result: ChannelTurnResult): Promise<void> | void;
};
```
`resolveTurn` returns a `ChannelTurnResolved`, which is an `AssembledChannelTurn` with an optional admission kind. Returning `{ admission: { kind: "observeOnly" } }` runs the turn without producing visible output. The adapter still owns the delivery callback; it just becomes a no-op for that turn.
`onFinalize` runs on every result, including dispatch errors. Use it to clear pending group history, remove ack reactions, stop status indicators, and flush local state.
## Delivery adapter
The kernel does not call the platform directly. The channel hands the kernel a `ChannelTurnDeliveryAdapter`:
```typescript
type ChannelTurnDeliveryAdapter = {
deliver(payload: ReplyPayload, info: ChannelDeliveryInfo): Promise<ChannelDeliveryResult | void>;
onError?(err: unknown, info: { kind: string }): void;
};
type ChannelDeliveryResult = {
messageIds?: string[];
threadId?: string;
replyToId?: string;
visibleReplySent?: boolean;
};
```
`deliver` is called once per buffered reply chunk. Return platform message ids when the channel has them so the dispatcher can preserve thread anchors and edit later chunks. For observe-only turns, return `{ visibleReplySent: false }` or use `createNoopChannelTurnDeliveryAdapter()`.
## Record options
The record stage wraps `recordInboundSession`. Most channels can use the defaults. Override via `record`:
```typescript
record: {
groupResolution,
createIfMissing: true,
updateLastRoute,
onRecordError: (err) => log.warn("record failed", err),
trackSessionMetaTask: (task) => pendingTasks.push(task),
}
```
The dispatcher waits for the record stage. If record throws, the kernel runs `onPreDispatchFailure` (when provided to `runPrepared`) and rethrows.
## Observability
Each stage emits a structured event when a `log` callback is supplied:
```typescript
await runtime.channel.turn.run({
channel: "twitch",
accountId,
raw,
adapter,
log: (event) => {
runtime.log?.debug?.(`turn.${event.stage}:${event.event}`, {
channel: event.channel,
accountId: event.accountId,
messageId: event.messageId,
sessionKey: event.sessionKey,
admission: event.admission,
reason: event.reason,
});
},
});
```
Logged stages: `ingest`, `classify`, `preflight`, `resolve`, `authorize`, `assemble`, `record`, `dispatch`, `finalize`. Avoid logging raw bodies; use `MessageFacts.preview` for short redacted previews.
## What stays channel-local
The kernel owns orchestration. The channel still owns:
- Platform transports (gateway, REST, websocket, polling, webhooks)
- Identity resolution and display-name matching
- Native commands, slash commands, autocomplete, modals, buttons, voice state
- Card, modal, and adaptive-card rendering
- Media auth, CDN rules, encrypted media, transcription
- Edit, reaction, redaction, and presence APIs
- Backfill and platform-side history fetch
- Pairing flows that require platform-specific verification
If two channels start needing the same helper for one of these, extract a shared SDK helper instead of pushing it into the kernel.
## Stability
`runtime.channel.turn.*` is part of the public plugin runtime surface. The fact types (`SenderFacts`, `ConversationFacts`, `RouteFacts`, `ReplyPlanFacts`, `AccessFacts`, `MessageFacts`, `SupplementalContextFacts`, `InboundMediaFacts`) and admission shapes (`ChannelTurnAdmission`, `ChannelEventClass`) are reachable through `PluginRuntime` from `openclaw/plugin-sdk/core`.
Backward compatibility rules apply: new fact fields are additive, admission kinds are not renamed, and the entry point names stay stable. New channel needs that require a non-additive change must go through the plugin SDK migration process.
## Related
- [Building channel plugins](/plugins/sdk-channel-plugins) for the broader channel plugin contract
- [Plugin runtime helpers](/plugins/sdk-runtime) for other `runtime.*` surfaces
- [Plugin internals](/plugins/architecture-internals) for load pipeline and registry mechanics

View File

@@ -135,8 +135,6 @@ function createTestRuntime(overrides?: {
},
turn: {
runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"],
dispatchAssembled:
vi.fn() as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},
pairing: {
readAllowFromStore: vi.fn(overrides?.readAllowFromStore ?? (async () => [])),

View File

@@ -295,60 +295,62 @@ async function processMessageWithPipeline(params: {
accountId: route.accountId,
});
await core.channel.turn.runResolved({
await core.channel.turn.run({
channel: "googlechat",
accountId: route.accountId,
raw: message,
input: {
id: message.name ?? spaceId,
timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
adapter: {
ingest: () => ({
id: message.name ?? spaceId,
timestamp: event.eventTime ? Date.parse(event.eventTime) : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
}),
resolveTurn: () => ({
cfg: config,
channel: "googlechat",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverGoogleChatReply({
payload,
account,
spaceId,
runtime,
core,
config,
statusSink,
typingMessageName,
});
// Only use typing message for first delivery
typingMessageName = undefined;
},
onError: (err, info) => {
runtime.error?.(
`[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`,
);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`);
},
},
}),
},
resolveTurn: () => ({
cfg: config,
channel: "googlechat",
accountId: route.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverGoogleChatReply({
payload,
account,
spaceId,
runtime,
core,
config,
statusSink,
typingMessageName,
});
// Only use typing message for first delivery
typingMessageName = undefined;
},
onError: (err, info) => {
runtime.error?.(
`[${account.accountId}] Google Chat ${info.kind} reply failed: ${String(err)}`,
);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`googlechat: failed updating session meta: ${String(err)}`);
},
},
}),
});
}

View File

@@ -230,7 +230,6 @@ export function createMatrixHandlerTestHarness(
},
turn: {
runPrepared,
dispatchAssembled: vi.fn(),
},
reactions: {
shouldAckReaction: options.shouldAckReaction ?? (() => false),

View File

@@ -253,7 +253,6 @@ function createRuntimeCore(cfg: OpenClawConfig) {
},
turn: {
runPrepared,
dispatchAssembled: vi.fn(),
},
text: {
chunkMarkdownTextWithMode: (text: string) => [text],

View File

@@ -91,8 +91,6 @@ export function installMSTeamsTestRuntime(options: MSTeamsTestRuntimeOptions = {
},
turn: {
runPrepared: runPrepared as unknown as PluginRuntime["channel"]["turn"]["runPrepared"],
dispatchAssembled:
vi.fn() as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},
},
} as unknown as PluginRuntime);

View File

@@ -169,45 +169,7 @@ vi.mock("./runtime.js", () => ({
routeSessionKey: resolved.routeSessionKey,
};
}),
runResolved: vi.fn(async (params) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return { admission: { kind: "drop", reason: "ingest-null" }, dispatched: false };
}
const resolved = await params.resolveTurn(input, {
kind: "message",
canStartAgentTurn: true,
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: mockRuntimeConfig,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: resolved.delivery.deliver,
onError: resolved.delivery.onError,
},
});
return {
admission: { kind: "dispatch" },
dispatched: true,
dispatchResult,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
};
}),
buildContext: buildChannelTurnContextMock,
dispatchAssembled: vi.fn(async (params) => ({
dispatchResult: await params.dispatchReplyWithBufferedBlockDispatcher({
ctx: params.ctxPayload,
cfg: mockRuntimeConfig,
dispatcherOptions: {
...params.dispatcherOptions,
deliver: params.delivery.deliver,
onError: params.delivery.onError,
},
}),
})),
},
},
})),

View File

@@ -72,96 +72,98 @@ export async function dispatchSynologyChatInboundTurn(params: {
userId: params.msg.from,
});
await resolved.rt.channel.turn.runResolved({
await resolved.rt.channel.turn.run({
channel: CHANNEL_ID,
accountId: params.account.accountId,
raw: params.msg,
input: (msg) => ({
id: `${params.account.accountId}:${msg.from}`,
timestamp: Date.now(),
rawText: msg.body,
textForAgent: msg.body,
textForCommands: msg.body,
raw: msg,
}),
resolveTurn: (input) => {
const chatKind =
params.msg.chatType === "group" || params.msg.chatType === "channel"
? params.msg.chatType
: "direct";
const msgCtx = resolved.rt.channel.turn.buildContext({
channel: CHANNEL_ID,
accountId: params.account.accountId,
timestamp: input.timestamp,
from: `synology-chat:${params.msg.from}`,
sender: {
id: params.msg.from,
name: params.msg.senderName,
},
conversation: {
kind: chatKind,
id: params.msg.from,
label: params.msg.senderName || params.msg.from,
routePeer: {
kind: "direct",
id: params.msg.from,
},
},
route: {
agentId: resolved.route.agentId,
adapter: {
ingest: (msg) => ({
id: `${params.account.accountId}:${msg.from}`,
timestamp: Date.now(),
rawText: msg.body,
textForAgent: msg.body,
textForCommands: msg.body,
raw: msg,
}),
resolveTurn: (input) => {
const chatKind =
params.msg.chatType === "group" || params.msg.chatType === "channel"
? params.msg.chatType
: "direct";
const msgCtx = resolved.rt.channel.turn.buildContext({
channel: CHANNEL_ID,
accountId: params.account.accountId,
routeSessionKey: resolved.sessionKey,
dispatchSessionKey: resolved.sessionKey,
},
reply: {
to: `synology-chat:${params.msg.from}`,
originatingTo: `synology-chat:${params.msg.from}`,
},
message: {
rawBody: input.rawText,
commandBody: input.textForCommands,
bodyForAgent: input.textForAgent,
envelopeFrom: params.msg.senderName,
},
extra: {
ChatType: params.msg.chatType,
CommandAuthorized: params.msg.commandAuthorized,
},
});
const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, {
agentId: resolved.route.agentId,
});
return {
cfg: currentCfg,
channel: CHANNEL_ID,
accountId: params.account.accountId,
agentId: resolved.route.agentId,
routeSessionKey: resolved.route.sessionKey,
storePath,
ctxPayload: msgCtx,
recordInboundSession: resolved.rt.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverSynologyChatReply({
account: params.account,
sendUserId,
payload,
});
timestamp: input.timestamp,
from: `synology-chat:${params.msg.from}`,
sender: {
id: params.msg.from,
name: params.msg.senderName,
},
},
dispatcherOptions: {
onReplyStart: () => {
params.log?.info?.(`Agent reply started for ${params.msg.from}`);
conversation: {
kind: chatKind,
id: params.msg.from,
label: params.msg.senderName || params.msg.from,
routePeer: {
kind: "direct",
id: params.msg.from,
},
},
},
record: {
onRecordError: (err) => {
params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err);
route: {
agentId: resolved.route.agentId,
accountId: params.account.accountId,
routeSessionKey: resolved.sessionKey,
dispatchSessionKey: resolved.sessionKey,
},
},
};
reply: {
to: `synology-chat:${params.msg.from}`,
originatingTo: `synology-chat:${params.msg.from}`,
},
message: {
rawBody: input.rawText,
commandBody: input.textForCommands,
bodyForAgent: input.textForAgent,
envelopeFrom: params.msg.senderName,
},
extra: {
ChatType: params.msg.chatType,
CommandAuthorized: params.msg.commandAuthorized,
},
});
const storePath = resolved.rt.channel.session.resolveStorePath(currentCfg.session?.store, {
agentId: resolved.route.agentId,
});
return {
cfg: currentCfg,
channel: CHANNEL_ID,
accountId: params.account.accountId,
agentId: resolved.route.agentId,
routeSessionKey: resolved.route.sessionKey,
storePath,
ctxPayload: msgCtx,
recordInboundSession: resolved.rt.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
resolved.rt.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverSynologyChatReply({
account: params.account,
sendUserId,
payload,
});
},
},
dispatcherOptions: {
onReplyStart: () => {
params.log?.info?.(`Agent reply started for ${params.msg.from}`);
},
},
record: {
onRecordError: (err) => {
params.log?.info?.(`Session metadata update failed for ${params.msg.from}`, err);
},
},
};
},
},
});

View File

@@ -51,126 +51,128 @@ async function processTwitchMessage(params: {
const { message, account, accountId, config, runtime, core, statusSink } = params;
const cfg = config as OpenClawConfig;
await core.channel.turn.runResolved({
await core.channel.turn.run({
channel: "twitch",
accountId,
raw: message,
input: (incoming) => ({
id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`,
timestamp: incoming.timestamp?.getTime(),
rawText: incoming.message,
textForAgent: incoming.message,
textForCommands: incoming.message,
raw: incoming,
}),
resolveTurn: (input) => {
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "twitch",
accountId,
peer: {
kind: "group",
id: message.channel,
},
});
const senderId = message.userId ?? message.username;
const fromLabel = message.displayName ?? message.username;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Twitch",
from: fromLabel,
timestamp: input.timestamp,
envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg),
body: input.rawText,
});
const ctxPayload = core.channel.turn.buildContext({
channel: "twitch",
accountId,
messageId: input.id,
timestamp: input.timestamp,
from: `twitch:user:${senderId}`,
sender: {
id: senderId,
name: fromLabel,
username: message.username,
},
conversation: {
kind: "group",
id: message.channel,
label: message.channel,
routePeer: {
adapter: {
ingest: (incoming) => ({
id: incoming.id ?? `${incoming.channel}:${incoming.timestamp?.getTime() ?? Date.now()}`,
timestamp: incoming.timestamp?.getTime(),
rawText: incoming.message,
textForAgent: incoming.message,
textForCommands: incoming.message,
raw: incoming,
}),
resolveTurn: (input) => {
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "twitch",
accountId,
peer: {
kind: "group",
id: message.channel,
},
},
route: {
});
const senderId = message.userId ?? message.username;
const fromLabel = message.displayName ?? message.username;
const body = core.channel.reply.formatAgentEnvelope({
channel: "Twitch",
from: fromLabel,
timestamp: input.timestamp,
envelope: core.channel.reply.resolveEnvelopeFormatOptions(cfg),
body: input.rawText,
});
const ctxPayload = core.channel.turn.buildContext({
channel: "twitch",
accountId,
messageId: input.id,
timestamp: input.timestamp,
from: `twitch:user:${senderId}`,
sender: {
id: senderId,
name: fromLabel,
username: message.username,
},
conversation: {
kind: "group",
id: message.channel,
label: message.channel,
routePeer: {
kind: "group",
id: message.channel,
},
},
route: {
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `twitch:channel:${message.channel}`,
originatingTo: `twitch:channel:${message.channel}`,
},
message: {
body,
rawBody: input.rawText,
bodyForAgent: input.textForAgent,
commandBody: input.textForCommands,
envelopeFrom: fromLabel,
},
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,
channel: "twitch",
accountId,
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "twitch",
accountId,
});
return {
cfg,
channel: "twitch",
accountId,
agentId: route.agentId,
accountId: route.accountId,
routeSessionKey: route.sessionKey,
},
reply: {
to: `twitch:channel:${message.channel}`,
originatingTo: `twitch:channel:${message.channel}`,
},
message: {
body,
rawBody: input.rawText,
bodyForAgent: input.textForAgent,
commandBody: input.textForCommands,
envelopeFrom: fromLabel,
},
});
const storePath = core.channel.session.resolveStorePath(cfg.session?.store, {
agentId: route.agentId,
});
const tableMode = core.channel.text.resolveMarkdownTableMode({
cfg,
channel: "twitch",
accountId,
});
const { onModelSelected, ...replyPipeline } = createChannelReplyPipeline({
cfg,
agentId: route.agentId,
channel: "twitch",
accountId,
});
return {
cfg,
channel: "twitch",
accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverTwitchReply({
payload,
channel: message.channel,
account,
accountId,
config,
tableMode,
runtime,
statusSink,
});
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverTwitchReply({
payload,
channel: message.channel,
account,
accountId,
config,
tableMode,
runtime,
statusSink,
});
},
onError: (err, info) => {
runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`);
},
},
onError: (err, info) => {
runtime.error?.(`Twitch ${info.kind} reply failed: ${String(err)}`);
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`Failed updating session meta: ${String(err)}`);
record: {
onRecordError: (err) => {
runtime.error?.(`Failed updating session meta: ${String(err)}`);
},
},
},
};
};
},
},
});
}

View File

@@ -667,63 +667,67 @@ async function processMessageWithPipeline(params: ZaloMessagePipelineParams): Pr
},
});
await core.channel.turn.runResolved({
await core.channel.turn.run({
channel: "zalo",
accountId: account.accountId,
raw: message,
input: {
id: message_id,
timestamp: date ? date * 1000 : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
adapter: {
ingest: () => ({
id: message_id,
timestamp: date ? date * 1000 : undefined,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: rawBody,
raw: message,
}),
resolveTurn: () => ({
cfg: config,
channel: "zalo",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
webhookUrl: params.webhookUrl,
webhookPath: params.webhookPath,
proxyUrl: account.config.proxy,
mediaMaxBytes: params.mediaMaxMb * 1024 * 1024,
canHostMedia: params.canHostMedia,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(
`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`,
);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
},
}),
},
resolveTurn: () => ({
cfg: config,
channel: "zalo",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZaloReply({
payload,
token,
chatId,
runtime,
core,
config,
webhookUrl: params.webhookUrl,
webhookPath: params.webhookPath,
proxyUrl: account.config.proxy,
mediaMaxBytes: params.mediaMaxMb * 1024 * 1024,
canHostMedia: params.canHostMedia,
accountId: account.accountId,
statusSink,
fetcher,
tableMode,
});
},
onError: (err, info) => {
runtime.error?.(`[${account.accountId}] Zalo ${info.kind} reply failed: ${String(err)}`);
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalo: failed updating session meta: ${String(err)}`);
},
},
}),
});
}

View File

@@ -278,90 +278,8 @@ export function createImageLifecycleCore() {
dispatchResult,
};
}) as unknown as PluginRuntime["channel"]["turn"]["run"],
runResolved: vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const resolved = await params.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
await resolved.recordInboundSession({
storePath: resolved.storePath,
sessionKey: resolved.ctxPayload.SessionKey ?? resolved.routeSessionKey,
ctx: resolved.ctxPayload,
groupResolution: resolved.record?.groupResolution,
createIfMissing: resolved.record?.createIfMissing,
updateLastRoute: resolved.record?.updateLastRoute,
onRecordError: resolved.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await resolved.dispatchReplyWithBufferedBlockDispatcher({
ctx: resolved.ctxPayload,
cfg: resolved.cfg,
dispatcherOptions: {
...resolved.dispatcherOptions,
deliver: async (payload, info) => {
await resolved.delivery.deliver(payload, info);
},
onError: resolved.delivery.onError,
},
replyOptions: resolved.replyOptions,
replyResolver: resolved.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: resolved.ctxPayload,
routeSessionKey: resolved.routeSessionKey,
dispatchResult,
};
},
) as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
buildContext:
buildChannelTurnContextMock as unknown as PluginRuntime["channel"]["turn"]["buildContext"],
dispatchAssembled: vi.fn(
async (turn: Parameters<PluginRuntime["channel"]["turn"]["dispatchAssembled"]>[0]) => {
await turn.recordInboundSession({
storePath: turn.storePath,
sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey,
ctx: turn.ctxPayload,
groupResolution: turn.record?.groupResolution,
createIfMissing: turn.record?.createIfMissing,
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({
ctx: turn.ctxPayload,
cfg: turn.cfg,
dispatcherOptions: {
...turn.dispatcherOptions,
deliver: async (payload, info) => {
await turn.delivery.deliver(payload, info);
},
onError: turn.delivery.onError,
},
replyOptions: turn.replyOptions,
replyResolver: turn.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult,
};
},
) as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},
commands: {
shouldComputeCommandAuthorized: vi.fn(

View File

@@ -89,39 +89,40 @@ function installRuntime(params: {
const readSessionUpdatedAt = vi.fn(
(_params?: { storePath: string; sessionKey: string }): number | undefined => undefined,
);
const dispatchAssembled = vi.fn(
async (turn: Parameters<PluginRuntime["channel"]["turn"]["dispatchAssembled"]>[0]) => {
await turn.recordInboundSession({
storePath: turn.storePath,
sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey,
ctx: turn.ctxPayload,
groupResolution: turn.record?.groupResolution,
createIfMissing: turn.record?.createIfMissing,
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({
ctx: turn.ctxPayload,
cfg: turn.cfg,
dispatcherOptions: {
...turn.dispatcherOptions,
deliver: async (payload, info) => {
await turn.delivery.deliver(payload, info);
},
onError: turn.delivery.onError,
type ResolvedTurn = Awaited<
ReturnType<Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]["adapter"]["resolveTurn"]>
>;
const dispatchAssembled = vi.fn(async (turn: ResolvedTurn) => {
await turn.recordInboundSession({
storePath: turn.storePath,
sessionKey: turn.ctxPayload.SessionKey ?? turn.routeSessionKey,
ctx: turn.ctxPayload,
groupResolution: turn.record?.groupResolution,
createIfMissing: turn.record?.createIfMissing,
updateLastRoute: turn.record?.updateLastRoute,
onRecordError: turn.record?.onRecordError ?? (() => undefined),
});
const dispatchResult = await turn.dispatchReplyWithBufferedBlockDispatcher({
ctx: turn.ctxPayload,
cfg: turn.cfg,
dispatcherOptions: {
...turn.dispatcherOptions,
deliver: async (payload, info) => {
await turn.delivery.deliver(payload, info);
},
replyOptions: turn.replyOptions,
replyResolver: turn.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult,
};
},
);
onError: turn.delivery.onError,
},
replyOptions: turn.replyOptions,
replyResolver: turn.replyResolver,
});
return {
admission: { kind: "dispatch" as const },
dispatched: true,
ctxPayload: turn.ctxPayload,
routeSessionKey: turn.routeSessionKey,
dispatchResult,
};
});
const runTurn = vi.fn(async (params: Parameters<PluginRuntime["channel"]["turn"]["run"]>[0]) => {
const input = await params.adapter.ingest(params.raw);
if (!input) {
@@ -137,27 +138,6 @@ function installRuntime(params: {
);
return await dispatchAssembled(resolved);
});
const runResolvedTurn = vi.fn(
async (params: Parameters<PluginRuntime["channel"]["turn"]["runResolved"]>[0]) => {
const input =
typeof params.input === "function" ? await params.input(params.raw) : params.input;
if (!input) {
return {
admission: { kind: "drop" as const, reason: "ingest-null" },
dispatched: false,
};
}
const resolved = await params.resolveTurn(
input,
{
kind: "message",
canStartAgentTurn: true,
},
{},
);
return await dispatchAssembled(resolved);
},
);
const buildContext = vi.fn(
(params: Parameters<PluginRuntime["channel"]["turn"]["buildContext"]>[0]) =>
({
@@ -264,10 +244,7 @@ function installRuntime(params: {
},
turn: {
run: runTurn as unknown as PluginRuntime["channel"]["turn"]["run"],
runResolved: runResolvedTurn as unknown as PluginRuntime["channel"]["turn"]["runResolved"],
buildContext: buildContext as unknown as PluginRuntime["channel"]["turn"]["buildContext"],
dispatchAssembled:
dispatchAssembled as unknown as PluginRuntime["channel"]["turn"]["dispatchAssembled"],
},
text: {
resolveMarkdownTableMode: vi.fn(() => "code"),

View File

@@ -671,64 +671,66 @@ async function processMessage(
},
});
await core.channel.turn.runResolved({
await core.channel.turn.run({
channel: "zalouser",
accountId: account.accountId,
raw: message,
input: {
id: messageSid ?? `${message.timestampMs}`,
timestamp: message.timestampMs,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: commandBody,
raw: message,
},
resolveTurn: () => ({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZalouserReply({
payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string },
profile: account.profile,
chatId,
isGroup,
runtime,
core,
config,
accountId: account.accountId,
statusSink,
tableMode: core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalouser",
adapter: {
ingest: () => ({
id: messageSid ?? `${message.timestampMs}`,
timestamp: message.timestampMs,
rawText: rawBody,
textForAgent: rawBody,
textForCommands: commandBody,
raw: message,
}),
resolveTurn: () => ({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
agentId: route.agentId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
recordInboundSession: core.channel.session.recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher:
core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
delivery: {
deliver: async (payload) => {
await deliverZalouserReply({
payload: payload as { text?: string; mediaUrls?: string[]; mediaUrl?: string },
profile: account.profile,
chatId,
isGroup,
runtime,
core,
config,
accountId: account.accountId,
}),
});
statusSink,
tableMode: core.channel.text.resolveMarkdownTableMode({
cfg: config,
channel: "zalouser",
accountId: account.accountId,
}),
});
},
onError: (err, info) => {
runtime.error(
`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`,
);
},
},
onError: (err, info) => {
runtime.error(
`[${account.accountId}] Zalouser ${info.kind} reply failed: ${String(err)}`,
);
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
},
dispatcherOptions: replyPipeline,
replyOptions: {
onModelSelected,
},
record: {
onRecordError: (err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
record: {
onRecordError: (err) => {
runtime.error?.(`zalouser: failed updating session meta: ${String(err)}`);
},
},
},
}),
}),
},
});
if (isGroup && historyKey) {
clearHistoryEntriesIfEnabled({

View File

@@ -195,8 +195,9 @@ describe("channel turn kernel", () => {
expect(resolveTurn).not.toHaveBeenCalled();
});
it("runs observe-only preflights through resolve, record, dispatch, and finalize", async () => {
it("runs observe-only preflights through resolve, record, dispatch, and finalize without visible delivery", async () => {
const events: string[] = [];
const deliver = vi.fn();
const onFinalize = vi.fn();
const result = await runChannelTurn({
channel: "test",
@@ -213,7 +214,7 @@ describe("channel turn kernel", () => {
ctxPayload: createCtx({ SessionKey: "agent:observer:test:peer" }),
recordInboundSession: createRecordInboundSession(events),
dispatchReplyWithBufferedBlockDispatcher: createDispatch(events),
delivery: createNoopChannelTurnDeliveryAdapter(),
delivery: { deliver },
record: {
onRecordError: vi.fn(),
},
@@ -228,6 +229,7 @@ describe("channel turn kernel", () => {
});
expect(result.dispatched).toBe(true);
expect(events).toEqual(["record", "dispatch"]);
expect(deliver).not.toHaveBeenCalled();
expect(onFinalize).toHaveBeenCalledWith(
expect.objectContaining({
admission: { kind: "observeOnly", reason: "broadcast-observer" },

View File

@@ -234,7 +234,14 @@ export async function runChannelTurn<TRaw>(
const admission = resolved.admission ?? preflightAdmission ?? ({ kind: "dispatch" } as const);
let result: ChannelTurnResult;
try {
const dispatchResult = await dispatchAssembledChannelTurn(resolved);
const dispatchResult = await dispatchAssembledChannelTurn(
admission.kind === "observeOnly"
? {
...resolved,
delivery: createNoopChannelTurnDeliveryAdapter(),
}
: resolved,
);
result = {
...dispatchResult,
admission,

View File

@@ -153,9 +153,11 @@ export type PluginRuntimeChannel = {
};
turn: {
run: typeof import("../../channels/turn/kernel.js").runChannelTurn;
/** @deprecated Prefer `run(...)`. */
runResolved: typeof import("../../channels/turn/kernel.js").runResolvedChannelTurn;
buildContext: typeof import("../../channels/turn/kernel.js").buildChannelTurnContext;
runPrepared: typeof import("../../channels/turn/kernel.js").runPreparedChannelTurn;
/** @deprecated Prefer `run(...)` or `runPrepared(...)`. */
dispatchAssembled: typeof import("../../channels/turn/kernel.js").dispatchAssembledChannelTurn;
};
threadBindings: {