mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-05-03 15:21:31 +08:00
Compare commits
4 Commits
kit/cli-fl
...
effect-syn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c2b2e682b | ||
|
|
4f5af93e44 | ||
|
|
bdefdc2306 | ||
|
|
ec98b656fd |
@@ -169,6 +169,7 @@ export const layer = Layer.effect(
|
||||
const auth = yield* Auth.Service
|
||||
const session = yield* Session.Service
|
||||
const http = yield* HttpClient.HttpClient
|
||||
const sync = yield* SyncEvent.Service
|
||||
const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const syncFibers = yield* FiberMap.make<WorkspaceID, void, SyncLoopError>()
|
||||
|
||||
@@ -307,25 +308,30 @@ export const layer = Layer.effect(
|
||||
events: events.length,
|
||||
})
|
||||
|
||||
yield* Effect.sync(() =>
|
||||
WorkspaceContext.provide({
|
||||
yield* Effect.promise(async () => {
|
||||
await WorkspaceContext.provide({
|
||||
workspaceID: space.id,
|
||||
fn: () => {
|
||||
for (const event of events) {
|
||||
SyncEvent.replay(
|
||||
{
|
||||
id: event.id,
|
||||
aggregateID: event.aggregate_id,
|
||||
seq: event.seq,
|
||||
type: event.type,
|
||||
data: event.data,
|
||||
},
|
||||
{ publish: true },
|
||||
)
|
||||
}
|
||||
async fn() {
|
||||
await Effect.runPromise(
|
||||
Effect.forEach(
|
||||
events,
|
||||
(event) =>
|
||||
sync.replay(
|
||||
{
|
||||
id: event.id,
|
||||
aggregateID: event.aggregate_id,
|
||||
seq: event.seq,
|
||||
type: event.type,
|
||||
data: event.data,
|
||||
},
|
||||
{ publish: true },
|
||||
),
|
||||
{ discard: true },
|
||||
),
|
||||
)
|
||||
},
|
||||
}),
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
const syncWorkspaceLoop = Effect.fn("Workspace.syncWorkspaceLoop")(function* (space: Info) {
|
||||
@@ -361,16 +367,28 @@ export const layer = Layer.effect(
|
||||
setStatus(space.id, "connected")
|
||||
|
||||
yield* parseSSE(stream, (evt) =>
|
||||
Effect.sync(() => {
|
||||
Effect.gen(function* () {
|
||||
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
|
||||
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
|
||||
if (payload.type === "server.heartbeat") return
|
||||
|
||||
if (payload.type === "sync" && payload.syncEvent) {
|
||||
const failed = yield* sync.replay(payload.syncEvent).pipe(
|
||||
Effect.as(false),
|
||||
Effect.catchCause((error) =>
|
||||
Effect.sync(() => {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
})
|
||||
return true
|
||||
}),
|
||||
),
|
||||
)
|
||||
if (failed) return
|
||||
}
|
||||
|
||||
try {
|
||||
if (!evt || typeof evt !== "object" || !("payload" in evt)) return
|
||||
const payload = evt.payload as { type?: string; syncEvent?: SyncEvent.SerializedEvent }
|
||||
if (payload.type === "server.heartbeat") return
|
||||
|
||||
if (payload.type === "sync" && payload.syncEvent) {
|
||||
SyncEvent.replay(payload.syncEvent)
|
||||
}
|
||||
|
||||
const event = evt as { directory?: string; project?: string; payload: unknown }
|
||||
GlobalBus.emit("event", {
|
||||
directory: event.directory,
|
||||
@@ -378,10 +396,10 @@ export const layer = Layer.effect(
|
||||
workspace: space.id,
|
||||
payload: event.payload,
|
||||
})
|
||||
} catch (err) {
|
||||
} catch (error) {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
error,
|
||||
})
|
||||
}
|
||||
}),
|
||||
@@ -516,14 +534,12 @@ export const layer = Layer.effect(
|
||||
const adaptor = getAdaptor(space.projectID, space.type)
|
||||
const target = yield* Effect.promise(() => Promise.resolve(adaptor.target(space)))
|
||||
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
workspaceID: input.workspaceID,
|
||||
},
|
||||
}),
|
||||
)
|
||||
yield* sync.run(Session.Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
workspaceID: input.workspaceID,
|
||||
},
|
||||
})
|
||||
|
||||
const rows = yield* db((db) =>
|
||||
db
|
||||
@@ -593,7 +609,7 @@ export const layer = Layer.effect(
|
||||
})
|
||||
|
||||
if (target.type === "local") {
|
||||
SyncEvent.replayAll(events)
|
||||
yield* sync.replayAll(events)
|
||||
log.info("session restore batch replayed locally", {
|
||||
workspaceID: input.workspaceID,
|
||||
sessionID: input.sessionID,
|
||||
@@ -812,6 +828,7 @@ export const layer = Layer.effect(
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Auth.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
Layer.provide(FetchHttpClient.layer),
|
||||
)
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ import { Pty } from "@/pty"
|
||||
import { Installation } from "@/installation"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { Npm } from "@opencode-ai/core/npm"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
@@ -97,6 +98,7 @@ export const AppLayer = Layer.mergeAll(
|
||||
Installation.defaultLayer,
|
||||
ShareNext.defaultLayer,
|
||||
SessionShare.defaultLayer,
|
||||
SyncEvent.defaultLayer,
|
||||
).pipe(Layer.provideMerge(Observability.layer))
|
||||
|
||||
const rt = ManagedRuntime.make(AppLayer, { memoMap })
|
||||
|
||||
@@ -21,6 +21,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
|
||||
Effect.gen(function* () {
|
||||
const workspace = yield* Workspace.Service
|
||||
const scope = yield* Scope.Scope
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const start = Effect.fn("SyncHttpApi.start")(function* () {
|
||||
yield* workspace
|
||||
@@ -45,7 +46,7 @@ export const syncHandlers = HttpApiBuilder.group(InstanceHttpApi, "sync", (handl
|
||||
last: events.at(-1)?.seq,
|
||||
directory: ctx.payload.directory,
|
||||
})
|
||||
SyncEvent.replayAll(events)
|
||||
yield* sync.replayAll(events)
|
||||
log.info("sync replay complete", {
|
||||
sessionID: source,
|
||||
events: events.length,
|
||||
|
||||
@@ -31,6 +31,7 @@ import { SessionSummary } from "@/session/summary"
|
||||
import { Todo } from "@/session/todo"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { Skill } from "@/skill"
|
||||
import { SyncEvent } from "@/sync"
|
||||
import { ToolRegistry } from "@/tool/registry"
|
||||
import { lazy } from "@/util/lazy"
|
||||
import { Vcs } from "@/project/vcs"
|
||||
@@ -147,6 +148,7 @@ export const routes = Layer.mergeAll(rootApiRoutes, instanceRoutes).pipe(
|
||||
SessionRunState.defaultLayer,
|
||||
SessionStatus.defaultLayer,
|
||||
SessionSummary.defaultLayer,
|
||||
SyncEvent.defaultLayer,
|
||||
Skill.defaultLayer,
|
||||
Todo.defaultLayer,
|
||||
ToolRegistry.defaultLayer,
|
||||
|
||||
@@ -94,7 +94,7 @@ export const SyncRoutes = lazy(() =>
|
||||
last: events.at(-1)?.seq,
|
||||
directory: body.directory,
|
||||
})
|
||||
SyncEvent.replayAll(events)
|
||||
await AppRuntime.runPromise(SyncEvent.use.replayAll(events))
|
||||
|
||||
log.info("sync replay complete", {
|
||||
sessionID: source,
|
||||
|
||||
@@ -38,6 +38,7 @@ export const layer = Layer.effect(
|
||||
const bus = yield* Bus.Service
|
||||
const summary = yield* SessionSummary.Service
|
||||
const state = yield* SessionRunState.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const revert = Effect.fn("SessionRevert.revert")(function* (input: RevertInput) {
|
||||
yield* state.assertNotBusy(input.sessionID)
|
||||
@@ -121,7 +122,7 @@ export const layer = Layer.effect(
|
||||
remove.push(msg)
|
||||
}
|
||||
for (const msg of remove) {
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
yield* sync.run(MessageV2.Event.Removed, {
|
||||
sessionID,
|
||||
messageID: msg.info.id,
|
||||
})
|
||||
@@ -133,7 +134,7 @@ export const layer = Layer.effect(
|
||||
const removeParts = target.parts.slice(idx)
|
||||
target.parts = target.parts.slice(0, idx)
|
||||
for (const part of removeParts) {
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
yield* sync.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID,
|
||||
messageID: target.info.id,
|
||||
partID: part.id,
|
||||
@@ -156,6 +157,7 @@ export const defaultLayer = Layer.suspend(() =>
|
||||
Layer.provide(Storage.defaultLayer),
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(SessionSummary.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -443,11 +443,12 @@ export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["dat
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> = Layer.effect(
|
||||
export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service | SyncEvent.Service> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const storage = yield* Storage.Service
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const createNext = Effect.fn("Session.createNext")(function* (input: {
|
||||
id?: SessionID
|
||||
@@ -477,7 +478,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
}
|
||||
log.info("created", result)
|
||||
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
|
||||
yield* sync.run(Event.Created, { sessionID: result.id, info: result })
|
||||
|
||||
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
// This only exist for backwards compatibility. We should not be
|
||||
@@ -525,10 +526,8 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
Effect.catchCause(() => Effect.succeed(false)),
|
||||
)
|
||||
|
||||
yield* Effect.sync(() => {
|
||||
SyncEvent.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
|
||||
SyncEvent.remove(sessionID)
|
||||
})
|
||||
yield* sync.run(Event.Deleted, { sessionID, info: session }, { publish: hasInstance })
|
||||
yield* sync.remove(sessionID)
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
}
|
||||
@@ -536,19 +535,17 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
|
||||
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
|
||||
yield* sync.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg })
|
||||
return msg
|
||||
}).pipe(Effect.withSpan("Session.updateMessage"))
|
||||
|
||||
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
})
|
||||
return part
|
||||
}).pipe(Effect.withSpan("Session.updatePart"))
|
||||
|
||||
@@ -635,8 +632,7 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
return session
|
||||
})
|
||||
|
||||
const patch = (sessionID: SessionID, info: Patch) =>
|
||||
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
|
||||
const patch = (sessionID: SessionID, info: Patch) => sync.run(Event.Updated, { sessionID, info })
|
||||
|
||||
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
|
||||
yield* patch(sessionID, { time: { updated: Date.now() } })
|
||||
@@ -693,12 +689,10 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
})
|
||||
return input.messageID
|
||||
})
|
||||
|
||||
@@ -707,13 +701,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
}),
|
||||
)
|
||||
yield* sync.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
})
|
||||
return input.partID
|
||||
})
|
||||
|
||||
@@ -764,7 +756,11 @@ export const layer: Layer.Layer<Service, never, Bus.Service | Storage.Service> =
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Storage.defaultLayer))
|
||||
export const defaultLayer = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Storage.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
)
|
||||
|
||||
export function* list(input?: {
|
||||
directory?: string
|
||||
|
||||
@@ -21,20 +21,19 @@ export const layer = Layer.effect(
|
||||
const session = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const scope = yield* Scope.Scope
|
||||
const sync = yield* SyncEvent.Service
|
||||
|
||||
const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) {
|
||||
const conf = yield* cfg.get()
|
||||
if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration")
|
||||
const result = yield* shareNext.create(sessionID)
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }),
|
||||
)
|
||||
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } })
|
||||
return result
|
||||
})
|
||||
|
||||
const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) {
|
||||
yield* shareNext.remove(sessionID)
|
||||
yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } }))
|
||||
yield* sync.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })
|
||||
})
|
||||
|
||||
const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) {
|
||||
@@ -54,6 +53,7 @@ export const defaultLayer = layer.pipe(
|
||||
Layer.provide(ShareNext.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(SyncEvent.defaultLayer),
|
||||
)
|
||||
|
||||
export * as SessionShare from "./session"
|
||||
|
||||
@@ -9,9 +9,11 @@ import { EventSequenceTable, EventTable } from "./event.sql"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { EventID } from "./schema"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { Schema as EffectSchema } from "effect"
|
||||
import { Context, Effect, Layer, Schema as EffectSchema } from "effect"
|
||||
import { zodObject } from "@/util/effect-zod"
|
||||
import type { DeepMutable } from "@/util/schema"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { serviceUse } from "@/effect/service-use"
|
||||
|
||||
// Keep `Event["data"]` mutable because projectors mutate the persisted shape
|
||||
// when writing to the database. Bus payloads (`Properties`) stay readonly —
|
||||
@@ -46,6 +48,125 @@ export type SerializedEvent<Def extends Definition = Definition> = Event<Def> &
|
||||
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
|
||||
type ConvertEvent = (type: string, data: Event["data"]) => unknown | Promise<unknown>
|
||||
|
||||
export interface Interface {
|
||||
readonly run: <Def extends Definition>(
|
||||
def: Def,
|
||||
data: Event<Def>["data"],
|
||||
options?: { publish?: boolean },
|
||||
) => Effect.Effect<void>
|
||||
readonly replay: (event: SerializedEvent, options?: { publish: boolean }) => Effect.Effect<void>
|
||||
readonly replayAll: (events: SerializedEvent[], options?: { publish: boolean }) => Effect.Effect<string | undefined>
|
||||
readonly remove: (aggregateID: string) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/SyncEvent") {}
|
||||
|
||||
export const layer = Layer.effect(Service)(
|
||||
Effect.gen(function* () {
|
||||
const replay: Interface["replay"] = Effect.fn("SyncEvent.replay")(function* (event, options) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) return
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(
|
||||
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
|
||||
)
|
||||
}
|
||||
|
||||
process(def, event, { publish: !!options?.publish })
|
||||
})
|
||||
|
||||
const replayAll: Interface["replayAll"] = Effect.fn("SyncEvent.replayAll")(function* (events, options) {
|
||||
const source = events[0]?.aggregateID
|
||||
if (!source) return undefined
|
||||
if (events.some((item) => item.aggregateID !== source)) {
|
||||
throw new Error("Replay events must belong to the same session")
|
||||
}
|
||||
const start = events[0].seq
|
||||
for (const [i, item] of events.entries()) {
|
||||
const seq = start + i
|
||||
if (item.seq !== seq) {
|
||||
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
|
||||
}
|
||||
}
|
||||
for (const item of events) {
|
||||
yield* replay(item, options)
|
||||
}
|
||||
return source
|
||||
})
|
||||
|
||||
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* (def, data, options) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
// This should never happen: we've enforced it via typescript in
|
||||
// the definition
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
const { publish = true } = options || {}
|
||||
|
||||
// Note that this is an "immediate" transaction which is critical.
|
||||
// We need to make sure we can safely read and write with nothing
|
||||
// else changing the data from under us
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
const remove: Interface["remove"] = Effect.fn("SyncEvent.remove")(function* (aggregateID) {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
run,
|
||||
replay,
|
||||
replayAll,
|
||||
remove,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer
|
||||
|
||||
export const use = serviceUse(Service)
|
||||
|
||||
const runtime = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export const registry = new Map<string, Definition>()
|
||||
let projectors: Map<Definition, ProjectorFunc> | undefined
|
||||
const versions = new Map<string, number>()
|
||||
@@ -186,92 +307,19 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
|
||||
}
|
||||
|
||||
export function replay(event: SerializedEvent, options?: { publish: boolean }) {
|
||||
const def = registry.get(event.type)
|
||||
if (!def) {
|
||||
throw new Error(`Unknown event type: ${event.type}`)
|
||||
}
|
||||
|
||||
const row = Database.use((db) =>
|
||||
db
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
|
||||
.get(),
|
||||
)
|
||||
|
||||
const latest = row?.seq ?? -1
|
||||
if (event.seq <= latest) {
|
||||
return
|
||||
}
|
||||
|
||||
const expected = latest + 1
|
||||
if (event.seq !== expected) {
|
||||
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
|
||||
}
|
||||
|
||||
process(def, event, { publish: !!options?.publish })
|
||||
return runtime.runSync((sync) => sync.replay(event, options))
|
||||
}
|
||||
|
||||
export function replayAll(events: SerializedEvent[], options?: { publish: boolean }) {
|
||||
const source = events[0]?.aggregateID
|
||||
if (!source) return
|
||||
if (events.some((item) => item.aggregateID !== source)) {
|
||||
throw new Error("Replay events must belong to the same session")
|
||||
}
|
||||
const start = events[0].seq
|
||||
for (const [i, item] of events.entries()) {
|
||||
const seq = start + i
|
||||
if (item.seq !== seq) {
|
||||
throw new Error(`Replay sequence mismatch at index ${i}: expected ${seq}, got ${item.seq}`)
|
||||
}
|
||||
}
|
||||
for (const item of events) {
|
||||
replay(item, options)
|
||||
}
|
||||
return source
|
||||
return runtime.runSync((sync) => sync.replayAll(events, options))
|
||||
}
|
||||
|
||||
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"], options?: { publish?: boolean }) {
|
||||
const agg = (data as Record<string, string>)[def.aggregate]
|
||||
// This should never happen: we've enforced it via typescript in
|
||||
// the definition
|
||||
if (agg == null) {
|
||||
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
|
||||
}
|
||||
|
||||
if (def.version !== versions.get(def.type)) {
|
||||
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
|
||||
}
|
||||
|
||||
const { publish = true } = options || {}
|
||||
|
||||
// Note that this is an "immediate" transaction which is critical.
|
||||
// We need to make sure we can safely read and write with nothing
|
||||
// else changing the data from under us
|
||||
Database.transaction(
|
||||
(tx) => {
|
||||
const id = EventID.ascending()
|
||||
const row = tx
|
||||
.select({ seq: EventSequenceTable.seq })
|
||||
.from(EventSequenceTable)
|
||||
.where(eq(EventSequenceTable.aggregate_id, agg))
|
||||
.get()
|
||||
const seq = row?.seq != null ? row.seq + 1 : 0
|
||||
|
||||
const event = { id, seq, aggregateID: agg, data }
|
||||
process(def, event, { publish })
|
||||
},
|
||||
{
|
||||
behavior: "immediate",
|
||||
},
|
||||
)
|
||||
return runtime.runSync((sync) => sync.run(def, data, options))
|
||||
}
|
||||
|
||||
export function remove(aggregateID: string) {
|
||||
Database.transaction((tx) => {
|
||||
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
|
||||
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
|
||||
})
|
||||
return runtime.runSync((sync) => sync.remove(aggregateID))
|
||||
}
|
||||
|
||||
export function payloads() {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { afterEach, beforeEach, describe, expect, mock, spyOn, test } from "bun:test"
|
||||
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"
|
||||
import fs from "node:fs/promises"
|
||||
import Http from "node:http"
|
||||
import path from "node:path"
|
||||
@@ -1426,48 +1426,41 @@ describe("workspace-old sessionRestore", () => {
|
||||
})
|
||||
})
|
||||
|
||||
test("local restore replays batches without fetch and emits progress", async () => {
|
||||
await withInstance(async (dir) => {
|
||||
const captured = captureGlobalEvents()
|
||||
let fetchCallCount = 0
|
||||
const replayAll = spyOn(SyncEvent, "replayAll")
|
||||
try {
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch() {
|
||||
fetchCallCount++
|
||||
return Response.json({ ok: true })
|
||||
},
|
||||
})
|
||||
const type = unique("restore-local")
|
||||
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
|
||||
insertWorkspace(info)
|
||||
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
|
||||
const session = await AppRuntime.runPromise(
|
||||
SessionNs.Service.use((svc) => svc.create({ title: "restore local" })),
|
||||
)
|
||||
replaceSessionEvents(session.id, 20)
|
||||
it.live("local restore replays batches and emits progress", () =>
|
||||
provideTmpdirInstance(
|
||||
(dir) =>
|
||||
Effect.gen(function* () {
|
||||
const workspace = yield* WorkspaceOld.Service
|
||||
const sessionSvc = yield* SessionNs.Service
|
||||
const captured = captureGlobalEvents()
|
||||
try {
|
||||
const type = unique("restore-local")
|
||||
const info = workspaceInfo(Instance.project.id, type, { directory: dir })
|
||||
insertWorkspace(info)
|
||||
registerAdaptor(Instance.project.id, type, localAdaptor(dir).adaptor)
|
||||
const session = yield* sessionSvc.create({ title: "restore local" })
|
||||
replaceSessionEvents(session.id, 20)
|
||||
|
||||
expect(await restoreWorkspaceSession({ workspaceID: info.id, sessionID: session.id })).toEqual({ total: 3 })
|
||||
|
||||
expect(fetchCallCount).toBe(0)
|
||||
expect(replayAll).toHaveBeenCalledTimes(3)
|
||||
expect(replayAll.mock.calls.map((call) => call[0].length)).toEqual([10, 10, 1])
|
||||
expect((await AppRuntime.runPromise(SessionNs.Service.use((svc) => svc.get(session.id)))).workspaceID).toBe(
|
||||
info.id,
|
||||
)
|
||||
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
|
||||
expect(
|
||||
captured.events
|
||||
.filter((event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type)
|
||||
.map((event) => event.payload.properties.step),
|
||||
).toEqual([0, 1, 2, 3])
|
||||
await removeWorkspace(info.id)
|
||||
} finally {
|
||||
captured.dispose()
|
||||
}
|
||||
})
|
||||
})
|
||||
expect(yield* workspace.sessionRestore({ workspaceID: info.id, sessionID: session.id })).toEqual({
|
||||
total: 3,
|
||||
})
|
||||
expect((yield* sessionSvc.get(session.id)).workspaceID).toBe(info.id)
|
||||
expect(eventRows(session.id).map((row) => row.seq)).toEqual(Array.from({ length: 21 }, (_, i) => i))
|
||||
expect(
|
||||
captured.events
|
||||
.filter(
|
||||
(event) => event.workspace === info.id && event.payload.type === WorkspaceOld.Event.Restore.type,
|
||||
)
|
||||
.map((event) => event.payload.properties.step),
|
||||
).toEqual([0, 1, 2, 3])
|
||||
yield* workspace.remove(info.id)
|
||||
} finally {
|
||||
captured.dispose()
|
||||
}
|
||||
}),
|
||||
{ git: true },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("session restore includes real message and part events in sequence order", () => {
|
||||
const replay: FetchCall[] = []
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { afterEach, describe, expect, mock, test } from "bun:test"
|
||||
import { afterEach, describe, expect, mock } from "bun:test"
|
||||
import { NodeServices } from "@effect/platform-node"
|
||||
import { mkdir } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
@@ -133,8 +133,6 @@ afterEach(async () => {
|
||||
})
|
||||
|
||||
describe("workspace HttpApi", () => {
|
||||
test.todo("proxies remote workspace websocket through real Effect listener", () => {})
|
||||
|
||||
it.live("serves read endpoints", () =>
|
||||
Effect.gen(function* () {
|
||||
const dir = yield* tmpdirScoped({ git: true })
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
import { describe, test, expect, beforeEach, afterEach, afterAll } from "bun:test"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
import { Schema } from "effect"
|
||||
import { describe, expect, beforeEach, afterEach, afterAll } from "bun:test"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
import { Effect, Layer, Schema } from "effect"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Bus } from "../../src/bus"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { SyncEvent } from "../../src/sync"
|
||||
import { Database } from "@/storage/db"
|
||||
import { EventTable } from "../../src/sync/event.sql"
|
||||
import { Identifier } from "../../src/id/id"
|
||||
import { MessageID } from "../../src/session/schema"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { initProjectors } from "../../src/server/projectors"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
||||
const it = testEffect(Layer.mergeAll(SyncEvent.defaultLayer, CrossSpawnSpawner.defaultLayer))
|
||||
|
||||
beforeEach(() => {
|
||||
Database.close()
|
||||
@@ -22,19 +24,6 @@ afterEach(() => {
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = original
|
||||
})
|
||||
|
||||
function withInstance(fn: () => void | Promise<void>) {
|
||||
return async () => {
|
||||
await using tmp = await tmpdir()
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
await fn()
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
describe("SyncEvent", () => {
|
||||
function setup() {
|
||||
SyncEvent.reset()
|
||||
@@ -59,179 +48,209 @@ describe("SyncEvent", () => {
|
||||
return { Created, Sent }
|
||||
}
|
||||
|
||||
function expectDefect<A, E, R>(effect: Effect.Effect<A, E, R>, pattern: RegExp) {
|
||||
return Effect.gen(function* () {
|
||||
const exit = yield* Effect.exit(effect)
|
||||
if (exit._tag === "Success") throw new Error("Expected effect to fail")
|
||||
expect(String(exit.cause)).toMatch(pattern)
|
||||
})
|
||||
}
|
||||
|
||||
afterAll(() => {
|
||||
SyncEvent.reset()
|
||||
initProjectors()
|
||||
})
|
||||
|
||||
describe("run", () => {
|
||||
test(
|
||||
it.live(
|
||||
"inserts event row",
|
||||
withInstance(() => {
|
||||
const { Created } = setup()
|
||||
SyncEvent.run(Created, { id: "evt_1", name: "first" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].type).toBe("item.created.1")
|
||||
expect(rows[0].aggregate_id).toBe("evt_1")
|
||||
}),
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Created } = setup()
|
||||
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].type).toBe("item.created.1")
|
||||
expect(rows[0].aggregate_id).toBe("evt_1")
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
test(
|
||||
it.live(
|
||||
"increments seq per aggregate",
|
||||
withInstance(() => {
|
||||
const { Created } = setup()
|
||||
SyncEvent.run(Created, { id: "evt_1", name: "first" })
|
||||
SyncEvent.run(Created, { id: "evt_1", name: "second" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(2)
|
||||
expect(rows[1].seq).toBe(rows[0].seq + 1)
|
||||
}),
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Created } = setup()
|
||||
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "first" })
|
||||
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "second" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(2)
|
||||
expect(rows[1].seq).toBe(rows[0].seq + 1)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
test(
|
||||
it.live(
|
||||
"uses custom aggregate field from agg()",
|
||||
withInstance(() => {
|
||||
const { Sent } = setup()
|
||||
SyncEvent.run(Sent, { item_id: "evt_1", to: "james" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregate_id).toBe("evt_1")
|
||||
}),
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Sent } = setup()
|
||||
yield* SyncEvent.use.run(Sent, { item_id: "evt_1", to: "james" })
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregate_id).toBe("evt_1")
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
test(
|
||||
it.live(
|
||||
"emits events",
|
||||
withInstance(async () => {
|
||||
const { Created } = setup()
|
||||
const events: Array<{
|
||||
type: string
|
||||
properties: { id: string; name: string }
|
||||
}> = []
|
||||
const received = new Promise<void>((resolve) => {
|
||||
Bus.subscribeAll((event) => {
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Created } = setup()
|
||||
const events: Array<{
|
||||
type: string
|
||||
properties: { id: string; name: string }
|
||||
}> = []
|
||||
let resolve = () => {}
|
||||
const received = new Promise<void>((done) => {
|
||||
resolve = done
|
||||
})
|
||||
const dispose = Bus.subscribeAll((event) => {
|
||||
events.push(event)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
|
||||
SyncEvent.run(Created, { id: "evt_1", name: "test" })
|
||||
|
||||
await received
|
||||
expect(events).toHaveLength(1)
|
||||
expect(events[0]).toEqual({
|
||||
type: "item.created",
|
||||
properties: {
|
||||
id: "evt_1",
|
||||
name: "test",
|
||||
},
|
||||
})
|
||||
}),
|
||||
try {
|
||||
yield* SyncEvent.use.run(Created, { id: "evt_1", name: "test" })
|
||||
yield* Effect.promise(() => received)
|
||||
expect(events).toHaveLength(1)
|
||||
expect(events[0]).toEqual({
|
||||
type: "item.created",
|
||||
properties: {
|
||||
id: "evt_1",
|
||||
name: "test",
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
dispose()
|
||||
}
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
|
||||
describe("replay", () => {
|
||||
test(
|
||||
it.live(
|
||||
"inserts event from external payload",
|
||||
withInstance(() => {
|
||||
const id = Identifier.descending("message")
|
||||
SyncEvent.replay({
|
||||
id: "evt_1",
|
||||
type: "item.created.1",
|
||||
seq: 0,
|
||||
aggregateID: id,
|
||||
data: { id, name: "replayed" },
|
||||
})
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregate_id).toBe(id)
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"throws on sequence mismatch",
|
||||
withInstance(() => {
|
||||
const id = Identifier.descending("message")
|
||||
SyncEvent.replay({
|
||||
id: "evt_1",
|
||||
type: "item.created.1",
|
||||
seq: 0,
|
||||
aggregateID: id,
|
||||
data: { id, name: "first" },
|
||||
})
|
||||
expect(() =>
|
||||
SyncEvent.replay({
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const id = MessageID.ascending()
|
||||
yield* SyncEvent.use.replay({
|
||||
id: "evt_1",
|
||||
type: "item.created.1",
|
||||
seq: 5,
|
||||
aggregateID: id,
|
||||
data: { id, name: "bad" },
|
||||
}),
|
||||
).toThrow(/Sequence mismatch/)
|
||||
}),
|
||||
)
|
||||
|
||||
test(
|
||||
"throws on unknown event type",
|
||||
withInstance(() => {
|
||||
expect(() =>
|
||||
SyncEvent.replay({
|
||||
id: "evt_1",
|
||||
type: "unknown.event.1",
|
||||
seq: 0,
|
||||
aggregateID: "x",
|
||||
data: {},
|
||||
}),
|
||||
).toThrow(/Unknown event type/)
|
||||
}),
|
||||
aggregateID: id,
|
||||
data: { id, name: "replayed" },
|
||||
})
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows).toHaveLength(1)
|
||||
expect(rows[0].aggregate_id).toBe(id)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
test(
|
||||
"replayAll accepts later chunks after the first batch",
|
||||
withInstance(() => {
|
||||
const { Created } = setup()
|
||||
const id = Identifier.descending("message")
|
||||
|
||||
const one = SyncEvent.replayAll([
|
||||
{
|
||||
it.live(
|
||||
"throws on sequence mismatch",
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const id = MessageID.ascending()
|
||||
yield* SyncEvent.use.replay({
|
||||
id: "evt_1",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
type: "item.created.1",
|
||||
seq: 0,
|
||||
aggregateID: id,
|
||||
data: { id, name: "first" },
|
||||
},
|
||||
{
|
||||
id: "evt_2",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 1,
|
||||
aggregateID: id,
|
||||
data: { id, name: "second" },
|
||||
},
|
||||
])
|
||||
})
|
||||
yield* expectDefect(
|
||||
SyncEvent.use.replay({
|
||||
id: "evt_1",
|
||||
type: "item.created.1",
|
||||
seq: 5,
|
||||
aggregateID: id,
|
||||
data: { id, name: "bad" },
|
||||
}),
|
||||
/Sequence mismatch/,
|
||||
)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
const two = SyncEvent.replayAll([
|
||||
{
|
||||
id: "evt_3",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 2,
|
||||
aggregateID: id,
|
||||
data: { id, name: "third" },
|
||||
},
|
||||
{
|
||||
id: "evt_4",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 3,
|
||||
aggregateID: id,
|
||||
data: { id, name: "fourth" },
|
||||
},
|
||||
])
|
||||
it.live(
|
||||
"throws on unknown event type",
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
yield* expectDefect(
|
||||
SyncEvent.use.replay({
|
||||
id: "evt_1",
|
||||
type: "unknown.event.1",
|
||||
seq: 0,
|
||||
aggregateID: "x",
|
||||
data: {},
|
||||
}),
|
||||
/Unknown event type/,
|
||||
)
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
expect(one).toBe(id)
|
||||
expect(two).toBe(id)
|
||||
it.live(
|
||||
"replayAll accepts later chunks after the first batch",
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const { Created } = setup()
|
||||
const id = MessageID.ascending()
|
||||
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
|
||||
}),
|
||||
const one = yield* SyncEvent.use.replayAll([
|
||||
{
|
||||
id: "evt_1",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 0,
|
||||
aggregateID: id,
|
||||
data: { id, name: "first" },
|
||||
},
|
||||
{
|
||||
id: "evt_2",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 1,
|
||||
aggregateID: id,
|
||||
data: { id, name: "second" },
|
||||
},
|
||||
])
|
||||
|
||||
const two = yield* SyncEvent.use.replayAll([
|
||||
{
|
||||
id: "evt_3",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 2,
|
||||
aggregateID: id,
|
||||
data: { id, name: "third" },
|
||||
},
|
||||
{
|
||||
id: "evt_4",
|
||||
type: SyncEvent.versionedType(Created.type, Created.version),
|
||||
seq: 3,
|
||||
aggregateID: id,
|
||||
data: { id, name: "fourth" },
|
||||
},
|
||||
])
|
||||
|
||||
expect(one).toBe(id)
|
||||
expect(two).toBe(id)
|
||||
|
||||
const rows = Database.use((db) => db.select().from(EventTable).all())
|
||||
expect(rows.map((row) => row.seq)).toEqual([0, 1, 2, 3])
|
||||
}),
|
||||
),
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user