From 305460b25fc673f707a238f180d93e58d80f1ee9 Mon Sep 17 00:00:00 2001 From: James Long Date: Thu, 16 Apr 2026 12:15:44 -0400 Subject: [PATCH] fix: add a few more tests for sync and session restore (#22837) --- packages/opencode/src/server/instance/sync.ts | 1 + packages/opencode/test/sync/index.test.ts | 48 +++ .../test/workspace/workspace-restore.test.ts | 280 ++++++++++++++++++ 3 files changed, 329 insertions(+) create mode 100644 packages/opencode/test/workspace/workspace-restore.test.ts diff --git a/packages/opencode/src/server/instance/sync.ts b/packages/opencode/src/server/instance/sync.ts index 633e77f10e..ac43b638eb 100644 --- a/packages/opencode/src/server/instance/sync.ts +++ b/packages/opencode/src/server/instance/sync.ts @@ -53,6 +53,7 @@ export const SyncRoutes = lazy(() => const body = c.req.valid("json") const events = body.events const source = events[0].aggregateID + log.info("sync replay requested", { sessionID: source, events: events.length, diff --git a/packages/opencode/test/sync/index.test.ts b/packages/opencode/test/sync/index.test.ts index 2ba716cac0..36429c3d84 100644 --- a/packages/opencode/test/sync/index.test.ts +++ b/packages/opencode/test/sync/index.test.ts @@ -187,5 +187,53 @@ describe("SyncEvent", () => { ).toThrow(/Unknown event type/) }), ) + + test( + "replayAll accepts later chunks after the first batch", + withInstance(() => { + const { Created } = setup() + const id = Identifier.descending("message") + + const one = SyncEvent.replayAll([ + { + id: "evt_1", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 0, + aggregateID: id, + data: { id, name: "first" }, + }, + { + id: "evt_2", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 1, + aggregateID: id, + data: { id, name: "second" }, + }, + ]) + + const two = SyncEvent.replayAll([ + { + id: "evt_3", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 2, + aggregateID: id, + data: { id, name: "third" }, + }, + { + id: "evt_4", + type: SyncEvent.versionedType(Created.type, Created.version), + seq: 3, + aggregateID: id, + data: { id, name: "fourth" }, + }, + ]) + + expect(one).toBe(id) + expect(two).toBe(id) + + const rows = Database.use((db) => db.select().from(EventTable).all()) + expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3]) + }), + ) }) }) diff --git a/packages/opencode/test/workspace/workspace-restore.test.ts b/packages/opencode/test/workspace/workspace-restore.test.ts new file mode 100644 index 0000000000..ee9ad059f8 --- /dev/null +++ b/packages/opencode/test/workspace/workspace-restore.test.ts @@ -0,0 +1,280 @@ +import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test" +import fs from "node:fs/promises" +import path from "node:path" +import { GlobalBus } from "../../src/bus/global" +import { registerAdaptor } from "../../src/control-plane/adaptors" +import type { WorkspaceAdaptor } from "../../src/control-plane/types" +import { Workspace } from "../../src/control-plane/workspace" +import { AppRuntime } from "../../src/effect/app-runtime" +import { Flag } from "../../src/flag/flag" +import { ModelID, ProviderID } from "../../src/provider/schema" +import { Instance } from "../../src/project/instance" +import { Session as SessionNs } from "../../src/session" +import { MessageV2 } from "../../src/session/message-v2" +import { MessageID, PartID, type SessionID } from "../../src/session/schema" +import { Database, asc, eq } from "../../src/storage" +import { SyncEvent } from "../../src/sync" +import { EventTable } from "../../src/sync/event.sql" +import { Log } from "../../src/util" +import { resetDatabase } from "../fixture/db" +import { tmpdir } from "../fixture/fixture" + +void Log.init({ print: false }) + +const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES + +beforeEach(() => { + Database.close() + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true +}) + +afterEach(async () => { + mock.restore() + await Instance.disposeAll() + // @ts-expect-error test override + Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original + await resetDatabase() +}) + +function create(input?: SessionNs.CreateInput) { + return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.create(input))) +} + +function get(id: SessionID) { + return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(id))) +} + +function updateMessage(msg: T) { + return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updateMessage(msg))) +} + +function updatePart(part: T) { + return AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.updatePart(part))) +} + +async function user(sessionID: SessionID, text: string) { + const msg = await updateMessage({ + id: MessageID.ascending(), + role: "user", + sessionID, + agent: "build", + model: { providerID: ProviderID.make("test"), modelID: ModelID.make("test") }, + time: { created: Date.now() }, + }) + await updatePart({ + id: PartID.ascending(), + sessionID, + messageID: msg.id, + type: "text", + text, + }) +} + +function remote(dir: string, url: string): WorkspaceAdaptor { + return { + name: "remote", + description: "remote", + configure(info) { + return { + ...info, + directory: dir, + } + }, + async create() { + await fs.mkdir(dir, { recursive: true }) + }, + async remove() {}, + target() { + return { + type: "remote" as const, + url, + } + }, + } +} + +function local(dir: string): WorkspaceAdaptor { + return { + name: "local", + description: "local", + configure(info) { + return { + ...info, + directory: dir, + } + }, + async create() { + await fs.mkdir(dir, { recursive: true }) + }, + async remove() {}, + target() { + return { + type: "local" as const, + directory: dir, + } + }, + } +} + +function eventStreamResponse() { + return new Response(new ReadableStream({ start() {} }), { + status: 200, + headers: { + "content-type": "text/event-stream", + }, + }) +} + +describe("Workspace.sessionRestore", () => { + test("replays session events in batches of 10 and emits progress", async () => { + await using tmp = await tmpdir({ git: true }) + const dir = path.join(tmp.path, ".restore") + const seen: any[] = [] + const posts: Array<{ + path: string + body: { directory: string; events: Array<{ seq: number; aggregateID: string }> } + }> = [] + const on = (evt: any) => seen.push(evt) + GlobalBus.on("event", on) + + const raw = globalThis.fetch + spyOn(globalThis, "fetch").mockImplementation( + Object.assign( + async (input: URL | RequestInfo, init?: BunFetchRequestInit | RequestInit) => { + const url = new URL(typeof input === "string" || input instanceof URL ? input : input.url) + if (url.pathname !== "/base/sync/replay") { + return eventStreamResponse() + } + const body = JSON.parse(String(init?.body)) + posts.push({ + path: url.pathname, + body, + }) + return Response.json({ sessionID: body.events[0].aggregateID }) + }, + { + preconnect: raw.preconnect?.bind(raw), + }, + ) as typeof globalThis.fetch, + ) + + try { + const setup = await Instance.provide({ + directory: tmp.path, + fn: async () => { + registerAdaptor(Instance.project.id, "worktree", remote(dir, "https://workspace.test/base")) + const space = await Workspace.create({ + type: "worktree", + branch: null, + extra: null, + projectID: Instance.project.id, + }) + const session = await create({}) + for (let i = 0; i < 6; i++) { + await user(session.id, `msg ${i}`) + } + const rows = Database.use((db) => + db + .select({ seq: EventTable.seq }) + .from(EventTable) + .where(eq(EventTable.aggregate_id, session.id)) + .orderBy(asc(EventTable.seq)) + .all(), + ) + const result = await Workspace.sessionRestore({ + workspaceID: space.id, + sessionID: session.id, + }) + return { space, session, rows, result } + }, + }) + + expect(setup.rows).toHaveLength(13) + expect(setup.result).toEqual({ total: 2 }) + expect(posts).toHaveLength(2) + expect(posts[0]?.path).toBe("/base/sync/replay") + expect(posts[1]?.path).toBe("/base/sync/replay") + expect(posts[0]?.body.directory).toBe(dir) + expect(posts[1]?.body.directory).toBe(dir) + expect(posts[0]?.body.events).toHaveLength(10) + expect(posts[1]?.body.events).toHaveLength(4) + expect(posts.flatMap((item) => item.body.events.map((event) => event.seq))).toEqual([ + ...setup.rows.map((row) => row.seq), + setup.rows.at(-1)!.seq + 1, + ]) + expect(posts[1]?.body.events.at(-1)).toMatchObject({ + aggregateID: setup.session.id, + seq: setup.rows.at(-1)!.seq + 1, + type: SyncEvent.versionedType(SessionNs.Event.Updated.type, SessionNs.Event.Updated.version), + data: { + sessionID: setup.session.id, + info: { + workspaceID: setup.space.id, + }, + }, + }) + + const restore = seen.filter( + (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type, + ) + expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2]) + expect(restore.map((evt) => evt.payload.properties.total)).toEqual([2, 2, 2]) + expect(restore.map((evt) => evt.payload.properties.sessionID)).toEqual([ + setup.session.id, + setup.session.id, + setup.session.id, + ]) + } finally { + GlobalBus.off("event", on) + } + }) + + test("replays locally without posting to a server", async () => { + await using tmp = await tmpdir({ git: true }) + const dir = path.join(tmp.path, ".restore-local") + const seen: any[] = [] + const on = (evt: any) => seen.push(evt) + GlobalBus.on("event", on) + + const fetch = spyOn(globalThis, "fetch") + const replayAll = spyOn(SyncEvent, "replayAll") + + try { + const setup = await Instance.provide({ + directory: tmp.path, + fn: async () => { + registerAdaptor(Instance.project.id, "local-restore", local(dir)) + const space = await Workspace.create({ + type: "local-restore", + branch: null, + extra: null, + projectID: Instance.project.id, + }) + const session = await create({}) + for (let i = 0; i < 6; i++) { + await user(session.id, `msg ${i}`) + } + const result = await Workspace.sessionRestore({ + workspaceID: space.id, + sessionID: session.id, + }) + const updated = await get(session.id) + return { space, session, result, updated } + }, + }) + + expect(setup.result).toEqual({ total: 2 }) + expect(fetch).not.toHaveBeenCalled() + expect(replayAll).toHaveBeenCalledTimes(2) + expect(setup.updated.workspaceID).toBe(setup.space.id) + + const restore = seen.filter( + (evt) => evt.workspace === setup.space.id && evt.payload.type === Workspace.Event.Restore.type, + ) + expect(restore.map((evt) => evt.payload.properties.step)).toEqual([0, 1, 2]) + } finally { + GlobalBus.off("event", on) + } + }) +})