diff --git a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts index 99cabf09c2c..98a618db33b 100644 --- a/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts +++ b/extensions/qa-lab/src/live-transports/shared/live-gateway.runtime.test.ts @@ -40,15 +40,19 @@ function createStubTransport(baseUrl = "http://127.0.0.1:43123") { describe("startQaLiveLaneGateway", () => { const gatewayStop = vi.fn(); + const gatewayCall = vi.fn(); const mockStop = vi.fn(); beforeEach(() => { gatewayStop.mockReset(); + gatewayCall.mockReset(); mockStop.mockReset(); startQaGatewayChild.mockReset(); startQaMockOpenAiServer.mockReset(); startQaGatewayChild.mockResolvedValue({ + call: gatewayCall, + cfg: {}, stop: gatewayStop, }); startQaMockOpenAiServer.mockResolvedValue({ diff --git a/extensions/qa-matrix/src/runners/contract/runtime.test.ts b/extensions/qa-matrix/src/runners/contract/runtime.test.ts index f47cd28be1b..16fd6827e65 100644 --- a/extensions/qa-matrix/src/runners/contract/runtime.test.ts +++ b/extensions/qa-matrix/src/runners/contract/runtime.test.ts @@ -1,5 +1,6 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { renderQaMarkdownReport } from "../../report.js"; import { __testing as liveTesting } from "./runtime.js"; afterEach(() => { @@ -228,6 +229,18 @@ describe("matrix live qa runtime", () => { scenarios: [], startedAt: "2026-04-10T10:00:00.000Z", sutAccountId: "sut", + timings: { + artifactWriteMs: 5, + canaryMs: 40, + harnessBootMs: 100, + initialGatewayBootMs: 200, + provisioningMs: 300, + scenarioGatewayBootMs: 50, + scenarioRestartGatewayMs: 60, + scenarioTransportInterruptMs: 70, + scenarios: [], + totalMs: 825, + }, userIds: { driver: "@driver:matrix-qa.test", observer: "@observer:matrix-qa.test", @@ -300,6 +313,27 @@ describe("matrix live qa runtime", () => { ], startedAt: "2026-04-10T10:00:00.000Z", sutAccountId: "sut", + timings: { + artifactWriteMs: 5, + canaryMs: 40, + harnessBootMs: 100, + initialGatewayBootMs: 200, + provisioningMs: 300, + scenarioGatewayBootMs: 50, + scenarioRestartGatewayMs: 60, + scenarioTransportInterruptMs: 70, + scenarios: [ + { + durationMs: 80, + gatewayBootMs: 0, + gatewayRestartMs: 0, + id: "matrix-mention-gating", + title: "Matrix room message without mention does not trigger", + transportInterruptMs: 0, + }, + ], + totalMs: 905, + }, userIds: { driver: "@driver:matrix-qa.test", observer: "@observer:matrix-qa.test", @@ -322,9 +356,128 @@ describe("matrix live qa runtime", () => { }, }, ], + timings: { + totalMs: 905, + }, }); }); + it("keeps failing Matrix scenario details and timings complete in summary + report output", () => { + const summary = liveTesting.buildMatrixQaSummary({ + artifactPaths: { + observedEvents: "/tmp/observed.json", + report: "/tmp/report.md", + summary: "/tmp/summary.json", + }, + checks: [{ name: "Matrix harness ready", status: "pass" }], + config: { + default: liveTesting.buildMatrixQaConfigSnapshot({ + driverUserId: "@driver:matrix-qa.test", + observerUserId: "@observer:matrix-qa.test", + sutUserId: "@sut:matrix-qa.test", + topology: { + defaultRoomId: "!room:matrix-qa.test", + defaultRoomKey: "main", + rooms: [], + }, + }), + scenarios: [], + }, + finishedAt: "2026-04-10T10:05:00.000Z", + harness: { + baseUrl: "http://127.0.0.1:28008/", + composeFile: "/tmp/docker-compose.yml", + dmRoomIds: [], + image: "ghcr.io/matrix-construct/tuwunel:v1.5.1", + roomId: "!room:matrix-qa.test", + roomIds: ["!room:matrix-qa.test"], + serverName: "matrix-qa.test", + }, + observedEventCount: 6, + scenarios: [ + { + id: "matrix-reaction-not-a-reply", + title: "Matrix reactions do not trigger a fresh bot reply", + status: "fail", + details: [ + "unexpected SUT reply after reaction from @driver:matrix-qa.test", + "reaction event: $reaction", + "unexpected reply event: $reply", + ].join("\n"), + }, + ], + startedAt: "2026-04-10T10:00:00.000Z", + sutAccountId: "sut", + timings: { + artifactWriteMs: 5, + canaryMs: 40, + harnessBootMs: 100, + initialGatewayBootMs: 200, + provisioningMs: 300, + scenarioGatewayBootMs: 50, + scenarioRestartGatewayMs: 60, + scenarioTransportInterruptMs: 70, + scenarios: [ + { + durationMs: 8_000, + gatewayBootMs: 0, + gatewayRestartMs: 0, + id: "matrix-reaction-not-a-reply", + title: "Matrix reactions do not trigger a fresh bot reply", + transportInterruptMs: 0, + }, + ], + totalMs: 825, + }, + userIds: { + driver: "@driver:matrix-qa.test", + observer: "@observer:matrix-qa.test", + sut: "@sut:matrix-qa.test", + }, + }); + + expect(summary).toMatchObject({ + counts: { + total: 2, + passed: 1, + failed: 1, + }, + scenarios: [ + { + id: "matrix-reaction-not-a-reply", + status: "fail", + details: expect.stringContaining("reaction event: $reaction"), + }, + ], + timings: { + scenarios: [ + { + id: "matrix-reaction-not-a-reply", + durationMs: 8_000, + }, + ], + }, + }); + + const report = renderQaMarkdownReport({ + title: "Matrix QA Report", + startedAt: new Date(summary.startedAt), + finishedAt: new Date(summary.finishedAt), + checks: summary.checks, + scenarios: summary.scenarios.map((scenario) => ({ + details: scenario.details, + name: scenario.title, + status: scenario.status, + })), + notes: [`observed events: ${summary.observedEventsPath}`], + }); + + expect(report).toContain("### Matrix reactions do not trigger a fresh bot reply"); + expect(report).toContain("unexpected SUT reply after reaction from @driver:matrix-qa.test"); + expect(report).toContain("reaction event: $reaction"); + expect(report).toContain("observed events: /tmp/observed.json"); + }); + it("batches Matrix scenarios by config key while preserving stable in-group order", () => { const scenarios = liveTesting.findMatrixQaScenarios([ "matrix-top-level-reply-shape", diff --git a/extensions/qa-matrix/src/runners/contract/runtime.ts b/extensions/qa-matrix/src/runners/contract/runtime.ts index 19e8d8cad85..554a9b193c4 100644 --- a/extensions/qa-matrix/src/runners/contract/runtime.ts +++ b/extensions/qa-matrix/src/runners/contract/runtime.ts @@ -24,6 +24,7 @@ import { import type { MatrixQaObservedEvent } from "../../substrate/events.js"; import { startMatrixQaHarness } from "../../substrate/harness.runtime.js"; import { resolveMatrixQaModels } from "./model-selection.js"; +import type { MatrixQaSyncStreams } from "./scenario-runtime-shared.js"; import { MATRIX_QA_SCENARIOS, buildMatrixQaTopologyForScenarios, @@ -101,6 +102,7 @@ type MatrixQaSummary = { startedAt: string; summaryPath: string; sutAccountId: string; + timings: MatrixQaTimings; userIds: { driver: string; observer: string; @@ -114,6 +116,50 @@ type MatrixQaArtifactPaths = { summary: string; }; +type MatrixQaScenarioTiming = { + durationMs: number; + gatewayBootMs: number; + gatewayRestartMs: number; + id: string; + title: string; + transportInterruptMs: number; +}; + +type MatrixQaTimings = { + artifactWriteMs: number; + canaryMs?: number; + harnessBootMs: number; + initialGatewayBootMs: number; + provisioningMs: number; + scenarioGatewayBootMs: number; + scenarioRestartGatewayMs: number; + scenarioTransportInterruptMs: number; + scenarios: MatrixQaScenarioTiming[]; + totalMs: number; +}; + +function shouldWriteMatrixQaProgress() { + const override = process.env.OPENCLAW_QA_MATRIX_PROGRESS; + if (override === "0") { + return false; + } + if (override === "1") { + return true; + } + return process.stderr.isTTY; +} + +function formatMatrixQaDurationMs(durationMs: number) { + return durationMs >= 1_000 ? `${(durationMs / 1_000).toFixed(1)}s` : `${durationMs}ms`; +} + +function writeMatrixQaProgress(message: string) { + if (!shouldWriteMatrixQaProgress()) { + return; + } + process.stderr.write(`[matrix-qa] ${message}\n`); +} + function countMatrixQaStatuses(entries: T[]) { return { failed: entries.filter((entry) => entry.status === "fail").length, @@ -221,6 +267,7 @@ function buildMatrixQaSummary(params: { scenarios: MatrixQaScenarioResult[]; startedAt: string; sutAccountId: string; + timings: MatrixQaTimings; userIds: MatrixQaSummary["userIds"]; }): MatrixQaSummary { const checkCounts = countMatrixQaStatuses(params.checks); @@ -244,10 +291,20 @@ function buildMatrixQaSummary(params: { startedAt: params.startedAt, summaryPath: params.artifactPaths.summary, sutAccountId: params.sutAccountId, + timings: params.timings, userIds: params.userIds, }; } +async function measureMatrixQaStep(step: () => Promise) { + const startedAtMs = Date.now(); + const result = await step(); + return { + durationMs: Date.now() - startedAtMs, + result, + }; +} + function isMatrixAccountReady(entry?: { connected?: boolean; healthState?: string; @@ -357,27 +414,41 @@ export async function runMatrixQaLive(params: { const includeObservedEventContent = process.env.OPENCLAW_QA_MATRIX_CAPTURE_CONTENT === "1"; const startedAtDate = new Date(); const startedAt = startedAtDate.toISOString(); + const runStartedAtMs = Date.now(); + writeMatrixQaProgress( + `suite start scenarios=${scenarios.length} provider=${providerMode} output=${outputDir}`, + ); - const harness = await startMatrixQaHarness({ - outputDir: path.join(outputDir, "matrix-harness"), - repoRoot, - }); - const provisioning: MatrixQaProvisionResult = await (async () => { + const { durationMs: harnessBootMs, result: harness } = await measureMatrixQaStep(() => + startMatrixQaHarness({ + outputDir: path.join(outputDir, "matrix-harness"), + repoRoot, + }), + ); + writeMatrixQaProgress( + `harness ready ${formatMatrixQaDurationMs(harnessBootMs)} baseUrl=${harness.baseUrl}`, + ); + const { durationMs: provisioningMs, result: provisioning } = await (async () => { try { - return await provisionMatrixQaRoom({ - baseUrl: harness.baseUrl, - driverLocalpart: `qa-driver-${runSuffix}`, - observerLocalpart: `qa-observer-${runSuffix}`, - registrationToken: harness.registrationToken, - roomName: `OpenClaw Matrix QA ${runSuffix}`, - sutLocalpart: `qa-sut-${runSuffix}`, - topology, - }); + return await measureMatrixQaStep(() => + provisionMatrixQaRoom({ + baseUrl: harness.baseUrl, + driverLocalpart: `qa-driver-${runSuffix}`, + observerLocalpart: `qa-observer-${runSuffix}`, + registrationToken: harness.registrationToken, + roomName: `OpenClaw Matrix QA ${runSuffix}`, + sutLocalpart: `qa-sut-${runSuffix}`, + topology, + }), + ); } catch (error) { await harness.stop().catch(() => {}); throw error; } })(); + writeMatrixQaProgress( + `topology ready ${formatMatrixQaDurationMs(provisioningMs)} rooms=${provisioning.topology.rooms.length}`, + ); const checks: QaReportCheck[] = [ { @@ -401,6 +472,13 @@ export async function runMatrixQaLive(params: { let gatewayHarnessKey: string | null = null; let canaryFailed = false; const syncState: { driver?: string; observer?: string } = {}; + const syncStreams: MatrixQaSyncStreams = {}; + let canaryMs: number | undefined; + let initialGatewayBootMs = 0; + let scenarioGatewayBootMs = 0; + let scenarioRestartGatewayMs = 0; + let scenarioTransportInterruptMs = 0; + const scenarioTimings: MatrixQaScenarioTiming[] = []; const gatewayConfigParams = { driverUserId: provisioning.driver.userId, homeserver: harness.baseUrl, @@ -420,38 +498,53 @@ export async function runMatrixQaLive(params: { const ensureGatewayHarness = async (overrides?: MatrixQaConfigOverrides) => { const nextKey = buildMatrixQaGatewayConfigKey(overrides); if (gatewayHarness && gatewayHarnessKey === nextKey) { - return gatewayHarness; + return { + durationMs: 0, + harness: gatewayHarness, + }; } if (gatewayHarness) { await gatewayHarness.stop(); gatewayHarness = null; - gatewayHarnessKey = null; + gatewayHarnessKey = nextKey; } - const started = await startMatrixQaLiveLaneGateway({ - repoRoot, - transport: { - requiredPluginIds: [], - createGatewayConfig: () => ({}), - }, - transportBaseUrl: "http://127.0.0.1:43123", - providerMode, - primaryModel, - alternateModel, - fastMode: params.fastMode, - controlUiEnabled: false, - mutateConfig: (cfg) => - buildMatrixQaConfig(cfg, { - ...gatewayConfigParams, - overrides, - }), + writeMatrixQaProgress("gateway boot start"); + const { durationMs, result: started } = await measureMatrixQaStep(async () => { + const nextHarness = await startMatrixQaLiveLaneGateway({ + repoRoot, + transport: { + requiredPluginIds: [], + createGatewayConfig: () => ({}), + }, + transportBaseUrl: "http://127.0.0.1:43123", + providerMode, + primaryModel, + alternateModel, + fastMode: params.fastMode, + controlUiEnabled: false, + mutateConfig: (cfg) => + buildMatrixQaConfig(cfg, { + ...gatewayConfigParams, + overrides, + }), + }); + await waitForMatrixChannelReady(nextHarness.gateway, sutAccountId); + return nextHarness; }); - await waitForMatrixChannelReady(started.gateway, sutAccountId); + writeMatrixQaProgress(`gateway boot done ${formatMatrixQaDurationMs(durationMs)}`); gatewayHarness = started; gatewayHarnessKey = nextKey; - return started; + return { + durationMs, + harness: started, + }; }; - gatewayHarness = await ensureGatewayHarness(); + { + const ensured = await ensureGatewayHarness(); + gatewayHarness = ensured.harness; + initialGatewayBootMs = ensured.durationMs; + } checks.push({ name: "Matrix channel ready", status: "pass", @@ -459,15 +552,21 @@ export async function runMatrixQaLive(params: { }); try { - const canary = await runMatrixQaCanary({ - baseUrl: harness.baseUrl, - driverAccessToken: provisioning.driver.accessToken, - observedEvents, - roomId: provisioning.roomId, - syncState, - sutUserId: provisioning.sut.userId, - timeoutMs: 45_000, - }); + writeMatrixQaProgress("canary start"); + const canaryMeasured = await measureMatrixQaStep(() => + runMatrixQaCanary({ + baseUrl: harness.baseUrl, + driverAccessToken: provisioning.driver.accessToken, + observedEvents, + roomId: provisioning.roomId, + syncState, + syncStreams, + sutUserId: provisioning.sut.userId, + timeoutMs: 45_000, + }), + ); + canaryMs = canaryMeasured.durationMs; + const canary = canaryMeasured.result; canaryArtifact = { driverEventId: canary.driverEventId, reply: canary.reply, @@ -478,6 +577,7 @@ export async function runMatrixQaLive(params: { status: "pass", details: buildMatrixReplyDetails("reply", canary.reply).join("\n"), }); + writeMatrixQaProgress(`canary pass ${formatMatrixQaDurationMs(canaryMeasured.durationMs)}`); } catch (error) { canaryFailed = true; checks.push({ @@ -485,6 +585,7 @@ export async function runMatrixQaLive(params: { status: "fail", details: formatErrorMessage(error), }); + writeMatrixQaProgress(`canary fail ${formatErrorMessage(error)}`); } if (!canaryFailed) { @@ -495,36 +596,70 @@ export async function runMatrixQaLive(params: { scenario, }); scenarioConfigSnapshots[originalIndex] = scenarioConfigEntry; + let gatewayBootMs = 0; + let gatewayRestartMs = 0; + let transportInterruptMs = 0; try { + writeMatrixQaProgress(`scenario start ${scenario.id}`); const scenarioGateway = await ensureGatewayHarness(scenario.configOverrides); - const result = await runMatrixQaScenario(scenario, { - baseUrl: harness.baseUrl, - canary: canaryArtifact, - driverAccessToken: provisioning.driver.accessToken, - driverUserId: provisioning.driver.userId, - interruptTransport: async () => { - await harness.restartService(); - await waitForMatrixChannelReady(scenarioGateway.gateway, sutAccountId, { - timeoutMs: 90_000, - }); - }, - observedEvents, - observerAccessToken: provisioning.observer.accessToken, - observerUserId: provisioning.observer.userId, - restartGateway: async () => { - if (!gatewayHarness) { - throw new Error("Matrix restart scenario requires a live gateway"); - } - await scenarioGateway.gateway.restart(); - await waitForMatrixChannelReady(scenarioGateway.gateway, sutAccountId); - }, - roomId: provisioning.roomId, - sutAccessToken: provisioning.sut.accessToken, - syncState, - sutUserId: provisioning.sut.userId, - timeoutMs: scenario.timeoutMs, - topology: provisioning.topology, - }); + gatewayBootMs = scenarioGateway.durationMs; + scenarioGatewayBootMs += gatewayBootMs; + const measuredScenario = await measureMatrixQaStep(() => + runMatrixQaScenario(scenario, { + baseUrl: harness.baseUrl, + canary: canaryArtifact, + driverAccessToken: provisioning.driver.accessToken, + driverUserId: provisioning.driver.userId, + interruptTransport: async () => { + writeMatrixQaProgress(`transport interrupt start ${scenario.id}`); + const measuredInterrupt = await measureMatrixQaStep(async () => { + await harness.restartService(); + await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId, { + timeoutMs: 90_000, + }); + }); + transportInterruptMs += measuredInterrupt.durationMs; + scenarioTransportInterruptMs += measuredInterrupt.durationMs; + writeMatrixQaProgress( + `transport interrupt done ${scenario.id} ${formatMatrixQaDurationMs(measuredInterrupt.durationMs)}`, + ); + }, + observedEvents, + observerAccessToken: provisioning.observer.accessToken, + observerUserId: provisioning.observer.userId, + restartGateway: async () => { + if (!gatewayHarness) { + throw new Error("Matrix restart scenario requires a live gateway"); + } + writeMatrixQaProgress(`gateway restart start ${scenario.id}`); + const measuredRestart = await measureMatrixQaStep(async () => { + await scenarioGateway.harness.gateway.restart(); + await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId); + }); + gatewayRestartMs += measuredRestart.durationMs; + scenarioRestartGatewayMs += measuredRestart.durationMs; + writeMatrixQaProgress( + `gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`, + ); + }, + roomId: provisioning.roomId, + sutAccessToken: provisioning.sut.accessToken, + syncState, + syncStreams, + sutUserId: provisioning.sut.userId, + timeoutMs: scenario.timeoutMs, + topology: provisioning.topology, + }), + ); + const result = measuredScenario.result; + scenarioTimings[originalIndex] = { + durationMs: measuredScenario.durationMs, + gatewayBootMs, + gatewayRestartMs, + id: scenario.id, + title: scenario.title, + transportInterruptMs, + }; scenarioResults[originalIndex] = buildMatrixQaScenarioResult({ artifacts: result.artifacts, configSummary: scenarioConfigSummary, @@ -532,13 +667,25 @@ export async function runMatrixQaLive(params: { scenario, status: "pass", }); + writeMatrixQaProgress( + `scenario pass ${scenario.id} ${formatMatrixQaDurationMs(measuredScenario.durationMs)}`, + ); } catch (error) { + scenarioTimings[originalIndex] = { + durationMs: 0, + gatewayBootMs, + gatewayRestartMs, + id: scenario.id, + title: scenario.title, + transportInterruptMs, + }; scenarioResults[originalIndex] = buildMatrixQaScenarioResult({ configSummary: scenarioConfigSummary, details: formatErrorMessage(error), scenario, status: "fail", }); + writeMatrixQaProgress(`scenario fail ${scenario.id} ${formatErrorMessage(error)}`); } } } @@ -596,8 +743,10 @@ export async function runMatrixQaLive(params: { `sut: ${provisioning.sut.userId}`, `homeserver: ${harness.baseUrl}`, `image: ${harness.image}`, + `timings: harness=${harnessBootMs}ms provisioning=${provisioningMs}ms gateway=${initialGatewayBootMs}ms canary=${canaryMs ?? 0}ms`, ], }); + const artifactWriteStartedAtMs = Date.now(); const summary: MatrixQaSummary = buildMatrixQaSummary({ artifactPaths, canary: canaryArtifact, @@ -622,6 +771,18 @@ export async function runMatrixQaLive(params: { scenarios: completedScenarioResults, startedAt, sutAccountId, + timings: { + artifactWriteMs: 0, + canaryMs, + harnessBootMs, + initialGatewayBootMs, + provisioningMs, + scenarioGatewayBootMs, + scenarioRestartGatewayMs, + scenarioTransportInterruptMs, + scenarios: scenarioTimings, + totalMs: Date.now() - runStartedAtMs, + }, userIds: { driver: provisioning.driver.userId, observer: provisioning.observer.userId, @@ -630,10 +791,6 @@ export async function runMatrixQaLive(params: { }); await fs.writeFile(reportPath, `${report}\n`, { encoding: "utf8", mode: 0o600 }); - await fs.writeFile(summaryPath, `${JSON.stringify(summary, null, 2)}\n`, { - encoding: "utf8", - mode: 0o600, - }); await fs.writeFile( observedEventsPath, `${JSON.stringify( @@ -646,6 +803,15 @@ export async function runMatrixQaLive(params: { )}\n`, { encoding: "utf8", mode: 0o600 }, ); + summary.timings.artifactWriteMs = Date.now() - artifactWriteStartedAtMs; + summary.timings.totalMs = Date.now() - runStartedAtMs; + await fs.writeFile(summaryPath, `${JSON.stringify(summary, null, 2)}\n`, { + encoding: "utf8", + mode: 0o600, + }); + writeMatrixQaProgress( + `suite ${summary.counts.failed > 0 ? "fail" : "pass"} ${summary.counts.passed}/${summary.counts.total} total=${formatMatrixQaDurationMs(summary.timings.totalMs)}`, + ); const failedChecks = checks.filter( (check) => check.status === "fail" && check.name !== "Matrix cleanup", diff --git a/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts b/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts index b3af1667fef..97b26e4ac9c 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-catalog.ts @@ -14,6 +14,8 @@ import { export type MatrixQaScenarioId = | "matrix-thread-follow-up" + | "matrix-thread-root-preservation" + | "matrix-thread-nested-reply-shape" | "matrix-thread-isolation" | "matrix-top-level-reply-shape" | "matrix-room-thread-reply-override" @@ -27,6 +29,8 @@ export type MatrixQaScenarioId = | "matrix-secondary-room-reply" | "matrix-secondary-room-open-trigger" | "matrix-reaction-notification" + | "matrix-reaction-threaded" + | "matrix-reaction-not-a-reply" | "matrix-restart-resume" | "matrix-room-membership-loss" | "matrix-homeserver-restart-resume" @@ -138,6 +142,16 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [ timeoutMs: 60_000, title: "Matrix thread follow-up reply", }, + { + id: "matrix-thread-root-preservation", + timeoutMs: 60_000, + title: "Matrix threaded replies keep the original root event", + }, + { + id: "matrix-thread-nested-reply-shape", + timeoutMs: 60_000, + title: "Matrix nested threaded replies keep fallback replies on the root event", + }, { id: "matrix-thread-isolation", standardId: "thread-isolation", @@ -257,6 +271,16 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [ timeoutMs: 45_000, title: "Matrix reactions on bot replies are observed", }, + { + id: "matrix-reaction-threaded", + timeoutMs: 45_000, + title: "Matrix reactions preserve threaded reply targets", + }, + { + id: "matrix-reaction-not-a-reply", + timeoutMs: 8_000, + title: "Matrix reactions do not trigger a fresh bot reply", + }, { id: "matrix-restart-resume", standardId: "restart-resume", diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-dm.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-dm.ts index 42bcd2e32eb..16bfbb9c07e 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-dm.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-dm.ts @@ -36,6 +36,7 @@ async function runDmSharedSessionFlow(params: { observedEvents: params.context.observedEvents, roomId: firstRoomId, syncState: params.context.syncState, + syncStreams: params.context.syncStreams, sutUserId: params.context.sutUserId, timeoutMs: params.context.timeoutMs, tokenPrefix: "MATRIX_QA_DM_PRIMARY", @@ -45,11 +46,19 @@ async function runDmSharedSessionFlow(params: { const replyClient = createMatrixQaScenarioClient({ accessToken: params.context.driverAccessToken, + actorId: "driver", baseUrl: params.context.baseUrl, + observedEvents: params.context.observedEvents, + syncState: params.context.syncState, + syncStreams: params.context.syncStreams, }); const noticeClient = createMatrixQaScenarioClient({ accessToken: params.context.driverAccessToken, + actorId: "driver", baseUrl: params.context.baseUrl, + observedEvents: params.context.observedEvents, + syncState: params.context.syncState, + syncStreams: params.context.syncStreams, }); const [replySince, noticeSince] = await Promise.all([ replyClient.primeRoom(), @@ -139,6 +148,7 @@ export async function runDmThreadReplyOverrideScenario(context: MatrixQaScenario event.relatesTo?.relType === "m.thread" && event.relatesTo?.eventId === params.driverEventId, roomId, syncState: context.syncState, + syncStreams: context.syncStreams, sutUserId: context.sutUserId, timeoutMs: context.timeoutMs, tokenPrefix: "MATRIX_QA_DM_THREAD", diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-room.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-room.ts index 61892a04ccc..418b29db436 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-room.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-room.ts @@ -26,28 +26,133 @@ import { runTopologyScopedTopLevelScenario, runTopLevelMentionScenario, waitForMembershipEvent, + type MatrixQaActorId, type MatrixQaScenarioContext, type MatrixQaSyncState, } from "./scenario-runtime-shared.js"; import type { MatrixQaCanaryArtifact, MatrixQaScenarioExecution } from "./scenario-types.js"; -async function runThreadScenario(params: MatrixQaScenarioContext) { - const { client, startSince } = await primeMatrixQaActorCursor({ - accessToken: params.driverAccessToken, +type MatrixQaThreadScenarioResult = Awaited>; + +function assertMatrixQaInReplyTarget(params: { + actualEventId?: string; + expectedEventId: string; + label: string; +}) { + if (params.actualEventId !== params.expectedEventId) { + throw new Error( + `${params.label} targeted ${params.actualEventId ?? ""} instead of ${params.expectedEventId}`, + ); + } +} + +function requireMatrixQaNestedThreadEvent( + nestedDriverEventId: string | undefined, + scenarioLabel: string, +) { + if (!nestedDriverEventId) { + throw new Error(`${scenarioLabel} did not create a nested trigger`); + } + return nestedDriverEventId; +} + +function buildMatrixQaThreadArtifacts(result: MatrixQaThreadScenarioResult) { + return { + driverEventId: result.driverEventId, + reply: result.reply, + rootEventId: result.rootEventId, + token: result.token, + }; +} + +function buildMatrixQaThreadDetailLines(params: { + result: MatrixQaThreadScenarioResult; + includeNestedTrigger?: boolean; + extraLines?: string[]; + replyLabel?: string; +}) { + return [ + `thread root event: ${params.result.rootEventId}`, + ...(params.includeNestedTrigger && params.result.nestedDriverEventId + ? [`nested trigger event: ${params.result.nestedDriverEventId}`] + : []), + `mention trigger event: ${params.result.driverEventId}`, + ...(params.extraLines ?? []), + ...buildMatrixReplyDetails(params.replyLabel ?? "reply", params.result.reply), + ]; +} + +async function primeMatrixQaDriverScenarioClient(context: MatrixQaScenarioContext) { + return await primeMatrixQaActorCursor({ + accessToken: context.driverAccessToken, actorId: "driver", - baseUrl: params.baseUrl, - syncState: params.syncState, + baseUrl: context.baseUrl, + observedEvents: context.observedEvents, + syncState: context.syncState, + syncStreams: context.syncStreams, }); +} + +function createMatrixQaDriverScenarioClient(context: MatrixQaScenarioContext) { + return createMatrixQaScenarioClient({ + accessToken: context.driverAccessToken, + actorId: "driver", + baseUrl: context.baseUrl, + observedEvents: context.observedEvents, + syncState: context.syncState, + syncStreams: context.syncStreams, + }); +} + +async function runAssertedDriverTopLevelScenario(params: { + context: MatrixQaScenarioContext; + label: string; + roomId?: string; + tokenPrefix: string; +}) { + const result = await runDriverTopLevelMentionScenario({ + baseUrl: params.context.baseUrl, + driverAccessToken: params.context.driverAccessToken, + observedEvents: params.context.observedEvents, + roomId: params.roomId ?? params.context.roomId, + syncState: params.context.syncState, + syncStreams: params.context.syncStreams, + sutUserId: params.context.sutUserId, + timeoutMs: params.context.timeoutMs, + tokenPrefix: params.tokenPrefix, + }); + assertTopLevelReplyArtifact(params.label, result.reply); + return result; +} + +async function runThreadScenario( + params: MatrixQaScenarioContext, + options?: { + createNestedReply?: boolean; + tokenPrefix?: string; + }, +) { + const { client, startSince } = await primeMatrixQaDriverScenarioClient(params); const rootBody = `thread root ${randomUUID().slice(0, 8)}`; const rootEventId = await client.sendTextMessage({ body: rootBody, roomId: params.roomId, }); - const token = `MATRIX_QA_THREAD_${randomUUID().slice(0, 8).toUpperCase()}`; + const nestedDriverEventId = + options?.createNestedReply === true + ? await client.sendTextMessage({ + body: `thread nested ${randomUUID().slice(0, 8)}`, + replyToEventId: rootEventId, + roomId: params.roomId, + threadRootEventId: rootEventId, + }) + : undefined; + const triggerEventId = nestedDriverEventId ?? rootEventId; + const token = `${options?.tokenPrefix ?? "MATRIX_QA_THREAD"}_${randomUUID().slice(0, 8).toUpperCase()}`; const driverEventId = await client.sendTextMessage({ body: buildMentionPrompt(params.sutUserId, token), mentionUserIds: [params.sutUserId], - replyToEventId: rootEventId, + replyToEventId: triggerEventId, roomId: params.roomId, threadRootEventId: rootEventId, }); @@ -72,6 +177,7 @@ async function runThreadScenario(params: MatrixQaScenarioContext) { }); return { driverEventId, + nestedDriverEventId, reply: buildMatrixReplyArtifact(matched.event, token), rootEventId, token, @@ -84,6 +190,7 @@ export async function runMatrixQaCanary(params: { observedEvents: MatrixQaObservedEvent[]; roomId: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaScenarioContext["syncStreams"]; sutUserId: string; timeoutMs: number; }): Promise<{ @@ -97,6 +204,7 @@ export async function runMatrixQaCanary(params: { observedEvents: params.observedEvents, roomId: params.roomId, syncState: params.syncState, + syncStreams: params.syncStreams, sutUserId: params.sutUserId, timeoutMs: params.timeoutMs, tokenPrefix: "MATRIX_QA_CANARY", @@ -112,12 +220,7 @@ export async function runThreadFollowUpScenario(context: MatrixQaScenarioContext label: "thread reply", }); return { - artifacts: { - driverEventId: result.driverEventId, - reply: result.reply, - rootEventId: result.rootEventId, - token: result.token, - }, + artifacts: buildMatrixQaThreadArtifacts(result), details: [ `root event: ${result.rootEventId}`, `driver thread event: ${result.driverEventId}`, @@ -126,23 +229,74 @@ export async function runThreadFollowUpScenario(context: MatrixQaScenarioContext } satisfies MatrixQaScenarioExecution; } +export async function runThreadRootPreservationScenario(context: MatrixQaScenarioContext) { + const result = await runThreadScenario(context, { + createNestedReply: true, + tokenPrefix: "MATRIX_QA_THREAD_ROOT", + }); + assertThreadReplyArtifact(result.reply, { + expectedRootEventId: result.rootEventId, + label: "thread root preservation reply", + }); + requireMatrixQaNestedThreadEvent( + result.nestedDriverEventId, + "Matrix thread root preservation scenario", + ); + return { + artifacts: buildMatrixQaThreadArtifacts(result), + details: buildMatrixQaThreadDetailLines({ + result, + includeNestedTrigger: true, + extraLines: [ + `reply thread root: ${result.reply.relatesTo?.eventId ?? ""}`, + `reply in_reply_to: ${result.reply.relatesTo?.inReplyToId ?? ""}`, + ], + }).join("\n"), + } satisfies MatrixQaScenarioExecution; +} + +export async function runThreadNestedReplyShapeScenario(context: MatrixQaScenarioContext) { + const result = await runThreadScenario(context, { + createNestedReply: true, + tokenPrefix: "MATRIX_QA_THREAD_NESTED", + }); + assertThreadReplyArtifact(result.reply, { + expectedRootEventId: result.rootEventId, + label: "thread nested reply", + }); + requireMatrixQaNestedThreadEvent( + result.nestedDriverEventId, + "Matrix thread nested reply scenario", + ); + assertMatrixQaInReplyTarget({ + actualEventId: result.reply.relatesTo?.inReplyToId, + expectedEventId: result.rootEventId, + label: "thread nested reply in_reply_to", + }); + return { + artifacts: buildMatrixQaThreadArtifacts(result), + details: buildMatrixQaThreadDetailLines({ + result, + includeNestedTrigger: true, + extraLines: [ + `reply in_reply_to: ${result.reply.relatesTo?.inReplyToId ?? ""}`, + `expected fallback root: ${result.rootEventId}`, + ], + }).join("\n"), + } satisfies MatrixQaScenarioExecution; +} + export async function runThreadIsolationScenario(context: MatrixQaScenarioContext) { const threadPhase = await runThreadScenario(context); assertThreadReplyArtifact(threadPhase.reply, { expectedRootEventId: threadPhase.rootEventId, label: "thread isolation reply", }); - const topLevelPhase = await runDriverTopLevelMentionScenario({ - baseUrl: context.baseUrl, - driverAccessToken: context.driverAccessToken, - observedEvents: context.observedEvents, - roomId: context.roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, + const topLevelPhase = await runAssertedDriverTopLevelScenario({ + context, + label: "top-level follow-up reply", tokenPrefix: "MATRIX_QA_TOPLEVEL", }); - assertTopLevelReplyArtifact("top-level follow-up reply", topLevelPhase.reply); return { artifacts: { threadDriverEventId: threadPhase.driverEventId, @@ -164,17 +318,11 @@ export async function runThreadIsolationScenario(context: MatrixQaScenarioContex } export async function runTopLevelReplyShapeScenario(context: MatrixQaScenarioContext) { - const result = await runDriverTopLevelMentionScenario({ - baseUrl: context.baseUrl, - driverAccessToken: context.driverAccessToken, - observedEvents: context.observedEvents, - roomId: context.roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, + const result = await runAssertedDriverTopLevelScenario({ + context, + label: "top-level reply", tokenPrefix: "MATRIX_QA_TOPLEVEL", }); - assertTopLevelReplyArtifact("top-level reply", result.reply); return { artifacts: { driverEventId: result.driverEventId, @@ -198,6 +346,7 @@ export async function runRoomThreadReplyOverrideScenario(context: MatrixQaScenar event.relatesTo?.relType === "m.thread" && event.relatesTo?.eventId === params.driverEventId, roomId: context.roomId, syncState: context.syncState, + syncStreams: context.syncStreams, sutUserId: context.sutUserId, timeoutMs: context.timeoutMs, tokenPrefix: "MATRIX_QA_ROOM_THREAD", @@ -225,7 +374,9 @@ export async function runObserverAllowlistOverrideScenario(context: MatrixQaScen accessToken: context.observerAccessToken, actorId: "observer", baseUrl: context.baseUrl, + observedEvents: context.observedEvents, syncState: context.syncState, + syncStreams: context.syncStreams, }); const token = `MATRIX_QA_OBSERVER_ALLOWLIST_${randomUUID().slice(0, 8).toUpperCase()}`; const body = buildMentionPrompt(context.sutUserId, token); @@ -271,12 +422,7 @@ export async function runObserverAllowlistOverrideScenario(context: MatrixQaScen } export async function runQuietStreamingPreviewScenario(context: MatrixQaScenarioContext) { - const { client, startSince } = await primeMatrixQaActorCursor({ - accessToken: context.driverAccessToken, - actorId: "driver", - baseUrl: context.baseUrl, - syncState: context.syncState, - }); + const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); const finalText = `MATRIX_QA_QUIET_STREAM_${randomUUID().slice(0, 8).toUpperCase()} preview complete`; const triggerBody = buildMatrixQuietStreamingPrompt(context.sutUserId, finalText); const driverEventId = await client.sendTextMessage({ @@ -337,12 +483,7 @@ export async function runQuietStreamingPreviewScenario(context: MatrixQaScenario export async function runBlockStreamingScenario(context: MatrixQaScenarioContext) { const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_BLOCK_ROOM_KEY); - const { client, startSince } = await primeMatrixQaActorCursor({ - accessToken: context.driverAccessToken, - actorId: "driver", - baseUrl: context.baseUrl, - syncState: context.syncState, - }); + const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); const firstText = `MATRIX_QA_BLOCK_ONE_${randomUUID().slice(0, 8).toUpperCase()}`; const secondText = `MATRIX_QA_BLOCK_TWO_${randomUUID().slice(0, 8).toUpperCase()}`; const triggerBody = buildMatrixBlockStreamingPrompt(context.sutUserId, firstText, secondText); @@ -406,12 +547,7 @@ export async function runBlockStreamingScenario(context: MatrixQaScenarioContext } export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioContext) { - const { client, startSince } = await primeMatrixQaActorCursor({ - accessToken: context.driverAccessToken, - actorId: "driver", - baseUrl: context.baseUrl, - syncState: context.syncState, - }); + const { client, startSince } = await primeMatrixQaDriverScenarioClient(context); const dynamicRoomId = await client.createPrivateRoom({ inviteUserIds: [context.observerUserId, context.sutUserId], name: `Matrix QA AutoJoin ${randomUUID().slice(0, 8)}`, @@ -435,18 +571,12 @@ export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioCon startSince, }); - const result = await runTopLevelMentionScenario({ - accessToken: context.driverAccessToken, - actorId: "driver", - baseUrl: context.baseUrl, - observedEvents: context.observedEvents, + const result = await runAssertedDriverTopLevelScenario({ + context, + label: "auto-join room reply", roomId: dynamicRoomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, tokenPrefix: "MATRIX_QA_AUTOJOIN", }); - assertTopLevelReplyArtifact("auto-join room reply", result.reply); return { artifacts: { @@ -468,10 +598,7 @@ export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioCon export async function runMembershipLossScenario(context: MatrixQaScenarioContext) { const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_MEMBERSHIP_ROOM_KEY); - const driverClient = createMatrixQaScenarioClient({ - accessToken: context.driverAccessToken, - baseUrl: context.baseUrl, - }); + const driverClient = createMatrixQaDriverScenarioClient(context); const sutClient = createMatrixQaScenarioClient({ accessToken: context.sutAccessToken, baseUrl: context.baseUrl, @@ -491,6 +618,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext roomId, stateKey: context.sutUserId, syncState: context.syncState, + syncStreams: context.syncStreams, timeoutMs: context.timeoutMs, }); @@ -505,6 +633,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext observedEvents: context.observedEvents, roomId, syncState: context.syncState, + syncStreams: context.syncStreams, sutUserId: context.sutUserId, timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs), token: noReplyToken, @@ -523,6 +652,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext roomId, stateKey: context.sutUserId, syncState: context.syncState, + syncStreams: context.syncStreams, timeoutMs: context.timeoutMs, }); await sutClient.joinRoom(roomId); @@ -535,6 +665,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext roomId, stateKey: context.sutUserId, syncState: context.syncState, + syncStreams: context.syncStreams, timeoutMs: context.timeoutMs, }); @@ -566,52 +697,239 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext } export async function runReactionNotificationScenario(context: MatrixQaScenarioContext) { - const reactionTargetEventId = context.canary?.reply.eventId?.trim(); - if (!reactionTargetEventId) { - throw new Error("Matrix reaction scenario requires a canary reply event id"); - } - const { client, startSince } = await primeMatrixQaActorCursor({ - accessToken: context.driverAccessToken, + const reactionTargetEventId = requireMatrixQaReactionTargetEventId( + context.canary?.reply.eventId, + "Matrix reaction scenario", + ); + const result = await observeReactionScenario({ actorId: "driver", + actorUserId: context.driverUserId, + accessToken: context.driverAccessToken, baseUrl: context.baseUrl, + observedEvents: context.observedEvents, + reactionTargetEventId, + roomId: context.roomId, syncState: context.syncState, + syncStreams: context.syncStreams, + timeoutMs: context.timeoutMs, }); - const reactionEmoji = "👍"; + return { + artifacts: buildMatrixQaReactionArtifacts({ reaction: result }), + details: buildMatrixQaReactionDetailLines({ + actorUserId: result.actorUserId, + observedReactionKey: result.event.reaction?.key, + reactionEmoji: result.reactionEmoji, + reactionEventId: result.reactionEventId, + reactionTargetEventId: result.reactionTargetEventId, + }).join("\n"), + } satisfies MatrixQaScenarioExecution; +} + +function buildMatrixQaReactionDetailLines(params: { + actorUserId?: string; + observedReactionKey?: string; + reactionEmoji: string; + reactionEventId: string; + reactionTargetEventId: string; +}) { + return [ + `reaction event: ${params.reactionEventId}`, + `reaction target: ${params.reactionTargetEventId}`, + `reaction emoji: ${params.reactionEmoji}`, + ...(params.actorUserId ? [`reaction sender: ${params.actorUserId}`] : []), + ...(params.observedReactionKey ? [`observed reaction key: ${params.observedReactionKey}`] : []), + ]; +} + +function requireMatrixQaReactionTargetEventId( + reactionTargetEventId: string | undefined, + scenarioLabel: string, +) { + const normalizedReactionTargetEventId = reactionTargetEventId?.trim(); + if (!normalizedReactionTargetEventId) { + throw new Error(`${scenarioLabel} requires a canary reply event id`); + } + return normalizedReactionTargetEventId; +} + +async function observeReactionScenario(params: { + actorId: MatrixQaActorId; + actorUserId: string; + accessToken: string; + baseUrl: string; + observedEvents: MatrixQaObservedEvent[]; + reactionEmoji?: string; + reactionTargetEventId: string; + roomId: string; + syncState: MatrixQaSyncState; + syncStreams?: MatrixQaScenarioContext["syncStreams"]; + timeoutMs: number; +}) { + const { client, startSince } = await primeMatrixQaActorCursor({ + accessToken: params.accessToken, + actorId: params.actorId, + baseUrl: params.baseUrl, + observedEvents: params.observedEvents, + syncState: params.syncState, + syncStreams: params.syncStreams, + }); + const reactionEmoji = params.reactionEmoji ?? "👍"; const reactionEventId = await client.sendReaction({ emoji: reactionEmoji, - messageId: reactionTargetEventId, - roomId: context.roomId, + messageId: params.reactionTargetEventId, + roomId: params.roomId, }); const matched = await client.waitForRoomEvent({ - observedEvents: context.observedEvents, + observedEvents: params.observedEvents, predicate: (event) => - event.roomId === context.roomId && - event.sender === context.driverUserId && + event.roomId === params.roomId && + event.sender === params.actorUserId && event.type === "m.reaction" && event.eventId === reactionEventId && - event.reaction?.eventId === reactionTargetEventId && + event.reaction?.eventId === params.reactionTargetEventId && event.reaction?.key === reactionEmoji, - roomId: context.roomId, + roomId: params.roomId, since: startSince, + timeoutMs: params.timeoutMs, + }); + return { + actorId: params.actorId, + actorUserId: params.actorUserId, + event: matched.event, + reactionEmoji, + reactionEventId, + reactionTargetEventId: params.reactionTargetEventId, + since: matched.since, + startSince, + }; +} + +function buildMatrixQaReactionArtifacts(params: { + actorUserId?: string; + expectedNoReplyWindowMs?: number; + reaction: Awaited>; +}) { + return { + ...(params.actorUserId ? { actorUserId: params.actorUserId } : {}), + ...(params.expectedNoReplyWindowMs === undefined + ? {} + : { expectedNoReplyWindowMs: params.expectedNoReplyWindowMs }), + reactionEmoji: params.reaction.reactionEmoji, + reactionEventId: params.reaction.reactionEventId, + reactionTargetEventId: params.reaction.reactionTargetEventId, + }; +} + +export async function runReactionThreadedScenario(context: MatrixQaScenarioContext) { + const thread = await runThreadScenario(context, { + createNestedReply: true, + tokenPrefix: "MATRIX_QA_REACTION_THREAD", + }); + assertThreadReplyArtifact(thread.reply, { + expectedRootEventId: thread.rootEventId, + label: "threaded reaction reply", + }); + const reaction = await observeReactionScenario({ + actorId: "driver", + actorUserId: context.driverUserId, + accessToken: context.driverAccessToken, + baseUrl: context.baseUrl, + observedEvents: context.observedEvents, + reactionTargetEventId: thread.reply.eventId, + roomId: context.roomId, + syncState: context.syncState, + syncStreams: context.syncStreams, timeoutMs: context.timeoutMs, }); advanceMatrixQaActorCursor({ - actorId: "driver", + actorId: reaction.actorId, syncState: context.syncState, - nextSince: matched.since, - startSince, + nextSince: reaction.since, + startSince: reaction.startSince, }); return { artifacts: { - reactionEmoji, - reactionEventId, - reactionTargetEventId, + driverEventId: thread.driverEventId, + ...buildMatrixQaReactionArtifacts({ reaction }), + reply: thread.reply, + rootEventId: thread.rootEventId, + token: thread.token, }, details: [ - `reaction event: ${reactionEventId}`, - `reaction target: ${reactionTargetEventId}`, - `reaction emoji: ${reactionEmoji}`, - `observed reaction key: ${matched.event.reaction?.key ?? ""}`, + ...buildMatrixQaThreadDetailLines({ + result: thread, + includeNestedTrigger: true, + extraLines: [`thread reply event: ${thread.reply.eventId}`], + replyLabel: "thread reply", + }), + ...buildMatrixQaReactionDetailLines({ + reactionEmoji: reaction.reactionEmoji, + reactionEventId: reaction.reactionEventId, + reactionTargetEventId: reaction.reactionTargetEventId, + }), + ].join("\n"), + } satisfies MatrixQaScenarioExecution; +} + +export async function runReactionNotAReplyScenario(context: MatrixQaScenarioContext) { + const reactionTargetEventId = requireMatrixQaReactionTargetEventId( + context.canary?.reply.eventId, + "Matrix reaction no-reply scenario", + ); + const reaction = await observeReactionScenario({ + actorId: "driver", + actorUserId: context.driverUserId, + accessToken: context.driverAccessToken, + baseUrl: context.baseUrl, + observedEvents: context.observedEvents, + reactionTargetEventId, + roomId: context.roomId, + syncState: context.syncState, + syncStreams: context.syncStreams, + timeoutMs: context.timeoutMs, + }); + const client = createMatrixQaDriverScenarioClient(context); + const noReplyWindowMs = Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs); + const noReplyResult = await client.waitForOptionalRoomEvent({ + observedEvents: context.observedEvents, + predicate: (event) => + event.roomId === context.roomId && + event.sender === context.sutUserId && + event.type === "m.room.message", + roomId: context.roomId, + since: reaction.since, + timeoutMs: noReplyWindowMs, + }); + if (noReplyResult.matched) { + const unexpectedReply = buildMatrixReplyArtifact(noReplyResult.event); + throw new Error( + [ + `unexpected SUT reply after reaction from ${context.driverUserId}`, + `reaction target: ${reaction.reactionTargetEventId}`, + `reaction event: ${reaction.reactionEventId}`, + ...buildMatrixReplyDetails("unexpected reply", unexpectedReply), + ].join("\n"), + ); + } + advanceMatrixQaActorCursor({ + actorId: reaction.actorId, + syncState: context.syncState, + nextSince: noReplyResult.since, + startSince: reaction.startSince, + }); + return { + artifacts: buildMatrixQaReactionArtifacts({ + actorUserId: context.driverUserId, + expectedNoReplyWindowMs: noReplyWindowMs, + reaction, + }), + details: [ + ...buildMatrixQaReactionDetailLines({ + reactionEmoji: reaction.reactionEmoji, + reactionEventId: reaction.reactionEventId, + reactionTargetEventId: reaction.reactionTargetEventId, + }), + `waited ${noReplyWindowMs}ms with no SUT reply`, ].join("\n"), } satisfies MatrixQaScenarioExecution; } @@ -622,17 +940,12 @@ export async function runHomeserverRestartResumeScenario(context: MatrixQaScenar } const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_HOMESERVER_ROOM_KEY); await context.interruptTransport(); - const resumed = await runDriverTopLevelMentionScenario({ - baseUrl: context.baseUrl, - driverAccessToken: context.driverAccessToken, - observedEvents: context.observedEvents, + const resumed = await runAssertedDriverTopLevelScenario({ + context, + label: "post-homeserver-restart reply", roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, tokenPrefix: "MATRIX_QA_HOMESERVER", }); - assertTopLevelReplyArtifact("post-homeserver-restart reply", resumed.reply); return { artifacts: { driverEventId: resumed.driverEventId, @@ -656,17 +969,12 @@ export async function runRestartResumeScenario(context: MatrixQaScenarioContext) } const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY); await context.restartGateway(); - const result = await runDriverTopLevelMentionScenario({ - baseUrl: context.baseUrl, - driverAccessToken: context.driverAccessToken, - observedEvents: context.observedEvents, + const result = await runAssertedDriverTopLevelScenario({ + context, + label: "post-restart reply", roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, tokenPrefix: "MATRIX_QA_RESTART", }); - assertTopLevelReplyArtifact("post-restart reply", result.reply); return { artifacts: { driverEventId: result.driverEventId, diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts index ea24e5f857f..5ebae807006 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-shared.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; -import { createMatrixQaClient } from "../../substrate/client.js"; +import { createMatrixQaClient, type MatrixQaRoomObserver } from "../../substrate/client.js"; import type { MatrixQaObservedEvent } from "../../substrate/events.js"; +import { createMatrixQaRoomObserver } from "../../substrate/sync.js"; import { type MatrixQaProvisionedTopology } from "../../substrate/topology.js"; import { resolveMatrixQaScenarioRoomId } from "./scenario-catalog.js"; import type { @@ -12,6 +13,7 @@ import type { export type MatrixQaActorId = "driver" | "observer"; export type MatrixQaSyncState = Partial>; +export type MatrixQaSyncStreams = Partial>; export type MatrixQaScenarioContext = { baseUrl: string; @@ -26,6 +28,7 @@ export type MatrixQaScenarioContext = { interruptTransport?: () => Promise; sutAccessToken: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; sutUserId: string; timeoutMs: number; topology: MatrixQaProvisionedTopology; @@ -147,15 +150,71 @@ export function writeMatrixQaSyncCursor( } } +function getOrCreateMatrixQaActorSyncStream(params: { + accessToken: string; + actorId: MatrixQaActorId; + baseUrl: string; + observedEvents: MatrixQaObservedEvent[]; + syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; +}) { + const existingStream = params.syncStreams?.[params.actorId]; + if (existingStream) { + return existingStream; + } + const stream = createMatrixQaRoomObserver({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + observedEvents: params.observedEvents, + since: readMatrixQaSyncCursor(params.syncState, params.actorId), + }); + if (params.syncStreams) { + params.syncStreams[params.actorId] = stream; + } + return stream; +} + +export function createMatrixQaScenarioClient(params: { + accessToken: string; + actorId?: MatrixQaActorId; + baseUrl: string; + observedEvents?: MatrixQaObservedEvent[]; + syncState?: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; +}) { + const syncObserver = + params.actorId && params.observedEvents && params.syncState && params.syncStreams + ? getOrCreateMatrixQaActorSyncStream({ + accessToken: params.accessToken, + actorId: params.actorId, + baseUrl: params.baseUrl, + observedEvents: params.observedEvents, + syncState: params.syncState, + syncStreams: params.syncStreams, + }) + : undefined; + return createMatrixQaClient({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + ...(syncObserver ? { syncObserver } : {}), + }); +} + export async function primeMatrixQaActorCursor(params: { accessToken: string; actorId: MatrixQaActorId; baseUrl: string; + observedEvents: MatrixQaObservedEvent[]; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; }) { - const client = createMatrixQaClient({ + const client = createMatrixQaScenarioClient({ accessToken: params.accessToken, + actorId: params.actorId, baseUrl: params.baseUrl, + observedEvents: params.observedEvents, + syncState: params.syncState, + syncStreams: params.syncStreams, }); const existingSince = readMatrixQaSyncCursor(params.syncState, params.actorId); if (existingSince) { @@ -177,13 +236,6 @@ export function advanceMatrixQaActorCursor(params: { writeMatrixQaSyncCursor(params.syncState, params.actorId, params.nextSince ?? params.startSince); } -export function createMatrixQaScenarioClient(params: { accessToken: string; baseUrl: string }) { - return createMatrixQaClient({ - accessToken: params.accessToken, - baseUrl: params.baseUrl, - }); -} - export async function runConfigurableTopLevelScenario(params: { accessToken: string; actorId: MatrixQaActorId; @@ -195,6 +247,7 @@ export async function runConfigurableTopLevelScenario(params: { ) => boolean; roomId: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; sutUserId: string; timeoutMs: number; tokenPrefix: string; @@ -204,7 +257,9 @@ export async function runConfigurableTopLevelScenario(params: { accessToken: params.accessToken, actorId: params.actorId, baseUrl: params.baseUrl, + observedEvents: params.observedEvents, syncState: params.syncState, + syncStreams: params.syncStreams, }); const token = `${params.tokenPrefix}_${randomUUID().slice(0, 8).toUpperCase()}`; const body = @@ -249,6 +304,7 @@ export async function runTopLevelMentionScenario(params: { observedEvents: MatrixQaObservedEvent[]; roomId: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; sutUserId: string; timeoutMs: number; tokenPrefix: string; @@ -263,6 +319,7 @@ export async function runDriverTopLevelMentionScenario(params: { observedEvents: MatrixQaObservedEvent[]; roomId: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; sutUserId: string; timeoutMs: number; tokenPrefix: string; @@ -274,6 +331,7 @@ export async function runDriverTopLevelMentionScenario(params: { observedEvents: params.observedEvents, roomId: params.roomId, syncState: params.syncState, + syncStreams: params.syncStreams, sutUserId: params.sutUserId, timeoutMs: params.timeoutMs, tokenPrefix: params.tokenPrefix, @@ -289,13 +347,16 @@ export async function waitForMembershipEvent(params: { roomId: string; stateKey: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; timeoutMs: number; }) { const { client, startSince } = await primeMatrixQaActorCursor({ accessToken: params.accessToken, actorId: params.actorId, baseUrl: params.baseUrl, + observedEvents: params.observedEvents, syncState: params.syncState, + syncStreams: params.syncStreams, }); const matched = await client.waitForRoomEvent({ observedEvents: params.observedEvents, @@ -334,6 +395,7 @@ export async function runTopologyScopedTopLevelScenario(params: { observedEvents: params.context.observedEvents, roomId, syncState: params.context.syncState, + syncStreams: params.context.syncStreams, sutUserId: params.context.sutUserId, timeoutMs: params.context.timeoutMs, tokenPrefix: params.tokenPrefix, @@ -369,6 +431,7 @@ export async function runNoReplyExpectedScenario(params: { observedEvents: MatrixQaObservedEvent[]; roomId: string; syncState: MatrixQaSyncState; + syncStreams?: MatrixQaSyncStreams; sutUserId: string; timeoutMs: number; token: string; @@ -377,7 +440,9 @@ export async function runNoReplyExpectedScenario(params: { accessToken: params.accessToken, actorId: params.actorId, baseUrl: params.baseUrl, + observedEvents: params.observedEvents, syncState: params.syncState, + syncStreams: params.syncStreams, }); const driverEventId = await client.sendTextMessage({ body: params.body, diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts index 6fb35037454..559460a4229 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime.ts @@ -16,12 +16,16 @@ import { runMembershipLossScenario, runObserverAllowlistOverrideScenario, runQuietStreamingPreviewScenario, + runReactionNotAReplyScenario, runReactionNotificationScenario, + runReactionThreadedScenario, runRestartResumeScenario, runRoomAutoJoinInviteScenario, runRoomThreadReplyOverrideScenario, runThreadFollowUpScenario, runThreadIsolationScenario, + runThreadNestedReplyShapeScenario, + runThreadRootPreservationScenario, runTopLevelReplyShapeScenario, } from "./scenario-runtime-room.js"; import { @@ -48,6 +52,52 @@ export { }; export type { MatrixQaScenarioContext, MatrixQaSyncState }; +async function runDriverTopologyScopedScenario(params: { + context: MatrixQaScenarioContext; + roomKey: string; + tokenPrefix: string; + withMention?: boolean; +}) { + return await runTopologyScopedTopLevelScenario({ + accessToken: params.context.driverAccessToken, + actorId: "driver", + actorUserId: params.context.driverUserId, + context: params.context, + roomKey: params.roomKey, + tokenPrefix: params.tokenPrefix, + ...(params.withMention === undefined ? {} : { withMention: params.withMention }), + }); +} + +function buildMatrixQaToken(prefix: string) { + return `${prefix}_${randomUUID().slice(0, 8).toUpperCase()}`; +} + +async function runNoReplyScenario(params: { + accessToken: string; + actorId: "driver" | "observer"; + actorUserId: string; + body: string; + context: MatrixQaScenarioContext; + mentionUserIds?: string[]; + token: string; +}) { + return await runNoReplyExpectedScenario({ + accessToken: params.accessToken, + actorId: params.actorId, + actorUserId: params.actorUserId, + baseUrl: params.context.baseUrl, + body: params.body, + ...(params.mentionUserIds ? { mentionUserIds: params.mentionUserIds } : {}), + observedEvents: params.context.observedEvents, + roomId: params.context.roomId, + syncState: params.context.syncState, + sutUserId: params.context.sutUserId, + timeoutMs: params.context.timeoutMs, + token: params.token, + }); +} + export async function runMatrixQaScenario( scenario: MatrixQaScenarioDefinition, context: MatrixQaScenarioContext, @@ -55,6 +105,10 @@ export async function runMatrixQaScenario( switch (scenario.id) { case "matrix-thread-follow-up": return await runThreadFollowUpScenario(context); + case "matrix-thread-root-preservation": + return await runThreadRootPreservationScenario(context); + case "matrix-thread-nested-reply-shape": + return await runThreadNestedReplyShapeScenario(context); case "matrix-thread-isolation": return await runThreadIsolationScenario(context); case "matrix-top-level-reply-shape": @@ -66,10 +120,7 @@ export async function runMatrixQaScenario( case "matrix-room-block-streaming": return await runBlockStreamingScenario(context); case "matrix-dm-reply-shape": - return await runTopologyScopedTopLevelScenario({ - accessToken: context.driverAccessToken, - actorId: "driver", - actorUserId: context.driverUserId, + return await runDriverTopologyScopedScenario({ context, roomKey: MATRIX_QA_DRIVER_DM_ROOM_KEY, tokenPrefix: "MATRIX_QA_DM", @@ -84,19 +135,13 @@ export async function runMatrixQaScenario( case "matrix-room-autojoin-invite": return await runRoomAutoJoinInviteScenario(context); case "matrix-secondary-room-reply": - return await runTopologyScopedTopLevelScenario({ - accessToken: context.driverAccessToken, - actorId: "driver", - actorUserId: context.driverUserId, + return await runDriverTopologyScopedScenario({ context, roomKey: MATRIX_QA_SECONDARY_ROOM_KEY, tokenPrefix: "MATRIX_QA_SECONDARY", }); case "matrix-secondary-room-open-trigger": - return await runTopologyScopedTopLevelScenario({ - accessToken: context.driverAccessToken, - actorId: "driver", - actorUserId: context.driverUserId, + return await runDriverTopologyScopedScenario({ context, roomKey: MATRIX_QA_SECONDARY_ROOM_KEY, tokenPrefix: "MATRIX_QA_SECONDARY_OPEN", @@ -104,6 +149,10 @@ export async function runMatrixQaScenario( }); case "matrix-reaction-notification": return await runReactionNotificationScenario(context); + case "matrix-reaction-threaded": + return await runReactionThreadedScenario(context); + case "matrix-reaction-not-a-reply": + return await runReactionNotAReplyScenario(context); case "matrix-restart-resume": return await runRestartResumeScenario(context); case "matrix-room-membership-loss": @@ -111,37 +160,27 @@ export async function runMatrixQaScenario( case "matrix-homeserver-restart-resume": return await runHomeserverRestartResumeScenario(context); case "matrix-mention-gating": { - const token = `MATRIX_QA_NOMENTION_${randomUUID().slice(0, 8).toUpperCase()}`; - return await runNoReplyExpectedScenario({ + const token = buildMatrixQaToken("MATRIX_QA_NOMENTION"); + return await runNoReplyScenario({ accessToken: context.driverAccessToken, actorId: "driver", actorUserId: context.driverUserId, - baseUrl: context.baseUrl, body: buildExactMarkerPrompt(token), - observedEvents: context.observedEvents, - roomId: context.roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, + context, token, }); } case "matrix-observer-allowlist-override": return await runObserverAllowlistOverrideScenario(context); case "matrix-allowlist-block": { - const token = `MATRIX_QA_ALLOWLIST_${randomUUID().slice(0, 8).toUpperCase()}`; - return await runNoReplyExpectedScenario({ + const token = buildMatrixQaToken("MATRIX_QA_ALLOWLIST"); + return await runNoReplyScenario({ accessToken: context.observerAccessToken, actorId: "observer", actorUserId: context.observerUserId, - baseUrl: context.baseUrl, body: buildMentionPrompt(context.sutUserId, token), mentionUserIds: [context.sutUserId], - observedEvents: context.observedEvents, - roomId: context.roomId, - syncState: context.syncState, - sutUserId: context.sutUserId, - timeoutMs: context.timeoutMs, + context, token, }); } diff --git a/extensions/qa-matrix/src/runners/contract/scenarios.test.ts b/extensions/qa-matrix/src/runners/contract/scenarios.test.ts index 7293addd626..f711b5ad61e 100644 --- a/extensions/qa-matrix/src/runners/contract/scenarios.test.ts +++ b/extensions/qa-matrix/src/runners/contract/scenarios.test.ts @@ -25,6 +25,8 @@ describe("matrix live qa scenarios", () => { it("ships the Matrix live QA scenario set by default", () => { expect(scenarioTesting.findMatrixQaScenarios().map((scenario) => scenario.id)).toEqual([ "matrix-thread-follow-up", + "matrix-thread-root-preservation", + "matrix-thread-nested-reply-shape", "matrix-thread-isolation", "matrix-top-level-reply-shape", "matrix-room-thread-reply-override", @@ -38,6 +40,8 @@ describe("matrix live qa scenarios", () => { "matrix-secondary-room-reply", "matrix-secondary-room-open-trigger", "matrix-reaction-notification", + "matrix-reaction-threaded", + "matrix-reaction-not-a-reply", "matrix-restart-resume", "matrix-room-membership-loss", "matrix-homeserver-restart-resume", diff --git a/extensions/qa-matrix/src/substrate/artifacts.test.ts b/extensions/qa-matrix/src/substrate/artifacts.test.ts index c8a5a5f3e55..f95f825a3c6 100644 --- a/extensions/qa-matrix/src/substrate/artifacts.test.ts +++ b/extensions/qa-matrix/src/substrate/artifacts.test.ts @@ -89,4 +89,59 @@ describe("matrix observed event artifacts", () => { }, ]); }); + + it("keeps redaction metadata while still stripping Matrix event content", () => { + expect( + buildMatrixQaObservedEventsArtifact({ + includeContent: false, + observedEvents: [ + { + kind: "redaction", + roomId: "!room:matrix-qa.test", + eventId: "$redaction", + sender: "@driver:matrix-qa.test", + type: "m.room.redaction", + originServerTs: 1_700_000_000_123, + }, + { + kind: "message", + roomId: "!room:matrix-qa.test", + eventId: "$message", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + body: "private body", + formattedBody: "

private body

", + msgtype: "m.text", + }, + ], + }), + ).toEqual([ + { + kind: "redaction", + roomId: "!room:matrix-qa.test", + eventId: "$redaction", + sender: "@driver:matrix-qa.test", + type: "m.room.redaction", + originServerTs: 1_700_000_000_123, + msgtype: undefined, + membership: undefined, + relatesTo: undefined, + mentions: undefined, + reaction: undefined, + }, + { + kind: "message", + roomId: "!room:matrix-qa.test", + eventId: "$message", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + originServerTs: undefined, + msgtype: "m.text", + membership: undefined, + relatesTo: undefined, + mentions: undefined, + reaction: undefined, + }, + ]); + }); }); diff --git a/extensions/qa-matrix/src/substrate/client.ts b/extensions/qa-matrix/src/substrate/client.ts index 8cc5a2db628..3db48d28e32 100644 --- a/extensions/qa-matrix/src/substrate/client.ts +++ b/extensions/qa-matrix/src/substrate/client.ts @@ -4,9 +4,11 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { MatrixQaObservedEvent } from "./events.js"; import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js"; import { + createMatrixQaRoomObserver, primeMatrixQaRoom, waitForMatrixQaRoomEvent, waitForOptionalMatrixQaRoomEvent, + type MatrixQaRoomObserver, type MatrixQaRoomEventWaitResult, } from "./sync.js"; import { @@ -18,7 +20,7 @@ import { } from "./topology.js"; export type { MatrixQaObservedEvent } from "./events.js"; -export type { MatrixQaRoomEventWaitResult } from "./sync.js"; +export type { MatrixQaRoomEventWaitResult, MatrixQaRoomObserver } from "./sync.js"; type MatrixQaAuthStage = "m.login.dummy" | "m.login.registration_token"; @@ -261,8 +263,10 @@ export function createMatrixQaClient(params: { accessToken?: string; baseUrl: string; fetchImpl?: MatrixQaFetchLike; + syncObserver?: MatrixQaRoomObserver; }) { const fetchImpl = params.fetchImpl ?? fetch; + const syncObserver = params.syncObserver; return { async createPrivateRoom(opts: { inviteUserIds: string[]; isDirect?: boolean; name: string }) { @@ -294,6 +298,9 @@ export function createMatrixQaClient(params: { return roomId; }, async primeRoom() { + if (syncObserver) { + return await syncObserver.prime(); + } return await primeMatrixQaRoom({ accessToken: params.accessToken, baseUrl: params.baseUrl, @@ -433,6 +440,13 @@ export function createMatrixQaClient(params: { since?: string; timeoutMs: number; }) { + if (syncObserver) { + return syncObserver.waitForOptionalRoomEvent({ + predicate: opts.predicate, + roomId: opts.roomId, + timeoutMs: opts.timeoutMs, + }); + } return waitForOptionalMatrixQaRoomEvent({ accessToken: params.accessToken, baseUrl: params.baseUrl, @@ -447,6 +461,13 @@ export function createMatrixQaClient(params: { since?: string; timeoutMs: number; }) { + if (syncObserver) { + return await syncObserver.waitForRoomEvent({ + predicate: opts.predicate, + roomId: opts.roomId, + timeoutMs: opts.timeoutMs, + }); + } return await waitForMatrixQaRoomEvent({ accessToken: params.accessToken, baseUrl: params.baseUrl, @@ -505,42 +526,42 @@ async function provisionMatrixQaTopology(params: { fetchImpl?: MatrixQaFetchLike; spec: MatrixQaTopologySpec; }): Promise { - const rooms = await Promise.all( - params.spec.rooms.map(async (room) => { - const members = resolveTopologyMemberAccounts(params.accounts, room.members); - const creator = members[0]; - const invitees = members.slice(1); - const creatorClient = createMatrixQaClient({ - accessToken: creator.account.accessToken, - baseUrl: params.baseUrl, - fetchImpl: params.fetchImpl, - }); - const roomId = await creatorClient.createPrivateRoom({ - inviteUserIds: invitees.map((entry) => entry.account.userId), - isDirect: room.kind === "dm", - name: room.name, - }); - await Promise.all( - invitees.map((invitee) => - joinRoomWithRetry({ - accessToken: invitee.account.accessToken, - baseUrl: params.baseUrl, - fetchImpl: params.fetchImpl, - roomId, - }), - ), - ); - return { - key: room.key, - kind: room.kind, - memberRoles: members.map((entry) => entry.role), - memberUserIds: members.map((entry) => entry.account.userId), - name: room.name, - requireMention: resolveProvisionedRoomRequireMention(room), - roomId, - }; - }), - ); + const rooms = []; + + for (const room of params.spec.rooms) { + const members = resolveTopologyMemberAccounts(params.accounts, room.members); + const creator = members[0]; + const invitees = members.slice(1); + const creatorClient = createMatrixQaClient({ + accessToken: creator.account.accessToken, + baseUrl: params.baseUrl, + fetchImpl: params.fetchImpl, + }); + const roomId = await creatorClient.createPrivateRoom({ + inviteUserIds: invitees.map((entry) => entry.account.userId), + isDirect: room.kind === "dm", + name: room.name, + }); + await Promise.all( + invitees.map((invitee) => + joinRoomWithRetry({ + accessToken: invitee.account.accessToken, + baseUrl: params.baseUrl, + fetchImpl: params.fetchImpl, + roomId, + }), + ), + ); + rooms.push({ + key: room.key, + kind: room.kind, + memberRoles: members.map((entry) => entry.role), + memberUserIds: members.map((entry) => entry.account.userId), + name: room.name, + requireMention: resolveProvisionedRoomRequireMention(room), + roomId, + }); + } const defaultRoom = findMatrixQaProvisionedRoom( { @@ -628,5 +649,6 @@ export const __testing = { buildMatrixQaMessageContent, buildMatrixReactionRelation, buildMatrixThreadRelation, + createMatrixQaRoomObserver, resolveNextRegistrationAuth, }; diff --git a/extensions/qa-matrix/src/substrate/config.test.ts b/extensions/qa-matrix/src/substrate/config.test.ts index dae92ecabde..5187165efdf 100644 --- a/extensions/qa-matrix/src/substrate/config.test.ts +++ b/extensions/qa-matrix/src/substrate/config.test.ts @@ -152,6 +152,39 @@ describe("matrix qa config", () => { }); }); + it("rewrites the owned Matrix QA account instead of retaining stale override fields", () => { + const overridden = buildMatrixQaConfig({} as OpenClawConfig, { + driverUserId: "@driver:matrix-qa.test", + homeserver: "http://127.0.0.1:28008/", + observerUserId: "@observer:matrix-qa.test", + overrides: { + autoJoin: "allowlist", + autoJoinAllowlist: ["!ops:matrix-qa.test"], + blockStreaming: true, + streaming: "quiet", + }, + sutAccessToken: "sut-token", + sutAccountId: "sut", + sutUserId: "@sut:matrix-qa.test", + topology, + }); + + const reset = buildMatrixQaConfig(overridden, { + driverUserId: "@driver:matrix-qa.test", + homeserver: "http://127.0.0.1:28008/", + observerUserId: "@observer:matrix-qa.test", + sutAccessToken: "sut-token", + sutAccountId: "sut", + sutUserId: "@sut:matrix-qa.test", + topology, + }); + + expect(reset.channels?.matrix?.accounts?.sut?.autoJoin).toBeUndefined(); + expect(reset.channels?.matrix?.accounts?.sut?.autoJoinAllowlist).toBeUndefined(); + expect(reset.channels?.matrix?.accounts?.sut?.blockStreaming).toBeUndefined(); + expect(reset.channels?.matrix?.accounts?.sut?.streaming).toBeUndefined(); + }); + it("builds an effective Matrix QA config snapshot for reporting", () => { const snapshot = buildMatrixQaConfigSnapshot({ driverUserId: "@driver:matrix-qa.test", diff --git a/extensions/qa-matrix/src/substrate/config.ts b/extensions/qa-matrix/src/substrate/config.ts index 1dc7be5125f..24838ff2185 100644 --- a/extensions/qa-matrix/src/substrate/config.ts +++ b/extensions/qa-matrix/src/substrate/config.ts @@ -250,7 +250,6 @@ function buildMatrixQaAccountDmConfig(params: { } function buildMatrixQaChannelAccountConfig(params: { - existingAccount?: MatrixQaChannelAccountConfig; groups: Record; homeserver: string; overrides?: MatrixQaConfigOverrides; @@ -274,7 +273,6 @@ function buildMatrixQaChannelAccountConfig(params: { params.overrides?.streaming !== undefined ? { streaming: params.overrides.streaming } : {}; return { - ...params.existingAccount, accessToken: params.sutAccessToken, ...(params.sutDeviceId ? { deviceId: params.sutDeviceId } : {}), dm: buildMatrixQaAccountDmConfig({ @@ -394,7 +392,6 @@ export function buildMatrixQaConfig( accounts: { ...baseCfg.channels?.matrix?.accounts, [params.sutAccountId]: buildMatrixQaChannelAccountConfig({ - existingAccount: baseCfg.channels?.matrix?.accounts?.[params.sutAccountId], groups, homeserver: params.homeserver, overrides: params.overrides, diff --git a/extensions/qa-matrix/src/substrate/sync.test.ts b/extensions/qa-matrix/src/substrate/sync.test.ts index e5042ec841f..c5eb3a31e28 100644 --- a/extensions/qa-matrix/src/substrate/sync.test.ts +++ b/extensions/qa-matrix/src/substrate/sync.test.ts @@ -1,6 +1,10 @@ import { describe, expect, it } from "vitest"; import type { MatrixQaObservedEvent } from "./events.js"; -import { primeMatrixQaRoom, waitForOptionalMatrixQaRoomEvent } from "./sync.js"; +import { + createMatrixQaRoomObserver, + primeMatrixQaRoom, + waitForOptionalMatrixQaRoomEvent, +} from "./sync.js"; describe("matrix sync helpers", () => { it("primes the Matrix sync cursor without recording observed events", async () => { @@ -141,4 +145,135 @@ describe("matrix sync helpers", () => { ]), ); }); + + it("lets a second wait reuse later same-batch events without another /sync", async () => { + let calls = 0; + const fetchImpl: typeof fetch = async () => { + calls += 1; + return new Response( + JSON.stringify({ + next_batch: "next-batch-2", + rooms: { + join: { + "!room:matrix-qa.test": { + timeline: { + events: [ + { + event_id: "$preview", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + content: { body: "preview", msgtype: "m.notice" }, + }, + { + event_id: "$final", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + content: { + body: "final", + msgtype: "m.text", + "m.relates_to": { + rel_type: "m.replace", + event_id: "$preview", + "m.new_content": { body: "final", msgtype: "m.text" }, + }, + }, + }, + ], + }, + }, + }, + }, + }), + { status: 200, headers: { "content-type": "application/json" } }, + ); + }; + const observedEvents: MatrixQaObservedEvent[] = []; + const observer = createMatrixQaRoomObserver({ + accessToken: "token", + baseUrl: "http://127.0.0.1:28008/", + fetchImpl, + observedEvents, + since: "start-batch", + }); + + const preview = await observer.waitForRoomEvent({ + predicate: (event) => event.eventId === "$preview", + roomId: "!room:matrix-qa.test", + timeoutMs: 1_000, + }); + const finalized = await observer.waitForRoomEvent({ + predicate: (event) => event.eventId === "$final", + roomId: "!room:matrix-qa.test", + timeoutMs: 1_000, + }); + + expect(preview.event.eventId).toBe("$preview"); + expect(finalized.event.eventId).toBe("$final"); + expect(calls).toBe(1); + }); + + it("shares one in-flight /sync poll across concurrent waits", async () => { + let calls = 0; + const fetchImpl: typeof fetch = async () => { + calls += 1; + await new Promise((resolve) => setTimeout(resolve, 10)); + return new Response( + JSON.stringify({ + next_batch: "next-batch-2", + rooms: { + join: { + "!room:matrix-qa.test": { + timeline: { + events: [ + { + event_id: "$reply", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + content: { body: "reply", msgtype: "m.text" }, + }, + { + event_id: "$notice", + sender: "@sut:matrix-qa.test", + type: "m.room.message", + content: { body: "notice", msgtype: "m.notice" }, + }, + ], + }, + }, + }, + }, + }), + { status: 200, headers: { "content-type": "application/json" } }, + ); + }; + const observer = createMatrixQaRoomObserver({ + accessToken: "token", + baseUrl: "http://127.0.0.1:28008/", + fetchImpl, + observedEvents: [], + since: "start-batch", + }); + + const [reply, notice] = await Promise.all([ + observer.waitForRoomEvent({ + predicate: (event) => event.eventId === "$reply", + roomId: "!room:matrix-qa.test", + timeoutMs: 1_000, + }), + observer.waitForOptionalRoomEvent({ + predicate: (event) => event.eventId === "$notice", + roomId: "!room:matrix-qa.test", + timeoutMs: 1_000, + }), + ]); + + expect(reply.event.eventId).toBe("$reply"); + expect(notice).toMatchObject({ + event: expect.objectContaining({ + eventId: "$notice", + }), + matched: true, + }); + expect(calls).toBe(1); + }); }); diff --git a/extensions/qa-matrix/src/substrate/sync.ts b/extensions/qa-matrix/src/substrate/sync.ts index ee3f3678807..9e72a20840c 100644 --- a/extensions/qa-matrix/src/substrate/sync.ts +++ b/extensions/qa-matrix/src/substrate/sync.ts @@ -30,21 +30,181 @@ export type MatrixQaRoomEventWaitResult = type MatrixQaSyncParams = { accessToken?: string; baseUrl: string; - fetchImpl: MatrixQaFetchLike; + fetchImpl?: MatrixQaFetchLike; +}; + +export type MatrixQaRoomObserver = { + prime(): Promise; + waitForOptionalRoomEvent(params: { + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; + timeoutMs: number; + }): Promise; + waitForRoomEvent(params: { + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; + timeoutMs: number; + }): Promise<{ + event: MatrixQaObservedEvent; + since?: string; + }>; +}; + +type MatrixQaRoomObserverState = { + cursorIndex: number; + events: MatrixQaObservedEvent[]; + pollPromise?: Promise; + since?: string; }; export async function primeMatrixQaRoom(params: MatrixQaSyncParams) { + const fetchImpl = params.fetchImpl ?? fetch; const response = await requestMatrixJson({ accessToken: params.accessToken, baseUrl: params.baseUrl, endpoint: "/_matrix/client/v3/sync", - fetchImpl: params.fetchImpl, + fetchImpl, method: "GET", query: { timeout: 0 }, }); return response.body.next_batch?.trim() || undefined; } +async function pollMatrixQaRoomObserver( + params: MatrixQaSyncParams & { + observedEvents: MatrixQaObservedEvent[]; + roomObserver: MatrixQaRoomObserverState; + timeoutMs: number; + }, +) { + const fetchImpl = params.fetchImpl ?? fetch; + if (params.roomObserver.pollPromise) { + await params.roomObserver.pollPromise; + return; + } + + params.roomObserver.pollPromise = (async () => { + const response = await requestMatrixJson({ + accessToken: params.accessToken, + baseUrl: params.baseUrl, + endpoint: "/_matrix/client/v3/sync", + fetchImpl, + method: "GET", + query: { + ...(params.roomObserver.since ? { since: params.roomObserver.since } : {}), + timeout: Math.min(10_000, params.timeoutMs), + }, + timeoutMs: Math.min(15_000, params.timeoutMs + 5_000), + }); + params.roomObserver.since = response.body.next_batch?.trim() || params.roomObserver.since; + for (const [roomId, joinedRoom] of Object.entries(response.body.rooms?.join ?? {})) { + for (const event of joinedRoom.timeline?.events ?? []) { + const normalized = normalizeMatrixQaObservedEvent(roomId, event); + if (!normalized) { + continue; + } + params.observedEvents.push(normalized); + params.roomObserver.events.push(normalized); + } + } + })(); + + try { + await params.roomObserver.pollPromise; + } finally { + params.roomObserver.pollPromise = undefined; + } +} + +function findObservedEventMatch(params: { + cursorIndex: number; + events: MatrixQaObservedEvent[]; + predicate: (event: MatrixQaObservedEvent) => boolean; + roomId: string; +}) { + for (let index = params.cursorIndex; index < params.events.length; index += 1) { + const event = params.events[index]; + if (event?.roomId !== params.roomId) { + continue; + } + if (params.predicate(event)) { + return { + event, + nextCursorIndex: index + 1, + }; + } + } + return undefined; +} + +export function createMatrixQaRoomObserver( + params: MatrixQaSyncParams & { + observedEvents: MatrixQaObservedEvent[]; + since?: string; + }, +): MatrixQaRoomObserver { + const roomObserver: MatrixQaRoomObserverState = { + cursorIndex: 0, + events: [], + since: params.since, + }; + + return { + async prime() { + if (roomObserver.since) { + return roomObserver.since; + } + roomObserver.since = await primeMatrixQaRoom(params); + return roomObserver.since; + }, + async waitForOptionalRoomEvent(waitParams) { + const startSince = await this.prime(); + const startedAt = Date.now(); + let cursorIndex = roomObserver.cursorIndex; + while (Date.now() - startedAt < waitParams.timeoutMs) { + const matched = findObservedEventMatch({ + cursorIndex, + events: roomObserver.events, + predicate: waitParams.predicate, + roomId: waitParams.roomId, + }); + if (matched) { + roomObserver.cursorIndex = Math.max(roomObserver.cursorIndex, matched.nextCursorIndex); + return { + event: matched.event, + matched: true, + since: roomObserver.since ?? startSince, + }; + } + + cursorIndex = roomObserver.events.length; + const remainingMs = Math.max(1_000, waitParams.timeoutMs - (Date.now() - startedAt)); + await pollMatrixQaRoomObserver({ + ...params, + observedEvents: params.observedEvents, + roomObserver, + timeoutMs: remainingMs, + }); + } + roomObserver.cursorIndex = Math.max(roomObserver.cursorIndex, cursorIndex); + return { + matched: false, + since: roomObserver.since ?? startSince, + }; + }, + async waitForRoomEvent(waitParams) { + const result = await this.waitForOptionalRoomEvent(waitParams); + if (result.matched) { + return { + event: result.event, + since: result.since, + }; + } + throw new Error(`timed out after ${waitParams.timeoutMs}ms waiting for Matrix room event`); + }, + }; +} + export async function waitForOptionalMatrixQaRoomEvent( params: MatrixQaSyncParams & { observedEvents: MatrixQaObservedEvent[]; @@ -54,40 +214,11 @@ export async function waitForOptionalMatrixQaRoomEvent( timeoutMs: number; }, ): Promise { - const startedAt = Date.now(); - let since = params.since; - while (Date.now() - startedAt < params.timeoutMs) { - const remainingMs = Math.max(1_000, params.timeoutMs - (Date.now() - startedAt)); - const response = await requestMatrixJson({ - accessToken: params.accessToken, - baseUrl: params.baseUrl, - endpoint: "/_matrix/client/v3/sync", - fetchImpl: params.fetchImpl, - method: "GET", - query: { - ...(since ? { since } : {}), - timeout: Math.min(10_000, remainingMs), - }, - timeoutMs: Math.min(15_000, remainingMs + 5_000), - }); - since = response.body.next_batch?.trim() || since; - const roomEvents = response.body.rooms?.join?.[params.roomId]?.timeline?.events ?? []; - let matchedEvent: MatrixQaObservedEvent | null = null; - for (const event of roomEvents) { - const normalized = normalizeMatrixQaObservedEvent(params.roomId, event); - if (!normalized) { - continue; - } - params.observedEvents.push(normalized); - if (matchedEvent === null && params.predicate(normalized)) { - matchedEvent = normalized; - } - } - if (matchedEvent) { - return { event: matchedEvent, matched: true, since }; - } - } - return { matched: false, since }; + return await createMatrixQaRoomObserver(params).waitForOptionalRoomEvent({ + predicate: params.predicate, + roomId: params.roomId, + timeoutMs: params.timeoutMs, + }); } export async function waitForMatrixQaRoomEvent(