mirror of
https://fastgit.cc/github.com/openclaw/openclaw
synced 2026-04-30 22:12:32 +08:00
Add cron changed plugin hook (#72773)
* feat: add cron changed plugin hook * fix: improve cron_changed hook correctness and code quality - Fix PluginHookGatewayCronDeliveryStatus: replace 'error' with 'unknown' to match internal CronDeliveryStatus enum - Add job snapshot to CronEvent so removed events carry the deleted job - Extract pickDefined helper, replace 14-field verbose spread mapping - Add toPluginCronJob mapper for explicit internal→public type boundary - Fix schedule union: use literal-only kind discriminants for TS narrowing - Use loadConfig() (runtime) instead of params.cfg (startup) in hook ctx - Use formatErrorMessage instead of String(err) for stack preservation - Fix pre-existing getCron TS2322 with explicit cast (matches gateway_start) - Re-export supporting types from hooks.ts for plugin consumers - Add tests: removed events with job, finished with full fields, runtime cfg
This commit is contained in:
@@ -299,6 +299,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Control UI/Talk: add a generic browser realtime transport contract, Google Live browser Talk sessions with constrained ephemeral tokens, and a Gateway relay for backend-only realtime voice plugins. Thanks @VACInc.
|
||||
- CLI/models: route provider-filtered model listing through an explicit source plan so user config, installed manifest rows, Provider Index previews, and scoped runtime fallbacks keep a stable authority order without adding another catalog cache. Thanks @shakkernerd.
|
||||
- Plugins/cron: add a typed `cron_changed` hook for observing gateway-owned cron lifecycle updates without depending on internal cron events. Thanks @amknight.
|
||||
- Providers: add Cerebras as a bundled plugin with onboarding, static model catalog, docs, and manifest-owned endpoint metadata.
|
||||
- Memory/OpenAI-compatible: add optional `memorySearch.inputType`, `queryInputType`, and `documentInputType` config for asymmetric embedding endpoints, including direct query embeddings and provider batch indexing. Carries forward #63313 and #60727. Thanks @HOYALIM and @prospect1314521.
|
||||
- Ollama/memory: add model-specific retrieval query prefixes for `nomic-embed-text`, `qwen3-embedding`, and `mxbai-embed-large` memory-search queries while leaving document batches unchanged. Carries forward #45013. Thanks @laolin5564.
|
||||
|
||||
@@ -300,6 +300,7 @@ semantics.
|
||||
- `message_received`: use the typed `threadId` field when you need inbound thread/topic routing. Keep `metadata` for channel-specific extras.
|
||||
- `message_sending`: use typed `replyToId` / `threadId` routing fields before falling back to channel-specific `metadata`.
|
||||
- `gateway_start`: use `ctx.config`, `ctx.workspaceDir`, and `ctx.getCron?.()` for gateway-owned startup state instead of relying on internal `gateway:startup` hooks.
|
||||
- `cron_changed`: observe gateway-owned cron lifecycle changes. Use `event.job?.state?.nextRunAtMs` and `ctx.getCron?.()` when syncing external wake schedulers, and keep OpenClaw as the source of truth for due checks and execution.
|
||||
|
||||
### API object fields
|
||||
|
||||
|
||||
@@ -183,6 +183,7 @@ export async function start(state: CronServiceState) {
|
||||
emit(state, {
|
||||
jobId: interrupted.jobId,
|
||||
action: "finished",
|
||||
job,
|
||||
status: "error",
|
||||
error: STARTUP_INTERRUPTED_ERROR,
|
||||
delivered: false,
|
||||
@@ -338,6 +339,7 @@ export async function add(state: CronServiceState, input: CronJobCreate) {
|
||||
emit(state, {
|
||||
jobId: job.id,
|
||||
action: "added",
|
||||
job,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
});
|
||||
return job;
|
||||
@@ -387,6 +389,7 @@ export async function update(state: CronServiceState, id: string, patch: CronJob
|
||||
emit(state, {
|
||||
jobId: id,
|
||||
action: "updated",
|
||||
job,
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
});
|
||||
return job;
|
||||
@@ -401,12 +404,13 @@ export async function remove(state: CronServiceState, id: string) {
|
||||
if (!state.store) {
|
||||
return { ok: false, removed: false } as const;
|
||||
}
|
||||
const removedJob = state.store.jobs.find((j) => j.id === id);
|
||||
state.store.jobs = state.store.jobs.filter((j) => j.id !== id);
|
||||
const removed = (state.store.jobs.length ?? 0) !== before;
|
||||
await persist(state);
|
||||
armTimer(state);
|
||||
if (removed) {
|
||||
emit(state, { jobId: id, action: "removed" });
|
||||
emit(state, { jobId: id, action: "removed", job: removedJob });
|
||||
}
|
||||
return { ok: true, removed } as const;
|
||||
});
|
||||
@@ -637,7 +641,7 @@ async function prepareManualRun(
|
||||
// Persist the running marker before releasing lock so timer ticks that
|
||||
// force-reload from disk cannot start the same job concurrently.
|
||||
await persist(state);
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now });
|
||||
emit(state, { jobId: job.id, action: "started", job, runAtMs: preflight.now });
|
||||
const taskRunId = tryCreateManualTaskRun({
|
||||
state,
|
||||
job,
|
||||
@@ -701,6 +705,7 @@ async function finishPreparedManualRun(
|
||||
emit(state, {
|
||||
jobId: job.id,
|
||||
action: "finished",
|
||||
job,
|
||||
status: coreResult.status,
|
||||
error: coreResult.error,
|
||||
summary: coreResult.summary,
|
||||
@@ -720,7 +725,7 @@ async function finishPreparedManualRun(
|
||||
|
||||
if (shouldDelete && state.store) {
|
||||
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
|
||||
emit(state, { jobId: job.id, action: "removed" });
|
||||
emit(state, { jobId: job.id, action: "removed", job });
|
||||
}
|
||||
|
||||
// Manual runs should not advance other due jobs without executing them.
|
||||
|
||||
@@ -16,6 +16,8 @@ import type {
|
||||
export type CronEvent = {
|
||||
jobId: string;
|
||||
action: "added" | "updated" | "removed" | "started" | "finished";
|
||||
/** Snapshot of the job at the time of the event. Present for all actions where the job is accessible. */
|
||||
job?: CronJob;
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
status?: CronRunStatus;
|
||||
|
||||
@@ -670,7 +670,7 @@ function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOu
|
||||
|
||||
if (shouldDelete) {
|
||||
store.jobs = jobs.filter((entry) => entry.id !== job.id);
|
||||
emit(state, { jobId: job.id, action: "removed" });
|
||||
emit(state, { jobId: job.id, action: "removed", job });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -803,7 +803,7 @@ export async function onTimer(state: CronServiceState) {
|
||||
const startedAt = state.deps.nowMs();
|
||||
job.state.runningAtMs = startedAt;
|
||||
markCronJobActive(job.id);
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
||||
emit(state, { jobId: job.id, action: "started", job, runAtMs: startedAt });
|
||||
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
|
||||
const taskRunId = tryCreateCronTaskRun({ state, job, startedAt });
|
||||
|
||||
@@ -1109,7 +1109,12 @@ async function runStartupCatchupCandidate(
|
||||
job: candidate.job,
|
||||
startedAt,
|
||||
});
|
||||
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
|
||||
emit(state, {
|
||||
jobId: candidate.job.id,
|
||||
action: "started",
|
||||
job: candidate.job,
|
||||
runAtMs: startedAt,
|
||||
});
|
||||
try {
|
||||
const result = await executeJobCoreWithTimeout(state, candidate.job);
|
||||
return {
|
||||
@@ -1408,7 +1413,7 @@ export async function executeJob(
|
||||
job.state.runningAtMs = startedAt;
|
||||
job.state.lastError = undefined;
|
||||
markCronJobActive(job.id);
|
||||
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
|
||||
emit(state, { jobId: job.id, action: "started", job, runAtMs: startedAt });
|
||||
|
||||
let coreResult: {
|
||||
status: CronRunStatus;
|
||||
@@ -1435,7 +1440,7 @@ export async function executeJob(
|
||||
|
||||
if (shouldDelete && state.store) {
|
||||
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
|
||||
emit(state, { jobId: job.id, action: "removed" });
|
||||
emit(state, { jobId: job.id, action: "removed", job });
|
||||
}
|
||||
clearCronJobActive(job.id);
|
||||
}
|
||||
@@ -1454,6 +1459,7 @@ function emitJobFinished(
|
||||
emit(state, {
|
||||
jobId: job.id,
|
||||
action: "finished",
|
||||
job,
|
||||
status: result.status,
|
||||
error: result.error,
|
||||
summary: result.summary,
|
||||
|
||||
@@ -14,6 +14,8 @@ const {
|
||||
fetchWithSsrFGuardMock,
|
||||
runCronIsolatedAgentTurnMock,
|
||||
cleanupBrowserSessionsForLifecycleEndMock,
|
||||
getGlobalHookRunnerMock,
|
||||
runCronChangedMock,
|
||||
} = vi.hoisted(() => ({
|
||||
enqueueSystemEventMock: vi.fn(),
|
||||
requestHeartbeatNowMock: vi.fn(),
|
||||
@@ -24,6 +26,11 @@ const {
|
||||
fetchWithSsrFGuardMock: vi.fn(),
|
||||
runCronIsolatedAgentTurnMock: vi.fn(async () => ({ status: "ok" as const, summary: "ok" })),
|
||||
cleanupBrowserSessionsForLifecycleEndMock: vi.fn(async () => {}),
|
||||
runCronChangedMock: vi.fn(async () => {}),
|
||||
getGlobalHookRunnerMock: vi.fn(() => ({
|
||||
hasHooks: (hookName: string) => hookName === "cron_changed",
|
||||
runCronChanged: runCronChangedMock,
|
||||
})),
|
||||
}));
|
||||
|
||||
function enqueueSystemEvent(...args: unknown[]) {
|
||||
@@ -65,6 +72,14 @@ vi.mock("../config/config.js", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../config/io.js", async () => {
|
||||
const actual = await vi.importActual<typeof import("../config/io.js")>("../config/io.js");
|
||||
return {
|
||||
...actual,
|
||||
getRuntimeConfig: () => loadConfigMock(),
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/net/fetch-guard.js", () => ({
|
||||
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
|
||||
}));
|
||||
@@ -77,6 +92,10 @@ vi.mock("../browser-lifecycle-cleanup.js", () => ({
|
||||
cleanupBrowserSessionsForLifecycleEnd: cleanupBrowserSessionsForLifecycleEndMock,
|
||||
}));
|
||||
|
||||
vi.mock("../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: getGlobalHookRunnerMock,
|
||||
}));
|
||||
|
||||
import { buildGatewayCronService } from "./server-cron.js";
|
||||
|
||||
function createCronConfig(name: string): OpenClawConfig {
|
||||
@@ -100,6 +119,121 @@ describe("buildGatewayCronService", () => {
|
||||
fetchWithSsrFGuardMock.mockClear();
|
||||
runCronIsolatedAgentTurnMock.mockClear();
|
||||
cleanupBrowserSessionsForLifecycleEndMock.mockClear();
|
||||
runCronChangedMock.mockClear();
|
||||
getGlobalHookRunnerMock.mockClear();
|
||||
getGlobalHookRunnerMock.mockReturnValue({
|
||||
hasHooks: (hookName: string) => hookName === "cron_changed",
|
||||
runCronChanged: runCronChangedMock,
|
||||
});
|
||||
});
|
||||
|
||||
it("emits cron_changed hooks with computed next run state", async () => {
|
||||
const cfg = createCronConfig("server-cron-hook");
|
||||
loadConfigMock.mockReturnValue(cfg);
|
||||
|
||||
const state = buildGatewayCronService({
|
||||
cfg,
|
||||
deps: {} as CliDeps,
|
||||
broadcast: () => {},
|
||||
});
|
||||
try {
|
||||
const job = await state.cron.add({
|
||||
name: "scheduler-hook",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "sync external wake" },
|
||||
});
|
||||
|
||||
expect(runCronChangedMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
action: "added",
|
||||
jobId: job.id,
|
||||
job: expect.objectContaining({
|
||||
id: job.id,
|
||||
state: expect.objectContaining({ nextRunAtMs: job.state.nextRunAtMs }),
|
||||
}),
|
||||
}),
|
||||
expect.objectContaining({
|
||||
config: cfg,
|
||||
getCron: expect.any(Function),
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
state.cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("cron_changed removed events include the deleted job snapshot", async () => {
|
||||
const cfg = createCronConfig("server-cron-hook-removed");
|
||||
loadConfigMock.mockReturnValue(cfg);
|
||||
|
||||
const state = buildGatewayCronService({
|
||||
cfg,
|
||||
deps: {} as CliDeps,
|
||||
broadcast: () => {},
|
||||
});
|
||||
try {
|
||||
const job = await state.cron.add({
|
||||
name: "to-be-removed",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "will be removed" },
|
||||
});
|
||||
|
||||
runCronChangedMock.mockClear();
|
||||
await state.cron.remove(job.id);
|
||||
|
||||
expect(runCronChangedMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
action: "removed",
|
||||
jobId: job.id,
|
||||
job: expect.objectContaining({
|
||||
id: job.id,
|
||||
name: "to-be-removed",
|
||||
}),
|
||||
}),
|
||||
expect.objectContaining({
|
||||
getCron: expect.any(Function),
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
state.cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("cron_changed hook context uses runtime config from getRuntimeConfig()", async () => {
|
||||
const startupCfg = createCronConfig("server-cron-hook-runtime-cfg");
|
||||
const runtimeCfg = { ...startupCfg, _marker: "runtime" };
|
||||
loadConfigMock.mockReturnValue(runtimeCfg);
|
||||
|
||||
const state = buildGatewayCronService({
|
||||
cfg: startupCfg,
|
||||
deps: {} as CliDeps,
|
||||
broadcast: () => {},
|
||||
});
|
||||
try {
|
||||
await state.cron.add({
|
||||
name: "runtime-cfg-check",
|
||||
enabled: true,
|
||||
schedule: { kind: "every", everyMs: 60_000, anchorMs: 1_000 },
|
||||
sessionTarget: "main",
|
||||
wakeMode: "next-heartbeat",
|
||||
payload: { kind: "systemEvent", text: "cfg check" },
|
||||
});
|
||||
|
||||
// The hook context should use getRuntimeConfig() (runtimeCfg), not startupCfg
|
||||
expect(runCronChangedMock).toHaveBeenCalledTimes(1);
|
||||
const calls = runCronChangedMock.mock.calls as unknown[][];
|
||||
const hookCtx = calls[0]?.[1] as { config?: unknown } | undefined;
|
||||
expect(hookCtx?.config).toBe(runtimeCfg);
|
||||
expect(hookCtx?.config).not.toBe(startupCfg);
|
||||
} finally {
|
||||
state.cron.stop();
|
||||
}
|
||||
});
|
||||
|
||||
it("routes main-target jobs to the scoped session for enqueue + wake", async () => {
|
||||
|
||||
@@ -18,10 +18,19 @@ import {
|
||||
import { CronService } from "../cron/service.js";
|
||||
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import type { CronJob } from "../cron/types.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { runHeartbeatOnce } from "../infra/heartbeat-runner.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { getChildLogger } from "../logging.js";
|
||||
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||
import type {
|
||||
PluginHookCronChangedEvent,
|
||||
PluginHookGatewayCronJob,
|
||||
PluginHookGatewayCronService,
|
||||
PluginHookGatewayContext,
|
||||
} from "../plugins/hook-types.js";
|
||||
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import {
|
||||
@@ -35,6 +44,44 @@ export type GatewayCronState = {
|
||||
cronEnabled: boolean;
|
||||
};
|
||||
|
||||
/** Pick only the keys whose values are not `undefined` from an object. */
|
||||
function pickDefined<T extends Record<string, unknown>>(
|
||||
obj: T,
|
||||
keys: (keyof T)[],
|
||||
): Partial<Pick<T, (typeof keys)[number]>> {
|
||||
const result: Partial<Pick<T, (typeof keys)[number]>> = {};
|
||||
for (const k of keys) {
|
||||
if (obj[k] !== undefined) {
|
||||
(result as Record<string, unknown>)[k as string] = obj[k];
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Map internal CronJob to the public plugin SDK shape. */
|
||||
function toPluginCronJob(job: CronJob): PluginHookGatewayCronJob {
|
||||
return {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
description: job.description,
|
||||
enabled: job.enabled,
|
||||
schedule: job.schedule ? structuredClone(job.schedule) : undefined,
|
||||
sessionTarget: job.sessionTarget,
|
||||
wakeMode: job.wakeMode,
|
||||
payload: job.payload ? structuredClone(job.payload) : undefined,
|
||||
state: {
|
||||
nextRunAtMs: job.state.nextRunAtMs,
|
||||
runningAtMs: job.state.runningAtMs,
|
||||
lastRunAtMs: job.state.lastRunAtMs,
|
||||
lastRunStatus: job.state.lastRunStatus,
|
||||
lastError: job.state.lastError,
|
||||
lastDurationMs: job.state.lastDurationMs,
|
||||
},
|
||||
createdAtMs: job.createdAtMs,
|
||||
updatedAtMs: job.updatedAtMs,
|
||||
};
|
||||
}
|
||||
|
||||
export function buildGatewayCronService(params: {
|
||||
cfg: OpenClawConfig;
|
||||
deps: CliDeps;
|
||||
@@ -162,6 +209,23 @@ export function buildGatewayCronService(params: {
|
||||
const sessionStorePath = resolveSessionStorePath(defaultAgentId);
|
||||
const warnedLegacyWebhookJobs = new Set<string>();
|
||||
|
||||
const runCronChangedHook = (evt: PluginHookCronChangedEvent) => {
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
if (!hookRunner?.hasHooks("cron_changed")) {
|
||||
return;
|
||||
}
|
||||
const hookCtx: PluginHookGatewayContext = {
|
||||
config: getRuntimeConfig(),
|
||||
getCron: () => cron as PluginHookGatewayCronService,
|
||||
};
|
||||
void hookRunner.runCronChanged(evt, hookCtx).catch((err) => {
|
||||
cronLogger.warn(
|
||||
{ err: formatErrorMessage(err), jobId: evt.jobId },
|
||||
"cron_changed hook failed",
|
||||
);
|
||||
});
|
||||
};
|
||||
|
||||
const cron = new CronService({
|
||||
storePath,
|
||||
cronEnabled,
|
||||
@@ -259,8 +323,34 @@ export function buildGatewayCronService(params: {
|
||||
log: getChildLogger({ module: "cron", storePath }),
|
||||
onEvent: (evt) => {
|
||||
params.broadcast("cron", evt, { dropIfSlow: true });
|
||||
// Build hook event from CronEvent. The job snapshot is carried on the
|
||||
// internal event so it's available even for "removed" actions where
|
||||
// getJob() would return undefined. `delivery` and `usage` are
|
||||
// intentionally omitted — they contain internal channel/token detail
|
||||
// that is not part of the public plugin SDK surface.
|
||||
const hookEvt: PluginHookCronChangedEvent = {
|
||||
action: evt.action,
|
||||
jobId: evt.jobId,
|
||||
...(evt.job ? { job: toPluginCronJob(evt.job) } : {}),
|
||||
...pickDefined(evt, [
|
||||
"runAtMs",
|
||||
"durationMs",
|
||||
"status",
|
||||
"error",
|
||||
"summary",
|
||||
"delivered",
|
||||
"deliveryStatus",
|
||||
"deliveryError",
|
||||
"sessionId",
|
||||
"sessionKey",
|
||||
"nextRunAtMs",
|
||||
"model",
|
||||
"provider",
|
||||
]),
|
||||
};
|
||||
runCronChangedHook(hookEvt);
|
||||
if (evt.action === "finished") {
|
||||
const job = cron.getJob(evt.jobId);
|
||||
const job = evt.job ?? cron.getJob(evt.jobId);
|
||||
dispatchGatewayCronFinishedNotifications({
|
||||
evt,
|
||||
job,
|
||||
|
||||
@@ -99,6 +99,7 @@ export type PluginHookName =
|
||||
| "gateway_start"
|
||||
| "gateway_stop"
|
||||
| "heartbeat_prompt_contribution"
|
||||
| "cron_changed"
|
||||
| "before_dispatch"
|
||||
| "reply_dispatch"
|
||||
| "before_install";
|
||||
@@ -135,6 +136,7 @@ export const PLUGIN_HOOK_NAMES = [
|
||||
"gateway_start",
|
||||
"gateway_stop",
|
||||
"heartbeat_prompt_contribution",
|
||||
"cron_changed",
|
||||
"before_dispatch",
|
||||
"reply_dispatch",
|
||||
"before_install",
|
||||
@@ -597,23 +599,72 @@ export type PluginHookGatewayStopEvent = {
|
||||
reason?: string;
|
||||
};
|
||||
|
||||
export type PluginHookGatewayCronRunStatus = "ok" | "error" | "skipped";
|
||||
|
||||
export type PluginHookGatewayCronDeliveryStatus =
|
||||
| "not-requested"
|
||||
| "delivered"
|
||||
| "not-delivered"
|
||||
| "unknown";
|
||||
|
||||
export type PluginHookGatewayCronJobState = {
|
||||
nextRunAtMs?: number;
|
||||
runningAtMs?: number;
|
||||
lastRunAtMs?: number;
|
||||
lastRunStatus?: PluginHookGatewayCronRunStatus;
|
||||
lastError?: string;
|
||||
lastDurationMs?: number;
|
||||
};
|
||||
|
||||
export type PluginHookGatewayCronJob = {
|
||||
id: string;
|
||||
name?: string;
|
||||
description?: string;
|
||||
enabled?: boolean;
|
||||
schedule?: {
|
||||
kind?: string;
|
||||
expr?: string;
|
||||
tz?: string;
|
||||
};
|
||||
schedule?:
|
||||
| {
|
||||
kind: "cron";
|
||||
expr?: string;
|
||||
tz?: string;
|
||||
staggerMs?: number;
|
||||
}
|
||||
| {
|
||||
kind: "at";
|
||||
at?: string;
|
||||
}
|
||||
| {
|
||||
kind: "every";
|
||||
everyMs?: number;
|
||||
anchorMs?: number;
|
||||
};
|
||||
sessionTarget?: string;
|
||||
wakeMode?: string;
|
||||
payload?: {
|
||||
kind?: string;
|
||||
text?: string;
|
||||
};
|
||||
state?: PluginHookGatewayCronJobState;
|
||||
createdAtMs?: number;
|
||||
updatedAtMs?: number;
|
||||
};
|
||||
|
||||
export type PluginHookCronChangedEvent = {
|
||||
action: "added" | "updated" | "removed" | "started" | "finished";
|
||||
jobId: string;
|
||||
job?: PluginHookGatewayCronJob;
|
||||
runAtMs?: number;
|
||||
durationMs?: number;
|
||||
status?: PluginHookGatewayCronRunStatus;
|
||||
error?: string;
|
||||
summary?: string;
|
||||
delivered?: boolean;
|
||||
deliveryStatus?: PluginHookGatewayCronDeliveryStatus;
|
||||
deliveryError?: string;
|
||||
sessionId?: string;
|
||||
sessionKey?: string;
|
||||
nextRunAtMs?: number;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
};
|
||||
|
||||
export type PluginHookGatewayCronCreateInput = {
|
||||
@@ -872,6 +923,10 @@ export type PluginHookHandlerMap = {
|
||||
| Promise<PluginHeartbeatPromptContributionResult | void>
|
||||
| PluginHeartbeatPromptContributionResult
|
||||
| void;
|
||||
cron_changed: (
|
||||
event: PluginHookCronChangedEvent,
|
||||
ctx: PluginHookGatewayContext,
|
||||
) => Promise<void> | void;
|
||||
before_install: (
|
||||
event: PluginHookBeforeInstallEvent,
|
||||
ctx: PluginHookBeforeInstallContext,
|
||||
|
||||
@@ -45,6 +45,10 @@ import type {
|
||||
PluginAgentTurnPrepareResult,
|
||||
PluginHeartbeatPromptContributionEvent,
|
||||
PluginHeartbeatPromptContributionResult,
|
||||
PluginHookCronChangedEvent,
|
||||
PluginHookGatewayCronDeliveryStatus,
|
||||
PluginHookGatewayCronJobState,
|
||||
PluginHookGatewayCronRunStatus,
|
||||
PluginHookGatewayContext,
|
||||
PluginHookGatewayStartEvent,
|
||||
PluginHookGatewayStopEvent,
|
||||
@@ -130,6 +134,10 @@ export type {
|
||||
PluginHookSubagentSpawningResult,
|
||||
PluginHookSubagentSpawnedEvent,
|
||||
PluginHookSubagentEndedEvent,
|
||||
PluginHookCronChangedEvent,
|
||||
PluginHookGatewayCronDeliveryStatus,
|
||||
PluginHookGatewayCronJobState,
|
||||
PluginHookGatewayCronRunStatus,
|
||||
PluginHookGatewayContext,
|
||||
PluginHookGatewayStartEvent,
|
||||
PluginHookGatewayStopEvent,
|
||||
@@ -1263,6 +1271,16 @@ export function createHookRunner(
|
||||
>("heartbeat_prompt_contribution", event, ctx, { mergeResults: mergeAgentTurnPrepare });
|
||||
}
|
||||
|
||||
/**
|
||||
* Run cron_changed hook for gateway-owned cron lifecycle changes.
|
||||
*/
|
||||
async function runCronChanged(
|
||||
event: PluginHookCronChangedEvent,
|
||||
ctx: PluginHookGatewayContext,
|
||||
): Promise<void> {
|
||||
return runVoidHook("cron_changed", event, ctx);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Skill Install Hooks
|
||||
// =========================================================================
|
||||
@@ -1358,6 +1376,7 @@ export function createHookRunner(
|
||||
runGatewayStart,
|
||||
runGatewayStop,
|
||||
runHeartbeatPromptContribution,
|
||||
runCronChanged,
|
||||
// Install hooks
|
||||
runBeforeInstall,
|
||||
// Utility
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createHookRunnerWithRegistry } from "./hooks.test-helpers.js";
|
||||
import type {
|
||||
PluginHookCronChangedEvent,
|
||||
PluginHookGatewayContext,
|
||||
PluginHookGatewayStartEvent,
|
||||
PluginHookGatewayStopEvent,
|
||||
@@ -53,12 +54,75 @@ describe("gateway hook runner methods", () => {
|
||||
await expectGatewayHookCall({ hookName, event, gatewayCtx });
|
||||
});
|
||||
|
||||
it("runCronChanged invokes registered cron_changed hooks", async () => {
|
||||
const handler = vi.fn();
|
||||
const { runner } = createHookRunnerWithRegistry([{ hookName: "cron_changed", handler }]);
|
||||
const event: PluginHookCronChangedEvent = {
|
||||
action: "updated",
|
||||
jobId: "job-1",
|
||||
nextRunAtMs: 123,
|
||||
job: {
|
||||
id: "job-1",
|
||||
state: { nextRunAtMs: 123 },
|
||||
},
|
||||
};
|
||||
|
||||
await runner.runCronChanged(event, gatewayCtx);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(event, gatewayCtx);
|
||||
});
|
||||
|
||||
it("runCronChanged passes finished events with delivery and error fields", async () => {
|
||||
const handler = vi.fn();
|
||||
const { runner } = createHookRunnerWithRegistry([{ hookName: "cron_changed", handler }]);
|
||||
const event: PluginHookCronChangedEvent = {
|
||||
action: "finished",
|
||||
jobId: "job-2",
|
||||
status: "error",
|
||||
error: "timeout",
|
||||
summary: "Job timed out",
|
||||
delivered: false,
|
||||
deliveryStatus: "not-delivered",
|
||||
deliveryError: "channel unavailable",
|
||||
durationMs: 5000,
|
||||
runAtMs: 100,
|
||||
nextRunAtMs: 200,
|
||||
model: "gpt-5.4",
|
||||
provider: "openai",
|
||||
job: {
|
||||
id: "job-2",
|
||||
state: { lastRunStatus: "error", lastError: "timeout" },
|
||||
},
|
||||
};
|
||||
|
||||
await runner.runCronChanged(event, gatewayCtx);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(event, gatewayCtx);
|
||||
});
|
||||
|
||||
it("runCronChanged handles removed events without job", async () => {
|
||||
const handler = vi.fn();
|
||||
const { runner } = createHookRunnerWithRegistry([{ hookName: "cron_changed", handler }]);
|
||||
const event: PluginHookCronChangedEvent = {
|
||||
action: "removed",
|
||||
jobId: "job-3",
|
||||
job: { id: "job-3", name: "deleted-job" },
|
||||
};
|
||||
|
||||
await runner.runCronChanged(event, gatewayCtx);
|
||||
|
||||
expect(handler).toHaveBeenCalledWith(event, gatewayCtx);
|
||||
expect(handler.mock.calls[0][0].job).toEqual({ id: "job-3", name: "deleted-job" });
|
||||
});
|
||||
|
||||
it("hasHooks returns true for registered gateway hooks", () => {
|
||||
const { runner } = createHookRunnerWithRegistry([
|
||||
{ hookName: "gateway_start", handler: vi.fn() },
|
||||
{ hookName: "cron_changed", handler: vi.fn() },
|
||||
]);
|
||||
|
||||
expect(runner.hasHooks("gateway_start")).toBe(true);
|
||||
expect(runner.hasHooks("cron_changed")).toBe(true);
|
||||
expect(runner.hasHooks("gateway_stop")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user