Compare commits

...

4 Commits

Author SHA1 Message Date
Kit Langton
1c2b2e682b Remove covered workspace websocket todo 2026-04-30 16:46:43 -04:00
Kit Langton
4f5af93e44 Simplify SyncEvent service access 2026-04-30 16:39:15 -04:00
Kit Langton
bdefdc2306 Make SyncEvent layer canonical 2026-04-30 16:28:30 -04:00
Kit Langton
ec98b656fd Add SyncEvent service 2026-04-30 16:12:01 -04:00
12 changed files with 431 additions and 353 deletions

View File

@@ -169,6 +169,7 @@ export const layer = Layer.effect(
const auth = yield* Auth.Service
const session = yield* Session.Service
const http = yield* HttpClient.HttpClient
const sync = yield* SyncEvent.Service
const connections = new Map<WorkspaceID, ConnectionStatus>()
const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
@@ -307,25 +308,30 @@ export const layer = Layer.effect(
events: events.length,
})
yield* Effect.sync(() =>
WorkspaceContext.provide({
yield* Effect.promise(async () => {
await WorkspaceContext.provide({
workspaceID: space.id,
fn: () => {
for (const event of events) {
SyncEvent.replay(
{
id: event.id,
aggregateID: event.aggregate_id,
seq: event.seq,
type: event.type,
data: event.data,
},
{ publish: true },
)
}
async fn() {
await Effect.runPromise(
Effect.forEach(
events,
(event) =>
sync.replay(
{
id: event.id,
aggregateID: event.aggregate_id,
seq: event.seq,
type: event.type,
data: event.data,
},
{ publish: true },
),
{ discard: true },
),
)
},
}),
)
})
})
})
const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
@@ -361,16 +367,28 @@ export const layer = Layer.effect(
setStatus(space.id, "connected")
yield* parseSSE(stream, (evt) =>
Effect.sync(() => {
Effect.gen(function* () {
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
if (payload.type === "server.heartbeat") return
if (payload.type === "sync" && payload.syncEvent) {
const failed = yield* sync.replay(payload.syncEvent).pipe(
Effect.as(false),
Effect.catchCause((error) =>
Effect.sync(() => {
log.info("failed to replay global event", {
workspaceID: space.id,
error,
})
return true
}),
),
)
if (failed) return
}
try {
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
if (payload.type === "server.heartbeat") return
if (payload.type === "sync" && payload.syncEvent) {
SyncEvent.replay(payload.syncEvent)
}
const event = evt as { directory?: string; project?: string; payload: unknown }
GlobalBus.emit("event", {
directory: event.directory,
@@ -378,10 +396,10 @@ export const layer = Layer.effect(
workspace: space.id,
payload: event.payload,
})
} catch (err) {
} catch (error) {
log.info("failed to replay global event", {
workspaceID: space.id,
error: err,
error,
})
}
}),
@@ -516,14 +534,12 @@ export const layer = Layer.effect(
const adaptor = getAdaptor(space.projectID, space.type)
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, {
sessionID: input.sessionID,
info: {
workspaceID: input.workspaceID,
},
}),
)
yield* sync.run(Session.Event.Updated, {
sessionID: input.sessionID,
info: {
workspaceID: input.workspaceID,
},
})
const rows = yield* db((db) =>
db
@@ -593,7 +609,7 @@ export const layer = Layer.effect(
})
if (target.type === "local") {
SyncEvent.replayAll(events)
yield* sync.replayAll(events)
log.info("session restore batch replayed locally", {
workspaceID: input.workspaceID,
sessionID: input.sessionID,
@@ -812,6 +828,7 @@ export const layer = Layer.effect(
export const defaultLayer = layer.pipe(
Layer.provide(Auth.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
Layer.provide(FetchHttpClient.layer),
)

View File

@@ -47,6 +47,7 @@ import { Pty } from "@/pty"
import { Installation } from "@/installation"
import { ShareNext } from "@/share/share-next"
import { SessionShare } from "@/share/session"
import { SyncEvent } from "@/sync"
import { Npm } from "@opencode-ai/core/npm"
import { memoMap } from "@opencode-ai/core/effect/memo-map"
@@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll(
Installation.defaultLayer,
ShareNext.defaultLayer,
SessionShare.defaultLayer,
SyncEvent.defaultLayer,
).pipe(Layer.provideMerge(Observability.layer))
const rt = ManagedRuntime.make(AppLayer, { memoMap })

View File

@@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
Effect.gen(function* () {
const workspace = yield* Workspace.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const start = Effect.fn("SyncHttpApi.start")(function* () {
yield* workspace
@@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
last: events.at(-1)?.seq,
directory: ctx.payload.directory,
})
SyncEvent.replayAll(events)
yield* sync.replayAll(events)
log.info("sync replay complete", {
sessionID: source,
events: events.length,

View File

@@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary"
import { Todo } from "@/session/todo"
import { SessionShare } from "@/share/session"
import { Skill } from "@/skill"
import { SyncEvent } from "@/sync"
import { ToolRegistry } from "@/tool/registry"
import { lazy } from "@/util/lazy"
import { Vcs } from "@/project/vcs"
@@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe(
SessionRunState.defaultLayer,
SessionStatus.defaultLayer,
SessionSummary.defaultLayer,
SyncEvent.defaultLayer,
Skill.defaultLayer,
Todo.defaultLayer,
ToolRegistry.defaultLayer,

View File

@@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() =>
last: events.at(-1)?.seq,
directory: body.directory,
})
SyncEvent.replayAll(events)
await AppRuntime.runPromise(SyncEvent.use.replayAll(events))
log.info("sync replay complete", {
sessionID: source,

View File

@@ -38,6 +38,7 @@ export const layer = Layer.effect(
const bus = yield* Bus.Service
const summary = yield* SessionSummary.Service
const state = yield* SessionRunState.Service
const sync = yield* SyncEvent.Service
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
yield* state.assertNotBusy(input.sessionID)
@@ -121,7 +122,7 @@ export const layer = Layer.effect(
remove.push(msg)
}
for (const msg of remove) {
SyncEvent.run(MessageV2.Event.Removed, {
yield* sync.run(MessageV2.Event.Removed, {
sessionID,
messageID: msg.info.id,
})
@@ -133,7 +134,7 @@ export const layer = Layer.effect(
const removeParts = target.parts.slice(idx)
target.parts = target.parts.slice(0, idx)
for (const part of removeParts) {
SyncEvent.run(MessageV2.Event.PartRemoved, {
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID,
messageID: target.info.id,
partID: part.id,
@@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() =>
Layer.provide(Storage.defaultLayer),
Layer.provide(Bus.layer),
Layer.provide(SessionSummary.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
),
)

View File

@@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["dat
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
Effect.sync(() => Database.use(fn))
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const bus = yield* Bus.Service
const storage = yield* Storage.Service
const sync = yield* SyncEvent.Service
const createNext = Effect.fn("Session.createNext")(function* (input: {
id?: SessionID
@@ -477,7 +478,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}
log.info("created", result)
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
// This only exist for backwards compatibility. We should not be
@@ -525,10 +526,8 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
Effect.catchCause(() => Effect.succeed(false)),
)
yield* Effect.sync(() => {
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
SyncEvent.remove(sessionID)
})
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
yield* sync.remove(sessionID)
} catch (e) {
log.error(e)
}
@@ -536,19 +535,17 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
yield* sync.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
})
return part
}).pipe(Effect.withSpan("Session.updatePart"))
@@ -635,8 +632,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
return session
})
const patch = (sessionID: SessionID, info: Patch) =>
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
yield* patch(sessionID, { time: { updated: Date.now() } })
@@ -693,12 +689,10 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
sessionID: SessionID
messageID: MessageID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
}),
)
yield* sync.run(MessageV2.Event.Removed, {
sessionID: input.sessionID,
messageID: input.messageID,
})
return input.messageID
})
@@ -707,13 +701,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
messageID: MessageID
partID: PartID
}) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
}),
)
yield* sync.run(MessageV2.Event.PartRemoved, {
sessionID: input.sessionID,
messageID: input.messageID,
partID: input.partID,
})
return input.partID
})
@@ -764,7 +756,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
}),
)
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
export const defaultLayer = layer.pipe(
Layer.provide(Bus.layer),
Layer.provide(Storage.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
export function* list(input?: {
directory?: string

View File

@@ -21,20 +21,19 @@ export const layer = Layer.effect(
const session = yield* Session.Service
const shareNext = yield* ShareNext.Service
const scope = yield* Scope.Scope
const sync = yield* SyncEvent.Service
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
const conf = yield* cfg.get()
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
const result = yield* shareNext.create(sessionID)
yield* Effect.sync(() =>
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
)
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
return result
})
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
yield* shareNext.remove(sessionID)
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
})
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
@@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe(
Layer.provide(ShareNext.defaultLayer),
Layer.provide(Session.defaultLayer),
Layer.provide(Config.defaultLayer),
Layer.provide(SyncEvent.defaultLayer),
)
export * as SessionShare from "./session"

View File

@@ -9,9 +9,11 @@ import { EventSequenceTable, EventTable } from "./event.sql"
import { WorkspaceContext } from "@/control-plane/workspace-context"
import { EventID } from "./schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Schema as EffectSchema } from "effect"
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
import { zodObject } from "@/util/effect-zod"
import type { DeepMutable } from "@/util/schema"
import { makeRuntime } from "@/effect/run-service"
import { serviceUse } from "@/effect/service-use"
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
// when writing to the database. Bus payloads (`Properties`) stay readonly —
@@ -46,6 +48,125 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
export interface Interface {
readonly run: <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
options?: { publish?: boolean },
) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect<string | undefined>
readonly remove: (aggregateID: string) => Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
export const layer = Layer.effect(Service)(
Effect.gen(function* () {
const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) return
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
process(def, event, { publish: !!options?.publish })
})
const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) {
const source = events[0]?.aggregateID
if (!source) return undefined
if (events.some((item) => item.aggregateID !== source)) {
throw new Error("Replay events must belong to the same session")
}
const start = events[0].seq
for (const [i, item] of events.entries()) {
const seq = start + i
if (item.seq !== seq) {
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
}
}
for (const item of events) {
yield* replay(item, options)
}
return source
})
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
})
const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
})
return Service.of({
run,
replay,
replayAll,
remove,
})
}),
)
export const defaultLayer = layer
export const use = serviceUse(Service)
const runtime = makeRuntime(Service, defaultLayer)
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
@@ -186,92 +307,19 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
}
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.publish })
return runtime.runSync((sync) => sync.replay(event, options))
}
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
const source = events[0]?.aggregateID
if (!source) return
if (events.some((item) => item.aggregateID !== source)) {
throw new Error("Replay events must belong to the same session")
}
const start = events[0].seq
for (const [i, item] of events.entries()) {
const seq = start + i
if (item.seq !== seq) {
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
}
}
for (const item of events) {
replay(item, options)
}
return source
return runtime.runSync((sync) => sync.replayAll(events, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
const { publish = true } = options || {}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish })
},
{
behavior: "immediate",
},
)
return runtime.runSync((sync) => sync.run(def, data, options))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return runtime.runSync((sync) => sync.remove(aggregateID))
}
export function payloads() {

View File

@@ -1,4 +1,4 @@
import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
import fs from "node:fs/promises"
import Http from "node:http"
import path from "node:path"
@@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => {
})
})
test("local restore replays batches without fetch and emits progress", async () => {
await withInstance(async (dir) => {
const captured = captureGlobalEvents()
let fetchCallCount = 0
const replayAll = spyOn(SyncEvent, "replayAll")
try {
using server = Bun.serve({
port: 0,
fetch() {
fetchCallCount++
return Response.json({ ok: true })
},
})
const type = unique("restore-local")
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
insertWorkspace(info)
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
const session = await AppRuntime.runPromise(
SessionNs.Service.use((svc) => svc.create({ title: "restore local" })),
)
replaceSessionEvents(session.id, 20)
it.live("local restore replays batches and emits progress", () =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const workspace = yield* WorkspaceOld.Service
const sessionSvc = yield* SessionNs.Service
const captured = captureGlobalEvents()
try {
const type = unique("restore-local")
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
insertWorkspace(info)
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
const session = yield* sessionSvc.create({ title: "restore local" })
replaceSessionEvents(session.id, 20)
expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
expect(fetchCallCount).toBe(0)
expect(replayAll).toHaveBeenCalledTimes(3)
expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1])
expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(
info.id,
)
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
expect(
captured.events
.filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
.map((event) => event.payload.properties.step),
).toEqual([0, 1, 2, 3])
await removeWorkspace(info.id)
} finally {
captured.dispose()
}
})
})
expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({
total: 3,
})
expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id)
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
expect(
captured.events
.filter(
(event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type,
)
.map((event) => event.payload.properties.step),
).toEqual([0, 1, 2, 3])
yield* workspace.remove(info.id)
} finally {
captured.dispose()
}
}),
{ git: true },
),
)
it.live("session restore includes real message and part events in sequence order", () => {
const replay: FetchCall[] = []

View File

@@ -1,4 +1,4 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { afterEach, describe, expect, mock } from "bun:test"
import { NodeServices } from "@effect/platform-node"
import { mkdir } from "node:fs/promises"
import path from "node:path"
@@ -133,8 +133,6 @@ afterEach(async () => {
})
describe("workspace HttpApi", () => {
test.todo("proxies remote workspace websocket through real Effect listener", () => {})
it.live("serves read endpoints", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped({ git: true })

View File

@@ -1,16 +1,18 @@
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { tmpdir } from "../fixture/fixture"
import { Schema } from "effect"
import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test"
import { provideTmpdirInstance } from "../fixture/fixture"
import { Effect, Layer, Schema } from "effect"
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "@/storage/db"
import { EventTable } from "../../src/sync/event.sql"
import { Identifier } from "../../src/id/id"
import { MessageID } from "../../src/session/schema"
import { Flag } from "@opencode-ai/core/flag/flag"
import { initProjectors } from "../../src/server/projectors"
import { testEffect } from "../lib/effect"
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer))
beforeEach(() => {
Database.close()
@@ -22,19 +24,6 @@ afterEach(() => {
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
})
function withInstance(fn: () => void | Promise<void>) {
return async () => {
await using tmp = await tmpdir()
await Instance.provide({
directory: tmp.path,
fn: async () => {
await fn()
},
})
}
}
describe("SyncEvent", () => {
function setup() {
SyncEvent.reset()
@@ -59,179 +48,209 @@ describe("SyncEvent", () => {
return { Created, Sent }
}
function expectDefect<A, E, R>(effect: Effect.Effect<A, E, R>, pattern: RegExp) {
return Effect.gen(function* () {
const exit = yield* Effect.exit(effect)
if (exit._tag === "Success") throw new Error("Expected effect to fail")
expect(String(exit.cause)).toMatch(pattern)
})
}
afterAll(() => {
SyncEvent.reset()
initProjectors()
})
describe("run", () => {
test(
it.live(
"inserts event row",
withInstance(() => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
expect(rows[0].aggregate_id).toBe("evt_1")
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].type).toBe("item.created.1")
expect(rows[0].aggregate_id).toBe("evt_1")
}),
),
)
test(
it.live(
"increments seq per aggregate",
withInstance(() => {
const { Created } = setup()
SyncEvent.run(Created, { id: "evt_1", name: "first" })
SyncEvent.run(Created, { id: "evt_1", name: "second" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(2)
expect(rows[1].seq).toBe(rows[0].seq + 1)
}),
),
)
test(
it.live(
"uses custom aggregate field from agg()",
withInstance(() => {
const { Sent } = setup()
SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
}),
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Sent } = setup()
yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" })
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe("evt_1")
}),
),
)
test(
it.live(
"emits events",
withInstance(async () => {
const { Created } = setup()
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
const received = new Promise<void>((resolve) => {
Bus.subscribeAll((event) => {
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
const events: Array<{
type: string
properties: { id: string; name: string }
}> = []
let resolve = () => {}
const received = new Promise<void>((done) => {
resolve = done
})
const dispose = Bus.subscribeAll((event) => {
events.push(event)
resolve()
})
})
SyncEvent.run(Created, { id: "evt_1", name: "test" })
await received
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "evt_1",
name: "test",
},
})
}),
try {
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" })
yield* Effect.promise(() => received)
expect(events).toHaveLength(1)
expect(events[0]).toEqual({
type: "item.created",
properties: {
id: "evt_1",
name: "test",
},
})
} finally {
dispose()
}
}),
),
)
})
describe("replay", () => {
test(
it.live(
"inserts event from external payload",
withInstance(() => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "replayed" },
})
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
}),
)
test(
"throws on sequence mismatch",
withInstance(() => {
const id = Identifier.descending("message")
SyncEvent.replay({
id: "evt_1",
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "first" },
})
expect(() =>
SyncEvent.replay({
provideTmpdirInstance(() =>
Effect.gen(function* () {
const id = MessageID.ascending()
yield* SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
aggregateID: id,
data: { id, name: "bad" },
}),
).toThrow(/Sequence mismatch/)
}),
)
test(
"throws on unknown event type",
withInstance(() => {
expect(() =>
SyncEvent.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
).toThrow(/Unknown event type/)
}),
aggregateID: id,
data: { id, name: "replayed" },
})
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows).toHaveLength(1)
expect(rows[0].aggregate_id).toBe(id)
}),
),
)
test(
"replayAll accepts later chunks after the first batch",
withInstance(() => {
const { Created } = setup()
const id = Identifier.descending("message")
const one = SyncEvent.replayAll([
{
it.live(
"throws on sequence mismatch",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const id = MessageID.ascending()
yield* SyncEvent.use.replay({
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
type: "item.created.1",
seq: 0,
aggregateID: id,
data: { id, name: "first" },
},
{
id: "evt_2",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 1,
aggregateID: id,
data: { id, name: "second" },
},
])
})
yield* expectDefect(
SyncEvent.use.replay({
id: "evt_1",
type: "item.created.1",
seq: 5,
aggregateID: id,
data: { id, name: "bad" },
}),
/Sequence mismatch/,
)
}),
),
)
const two = SyncEvent.replayAll([
{
id: "evt_3",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 2,
aggregateID: id,
data: { id, name: "third" },
},
{
id: "evt_4",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 3,
aggregateID: id,
data: { id, name: "fourth" },
},
])
it.live(
"throws on unknown event type",
provideTmpdirInstance(() =>
Effect.gen(function* () {
yield* expectDefect(
SyncEvent.use.replay({
id: "evt_1",
type: "unknown.event.1",
seq: 0,
aggregateID: "x",
data: {},
}),
/Unknown event type/,
)
}),
),
)
expect(one).toBe(id)
expect(two).toBe(id)
it.live(
"replayAll accepts later chunks after the first batch",
provideTmpdirInstance(() =>
Effect.gen(function* () {
const { Created } = setup()
const id = MessageID.ascending()
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
}),
const one = yield* SyncEvent.use.replayAll([
{
id: "evt_1",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 0,
aggregateID: id,
data: { id, name: "first" },
},
{
id: "evt_2",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 1,
aggregateID: id,
data: { id, name: "second" },
},
])
const two = yield* SyncEvent.use.replayAll([
{
id: "evt_3",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 2,
aggregateID: id,
data: { id, name: "third" },
},
{
id: "evt_4",
type: SyncEvent.versionedType(Created.type, Created.version),
seq: 3,
aggregateID: id,
data: { id, name: "fourth" },
},
])
expect(one).toBe(id)
expect(two).toBe(id)
const rows = Database.use((db) => db.select().from(EventTable).all())
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
}),
),
)
})
})