diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 3484d5da76..2622f4f7f0 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -38,8 +38,9 @@ const ShareSchema = Schema.Struct({ export type Share = typeof ShareSchema.Type type State = { - queue: Map }> + queue: Map> scope: Scope.Closeable + shared: Map } type Data = @@ -118,17 +119,20 @@ export const layer = Layer.effect( function sync(sessionID: SessionID, data: Data[]): Effect.Effect { return Effect.gen(function* () { if (disabled) return + const share = yield* getCached(sessionID) + if (!share) return + const s = yield* InstanceState.get(state) const existing = s.queue.get(sessionID) if (existing) { for (const item of data) { - existing.data.set(key(item), item) + existing.set(key(item), item) } return } const next = new Map(data.map((item) => [key(item), item])) - s.queue.set(sessionID, { data: next }) + s.queue.set(sessionID, next) yield* flush(sessionID).pipe( Effect.delay(1000), Effect.catchCause((cause) => @@ -143,13 +147,14 @@ export const layer = Layer.effect( const state: InstanceState.InstanceState = yield* InstanceState.make( Effect.fn("ShareNext.state")(function* (_ctx) { - const cache: State = { queue: new Map(), scope: yield* Scope.make() } + const cache: State = { queue: new Map(), scope: yield* Scope.make(), shared: new Map() } yield* Effect.addFinalizer(() => Scope.close(cache.scope, Exit.void).pipe( Effect.andThen( Effect.sync(() => { cache.queue.clear() + cache.shared.clear() }), ), ), @@ -227,6 +232,18 @@ export const layer = Layer.effect( return { id: row.id, secret: row.secret, url: row.url } satisfies Share }) + const getCached = Effect.fnUntraced(function* (sessionID: SessionID) { + const s = yield* InstanceState.get(state) + if (s.shared.has(sessionID)) { + const cached = s.shared.get(sessionID) + return cached === null ? undefined : cached + } + + const share = yield* get(sessionID) + s.shared.set(sessionID, share ?? null) + return share + }) + const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) { if (disabled) return const s = yield* InstanceState.get(state) @@ -235,13 +252,13 @@ export const layer = Layer.effect( s.queue.delete(sessionID) - const share = yield* get(sessionID) + const share = yield* getCached(sessionID) if (!share) return const req = yield* request() const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe( HttpClientRequest.setHeaders(req.headers), - HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }), + HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.values()) }), Effect.flatMap((r) => http.execute(r)), ) @@ -307,6 +324,7 @@ export const layer = Layer.effect( .run(), ) const s = yield* InstanceState.get(state) + s.shared.set(sessionID, result) yield* full(sessionID).pipe( Effect.catchCause((cause) => Effect.sync(() => { @@ -321,8 +339,13 @@ export const layer = Layer.effect( const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) { if (disabled) return log.info("removing share", { sessionID }) - const share = yield* get(sessionID) - if (!share) return + const s = yield* InstanceState.get(state) + const share = yield* getCached(sessionID) + if (!share) { + s.shared.delete(sessionID) + s.queue.delete(sessionID) + return + } const req = yield* request() yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe( @@ -332,6 +355,8 @@ export const layer = Layer.effect( ) yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run()) + s.shared.delete(sessionID) + s.queue.delete(sessionID) }) return Service.of({ init, url, request, create, remove })