fix(ci): trim slow task and gateway paths

This commit is contained in:
Vincent Koc
2026-04-16 13:34:14 -07:00
parent 56a9fd4b34
commit f835da1667
9 changed files with 191 additions and 120 deletions

View File

@@ -11,6 +11,8 @@ import {
resolveMattermostAccount,
type ResolvedMattermostAccount,
} from "./mattermost/accounts.js";
import type { MattermostSlashCommandConfig } from "./mattermost/slash-commands.js";
import type { MattermostConfig } from "./types.js";
export const mattermostMeta = {
id: "mattermost",
@@ -25,6 +27,8 @@ export const mattermostMeta = {
quickstartAllowFrom: true,
} as const;
const DEFAULT_SLASH_CALLBACK_PATH = "/api/channels/mattermost/command";
export function normalizeMattermostAllowEntry(entry: string): string {
return normalizeLowercaseStringOrEmpty(
entry
@@ -46,6 +50,63 @@ export function formatMattermostAllowEntry(entry: string): string {
return normalizeLowercaseStringOrEmpty(trimmed.replace(/^(mattermost|user):/i, ""));
}
export function collectMattermostSlashCallbackPaths(
raw?: Partial<MattermostSlashCommandConfig>,
): string[] {
const callbackPath = (() => {
const trimmed = raw?.callbackPath?.trim();
if (!trimmed) {
return DEFAULT_SLASH_CALLBACK_PATH;
}
return trimmed.startsWith("/") ? trimmed : `/${trimmed}`;
})();
const callbackUrl = raw?.callbackUrl?.trim();
const paths = new Set<string>([callbackPath]);
if (callbackUrl) {
try {
const pathname = new URL(callbackUrl).pathname;
if (pathname) {
paths.add(pathname);
}
} catch {
// Keep the normalized callback path when the configured URL is invalid.
}
}
return [...paths];
}
export function resolveMattermostGatewayAuthBypassPaths(cfg: {
channels?: Record<string, unknown>;
}): string[] {
const base = cfg.channels?.mattermost as MattermostConfig | undefined;
const callbackPaths = new Set(
collectMattermostSlashCallbackPaths(
base?.commands as Partial<MattermostSlashCommandConfig> | undefined,
).filter(
(path) =>
path === "/api/channels/mattermost/command" || path.startsWith("/api/channels/mattermost/"),
),
);
const accounts = base?.accounts ?? {};
for (const account of Object.values(accounts)) {
const accountConfig =
account && typeof account === "object" && !Array.isArray(account)
? (account as {
commands?: Parameters<typeof collectMattermostSlashCallbackPaths>[0];
})
: undefined;
for (const path of collectMattermostSlashCallbackPaths(accountConfig?.commands)) {
if (
path === "/api/channels/mattermost/command" ||
path.startsWith("/api/channels/mattermost/")
) {
callbackPaths.add(path);
}
}
}
return [...callbackPaths];
}
export const mattermostConfigAdapter = createScopedChannelConfigAdapter<ResolvedMattermostAccount>({
sectionKey: "mattermost",
listAccountIds: listMattermostAccountIds,

View File

@@ -4,6 +4,7 @@ import {
isMattermostConfigured,
mattermostConfigAdapter,
mattermostMeta,
resolveMattermostGatewayAuthBypassPaths,
} from "./channel-config-shared.js";
import { MattermostChannelConfigSchema } from "./config-surface.js";
import { type ResolvedMattermostAccount } from "./mattermost/accounts.js";
@@ -29,6 +30,9 @@ export const mattermostSetupPlugin: ChannelPlugin<ResolvedMattermostAccount> = {
isConfigured: isMattermostConfigured,
describeAccount: describeMattermostAccount,
},
gateway: {
resolveGatewayAuthBypassPaths: ({ cfg }) => resolveMattermostGatewayAuthBypassPaths(cfg),
},
setup: mattermostSetupAdapter,
setupWizard: mattermostSetupWizard,
};

View File

@@ -30,6 +30,7 @@ import {
mattermostConfigAdapter,
mattermostMeta as meta,
normalizeMattermostAllowEntry as normalizeAllowEntry,
resolveMattermostGatewayAuthBypassPaths,
} from "./channel-config-shared.js";
import { MattermostChannelConfigSchema } from "./config-surface.js";
import { mattermostDoctor } from "./doctor.js";
@@ -41,7 +42,6 @@ import {
resolveMattermostReplyToMode,
type ResolvedMattermostAccount,
} from "./mattermost/accounts.js";
import type { MattermostSlashCommandConfig } from "./mattermost/slash-commands.js";
import { looksLikeMattermostTargetId, normalizeMattermostMessagingTarget } from "./normalize.js";
import { collectRuntimeConfigAssignments, secretTargetRegistryEntries } from "./secret-contract.js";
import { resolveMattermostOutboundSessionRoute } from "./session-route.js";
@@ -51,33 +51,6 @@ import type { MattermostConfig } from "./types.js";
const loadMattermostChannelRuntime = createLazyRuntimeModule(() => import("./channel.runtime.js"));
const DEFAULT_SLASH_CALLBACK_PATH = "/api/channels/mattermost/command";
function collectMattermostSlashCallbackPaths(
raw?: Partial<MattermostSlashCommandConfig>,
): string[] {
const callbackPath = (() => {
const trimmed = raw?.callbackPath?.trim();
if (!trimmed) {
return DEFAULT_SLASH_CALLBACK_PATH;
}
return trimmed.startsWith("/") ? trimmed : `/${trimmed}`;
})();
const callbackUrl = raw?.callbackUrl?.trim();
const paths = new Set<string>([callbackPath]);
if (callbackUrl) {
try {
const pathname = new URL(callbackUrl).pathname;
if (pathname) {
paths.add(pathname);
}
} catch {
// Keep the normalized callback path when the configured URL is invalid.
}
}
return [...paths];
}
const mattermostSecurityAdapter = createRestrictSendersChannelSecurity<ResolvedMattermostAccount>({
channelKey: "mattermost",
resolveDmPolicy: (account) => account.config.dmPolicy,
@@ -375,36 +348,7 @@ export const mattermostPlugin: ChannelPlugin<ResolvedMattermostAccount> = create
}),
}),
gateway: {
resolveGatewayAuthBypassPaths: ({ cfg }) => {
const base = cfg.channels?.mattermost;
const callbackPaths = new Set(
collectMattermostSlashCallbackPaths(
base?.commands as Partial<MattermostSlashCommandConfig> | undefined,
).filter(
(path) =>
path === "/api/channels/mattermost/command" ||
path.startsWith("/api/channels/mattermost/"),
),
);
const accounts = base?.accounts ?? {};
for (const account of Object.values(accounts)) {
const accountConfig =
account && typeof account === "object" && !Array.isArray(account)
? (account as {
commands?: Parameters<typeof collectMattermostSlashCallbackPaths>[0];
})
: undefined;
for (const path of collectMattermostSlashCallbackPaths(accountConfig?.commands)) {
if (
path === "/api/channels/mattermost/command" ||
path.startsWith("/api/channels/mattermost/")
) {
callbackPaths.add(path);
}
}
}
return [...callbackPaths];
},
resolveGatewayAuthBypassPaths: ({ cfg }) => resolveMattermostGatewayAuthBypassPaths(cfg),
startAccount: async (ctx) => {
const account = ctx.account;
const statusSink = createAccountStatusSink({

View File

@@ -189,12 +189,29 @@ const GATEWAY_PROBE_STATUS_BY_PATH = new Map<string, "live" | "ready">([
["/ready", "ready"],
["/readyz", "ready"],
]);
const pluginGatewayAuthBypassPathsCache = new WeakMap<
OpenClawConfig,
Promise<ReadonlySet<string>>
>();
async function resolvePluginGatewayAuthBypassPaths(
configSnapshot: OpenClawConfig,
): Promise<Set<string>> {
const paths = new Set<string>();
const { listBundledChannelPlugins } = await getBundledChannelsModule();
for (const plugin of listBundledChannelPlugins()) {
const configuredChannels = configSnapshot.channels;
if (!configuredChannels || Object.keys(configuredChannels).length === 0) {
return paths;
}
const { getBundledChannelPlugin, getBundledChannelSetupPlugin } =
await getBundledChannelsModule();
for (const channelId of Object.keys(configuredChannels)) {
const setupPlugin = getBundledChannelSetupPlugin(channelId);
const plugin = setupPlugin?.gateway?.resolveGatewayAuthBypassPaths
? setupPlugin
: getBundledChannelPlugin(channelId);
if (!plugin) {
continue;
}
for (const path of plugin.gateway?.resolveGatewayAuthBypassPaths?.({ cfg: configSnapshot }) ??
[]) {
if (typeof path === "string" && path.trim()) {
@@ -205,6 +222,21 @@ async function resolvePluginGatewayAuthBypassPaths(
return paths;
}
function getCachedPluginGatewayAuthBypassPaths(
configSnapshot: OpenClawConfig,
): Promise<ReadonlySet<string>> {
const cached = pluginGatewayAuthBypassPathsCache.get(configSnapshot);
if (cached) {
return cached;
}
const resolved = resolvePluginGatewayAuthBypassPaths(configSnapshot).catch((error) => {
pluginGatewayAuthBypassPathsCache.delete(configSnapshot);
throw error;
});
pluginGatewayAuthBypassPathsCache.set(configSnapshot, resolved);
return resolved;
}
function isOpenAiModelsPath(pathname: string): boolean {
return pathname === "/v1/models" || pathname.startsWith("/v1/models/");
}
@@ -1032,7 +1064,7 @@ export function createGatewayHttpServer(opts: {
req,
res,
requestPath,
getGatewayAuthBypassPaths: () => resolvePluginGatewayAuthBypassPaths(configSnapshot),
getGatewayAuthBypassPaths: () => getCachedPluginGatewayAuthBypassPaths(configSnapshot),
pluginPathContext,
handlePluginRequest,
shouldEnforcePluginGatewayAuth,

View File

@@ -51,7 +51,6 @@ const { MediaFetchErrorMock } = vi.hoisted(() => {
// ---------------------------------------------------------------------------
let applyMediaUnderstanding: typeof import("./apply.js").applyMediaUnderstanding;
let clearMediaUnderstandingBinaryCacheForTests: () => void;
const TEMP_MEDIA_PREFIX = "openclaw-echo-transcript-test-";
let suiteTempMediaRootDir = "";
@@ -211,8 +210,6 @@ describe("applyMediaUnderstanding echo transcript", () => {
suiteTempMediaRootDir = await fs.mkdtemp(path.join(baseDir, TEMP_MEDIA_PREFIX));
const mod = await import("./apply.js");
applyMediaUnderstanding = mod.applyMediaUnderstanding;
const runner = await import("./runner.js");
clearMediaUnderstandingBinaryCacheForTests = runner.clearMediaUnderstandingBinaryCacheForTests;
});
beforeEach(() => {
@@ -224,7 +221,6 @@ describe("applyMediaUnderstanding echo transcript", () => {
runCommandWithTimeoutMock.mockReset();
mockDeliverOutboundPayloads.mockClear();
mockDeliverOutboundPayloads.mockResolvedValue([{ channel: "whatsapp", messageId: "echo-1" }]);
clearMediaUnderstandingBinaryCacheForTests?.();
});
afterAll(async () => {

View File

@@ -317,7 +317,6 @@ describe("applyMediaUnderstanding", () => {
contentType: "audio/ogg",
fileName: "note.ogg",
});
clearMediaUnderstandingBinaryCacheForTests();
});
afterAll(async () => {
@@ -653,6 +652,7 @@ describe("applyMediaUnderstanding", () => {
});
it("auto-detects sherpa for audio when binary and model files are available", async () => {
clearMediaUnderstandingBinaryCacheForTests();
const binDir = await createTempMediaDir();
const modelDir = await createTempMediaDir();
await createMockExecutable(binDir, "sherpa-onnx-offline");
@@ -683,6 +683,7 @@ describe("applyMediaUnderstanding", () => {
});
it("auto-detects whisper-cli when sherpa is unavailable", async () => {
clearMediaUnderstandingBinaryCacheForTests();
const binDir = await createTempMediaDir();
const modelDir = await createTempMediaDir();
await createMockExecutable(binDir, "whisper-cli");
@@ -711,6 +712,7 @@ describe("applyMediaUnderstanding", () => {
});
it("transcodes non-wav audio before auto-detected whisper-cli runs", async () => {
clearMediaUnderstandingBinaryCacheForTests();
const binDir = await createTempMediaDir();
const modelDir = await createTempMediaDir();
await createMockExecutable(binDir, "whisper-cli");
@@ -770,6 +772,7 @@ describe("applyMediaUnderstanding", () => {
});
it("skips audio auto-detect when no supported binaries or provider keys are available", async () => {
clearMediaUnderstandingBinaryCacheForTests();
const emptyBinDir = await createTempMediaDir();
const isolatedAgentDir = await createTempMediaDir();
const ctx = await createAudioCtx({

View File

@@ -34,21 +34,26 @@ export function createChannelReplyPipeline(params: {
const channelId = params.channel
? (normalizeChannelId(params.channel) ?? params.channel)
: undefined;
const plugin = params.transformReplyPayload
? undefined
let plugin: ReturnType<typeof getChannelPlugin> | undefined;
let pluginTransformResolved = false;
const resolvePluginTransform = () => {
if (pluginTransformResolved) {
return plugin?.messaging?.transformReplyPayload;
}
pluginTransformResolved = true;
plugin = channelId ? getChannelPlugin(channelId) : undefined;
return plugin?.messaging?.transformReplyPayload;
};
const transformReplyPayload = params.transformReplyPayload
? params.transformReplyPayload
: channelId
? getChannelPlugin(channelId)
: undefined;
const transformReplyPayload =
params.transformReplyPayload ??
(plugin?.messaging?.transformReplyPayload
? (payload: ReplyPayload) =>
plugin.messaging?.transformReplyPayload?.({
resolvePluginTransform()?.({
payload,
cfg: params.cfg,
accountId: params.accountId,
}) ?? payload
: undefined);
: undefined;
return {
...createReplyPrefixOptions({
cfg: params.cfg,

View File

@@ -10,6 +10,11 @@ export {
} from "./session-chat-type-shared.js";
export function deriveSessionChatType(sessionKey: string | undefined | null): SessionKeyChatType {
const builtInType = deriveSessionChatTypeFromKey(sessionKey);
if (builtInType !== "unknown") {
return builtInType;
}
return deriveSessionChatTypeFromKey(
sessionKey,
Array.from(iterateBootstrapChannelPlugins())

View File

@@ -1,4 +1,13 @@
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it } from "vitest";
import type { AcpSessionStoreEntry } from "../acp/runtime/session-meta.js";
import type { SessionEntry } from "../config/sessions.js";
import type { ParsedAgentSessionKey } from "../routing/session-key.js";
import {
resetTaskRegistryMaintenanceRuntimeForTests,
runTaskRegistryMaintenance,
setTaskRegistryMaintenanceRuntimeForTests,
stopTaskRegistryMaintenanceForTests,
} from "./task-registry.maintenance.js";
import type { TaskRecord } from "./task-registry.types.js";
const GRACE_EXPIRED_MS = 10 * 60_000;
@@ -22,54 +31,66 @@ function makeStaleTask(overrides: Partial<TaskRecord>): TaskRecord {
};
}
async function loadMaintenanceModule(params: {
type TaskRegistryMaintenanceRuntime = Parameters<
typeof setTaskRegistryMaintenanceRuntimeForTests
>[0];
afterEach(() => {
stopTaskRegistryMaintenanceForTests();
resetTaskRegistryMaintenanceRuntimeForTests();
});
function createTaskRegistryMaintenanceHarness(params: {
tasks: TaskRecord[];
sessionStore?: Record<string, unknown>;
acpEntry?: unknown;
sessionStore?: Record<string, SessionEntry>;
acpEntry?: AcpSessionStoreEntry["entry"];
activeCronJobIds?: string[];
activeRunIds?: string[];
}) {
vi.resetModules();
const sessionStore = params.sessionStore ?? {};
const acpEntry = params.acpEntry;
const activeCronJobIds = new Set(params.activeCronJobIds ?? []);
const activeRunIds = new Set(params.activeRunIds ?? []);
const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }]));
vi.doMock("../acp/runtime/session-meta.js", () => ({
const runtime: TaskRegistryMaintenanceRuntime = {
readAcpSessionEntry: () =>
acpEntry !== undefined
? { entry: acpEntry, storeReadFailed: false }
: { entry: undefined, storeReadFailed: false },
}));
vi.doMock("../config/sessions.js", () => ({
? ({
cfg: {} as never,
storePath: "",
sessionKey: "",
storeSessionKey: "",
entry: acpEntry,
storeReadFailed: false,
} satisfies AcpSessionStoreEntry)
: ({
cfg: {} as never,
storePath: "",
sessionKey: "",
storeSessionKey: "",
entry: undefined,
storeReadFailed: false,
} satisfies AcpSessionStoreEntry),
loadSessionStore: () => sessionStore,
resolveStorePath: () => "",
}));
vi.doMock("../cron/active-jobs.js", () => ({
isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId),
}));
vi.doMock("../infra/agent-events.js", () => ({
getAgentRunContext: (runId: string) =>
activeRunIds.has(runId) ? { sessionKey: "main" } : undefined,
}));
vi.doMock("./runtime-internal.js", () => ({
parseAgentSessionKey: (sessionKey: string | null | undefined): ParsedAgentSessionKey | null => {
if (!sessionKey) {
return null;
}
const [kind, agentId, ...rest] = sessionKey.split(":");
return kind === "agent" && agentId && rest.length > 0
? { agentId, rest: rest.join(":") }
: null;
},
deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId),
ensureTaskRegistryReady: () => {},
getTaskById: (taskId: string) => currentTasks.get(taskId),
listTaskRecords: () => params.tasks,
markTaskLostById: (patch: {
taskId: string;
endedAt: number;
lastEventAt?: number;
error?: string;
cleanupAfter?: number;
}) => {
listTaskRecords: () => Array.from(currentTasks.values()),
markTaskLostById: (patch) => {
const current = currentTasks.get(patch.taskId);
if (!current) {
return null;
@@ -85,9 +106,9 @@ async function loadMaintenanceModule(params: {
currentTasks.set(patch.taskId, next);
return next;
},
maybeDeliverTaskTerminalUpdate: () => false,
maybeDeliverTaskTerminalUpdate: async () => null,
resolveTaskForLookupToken: () => undefined,
setTaskCleanupAfterById: (patch: { taskId: string; cleanupAfter: number }) => {
setTaskCleanupAfterById: (patch) => {
const current = currentTasks.get(patch.taskId);
if (!current) {
return null;
@@ -96,10 +117,10 @@ async function loadMaintenanceModule(params: {
currentTasks.set(patch.taskId, next);
return next;
},
}));
};
const mod = await import("./task-registry.maintenance.js");
return { mod, currentTasks };
setTaskRegistryMaintenanceRuntimeForTests(runtime);
return { currentTasks };
}
describe("task-registry maintenance issue #60299", () => {
@@ -111,12 +132,12 @@ describe("task-registry maintenance issue #60299", () => {
childSessionKey,
});
const { mod, currentTasks } = await loadMaintenanceModule({
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
sessionStore: { [childSessionKey]: { updatedAt: Date.now() } },
sessionStore: { [childSessionKey]: { sessionId: childSessionKey, updatedAt: Date.now() } },
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
});
@@ -127,12 +148,12 @@ describe("task-registry maintenance issue #60299", () => {
childSessionKey: undefined,
});
const { mod, currentTasks } = await loadMaintenanceModule({
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
activeCronJobIds: ["cron-job-2"],
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
@@ -147,12 +168,12 @@ describe("task-registry maintenance issue #60299", () => {
childSessionKey: channelKey,
});
const { mod, currentTasks } = await loadMaintenanceModule({
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
sessionStore: { [channelKey]: { updatedAt: Date.now() } },
sessionStore: { [channelKey]: { sessionId: channelKey, updatedAt: Date.now() } },
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
});
@@ -167,13 +188,13 @@ describe("task-registry maintenance issue #60299", () => {
childSessionKey: channelKey,
});
const { mod, currentTasks } = await loadMaintenanceModule({
const { currentTasks } = createTaskRegistryMaintenanceHarness({
tasks: [task],
sessionStore: { [channelKey]: { updatedAt: Date.now() } },
sessionStore: { [channelKey]: { sessionId: channelKey, updatedAt: Date.now() } },
activeRunIds: ["run-chat-cli-live"],
});
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(await runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
});
});