QA: extend Matrix live contract coverage

This commit is contained in:
Gustavo Madeira Santana
2026-04-15 07:28:48 -04:00
parent 3830e687dd
commit 963ad1df06
15 changed files with 1439 additions and 293 deletions

View File

@@ -40,15 +40,19 @@ function createStubTransport(baseUrl = "http://127.0.0.1:43123") {
describe("startQaLiveLaneGateway", () => {
const gatewayStop = vi.fn();
const gatewayCall = vi.fn();
const mockStop = vi.fn();
beforeEach(() => {
gatewayStop.mockReset();
gatewayCall.mockReset();
mockStop.mockReset();
startQaGatewayChild.mockReset();
startQaMockOpenAiServer.mockReset();
startQaGatewayChild.mockResolvedValue({
call: gatewayCall,
cfg: {},
stop: gatewayStop,
});
startQaMockOpenAiServer.mockResolvedValue({

View File

@@ -1,5 +1,6 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { afterEach, describe, expect, it, vi } from "vitest";
import { renderQaMarkdownReport } from "../../report.js";
import { __testing as liveTesting } from "./runtime.js";
afterEach(() => {
@@ -228,6 +229,18 @@ describe("matrix live qa runtime", () => {
scenarios: [],
startedAt: "2026-04-10T10:00:00.000Z",
sutAccountId: "sut",
timings: {
artifactWriteMs: 5,
canaryMs: 40,
harnessBootMs: 100,
initialGatewayBootMs: 200,
provisioningMs: 300,
scenarioGatewayBootMs: 50,
scenarioRestartGatewayMs: 60,
scenarioTransportInterruptMs: 70,
scenarios: [],
totalMs: 825,
},
userIds: {
driver: "@driver:matrix-qa.test",
observer: "@observer:matrix-qa.test",
@@ -300,6 +313,27 @@ describe("matrix live qa runtime", () => {
],
startedAt: "2026-04-10T10:00:00.000Z",
sutAccountId: "sut",
timings: {
artifactWriteMs: 5,
canaryMs: 40,
harnessBootMs: 100,
initialGatewayBootMs: 200,
provisioningMs: 300,
scenarioGatewayBootMs: 50,
scenarioRestartGatewayMs: 60,
scenarioTransportInterruptMs: 70,
scenarios: [
{
durationMs: 80,
gatewayBootMs: 0,
gatewayRestartMs: 0,
id: "matrix-mention-gating",
title: "Matrix room message without mention does not trigger",
transportInterruptMs: 0,
},
],
totalMs: 905,
},
userIds: {
driver: "@driver:matrix-qa.test",
observer: "@observer:matrix-qa.test",
@@ -322,9 +356,128 @@ describe("matrix live qa runtime", () => {
},
},
],
timings: {
totalMs: 905,
},
});
});
it("keeps failing Matrix scenario details and timings complete in summary + report output", () => {
const summary = liveTesting.buildMatrixQaSummary({
artifactPaths: {
observedEvents: "/tmp/observed.json",
report: "/tmp/report.md",
summary: "/tmp/summary.json",
},
checks: [{ name: "Matrix harness ready", status: "pass" }],
config: {
default: liveTesting.buildMatrixQaConfigSnapshot({
driverUserId: "@driver:matrix-qa.test",
observerUserId: "@observer:matrix-qa.test",
sutUserId: "@sut:matrix-qa.test",
topology: {
defaultRoomId: "!room:matrix-qa.test",
defaultRoomKey: "main",
rooms: [],
},
}),
scenarios: [],
},
finishedAt: "2026-04-10T10:05:00.000Z",
harness: {
baseUrl: "http://127.0.0.1:28008/",
composeFile: "/tmp/docker-compose.yml",
dmRoomIds: [],
image: "ghcr.io/matrix-construct/tuwunel:v1.5.1",
roomId: "!room:matrix-qa.test",
roomIds: ["!room:matrix-qa.test"],
serverName: "matrix-qa.test",
},
observedEventCount: 6,
scenarios: [
{
id: "matrix-reaction-not-a-reply",
title: "Matrix reactions do not trigger a fresh bot reply",
status: "fail",
details: [
"unexpected SUT reply after reaction from @driver:matrix-qa.test",
"reaction event: $reaction",
"unexpected reply event: $reply",
].join("\n"),
},
],
startedAt: "2026-04-10T10:00:00.000Z",
sutAccountId: "sut",
timings: {
artifactWriteMs: 5,
canaryMs: 40,
harnessBootMs: 100,
initialGatewayBootMs: 200,
provisioningMs: 300,
scenarioGatewayBootMs: 50,
scenarioRestartGatewayMs: 60,
scenarioTransportInterruptMs: 70,
scenarios: [
{
durationMs: 8_000,
gatewayBootMs: 0,
gatewayRestartMs: 0,
id: "matrix-reaction-not-a-reply",
title: "Matrix reactions do not trigger a fresh bot reply",
transportInterruptMs: 0,
},
],
totalMs: 825,
},
userIds: {
driver: "@driver:matrix-qa.test",
observer: "@observer:matrix-qa.test",
sut: "@sut:matrix-qa.test",
},
});
expect(summary).toMatchObject({
counts: {
total: 2,
passed: 1,
failed: 1,
},
scenarios: [
{
id: "matrix-reaction-not-a-reply",
status: "fail",
details: expect.stringContaining("reaction event: $reaction"),
},
],
timings: {
scenarios: [
{
id: "matrix-reaction-not-a-reply",
durationMs: 8_000,
},
],
},
});
const report = renderQaMarkdownReport({
title: "Matrix QA Report",
startedAt: new Date(summary.startedAt),
finishedAt: new Date(summary.finishedAt),
checks: summary.checks,
scenarios: summary.scenarios.map((scenario) => ({
details: scenario.details,
name: scenario.title,
status: scenario.status,
})),
notes: [`observed events: ${summary.observedEventsPath}`],
});
expect(report).toContain("### Matrix reactions do not trigger a fresh bot reply");
expect(report).toContain("unexpected SUT reply after reaction from @driver:matrix-qa.test");
expect(report).toContain("reaction event: $reaction");
expect(report).toContain("observed events: /tmp/observed.json");
});
it("batches Matrix scenarios by config key while preserving stable in-group order", () => {
const scenarios = liveTesting.findMatrixQaScenarios([
"matrix-top-level-reply-shape",

View File

@@ -24,6 +24,7 @@ import {
import type { MatrixQaObservedEvent } from "../../substrate/events.js";
import { startMatrixQaHarness } from "../../substrate/harness.runtime.js";
import { resolveMatrixQaModels } from "./model-selection.js";
import type { MatrixQaSyncStreams } from "./scenario-runtime-shared.js";
import {
MATRIX_QA_SCENARIOS,
buildMatrixQaTopologyForScenarios,
@@ -101,6 +102,7 @@ type MatrixQaSummary = {
startedAt: string;
summaryPath: string;
sutAccountId: string;
timings: MatrixQaTimings;
userIds: {
driver: string;
observer: string;
@@ -114,6 +116,50 @@ type MatrixQaArtifactPaths = {
summary: string;
};
type MatrixQaScenarioTiming = {
durationMs: number;
gatewayBootMs: number;
gatewayRestartMs: number;
id: string;
title: string;
transportInterruptMs: number;
};
type MatrixQaTimings = {
artifactWriteMs: number;
canaryMs?: number;
harnessBootMs: number;
initialGatewayBootMs: number;
provisioningMs: number;
scenarioGatewayBootMs: number;
scenarioRestartGatewayMs: number;
scenarioTransportInterruptMs: number;
scenarios: MatrixQaScenarioTiming[];
totalMs: number;
};
function shouldWriteMatrixQaProgress() {
const override = process.env.OPENCLAW_QA_MATRIX_PROGRESS;
if (override === "0") {
return false;
}
if (override === "1") {
return true;
}
return process.stderr.isTTY;
}
function formatMatrixQaDurationMs(durationMs: number) {
return durationMs >= 1_000 ? `${(durationMs / 1_000).toFixed(1)}s` : `${durationMs}ms`;
}
function writeMatrixQaProgress(message: string) {
if (!shouldWriteMatrixQaProgress()) {
return;
}
process.stderr.write(`[matrix-qa] ${message}\n`);
}
function countMatrixQaStatuses<T extends { status: "fail" | "pass" | "skip" }>(entries: T[]) {
return {
failed: entries.filter((entry) => entry.status === "fail").length,
@@ -221,6 +267,7 @@ function buildMatrixQaSummary(params: {
scenarios: MatrixQaScenarioResult[];
startedAt: string;
sutAccountId: string;
timings: MatrixQaTimings;
userIds: MatrixQaSummary["userIds"];
}): MatrixQaSummary {
const checkCounts = countMatrixQaStatuses(params.checks);
@@ -244,10 +291,20 @@ function buildMatrixQaSummary(params: {
startedAt: params.startedAt,
summaryPath: params.artifactPaths.summary,
sutAccountId: params.sutAccountId,
timings: params.timings,
userIds: params.userIds,
};
}
async function measureMatrixQaStep<T>(step: () => Promise<T>) {
const startedAtMs = Date.now();
const result = await step();
return {
durationMs: Date.now() - startedAtMs,
result,
};
}
function isMatrixAccountReady(entry?: {
connected?: boolean;
healthState?: string;
@@ -357,27 +414,41 @@ export async function runMatrixQaLive(params: {
const includeObservedEventContent = process.env.OPENCLAW_QA_MATRIX_CAPTURE_CONTENT === "1";
const startedAtDate = new Date();
const startedAt = startedAtDate.toISOString();
const runStartedAtMs = Date.now();
writeMatrixQaProgress(
`suite start scenarios=${scenarios.length} provider=${providerMode} output=${outputDir}`,
);
const harness = await startMatrixQaHarness({
outputDir: path.join(outputDir, "matrix-harness"),
repoRoot,
});
const provisioning: MatrixQaProvisionResult = await (async () => {
const { durationMs: harnessBootMs, result: harness } = await measureMatrixQaStep(() =>
startMatrixQaHarness({
outputDir: path.join(outputDir, "matrix-harness"),
repoRoot,
}),
);
writeMatrixQaProgress(
`harness ready ${formatMatrixQaDurationMs(harnessBootMs)} baseUrl=${harness.baseUrl}`,
);
const { durationMs: provisioningMs, result: provisioning } = await (async () => {
try {
return await provisionMatrixQaRoom({
baseUrl: harness.baseUrl,
driverLocalpart: `qa-driver-${runSuffix}`,
observerLocalpart: `qa-observer-${runSuffix}`,
registrationToken: harness.registrationToken,
roomName: `OpenClaw Matrix QA ${runSuffix}`,
sutLocalpart: `qa-sut-${runSuffix}`,
topology,
});
return await measureMatrixQaStep(() =>
provisionMatrixQaRoom({
baseUrl: harness.baseUrl,
driverLocalpart: `qa-driver-${runSuffix}`,
observerLocalpart: `qa-observer-${runSuffix}`,
registrationToken: harness.registrationToken,
roomName: `OpenClaw Matrix QA ${runSuffix}`,
sutLocalpart: `qa-sut-${runSuffix}`,
topology,
}),
);
} catch (error) {
await harness.stop().catch(() => {});
throw error;
}
})();
writeMatrixQaProgress(
`topology ready ${formatMatrixQaDurationMs(provisioningMs)} rooms=${provisioning.topology.rooms.length}`,
);
const checks: QaReportCheck[] = [
{
@@ -401,6 +472,13 @@ export async function runMatrixQaLive(params: {
let gatewayHarnessKey: string | null = null;
let canaryFailed = false;
const syncState: { driver?: string; observer?: string } = {};
const syncStreams: MatrixQaSyncStreams = {};
let canaryMs: number | undefined;
let initialGatewayBootMs = 0;
let scenarioGatewayBootMs = 0;
let scenarioRestartGatewayMs = 0;
let scenarioTransportInterruptMs = 0;
const scenarioTimings: MatrixQaScenarioTiming[] = [];
const gatewayConfigParams = {
driverUserId: provisioning.driver.userId,
homeserver: harness.baseUrl,
@@ -420,38 +498,53 @@ export async function runMatrixQaLive(params: {
const ensureGatewayHarness = async (overrides?: MatrixQaConfigOverrides) => {
const nextKey = buildMatrixQaGatewayConfigKey(overrides);
if (gatewayHarness && gatewayHarnessKey === nextKey) {
return gatewayHarness;
return {
durationMs: 0,
harness: gatewayHarness,
};
}
if (gatewayHarness) {
await gatewayHarness.stop();
gatewayHarness = null;
gatewayHarnessKey = null;
gatewayHarnessKey = nextKey;
}
const started = await startMatrixQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:43123",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildMatrixQaConfig(cfg, {
...gatewayConfigParams,
overrides,
}),
writeMatrixQaProgress("gateway boot start");
const { durationMs, result: started } = await measureMatrixQaStep(async () => {
const nextHarness = await startMatrixQaLiveLaneGateway({
repoRoot,
transport: {
requiredPluginIds: [],
createGatewayConfig: () => ({}),
},
transportBaseUrl: "http://127.0.0.1:43123",
providerMode,
primaryModel,
alternateModel,
fastMode: params.fastMode,
controlUiEnabled: false,
mutateConfig: (cfg) =>
buildMatrixQaConfig(cfg, {
...gatewayConfigParams,
overrides,
}),
});
await waitForMatrixChannelReady(nextHarness.gateway, sutAccountId);
return nextHarness;
});
await waitForMatrixChannelReady(started.gateway, sutAccountId);
writeMatrixQaProgress(`gateway boot done ${formatMatrixQaDurationMs(durationMs)}`);
gatewayHarness = started;
gatewayHarnessKey = nextKey;
return started;
return {
durationMs,
harness: started,
};
};
gatewayHarness = await ensureGatewayHarness();
{
const ensured = await ensureGatewayHarness();
gatewayHarness = ensured.harness;
initialGatewayBootMs = ensured.durationMs;
}
checks.push({
name: "Matrix channel ready",
status: "pass",
@@ -459,15 +552,21 @@ export async function runMatrixQaLive(params: {
});
try {
const canary = await runMatrixQaCanary({
baseUrl: harness.baseUrl,
driverAccessToken: provisioning.driver.accessToken,
observedEvents,
roomId: provisioning.roomId,
syncState,
sutUserId: provisioning.sut.userId,
timeoutMs: 45_000,
});
writeMatrixQaProgress("canary start");
const canaryMeasured = await measureMatrixQaStep(() =>
runMatrixQaCanary({
baseUrl: harness.baseUrl,
driverAccessToken: provisioning.driver.accessToken,
observedEvents,
roomId: provisioning.roomId,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: 45_000,
}),
);
canaryMs = canaryMeasured.durationMs;
const canary = canaryMeasured.result;
canaryArtifact = {
driverEventId: canary.driverEventId,
reply: canary.reply,
@@ -478,6 +577,7 @@ export async function runMatrixQaLive(params: {
status: "pass",
details: buildMatrixReplyDetails("reply", canary.reply).join("\n"),
});
writeMatrixQaProgress(`canary pass ${formatMatrixQaDurationMs(canaryMeasured.durationMs)}`);
} catch (error) {
canaryFailed = true;
checks.push({
@@ -485,6 +585,7 @@ export async function runMatrixQaLive(params: {
status: "fail",
details: formatErrorMessage(error),
});
writeMatrixQaProgress(`canary fail ${formatErrorMessage(error)}`);
}
if (!canaryFailed) {
@@ -495,36 +596,70 @@ export async function runMatrixQaLive(params: {
scenario,
});
scenarioConfigSnapshots[originalIndex] = scenarioConfigEntry;
let gatewayBootMs = 0;
let gatewayRestartMs = 0;
let transportInterruptMs = 0;
try {
writeMatrixQaProgress(`scenario start ${scenario.id}`);
const scenarioGateway = await ensureGatewayHarness(scenario.configOverrides);
const result = await runMatrixQaScenario(scenario, {
baseUrl: harness.baseUrl,
canary: canaryArtifact,
driverAccessToken: provisioning.driver.accessToken,
driverUserId: provisioning.driver.userId,
interruptTransport: async () => {
await harness.restartService();
await waitForMatrixChannelReady(scenarioGateway.gateway, sutAccountId, {
timeoutMs: 90_000,
});
},
observedEvents,
observerAccessToken: provisioning.observer.accessToken,
observerUserId: provisioning.observer.userId,
restartGateway: async () => {
if (!gatewayHarness) {
throw new Error("Matrix restart scenario requires a live gateway");
}
await scenarioGateway.gateway.restart();
await waitForMatrixChannelReady(scenarioGateway.gateway, sutAccountId);
},
roomId: provisioning.roomId,
sutAccessToken: provisioning.sut.accessToken,
syncState,
sutUserId: provisioning.sut.userId,
timeoutMs: scenario.timeoutMs,
topology: provisioning.topology,
});
gatewayBootMs = scenarioGateway.durationMs;
scenarioGatewayBootMs += gatewayBootMs;
const measuredScenario = await measureMatrixQaStep(() =>
runMatrixQaScenario(scenario, {
baseUrl: harness.baseUrl,
canary: canaryArtifact,
driverAccessToken: provisioning.driver.accessToken,
driverUserId: provisioning.driver.userId,
interruptTransport: async () => {
writeMatrixQaProgress(`transport interrupt start ${scenario.id}`);
const measuredInterrupt = await measureMatrixQaStep(async () => {
await harness.restartService();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId, {
timeoutMs: 90_000,
});
});
transportInterruptMs += measuredInterrupt.durationMs;
scenarioTransportInterruptMs += measuredInterrupt.durationMs;
writeMatrixQaProgress(
`transport interrupt done ${scenario.id} ${formatMatrixQaDurationMs(measuredInterrupt.durationMs)}`,
);
},
observedEvents,
observerAccessToken: provisioning.observer.accessToken,
observerUserId: provisioning.observer.userId,
restartGateway: async () => {
if (!gatewayHarness) {
throw new Error("Matrix restart scenario requires a live gateway");
}
writeMatrixQaProgress(`gateway restart start ${scenario.id}`);
const measuredRestart = await measureMatrixQaStep(async () => {
await scenarioGateway.harness.gateway.restart();
await waitForMatrixChannelReady(scenarioGateway.harness.gateway, sutAccountId);
});
gatewayRestartMs += measuredRestart.durationMs;
scenarioRestartGatewayMs += measuredRestart.durationMs;
writeMatrixQaProgress(
`gateway restart done ${scenario.id} ${formatMatrixQaDurationMs(measuredRestart.durationMs)}`,
);
},
roomId: provisioning.roomId,
sutAccessToken: provisioning.sut.accessToken,
syncState,
syncStreams,
sutUserId: provisioning.sut.userId,
timeoutMs: scenario.timeoutMs,
topology: provisioning.topology,
}),
);
const result = measuredScenario.result;
scenarioTimings[originalIndex] = {
durationMs: measuredScenario.durationMs,
gatewayBootMs,
gatewayRestartMs,
id: scenario.id,
title: scenario.title,
transportInterruptMs,
};
scenarioResults[originalIndex] = buildMatrixQaScenarioResult({
artifacts: result.artifacts,
configSummary: scenarioConfigSummary,
@@ -532,13 +667,25 @@ export async function runMatrixQaLive(params: {
scenario,
status: "pass",
});
writeMatrixQaProgress(
`scenario pass ${scenario.id} ${formatMatrixQaDurationMs(measuredScenario.durationMs)}`,
);
} catch (error) {
scenarioTimings[originalIndex] = {
durationMs: 0,
gatewayBootMs,
gatewayRestartMs,
id: scenario.id,
title: scenario.title,
transportInterruptMs,
};
scenarioResults[originalIndex] = buildMatrixQaScenarioResult({
configSummary: scenarioConfigSummary,
details: formatErrorMessage(error),
scenario,
status: "fail",
});
writeMatrixQaProgress(`scenario fail ${scenario.id} ${formatErrorMessage(error)}`);
}
}
}
@@ -596,8 +743,10 @@ export async function runMatrixQaLive(params: {
`sut: ${provisioning.sut.userId}`,
`homeserver: ${harness.baseUrl}`,
`image: ${harness.image}`,
`timings: harness=${harnessBootMs}ms provisioning=${provisioningMs}ms gateway=${initialGatewayBootMs}ms canary=${canaryMs ?? 0}ms`,
],
});
const artifactWriteStartedAtMs = Date.now();
const summary: MatrixQaSummary = buildMatrixQaSummary({
artifactPaths,
canary: canaryArtifact,
@@ -622,6 +771,18 @@ export async function runMatrixQaLive(params: {
scenarios: completedScenarioResults,
startedAt,
sutAccountId,
timings: {
artifactWriteMs: 0,
canaryMs,
harnessBootMs,
initialGatewayBootMs,
provisioningMs,
scenarioGatewayBootMs,
scenarioRestartGatewayMs,
scenarioTransportInterruptMs,
scenarios: scenarioTimings,
totalMs: Date.now() - runStartedAtMs,
},
userIds: {
driver: provisioning.driver.userId,
observer: provisioning.observer.userId,
@@ -630,10 +791,6 @@ export async function runMatrixQaLive(params: {
});
await fs.writeFile(reportPath, `${report}\n`, { encoding: "utf8", mode: 0o600 });
await fs.writeFile(summaryPath, `${JSON.stringify(summary, null, 2)}\n`, {
encoding: "utf8",
mode: 0o600,
});
await fs.writeFile(
observedEventsPath,
`${JSON.stringify(
@@ -646,6 +803,15 @@ export async function runMatrixQaLive(params: {
)}\n`,
{ encoding: "utf8", mode: 0o600 },
);
summary.timings.artifactWriteMs = Date.now() - artifactWriteStartedAtMs;
summary.timings.totalMs = Date.now() - runStartedAtMs;
await fs.writeFile(summaryPath, `${JSON.stringify(summary, null, 2)}\n`, {
encoding: "utf8",
mode: 0o600,
});
writeMatrixQaProgress(
`suite ${summary.counts.failed > 0 ? "fail" : "pass"} ${summary.counts.passed}/${summary.counts.total} total=${formatMatrixQaDurationMs(summary.timings.totalMs)}`,
);
const failedChecks = checks.filter(
(check) => check.status === "fail" && check.name !== "Matrix cleanup",

View File

@@ -14,6 +14,8 @@ import {
export type MatrixQaScenarioId =
| "matrix-thread-follow-up"
| "matrix-thread-root-preservation"
| "matrix-thread-nested-reply-shape"
| "matrix-thread-isolation"
| "matrix-top-level-reply-shape"
| "matrix-room-thread-reply-override"
@@ -27,6 +29,8 @@ export type MatrixQaScenarioId =
| "matrix-secondary-room-reply"
| "matrix-secondary-room-open-trigger"
| "matrix-reaction-notification"
| "matrix-reaction-threaded"
| "matrix-reaction-not-a-reply"
| "matrix-restart-resume"
| "matrix-room-membership-loss"
| "matrix-homeserver-restart-resume"
@@ -138,6 +142,16 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [
timeoutMs: 60_000,
title: "Matrix thread follow-up reply",
},
{
id: "matrix-thread-root-preservation",
timeoutMs: 60_000,
title: "Matrix threaded replies keep the original root event",
},
{
id: "matrix-thread-nested-reply-shape",
timeoutMs: 60_000,
title: "Matrix nested threaded replies keep fallback replies on the root event",
},
{
id: "matrix-thread-isolation",
standardId: "thread-isolation",
@@ -257,6 +271,16 @@ export const MATRIX_QA_SCENARIOS: MatrixQaScenarioDefinition[] = [
timeoutMs: 45_000,
title: "Matrix reactions on bot replies are observed",
},
{
id: "matrix-reaction-threaded",
timeoutMs: 45_000,
title: "Matrix reactions preserve threaded reply targets",
},
{
id: "matrix-reaction-not-a-reply",
timeoutMs: 8_000,
title: "Matrix reactions do not trigger a fresh bot reply",
},
{
id: "matrix-restart-resume",
standardId: "restart-resume",

View File

@@ -36,6 +36,7 @@ async function runDmSharedSessionFlow(params: {
observedEvents: params.context.observedEvents,
roomId: firstRoomId,
syncState: params.context.syncState,
syncStreams: params.context.syncStreams,
sutUserId: params.context.sutUserId,
timeoutMs: params.context.timeoutMs,
tokenPrefix: "MATRIX_QA_DM_PRIMARY",
@@ -45,11 +46,19 @@ async function runDmSharedSessionFlow(params: {
const replyClient = createMatrixQaScenarioClient({
accessToken: params.context.driverAccessToken,
actorId: "driver",
baseUrl: params.context.baseUrl,
observedEvents: params.context.observedEvents,
syncState: params.context.syncState,
syncStreams: params.context.syncStreams,
});
const noticeClient = createMatrixQaScenarioClient({
accessToken: params.context.driverAccessToken,
actorId: "driver",
baseUrl: params.context.baseUrl,
observedEvents: params.context.observedEvents,
syncState: params.context.syncState,
syncStreams: params.context.syncStreams,
});
const [replySince, noticeSince] = await Promise.all([
replyClient.primeRoom(),
@@ -139,6 +148,7 @@ export async function runDmThreadReplyOverrideScenario(context: MatrixQaScenario
event.relatesTo?.relType === "m.thread" && event.relatesTo?.eventId === params.driverEventId,
roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
tokenPrefix: "MATRIX_QA_DM_THREAD",

View File

@@ -26,28 +26,133 @@ import {
runTopologyScopedTopLevelScenario,
runTopLevelMentionScenario,
waitForMembershipEvent,
type MatrixQaActorId,
type MatrixQaScenarioContext,
type MatrixQaSyncState,
} from "./scenario-runtime-shared.js";
import type { MatrixQaCanaryArtifact, MatrixQaScenarioExecution } from "./scenario-types.js";
async function runThreadScenario(params: MatrixQaScenarioContext) {
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: params.driverAccessToken,
type MatrixQaThreadScenarioResult = Awaited<ReturnType<typeof runThreadScenario>>;
function assertMatrixQaInReplyTarget(params: {
actualEventId?: string;
expectedEventId: string;
label: string;
}) {
if (params.actualEventId !== params.expectedEventId) {
throw new Error(
`${params.label} targeted ${params.actualEventId ?? "<none>"} instead of ${params.expectedEventId}`,
);
}
}
function requireMatrixQaNestedThreadEvent(
nestedDriverEventId: string | undefined,
scenarioLabel: string,
) {
if (!nestedDriverEventId) {
throw new Error(`${scenarioLabel} did not create a nested trigger`);
}
return nestedDriverEventId;
}
function buildMatrixQaThreadArtifacts(result: MatrixQaThreadScenarioResult) {
return {
driverEventId: result.driverEventId,
reply: result.reply,
rootEventId: result.rootEventId,
token: result.token,
};
}
function buildMatrixQaThreadDetailLines(params: {
result: MatrixQaThreadScenarioResult;
includeNestedTrigger?: boolean;
extraLines?: string[];
replyLabel?: string;
}) {
return [
`thread root event: ${params.result.rootEventId}`,
...(params.includeNestedTrigger && params.result.nestedDriverEventId
? [`nested trigger event: ${params.result.nestedDriverEventId}`]
: []),
`mention trigger event: ${params.result.driverEventId}`,
...(params.extraLines ?? []),
...buildMatrixReplyDetails(params.replyLabel ?? "reply", params.result.reply),
];
}
async function primeMatrixQaDriverScenarioClient(context: MatrixQaScenarioContext) {
return await primeMatrixQaActorCursor({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: params.baseUrl,
syncState: params.syncState,
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
syncState: context.syncState,
syncStreams: context.syncStreams,
});
}
function createMatrixQaDriverScenarioClient(context: MatrixQaScenarioContext) {
return createMatrixQaScenarioClient({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
syncState: context.syncState,
syncStreams: context.syncStreams,
});
}
async function runAssertedDriverTopLevelScenario(params: {
context: MatrixQaScenarioContext;
label: string;
roomId?: string;
tokenPrefix: string;
}) {
const result = await runDriverTopLevelMentionScenario({
baseUrl: params.context.baseUrl,
driverAccessToken: params.context.driverAccessToken,
observedEvents: params.context.observedEvents,
roomId: params.roomId ?? params.context.roomId,
syncState: params.context.syncState,
syncStreams: params.context.syncStreams,
sutUserId: params.context.sutUserId,
timeoutMs: params.context.timeoutMs,
tokenPrefix: params.tokenPrefix,
});
assertTopLevelReplyArtifact(params.label, result.reply);
return result;
}
async function runThreadScenario(
params: MatrixQaScenarioContext,
options?: {
createNestedReply?: boolean;
tokenPrefix?: string;
},
) {
const { client, startSince } = await primeMatrixQaDriverScenarioClient(params);
const rootBody = `thread root ${randomUUID().slice(0, 8)}`;
const rootEventId = await client.sendTextMessage({
body: rootBody,
roomId: params.roomId,
});
const token = `MATRIX_QA_THREAD_${randomUUID().slice(0, 8).toUpperCase()}`;
const nestedDriverEventId =
options?.createNestedReply === true
? await client.sendTextMessage({
body: `thread nested ${randomUUID().slice(0, 8)}`,
replyToEventId: rootEventId,
roomId: params.roomId,
threadRootEventId: rootEventId,
})
: undefined;
const triggerEventId = nestedDriverEventId ?? rootEventId;
const token = `${options?.tokenPrefix ?? "MATRIX_QA_THREAD"}_${randomUUID().slice(0, 8).toUpperCase()}`;
const driverEventId = await client.sendTextMessage({
body: buildMentionPrompt(params.sutUserId, token),
mentionUserIds: [params.sutUserId],
replyToEventId: rootEventId,
replyToEventId: triggerEventId,
roomId: params.roomId,
threadRootEventId: rootEventId,
});
@@ -72,6 +177,7 @@ async function runThreadScenario(params: MatrixQaScenarioContext) {
});
return {
driverEventId,
nestedDriverEventId,
reply: buildMatrixReplyArtifact(matched.event, token),
rootEventId,
token,
@@ -84,6 +190,7 @@ export async function runMatrixQaCanary(params: {
observedEvents: MatrixQaObservedEvent[];
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaScenarioContext["syncStreams"];
sutUserId: string;
timeoutMs: number;
}): Promise<{
@@ -97,6 +204,7 @@ export async function runMatrixQaCanary(params: {
observedEvents: params.observedEvents,
roomId: params.roomId,
syncState: params.syncState,
syncStreams: params.syncStreams,
sutUserId: params.sutUserId,
timeoutMs: params.timeoutMs,
tokenPrefix: "MATRIX_QA_CANARY",
@@ -112,12 +220,7 @@ export async function runThreadFollowUpScenario(context: MatrixQaScenarioContext
label: "thread reply",
});
return {
artifacts: {
driverEventId: result.driverEventId,
reply: result.reply,
rootEventId: result.rootEventId,
token: result.token,
},
artifacts: buildMatrixQaThreadArtifacts(result),
details: [
`root event: ${result.rootEventId}`,
`driver thread event: ${result.driverEventId}`,
@@ -126,23 +229,74 @@ export async function runThreadFollowUpScenario(context: MatrixQaScenarioContext
} satisfies MatrixQaScenarioExecution;
}
export async function runThreadRootPreservationScenario(context: MatrixQaScenarioContext) {
const result = await runThreadScenario(context, {
createNestedReply: true,
tokenPrefix: "MATRIX_QA_THREAD_ROOT",
});
assertThreadReplyArtifact(result.reply, {
expectedRootEventId: result.rootEventId,
label: "thread root preservation reply",
});
requireMatrixQaNestedThreadEvent(
result.nestedDriverEventId,
"Matrix thread root preservation scenario",
);
return {
artifacts: buildMatrixQaThreadArtifacts(result),
details: buildMatrixQaThreadDetailLines({
result,
includeNestedTrigger: true,
extraLines: [
`reply thread root: ${result.reply.relatesTo?.eventId ?? "<none>"}`,
`reply in_reply_to: ${result.reply.relatesTo?.inReplyToId ?? "<none>"}`,
],
}).join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runThreadNestedReplyShapeScenario(context: MatrixQaScenarioContext) {
const result = await runThreadScenario(context, {
createNestedReply: true,
tokenPrefix: "MATRIX_QA_THREAD_NESTED",
});
assertThreadReplyArtifact(result.reply, {
expectedRootEventId: result.rootEventId,
label: "thread nested reply",
});
requireMatrixQaNestedThreadEvent(
result.nestedDriverEventId,
"Matrix thread nested reply scenario",
);
assertMatrixQaInReplyTarget({
actualEventId: result.reply.relatesTo?.inReplyToId,
expectedEventId: result.rootEventId,
label: "thread nested reply in_reply_to",
});
return {
artifacts: buildMatrixQaThreadArtifacts(result),
details: buildMatrixQaThreadDetailLines({
result,
includeNestedTrigger: true,
extraLines: [
`reply in_reply_to: ${result.reply.relatesTo?.inReplyToId ?? "<none>"}`,
`expected fallback root: ${result.rootEventId}`,
],
}).join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runThreadIsolationScenario(context: MatrixQaScenarioContext) {
const threadPhase = await runThreadScenario(context);
assertThreadReplyArtifact(threadPhase.reply, {
expectedRootEventId: threadPhase.rootEventId,
label: "thread isolation reply",
});
const topLevelPhase = await runDriverTopLevelMentionScenario({
baseUrl: context.baseUrl,
driverAccessToken: context.driverAccessToken,
observedEvents: context.observedEvents,
roomId: context.roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
const topLevelPhase = await runAssertedDriverTopLevelScenario({
context,
label: "top-level follow-up reply",
tokenPrefix: "MATRIX_QA_TOPLEVEL",
});
assertTopLevelReplyArtifact("top-level follow-up reply", topLevelPhase.reply);
return {
artifacts: {
threadDriverEventId: threadPhase.driverEventId,
@@ -164,17 +318,11 @@ export async function runThreadIsolationScenario(context: MatrixQaScenarioContex
}
export async function runTopLevelReplyShapeScenario(context: MatrixQaScenarioContext) {
const result = await runDriverTopLevelMentionScenario({
baseUrl: context.baseUrl,
driverAccessToken: context.driverAccessToken,
observedEvents: context.observedEvents,
roomId: context.roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
const result = await runAssertedDriverTopLevelScenario({
context,
label: "top-level reply",
tokenPrefix: "MATRIX_QA_TOPLEVEL",
});
assertTopLevelReplyArtifact("top-level reply", result.reply);
return {
artifacts: {
driverEventId: result.driverEventId,
@@ -198,6 +346,7 @@ export async function runRoomThreadReplyOverrideScenario(context: MatrixQaScenar
event.relatesTo?.relType === "m.thread" && event.relatesTo?.eventId === params.driverEventId,
roomId: context.roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
tokenPrefix: "MATRIX_QA_ROOM_THREAD",
@@ -225,7 +374,9 @@ export async function runObserverAllowlistOverrideScenario(context: MatrixQaScen
accessToken: context.observerAccessToken,
actorId: "observer",
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
syncState: context.syncState,
syncStreams: context.syncStreams,
});
const token = `MATRIX_QA_OBSERVER_ALLOWLIST_${randomUUID().slice(0, 8).toUpperCase()}`;
const body = buildMentionPrompt(context.sutUserId, token);
@@ -271,12 +422,7 @@ export async function runObserverAllowlistOverrideScenario(context: MatrixQaScen
}
export async function runQuietStreamingPreviewScenario(context: MatrixQaScenarioContext) {
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: context.baseUrl,
syncState: context.syncState,
});
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const finalText = `MATRIX_QA_QUIET_STREAM_${randomUUID().slice(0, 8).toUpperCase()} preview complete`;
const triggerBody = buildMatrixQuietStreamingPrompt(context.sutUserId, finalText);
const driverEventId = await client.sendTextMessage({
@@ -337,12 +483,7 @@ export async function runQuietStreamingPreviewScenario(context: MatrixQaScenario
export async function runBlockStreamingScenario(context: MatrixQaScenarioContext) {
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_BLOCK_ROOM_KEY);
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: context.baseUrl,
syncState: context.syncState,
});
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const firstText = `MATRIX_QA_BLOCK_ONE_${randomUUID().slice(0, 8).toUpperCase()}`;
const secondText = `MATRIX_QA_BLOCK_TWO_${randomUUID().slice(0, 8).toUpperCase()}`;
const triggerBody = buildMatrixBlockStreamingPrompt(context.sutUserId, firstText, secondText);
@@ -406,12 +547,7 @@ export async function runBlockStreamingScenario(context: MatrixQaScenarioContext
}
export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioContext) {
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: context.baseUrl,
syncState: context.syncState,
});
const { client, startSince } = await primeMatrixQaDriverScenarioClient(context);
const dynamicRoomId = await client.createPrivateRoom({
inviteUserIds: [context.observerUserId, context.sutUserId],
name: `Matrix QA AutoJoin ${randomUUID().slice(0, 8)}`,
@@ -435,18 +571,12 @@ export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioCon
startSince,
});
const result = await runTopLevelMentionScenario({
accessToken: context.driverAccessToken,
actorId: "driver",
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
const result = await runAssertedDriverTopLevelScenario({
context,
label: "auto-join room reply",
roomId: dynamicRoomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
tokenPrefix: "MATRIX_QA_AUTOJOIN",
});
assertTopLevelReplyArtifact("auto-join room reply", result.reply);
return {
artifacts: {
@@ -468,10 +598,7 @@ export async function runRoomAutoJoinInviteScenario(context: MatrixQaScenarioCon
export async function runMembershipLossScenario(context: MatrixQaScenarioContext) {
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_MEMBERSHIP_ROOM_KEY);
const driverClient = createMatrixQaScenarioClient({
accessToken: context.driverAccessToken,
baseUrl: context.baseUrl,
});
const driverClient = createMatrixQaDriverScenarioClient(context);
const sutClient = createMatrixQaScenarioClient({
accessToken: context.sutAccessToken,
baseUrl: context.baseUrl,
@@ -491,6 +618,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext
roomId,
stateKey: context.sutUserId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
@@ -505,6 +633,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext
observedEvents: context.observedEvents,
roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
sutUserId: context.sutUserId,
timeoutMs: Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs),
token: noReplyToken,
@@ -523,6 +652,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext
roomId,
stateKey: context.sutUserId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
await sutClient.joinRoom(roomId);
@@ -535,6 +665,7 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext
roomId,
stateKey: context.sutUserId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
@@ -566,52 +697,239 @@ export async function runMembershipLossScenario(context: MatrixQaScenarioContext
}
export async function runReactionNotificationScenario(context: MatrixQaScenarioContext) {
const reactionTargetEventId = context.canary?.reply.eventId?.trim();
if (!reactionTargetEventId) {
throw new Error("Matrix reaction scenario requires a canary reply event id");
}
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: context.driverAccessToken,
const reactionTargetEventId = requireMatrixQaReactionTargetEventId(
context.canary?.reply.eventId,
"Matrix reaction scenario",
);
const result = await observeReactionScenario({
actorId: "driver",
actorUserId: context.driverUserId,
accessToken: context.driverAccessToken,
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
reactionTargetEventId,
roomId: context.roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
const reactionEmoji = "👍";
return {
artifacts: buildMatrixQaReactionArtifacts({ reaction: result }),
details: buildMatrixQaReactionDetailLines({
actorUserId: result.actorUserId,
observedReactionKey: result.event.reaction?.key,
reactionEmoji: result.reactionEmoji,
reactionEventId: result.reactionEventId,
reactionTargetEventId: result.reactionTargetEventId,
}).join("\n"),
} satisfies MatrixQaScenarioExecution;
}
function buildMatrixQaReactionDetailLines(params: {
actorUserId?: string;
observedReactionKey?: string;
reactionEmoji: string;
reactionEventId: string;
reactionTargetEventId: string;
}) {
return [
`reaction event: ${params.reactionEventId}`,
`reaction target: ${params.reactionTargetEventId}`,
`reaction emoji: ${params.reactionEmoji}`,
...(params.actorUserId ? [`reaction sender: ${params.actorUserId}`] : []),
...(params.observedReactionKey ? [`observed reaction key: ${params.observedReactionKey}`] : []),
];
}
function requireMatrixQaReactionTargetEventId(
reactionTargetEventId: string | undefined,
scenarioLabel: string,
) {
const normalizedReactionTargetEventId = reactionTargetEventId?.trim();
if (!normalizedReactionTargetEventId) {
throw new Error(`${scenarioLabel} requires a canary reply event id`);
}
return normalizedReactionTargetEventId;
}
async function observeReactionScenario(params: {
actorId: MatrixQaActorId;
actorUserId: string;
accessToken: string;
baseUrl: string;
observedEvents: MatrixQaObservedEvent[];
reactionEmoji?: string;
reactionTargetEventId: string;
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaScenarioContext["syncStreams"];
timeoutMs: number;
}) {
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
});
const reactionEmoji = params.reactionEmoji ?? "👍";
const reactionEventId = await client.sendReaction({
emoji: reactionEmoji,
messageId: reactionTargetEventId,
roomId: context.roomId,
messageId: params.reactionTargetEventId,
roomId: params.roomId,
});
const matched = await client.waitForRoomEvent({
observedEvents: context.observedEvents,
observedEvents: params.observedEvents,
predicate: (event) =>
event.roomId === context.roomId &&
event.sender === context.driverUserId &&
event.roomId === params.roomId &&
event.sender === params.actorUserId &&
event.type === "m.reaction" &&
event.eventId === reactionEventId &&
event.reaction?.eventId === reactionTargetEventId &&
event.reaction?.eventId === params.reactionTargetEventId &&
event.reaction?.key === reactionEmoji,
roomId: context.roomId,
roomId: params.roomId,
since: startSince,
timeoutMs: params.timeoutMs,
});
return {
actorId: params.actorId,
actorUserId: params.actorUserId,
event: matched.event,
reactionEmoji,
reactionEventId,
reactionTargetEventId: params.reactionTargetEventId,
since: matched.since,
startSince,
};
}
function buildMatrixQaReactionArtifacts(params: {
actorUserId?: string;
expectedNoReplyWindowMs?: number;
reaction: Awaited<ReturnType<typeof observeReactionScenario>>;
}) {
return {
...(params.actorUserId ? { actorUserId: params.actorUserId } : {}),
...(params.expectedNoReplyWindowMs === undefined
? {}
: { expectedNoReplyWindowMs: params.expectedNoReplyWindowMs }),
reactionEmoji: params.reaction.reactionEmoji,
reactionEventId: params.reaction.reactionEventId,
reactionTargetEventId: params.reaction.reactionTargetEventId,
};
}
export async function runReactionThreadedScenario(context: MatrixQaScenarioContext) {
const thread = await runThreadScenario(context, {
createNestedReply: true,
tokenPrefix: "MATRIX_QA_REACTION_THREAD",
});
assertThreadReplyArtifact(thread.reply, {
expectedRootEventId: thread.rootEventId,
label: "threaded reaction reply",
});
const reaction = await observeReactionScenario({
actorId: "driver",
actorUserId: context.driverUserId,
accessToken: context.driverAccessToken,
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
reactionTargetEventId: thread.reply.eventId,
roomId: context.roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
advanceMatrixQaActorCursor({
actorId: "driver",
actorId: reaction.actorId,
syncState: context.syncState,
nextSince: matched.since,
startSince,
nextSince: reaction.since,
startSince: reaction.startSince,
});
return {
artifacts: {
reactionEmoji,
reactionEventId,
reactionTargetEventId,
driverEventId: thread.driverEventId,
...buildMatrixQaReactionArtifacts({ reaction }),
reply: thread.reply,
rootEventId: thread.rootEventId,
token: thread.token,
},
details: [
`reaction event: ${reactionEventId}`,
`reaction target: ${reactionTargetEventId}`,
`reaction emoji: ${reactionEmoji}`,
`observed reaction key: ${matched.event.reaction?.key ?? "<none>"}`,
...buildMatrixQaThreadDetailLines({
result: thread,
includeNestedTrigger: true,
extraLines: [`thread reply event: ${thread.reply.eventId}`],
replyLabel: "thread reply",
}),
...buildMatrixQaReactionDetailLines({
reactionEmoji: reaction.reactionEmoji,
reactionEventId: reaction.reactionEventId,
reactionTargetEventId: reaction.reactionTargetEventId,
}),
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}
export async function runReactionNotAReplyScenario(context: MatrixQaScenarioContext) {
const reactionTargetEventId = requireMatrixQaReactionTargetEventId(
context.canary?.reply.eventId,
"Matrix reaction no-reply scenario",
);
const reaction = await observeReactionScenario({
actorId: "driver",
actorUserId: context.driverUserId,
accessToken: context.driverAccessToken,
baseUrl: context.baseUrl,
observedEvents: context.observedEvents,
reactionTargetEventId,
roomId: context.roomId,
syncState: context.syncState,
syncStreams: context.syncStreams,
timeoutMs: context.timeoutMs,
});
const client = createMatrixQaDriverScenarioClient(context);
const noReplyWindowMs = Math.min(NO_REPLY_WINDOW_MS, context.timeoutMs);
const noReplyResult = await client.waitForOptionalRoomEvent({
observedEvents: context.observedEvents,
predicate: (event) =>
event.roomId === context.roomId &&
event.sender === context.sutUserId &&
event.type === "m.room.message",
roomId: context.roomId,
since: reaction.since,
timeoutMs: noReplyWindowMs,
});
if (noReplyResult.matched) {
const unexpectedReply = buildMatrixReplyArtifact(noReplyResult.event);
throw new Error(
[
`unexpected SUT reply after reaction from ${context.driverUserId}`,
`reaction target: ${reaction.reactionTargetEventId}`,
`reaction event: ${reaction.reactionEventId}`,
...buildMatrixReplyDetails("unexpected reply", unexpectedReply),
].join("\n"),
);
}
advanceMatrixQaActorCursor({
actorId: reaction.actorId,
syncState: context.syncState,
nextSince: noReplyResult.since,
startSince: reaction.startSince,
});
return {
artifacts: buildMatrixQaReactionArtifacts({
actorUserId: context.driverUserId,
expectedNoReplyWindowMs: noReplyWindowMs,
reaction,
}),
details: [
...buildMatrixQaReactionDetailLines({
reactionEmoji: reaction.reactionEmoji,
reactionEventId: reaction.reactionEventId,
reactionTargetEventId: reaction.reactionTargetEventId,
}),
`waited ${noReplyWindowMs}ms with no SUT reply`,
].join("\n"),
} satisfies MatrixQaScenarioExecution;
}
@@ -622,17 +940,12 @@ export async function runHomeserverRestartResumeScenario(context: MatrixQaScenar
}
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_HOMESERVER_ROOM_KEY);
await context.interruptTransport();
const resumed = await runDriverTopLevelMentionScenario({
baseUrl: context.baseUrl,
driverAccessToken: context.driverAccessToken,
observedEvents: context.observedEvents,
const resumed = await runAssertedDriverTopLevelScenario({
context,
label: "post-homeserver-restart reply",
roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
tokenPrefix: "MATRIX_QA_HOMESERVER",
});
assertTopLevelReplyArtifact("post-homeserver-restart reply", resumed.reply);
return {
artifacts: {
driverEventId: resumed.driverEventId,
@@ -656,17 +969,12 @@ export async function runRestartResumeScenario(context: MatrixQaScenarioContext)
}
const roomId = resolveMatrixQaScenarioRoomId(context, MATRIX_QA_RESTART_ROOM_KEY);
await context.restartGateway();
const result = await runDriverTopLevelMentionScenario({
baseUrl: context.baseUrl,
driverAccessToken: context.driverAccessToken,
observedEvents: context.observedEvents,
const result = await runAssertedDriverTopLevelScenario({
context,
label: "post-restart reply",
roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
tokenPrefix: "MATRIX_QA_RESTART",
});
assertTopLevelReplyArtifact("post-restart reply", result.reply);
return {
artifacts: {
driverEventId: result.driverEventId,

View File

@@ -1,6 +1,7 @@
import { randomUUID } from "node:crypto";
import { createMatrixQaClient } from "../../substrate/client.js";
import { createMatrixQaClient, type MatrixQaRoomObserver } from "../../substrate/client.js";
import type { MatrixQaObservedEvent } from "../../substrate/events.js";
import { createMatrixQaRoomObserver } from "../../substrate/sync.js";
import { type MatrixQaProvisionedTopology } from "../../substrate/topology.js";
import { resolveMatrixQaScenarioRoomId } from "./scenario-catalog.js";
import type {
@@ -12,6 +13,7 @@ import type {
export type MatrixQaActorId = "driver" | "observer";
export type MatrixQaSyncState = Partial<Record<MatrixQaActorId, string>>;
export type MatrixQaSyncStreams = Partial<Record<MatrixQaActorId, MatrixQaRoomObserver>>;
export type MatrixQaScenarioContext = {
baseUrl: string;
@@ -26,6 +28,7 @@ export type MatrixQaScenarioContext = {
interruptTransport?: () => Promise<void>;
sutAccessToken: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
sutUserId: string;
timeoutMs: number;
topology: MatrixQaProvisionedTopology;
@@ -147,15 +150,71 @@ export function writeMatrixQaSyncCursor(
}
}
function getOrCreateMatrixQaActorSyncStream(params: {
accessToken: string;
actorId: MatrixQaActorId;
baseUrl: string;
observedEvents: MatrixQaObservedEvent[];
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
}) {
const existingStream = params.syncStreams?.[params.actorId];
if (existingStream) {
return existingStream;
}
const stream = createMatrixQaRoomObserver({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
since: readMatrixQaSyncCursor(params.syncState, params.actorId),
});
if (params.syncStreams) {
params.syncStreams[params.actorId] = stream;
}
return stream;
}
export function createMatrixQaScenarioClient(params: {
accessToken: string;
actorId?: MatrixQaActorId;
baseUrl: string;
observedEvents?: MatrixQaObservedEvent[];
syncState?: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
}) {
const syncObserver =
params.actorId && params.observedEvents && params.syncState && params.syncStreams
? getOrCreateMatrixQaActorSyncStream({
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
})
: undefined;
return createMatrixQaClient({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
...(syncObserver ? { syncObserver } : {}),
});
}
export async function primeMatrixQaActorCursor(params: {
accessToken: string;
actorId: MatrixQaActorId;
baseUrl: string;
observedEvents: MatrixQaObservedEvent[];
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
}) {
const client = createMatrixQaClient({
const client = createMatrixQaScenarioClient({
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
});
const existingSince = readMatrixQaSyncCursor(params.syncState, params.actorId);
if (existingSince) {
@@ -177,13 +236,6 @@ export function advanceMatrixQaActorCursor(params: {
writeMatrixQaSyncCursor(params.syncState, params.actorId, params.nextSince ?? params.startSince);
}
export function createMatrixQaScenarioClient(params: { accessToken: string; baseUrl: string }) {
return createMatrixQaClient({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
});
}
export async function runConfigurableTopLevelScenario(params: {
accessToken: string;
actorId: MatrixQaActorId;
@@ -195,6 +247,7 @@ export async function runConfigurableTopLevelScenario(params: {
) => boolean;
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
sutUserId: string;
timeoutMs: number;
tokenPrefix: string;
@@ -204,7 +257,9 @@ export async function runConfigurableTopLevelScenario(params: {
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
});
const token = `${params.tokenPrefix}_${randomUUID().slice(0, 8).toUpperCase()}`;
const body =
@@ -249,6 +304,7 @@ export async function runTopLevelMentionScenario(params: {
observedEvents: MatrixQaObservedEvent[];
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
sutUserId: string;
timeoutMs: number;
tokenPrefix: string;
@@ -263,6 +319,7 @@ export async function runDriverTopLevelMentionScenario(params: {
observedEvents: MatrixQaObservedEvent[];
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
sutUserId: string;
timeoutMs: number;
tokenPrefix: string;
@@ -274,6 +331,7 @@ export async function runDriverTopLevelMentionScenario(params: {
observedEvents: params.observedEvents,
roomId: params.roomId,
syncState: params.syncState,
syncStreams: params.syncStreams,
sutUserId: params.sutUserId,
timeoutMs: params.timeoutMs,
tokenPrefix: params.tokenPrefix,
@@ -289,13 +347,16 @@ export async function waitForMembershipEvent(params: {
roomId: string;
stateKey: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
timeoutMs: number;
}) {
const { client, startSince } = await primeMatrixQaActorCursor({
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
});
const matched = await client.waitForRoomEvent({
observedEvents: params.observedEvents,
@@ -334,6 +395,7 @@ export async function runTopologyScopedTopLevelScenario(params: {
observedEvents: params.context.observedEvents,
roomId,
syncState: params.context.syncState,
syncStreams: params.context.syncStreams,
sutUserId: params.context.sutUserId,
timeoutMs: params.context.timeoutMs,
tokenPrefix: params.tokenPrefix,
@@ -369,6 +431,7 @@ export async function runNoReplyExpectedScenario(params: {
observedEvents: MatrixQaObservedEvent[];
roomId: string;
syncState: MatrixQaSyncState;
syncStreams?: MatrixQaSyncStreams;
sutUserId: string;
timeoutMs: number;
token: string;
@@ -377,7 +440,9 @@ export async function runNoReplyExpectedScenario(params: {
accessToken: params.accessToken,
actorId: params.actorId,
baseUrl: params.baseUrl,
observedEvents: params.observedEvents,
syncState: params.syncState,
syncStreams: params.syncStreams,
});
const driverEventId = await client.sendTextMessage({
body: params.body,

View File

@@ -16,12 +16,16 @@ import {
runMembershipLossScenario,
runObserverAllowlistOverrideScenario,
runQuietStreamingPreviewScenario,
runReactionNotAReplyScenario,
runReactionNotificationScenario,
runReactionThreadedScenario,
runRestartResumeScenario,
runRoomAutoJoinInviteScenario,
runRoomThreadReplyOverrideScenario,
runThreadFollowUpScenario,
runThreadIsolationScenario,
runThreadNestedReplyShapeScenario,
runThreadRootPreservationScenario,
runTopLevelReplyShapeScenario,
} from "./scenario-runtime-room.js";
import {
@@ -48,6 +52,52 @@ export {
};
export type { MatrixQaScenarioContext, MatrixQaSyncState };
async function runDriverTopologyScopedScenario(params: {
context: MatrixQaScenarioContext;
roomKey: string;
tokenPrefix: string;
withMention?: boolean;
}) {
return await runTopologyScopedTopLevelScenario({
accessToken: params.context.driverAccessToken,
actorId: "driver",
actorUserId: params.context.driverUserId,
context: params.context,
roomKey: params.roomKey,
tokenPrefix: params.tokenPrefix,
...(params.withMention === undefined ? {} : { withMention: params.withMention }),
});
}
function buildMatrixQaToken(prefix: string) {
return `${prefix}_${randomUUID().slice(0, 8).toUpperCase()}`;
}
async function runNoReplyScenario(params: {
accessToken: string;
actorId: "driver" | "observer";
actorUserId: string;
body: string;
context: MatrixQaScenarioContext;
mentionUserIds?: string[];
token: string;
}) {
return await runNoReplyExpectedScenario({
accessToken: params.accessToken,
actorId: params.actorId,
actorUserId: params.actorUserId,
baseUrl: params.context.baseUrl,
body: params.body,
...(params.mentionUserIds ? { mentionUserIds: params.mentionUserIds } : {}),
observedEvents: params.context.observedEvents,
roomId: params.context.roomId,
syncState: params.context.syncState,
sutUserId: params.context.sutUserId,
timeoutMs: params.context.timeoutMs,
token: params.token,
});
}
export async function runMatrixQaScenario(
scenario: MatrixQaScenarioDefinition,
context: MatrixQaScenarioContext,
@@ -55,6 +105,10 @@ export async function runMatrixQaScenario(
switch (scenario.id) {
case "matrix-thread-follow-up":
return await runThreadFollowUpScenario(context);
case "matrix-thread-root-preservation":
return await runThreadRootPreservationScenario(context);
case "matrix-thread-nested-reply-shape":
return await runThreadNestedReplyShapeScenario(context);
case "matrix-thread-isolation":
return await runThreadIsolationScenario(context);
case "matrix-top-level-reply-shape":
@@ -66,10 +120,7 @@ export async function runMatrixQaScenario(
case "matrix-room-block-streaming":
return await runBlockStreamingScenario(context);
case "matrix-dm-reply-shape":
return await runTopologyScopedTopLevelScenario({
accessToken: context.driverAccessToken,
actorId: "driver",
actorUserId: context.driverUserId,
return await runDriverTopologyScopedScenario({
context,
roomKey: MATRIX_QA_DRIVER_DM_ROOM_KEY,
tokenPrefix: "MATRIX_QA_DM",
@@ -84,19 +135,13 @@ export async function runMatrixQaScenario(
case "matrix-room-autojoin-invite":
return await runRoomAutoJoinInviteScenario(context);
case "matrix-secondary-room-reply":
return await runTopologyScopedTopLevelScenario({
accessToken: context.driverAccessToken,
actorId: "driver",
actorUserId: context.driverUserId,
return await runDriverTopologyScopedScenario({
context,
roomKey: MATRIX_QA_SECONDARY_ROOM_KEY,
tokenPrefix: "MATRIX_QA_SECONDARY",
});
case "matrix-secondary-room-open-trigger":
return await runTopologyScopedTopLevelScenario({
accessToken: context.driverAccessToken,
actorId: "driver",
actorUserId: context.driverUserId,
return await runDriverTopologyScopedScenario({
context,
roomKey: MATRIX_QA_SECONDARY_ROOM_KEY,
tokenPrefix: "MATRIX_QA_SECONDARY_OPEN",
@@ -104,6 +149,10 @@ export async function runMatrixQaScenario(
});
case "matrix-reaction-notification":
return await runReactionNotificationScenario(context);
case "matrix-reaction-threaded":
return await runReactionThreadedScenario(context);
case "matrix-reaction-not-a-reply":
return await runReactionNotAReplyScenario(context);
case "matrix-restart-resume":
return await runRestartResumeScenario(context);
case "matrix-room-membership-loss":
@@ -111,37 +160,27 @@ export async function runMatrixQaScenario(
case "matrix-homeserver-restart-resume":
return await runHomeserverRestartResumeScenario(context);
case "matrix-mention-gating": {
const token = `MATRIX_QA_NOMENTION_${randomUUID().slice(0, 8).toUpperCase()}`;
return await runNoReplyExpectedScenario({
const token = buildMatrixQaToken("MATRIX_QA_NOMENTION");
return await runNoReplyScenario({
accessToken: context.driverAccessToken,
actorId: "driver",
actorUserId: context.driverUserId,
baseUrl: context.baseUrl,
body: buildExactMarkerPrompt(token),
observedEvents: context.observedEvents,
roomId: context.roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
context,
token,
});
}
case "matrix-observer-allowlist-override":
return await runObserverAllowlistOverrideScenario(context);
case "matrix-allowlist-block": {
const token = `MATRIX_QA_ALLOWLIST_${randomUUID().slice(0, 8).toUpperCase()}`;
return await runNoReplyExpectedScenario({
const token = buildMatrixQaToken("MATRIX_QA_ALLOWLIST");
return await runNoReplyScenario({
accessToken: context.observerAccessToken,
actorId: "observer",
actorUserId: context.observerUserId,
baseUrl: context.baseUrl,
body: buildMentionPrompt(context.sutUserId, token),
mentionUserIds: [context.sutUserId],
observedEvents: context.observedEvents,
roomId: context.roomId,
syncState: context.syncState,
sutUserId: context.sutUserId,
timeoutMs: context.timeoutMs,
context,
token,
});
}

View File

@@ -25,6 +25,8 @@ describe("matrix live qa scenarios", () => {
it("ships the Matrix live QA scenario set by default", () => {
expect(scenarioTesting.findMatrixQaScenarios().map((scenario) => scenario.id)).toEqual([
"matrix-thread-follow-up",
"matrix-thread-root-preservation",
"matrix-thread-nested-reply-shape",
"matrix-thread-isolation",
"matrix-top-level-reply-shape",
"matrix-room-thread-reply-override",
@@ -38,6 +40,8 @@ describe("matrix live qa scenarios", () => {
"matrix-secondary-room-reply",
"matrix-secondary-room-open-trigger",
"matrix-reaction-notification",
"matrix-reaction-threaded",
"matrix-reaction-not-a-reply",
"matrix-restart-resume",
"matrix-room-membership-loss",
"matrix-homeserver-restart-resume",

View File

@@ -89,4 +89,59 @@ describe("matrix observed event artifacts", () => {
},
]);
});
it("keeps redaction metadata while still stripping Matrix event content", () => {
expect(
buildMatrixQaObservedEventsArtifact({
includeContent: false,
observedEvents: [
{
kind: "redaction",
roomId: "!room:matrix-qa.test",
eventId: "$redaction",
sender: "@driver:matrix-qa.test",
type: "m.room.redaction",
originServerTs: 1_700_000_000_123,
},
{
kind: "message",
roomId: "!room:matrix-qa.test",
eventId: "$message",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
body: "private body",
formattedBody: "<p>private body</p>",
msgtype: "m.text",
},
],
}),
).toEqual([
{
kind: "redaction",
roomId: "!room:matrix-qa.test",
eventId: "$redaction",
sender: "@driver:matrix-qa.test",
type: "m.room.redaction",
originServerTs: 1_700_000_000_123,
msgtype: undefined,
membership: undefined,
relatesTo: undefined,
mentions: undefined,
reaction: undefined,
},
{
kind: "message",
roomId: "!room:matrix-qa.test",
eventId: "$message",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
originServerTs: undefined,
msgtype: "m.text",
membership: undefined,
relatesTo: undefined,
mentions: undefined,
reaction: undefined,
},
]);
});
});

View File

@@ -4,9 +4,11 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import type { MatrixQaObservedEvent } from "./events.js";
import { requestMatrixJson, type MatrixQaFetchLike } from "./request.js";
import {
createMatrixQaRoomObserver,
primeMatrixQaRoom,
waitForMatrixQaRoomEvent,
waitForOptionalMatrixQaRoomEvent,
type MatrixQaRoomObserver,
type MatrixQaRoomEventWaitResult,
} from "./sync.js";
import {
@@ -18,7 +20,7 @@ import {
} from "./topology.js";
export type { MatrixQaObservedEvent } from "./events.js";
export type { MatrixQaRoomEventWaitResult } from "./sync.js";
export type { MatrixQaRoomEventWaitResult, MatrixQaRoomObserver } from "./sync.js";
type MatrixQaAuthStage = "m.login.dummy" | "m.login.registration_token";
@@ -261,8 +263,10 @@ export function createMatrixQaClient(params: {
accessToken?: string;
baseUrl: string;
fetchImpl?: MatrixQaFetchLike;
syncObserver?: MatrixQaRoomObserver;
}) {
const fetchImpl = params.fetchImpl ?? fetch;
const syncObserver = params.syncObserver;
return {
async createPrivateRoom(opts: { inviteUserIds: string[]; isDirect?: boolean; name: string }) {
@@ -294,6 +298,9 @@ export function createMatrixQaClient(params: {
return roomId;
},
async primeRoom() {
if (syncObserver) {
return await syncObserver.prime();
}
return await primeMatrixQaRoom({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
@@ -433,6 +440,13 @@ export function createMatrixQaClient(params: {
since?: string;
timeoutMs: number;
}) {
if (syncObserver) {
return syncObserver.waitForOptionalRoomEvent({
predicate: opts.predicate,
roomId: opts.roomId,
timeoutMs: opts.timeoutMs,
});
}
return waitForOptionalMatrixQaRoomEvent({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
@@ -447,6 +461,13 @@ export function createMatrixQaClient(params: {
since?: string;
timeoutMs: number;
}) {
if (syncObserver) {
return await syncObserver.waitForRoomEvent({
predicate: opts.predicate,
roomId: opts.roomId,
timeoutMs: opts.timeoutMs,
});
}
return await waitForMatrixQaRoomEvent({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
@@ -505,42 +526,42 @@ async function provisionMatrixQaTopology(params: {
fetchImpl?: MatrixQaFetchLike;
spec: MatrixQaTopologySpec;
}): Promise<MatrixQaProvisionedTopology> {
const rooms = await Promise.all(
params.spec.rooms.map(async (room) => {
const members = resolveTopologyMemberAccounts(params.accounts, room.members);
const creator = members[0];
const invitees = members.slice(1);
const creatorClient = createMatrixQaClient({
accessToken: creator.account.accessToken,
baseUrl: params.baseUrl,
fetchImpl: params.fetchImpl,
});
const roomId = await creatorClient.createPrivateRoom({
inviteUserIds: invitees.map((entry) => entry.account.userId),
isDirect: room.kind === "dm",
name: room.name,
});
await Promise.all(
invitees.map((invitee) =>
joinRoomWithRetry({
accessToken: invitee.account.accessToken,
baseUrl: params.baseUrl,
fetchImpl: params.fetchImpl,
roomId,
}),
),
);
return {
key: room.key,
kind: room.kind,
memberRoles: members.map((entry) => entry.role),
memberUserIds: members.map((entry) => entry.account.userId),
name: room.name,
requireMention: resolveProvisionedRoomRequireMention(room),
roomId,
};
}),
);
const rooms = [];
for (const room of params.spec.rooms) {
const members = resolveTopologyMemberAccounts(params.accounts, room.members);
const creator = members[0];
const invitees = members.slice(1);
const creatorClient = createMatrixQaClient({
accessToken: creator.account.accessToken,
baseUrl: params.baseUrl,
fetchImpl: params.fetchImpl,
});
const roomId = await creatorClient.createPrivateRoom({
inviteUserIds: invitees.map((entry) => entry.account.userId),
isDirect: room.kind === "dm",
name: room.name,
});
await Promise.all(
invitees.map((invitee) =>
joinRoomWithRetry({
accessToken: invitee.account.accessToken,
baseUrl: params.baseUrl,
fetchImpl: params.fetchImpl,
roomId,
}),
),
);
rooms.push({
key: room.key,
kind: room.kind,
memberRoles: members.map((entry) => entry.role),
memberUserIds: members.map((entry) => entry.account.userId),
name: room.name,
requireMention: resolveProvisionedRoomRequireMention(room),
roomId,
});
}
const defaultRoom = findMatrixQaProvisionedRoom(
{
@@ -628,5 +649,6 @@ export const __testing = {
buildMatrixQaMessageContent,
buildMatrixReactionRelation,
buildMatrixThreadRelation,
createMatrixQaRoomObserver,
resolveNextRegistrationAuth,
};

View File

@@ -152,6 +152,39 @@ describe("matrix qa config", () => {
});
});
it("rewrites the owned Matrix QA account instead of retaining stale override fields", () => {
const overridden = buildMatrixQaConfig({} as OpenClawConfig, {
driverUserId: "@driver:matrix-qa.test",
homeserver: "http://127.0.0.1:28008/",
observerUserId: "@observer:matrix-qa.test",
overrides: {
autoJoin: "allowlist",
autoJoinAllowlist: ["!ops:matrix-qa.test"],
blockStreaming: true,
streaming: "quiet",
},
sutAccessToken: "sut-token",
sutAccountId: "sut",
sutUserId: "@sut:matrix-qa.test",
topology,
});
const reset = buildMatrixQaConfig(overridden, {
driverUserId: "@driver:matrix-qa.test",
homeserver: "http://127.0.0.1:28008/",
observerUserId: "@observer:matrix-qa.test",
sutAccessToken: "sut-token",
sutAccountId: "sut",
sutUserId: "@sut:matrix-qa.test",
topology,
});
expect(reset.channels?.matrix?.accounts?.sut?.autoJoin).toBeUndefined();
expect(reset.channels?.matrix?.accounts?.sut?.autoJoinAllowlist).toBeUndefined();
expect(reset.channels?.matrix?.accounts?.sut?.blockStreaming).toBeUndefined();
expect(reset.channels?.matrix?.accounts?.sut?.streaming).toBeUndefined();
});
it("builds an effective Matrix QA config snapshot for reporting", () => {
const snapshot = buildMatrixQaConfigSnapshot({
driverUserId: "@driver:matrix-qa.test",

View File

@@ -250,7 +250,6 @@ function buildMatrixQaAccountDmConfig(params: {
}
function buildMatrixQaChannelAccountConfig(params: {
existingAccount?: MatrixQaChannelAccountConfig;
groups: Record<string, { enabled: boolean; requireMention: boolean }>;
homeserver: string;
overrides?: MatrixQaConfigOverrides;
@@ -274,7 +273,6 @@ function buildMatrixQaChannelAccountConfig(params: {
params.overrides?.streaming !== undefined ? { streaming: params.overrides.streaming } : {};
return {
...params.existingAccount,
accessToken: params.sutAccessToken,
...(params.sutDeviceId ? { deviceId: params.sutDeviceId } : {}),
dm: buildMatrixQaAccountDmConfig({
@@ -394,7 +392,6 @@ export function buildMatrixQaConfig(
accounts: {
...baseCfg.channels?.matrix?.accounts,
[params.sutAccountId]: buildMatrixQaChannelAccountConfig({
existingAccount: baseCfg.channels?.matrix?.accounts?.[params.sutAccountId],
groups,
homeserver: params.homeserver,
overrides: params.overrides,

View File

@@ -1,6 +1,10 @@
import { describe, expect, it } from "vitest";
import type { MatrixQaObservedEvent } from "./events.js";
import { primeMatrixQaRoom, waitForOptionalMatrixQaRoomEvent } from "./sync.js";
import {
createMatrixQaRoomObserver,
primeMatrixQaRoom,
waitForOptionalMatrixQaRoomEvent,
} from "./sync.js";
describe("matrix sync helpers", () => {
it("primes the Matrix sync cursor without recording observed events", async () => {
@@ -141,4 +145,135 @@ describe("matrix sync helpers", () => {
]),
);
});
it("lets a second wait reuse later same-batch events without another /sync", async () => {
let calls = 0;
const fetchImpl: typeof fetch = async () => {
calls += 1;
return new Response(
JSON.stringify({
next_batch: "next-batch-2",
rooms: {
join: {
"!room:matrix-qa.test": {
timeline: {
events: [
{
event_id: "$preview",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
content: { body: "preview", msgtype: "m.notice" },
},
{
event_id: "$final",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
content: {
body: "final",
msgtype: "m.text",
"m.relates_to": {
rel_type: "m.replace",
event_id: "$preview",
"m.new_content": { body: "final", msgtype: "m.text" },
},
},
},
],
},
},
},
},
}),
{ status: 200, headers: { "content-type": "application/json" } },
);
};
const observedEvents: MatrixQaObservedEvent[] = [];
const observer = createMatrixQaRoomObserver({
accessToken: "token",
baseUrl: "http://127.0.0.1:28008/",
fetchImpl,
observedEvents,
since: "start-batch",
});
const preview = await observer.waitForRoomEvent({
predicate: (event) => event.eventId === "$preview",
roomId: "!room:matrix-qa.test",
timeoutMs: 1_000,
});
const finalized = await observer.waitForRoomEvent({
predicate: (event) => event.eventId === "$final",
roomId: "!room:matrix-qa.test",
timeoutMs: 1_000,
});
expect(preview.event.eventId).toBe("$preview");
expect(finalized.event.eventId).toBe("$final");
expect(calls).toBe(1);
});
it("shares one in-flight /sync poll across concurrent waits", async () => {
let calls = 0;
const fetchImpl: typeof fetch = async () => {
calls += 1;
await new Promise((resolve) => setTimeout(resolve, 10));
return new Response(
JSON.stringify({
next_batch: "next-batch-2",
rooms: {
join: {
"!room:matrix-qa.test": {
timeline: {
events: [
{
event_id: "$reply",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
content: { body: "reply", msgtype: "m.text" },
},
{
event_id: "$notice",
sender: "@sut:matrix-qa.test",
type: "m.room.message",
content: { body: "notice", msgtype: "m.notice" },
},
],
},
},
},
},
}),
{ status: 200, headers: { "content-type": "application/json" } },
);
};
const observer = createMatrixQaRoomObserver({
accessToken: "token",
baseUrl: "http://127.0.0.1:28008/",
fetchImpl,
observedEvents: [],
since: "start-batch",
});
const [reply, notice] = await Promise.all([
observer.waitForRoomEvent({
predicate: (event) => event.eventId === "$reply",
roomId: "!room:matrix-qa.test",
timeoutMs: 1_000,
}),
observer.waitForOptionalRoomEvent({
predicate: (event) => event.eventId === "$notice",
roomId: "!room:matrix-qa.test",
timeoutMs: 1_000,
}),
]);
expect(reply.event.eventId).toBe("$reply");
expect(notice).toMatchObject({
event: expect.objectContaining({
eventId: "$notice",
}),
matched: true,
});
expect(calls).toBe(1);
});
});

View File

@@ -30,21 +30,181 @@ export type MatrixQaRoomEventWaitResult =
type MatrixQaSyncParams = {
accessToken?: string;
baseUrl: string;
fetchImpl: MatrixQaFetchLike;
fetchImpl?: MatrixQaFetchLike;
};
export type MatrixQaRoomObserver = {
prime(): Promise<string | undefined>;
waitForOptionalRoomEvent(params: {
predicate: (event: MatrixQaObservedEvent) => boolean;
roomId: string;
timeoutMs: number;
}): Promise<MatrixQaRoomEventWaitResult>;
waitForRoomEvent(params: {
predicate: (event: MatrixQaObservedEvent) => boolean;
roomId: string;
timeoutMs: number;
}): Promise<{
event: MatrixQaObservedEvent;
since?: string;
}>;
};
type MatrixQaRoomObserverState = {
cursorIndex: number;
events: MatrixQaObservedEvent[];
pollPromise?: Promise<void>;
since?: string;
};
export async function primeMatrixQaRoom(params: MatrixQaSyncParams) {
const fetchImpl = params.fetchImpl ?? fetch;
const response = await requestMatrixJson<MatrixQaSyncResponse>({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
endpoint: "/_matrix/client/v3/sync",
fetchImpl: params.fetchImpl,
fetchImpl,
method: "GET",
query: { timeout: 0 },
});
return response.body.next_batch?.trim() || undefined;
}
async function pollMatrixQaRoomObserver(
params: MatrixQaSyncParams & {
observedEvents: MatrixQaObservedEvent[];
roomObserver: MatrixQaRoomObserverState;
timeoutMs: number;
},
) {
const fetchImpl = params.fetchImpl ?? fetch;
if (params.roomObserver.pollPromise) {
await params.roomObserver.pollPromise;
return;
}
params.roomObserver.pollPromise = (async () => {
const response = await requestMatrixJson<MatrixQaSyncResponse>({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
endpoint: "/_matrix/client/v3/sync",
fetchImpl,
method: "GET",
query: {
...(params.roomObserver.since ? { since: params.roomObserver.since } : {}),
timeout: Math.min(10_000, params.timeoutMs),
},
timeoutMs: Math.min(15_000, params.timeoutMs + 5_000),
});
params.roomObserver.since = response.body.next_batch?.trim() || params.roomObserver.since;
for (const [roomId, joinedRoom] of Object.entries(response.body.rooms?.join ?? {})) {
for (const event of joinedRoom.timeline?.events ?? []) {
const normalized = normalizeMatrixQaObservedEvent(roomId, event);
if (!normalized) {
continue;
}
params.observedEvents.push(normalized);
params.roomObserver.events.push(normalized);
}
}
})();
try {
await params.roomObserver.pollPromise;
} finally {
params.roomObserver.pollPromise = undefined;
}
}
function findObservedEventMatch(params: {
cursorIndex: number;
events: MatrixQaObservedEvent[];
predicate: (event: MatrixQaObservedEvent) => boolean;
roomId: string;
}) {
for (let index = params.cursorIndex; index < params.events.length; index += 1) {
const event = params.events[index];
if (event?.roomId !== params.roomId) {
continue;
}
if (params.predicate(event)) {
return {
event,
nextCursorIndex: index + 1,
};
}
}
return undefined;
}
export function createMatrixQaRoomObserver(
params: MatrixQaSyncParams & {
observedEvents: MatrixQaObservedEvent[];
since?: string;
},
): MatrixQaRoomObserver {
const roomObserver: MatrixQaRoomObserverState = {
cursorIndex: 0,
events: [],
since: params.since,
};
return {
async prime() {
if (roomObserver.since) {
return roomObserver.since;
}
roomObserver.since = await primeMatrixQaRoom(params);
return roomObserver.since;
},
async waitForOptionalRoomEvent(waitParams) {
const startSince = await this.prime();
const startedAt = Date.now();
let cursorIndex = roomObserver.cursorIndex;
while (Date.now() - startedAt < waitParams.timeoutMs) {
const matched = findObservedEventMatch({
cursorIndex,
events: roomObserver.events,
predicate: waitParams.predicate,
roomId: waitParams.roomId,
});
if (matched) {
roomObserver.cursorIndex = Math.max(roomObserver.cursorIndex, matched.nextCursorIndex);
return {
event: matched.event,
matched: true,
since: roomObserver.since ?? startSince,
};
}
cursorIndex = roomObserver.events.length;
const remainingMs = Math.max(1_000, waitParams.timeoutMs - (Date.now() - startedAt));
await pollMatrixQaRoomObserver({
...params,
observedEvents: params.observedEvents,
roomObserver,
timeoutMs: remainingMs,
});
}
roomObserver.cursorIndex = Math.max(roomObserver.cursorIndex, cursorIndex);
return {
matched: false,
since: roomObserver.since ?? startSince,
};
},
async waitForRoomEvent(waitParams) {
const result = await this.waitForOptionalRoomEvent(waitParams);
if (result.matched) {
return {
event: result.event,
since: result.since,
};
}
throw new Error(`timed out after ${waitParams.timeoutMs}ms waiting for Matrix room event`);
},
};
}
export async function waitForOptionalMatrixQaRoomEvent(
params: MatrixQaSyncParams & {
observedEvents: MatrixQaObservedEvent[];
@@ -54,40 +214,11 @@ export async function waitForOptionalMatrixQaRoomEvent(
timeoutMs: number;
},
): Promise<MatrixQaRoomEventWaitResult> {
const startedAt = Date.now();
let since = params.since;
while (Date.now() - startedAt < params.timeoutMs) {
const remainingMs = Math.max(1_000, params.timeoutMs - (Date.now() - startedAt));
const response = await requestMatrixJson<MatrixQaSyncResponse>({
accessToken: params.accessToken,
baseUrl: params.baseUrl,
endpoint: "/_matrix/client/v3/sync",
fetchImpl: params.fetchImpl,
method: "GET",
query: {
...(since ? { since } : {}),
timeout: Math.min(10_000, remainingMs),
},
timeoutMs: Math.min(15_000, remainingMs + 5_000),
});
since = response.body.next_batch?.trim() || since;
const roomEvents = response.body.rooms?.join?.[params.roomId]?.timeline?.events ?? [];
let matchedEvent: MatrixQaObservedEvent | null = null;
for (const event of roomEvents) {
const normalized = normalizeMatrixQaObservedEvent(params.roomId, event);
if (!normalized) {
continue;
}
params.observedEvents.push(normalized);
if (matchedEvent === null && params.predicate(normalized)) {
matchedEvent = normalized;
}
}
if (matchedEvent) {
return { event: matchedEvent, matched: true, since };
}
}
return { matched: false, since };
return await createMatrixQaRoomObserver(params).waitForOptionalRoomEvent({
predicate: params.predicate,
roomId: params.roomId,
timeoutMs: params.timeoutMs,
});
}
export async function waitForMatrixQaRoomEvent(