diff --git a/packages/opencode/src/cli/cmd/github.ts b/packages/opencode/src/cli/cmd/github.ts index 46d091642f..9f46200b08 100644 --- a/packages/opencode/src/cli/cmd/github.ts +++ b/packages/opencode/src/cli/cmd/github.ts @@ -21,7 +21,7 @@ import { cmd } from "./cmd" import { ModelsDev } from "../../provider/models" import { Instance } from "@/project/instance" import { bootstrap } from "../bootstrap" -import { SessionShare } from "@/share/session" +import { SessionShare } from "@/share" import { Session } from "../../session" import type { SessionID } from "../../session/schema" import { MessageID, PartID } from "../../session/schema" diff --git a/packages/opencode/src/cli/cmd/import.ts b/packages/opencode/src/cli/cmd/import.ts index 1232f07422..41bbc3a4b0 100644 --- a/packages/opencode/src/cli/cmd/import.ts +++ b/packages/opencode/src/cli/cmd/import.ts @@ -7,7 +7,7 @@ import { bootstrap } from "../bootstrap" import { Database } from "../../storage/db" import { SessionTable, MessageTable, PartTable } from "../../session/session.sql" import { Instance } from "../../project/instance" -import { ShareNext } from "../../share/share-next" +import { ShareNext } from "../../share" import { EOL } from "os" import { Filesystem } from "../../util/filesystem" import { AppRuntime } from "@/effect/app-runtime" diff --git a/packages/opencode/src/effect/app-runtime.ts b/packages/opencode/src/effect/app-runtime.ts index f9f811e711..6b3096202c 100644 --- a/packages/opencode/src/effect/app-runtime.ts +++ b/packages/opencode/src/effect/app-runtime.ts @@ -45,8 +45,8 @@ import { Vcs } from "@/project/vcs" import { Worktree } from "@/worktree" import { Pty } from "@/pty" import { Installation } from "@/installation" -import { ShareNext } from "@/share/share-next" -import { SessionShare } from "@/share/session" +import { ShareNext } from "@/share" +import { SessionShare } from "@/share" export const AppLayer = Layer.mergeAll( AppFileSystem.defaultLayer, diff --git a/packages/opencode/src/effect/bootstrap-runtime.ts b/packages/opencode/src/effect/bootstrap-runtime.ts index d8400c52ae..b89a7ea8f1 100644 --- a/packages/opencode/src/effect/bootstrap-runtime.ts +++ b/packages/opencode/src/effect/bootstrap-runtime.ts @@ -5,7 +5,7 @@ import { Plugin } from "@/plugin" import { LSP } from "@/lsp" import { FileWatcher } from "@/file/watcher" import { Format } from "@/format" -import { ShareNext } from "@/share/share-next" +import { ShareNext } from "@/share" import { File } from "@/file" import { Vcs } from "@/project/vcs" import { Snapshot } from "@/snapshot" diff --git a/packages/opencode/src/project/bootstrap.ts b/packages/opencode/src/project/bootstrap.ts index a1f2a8cb02..8ab1be9d21 100644 --- a/packages/opencode/src/project/bootstrap.ts +++ b/packages/opencode/src/project/bootstrap.ts @@ -10,7 +10,7 @@ import { Command } from "../command" import { Instance } from "./instance" import { Log } from "@/util/log" import { FileWatcher } from "@/file/watcher" -import { ShareNext } from "@/share/share-next" +import { ShareNext } from "@/share" import * as Effect from "effect/Effect" export const InstanceBootstrap = Effect.gen(function* () { diff --git a/packages/opencode/src/server/instance/session.ts b/packages/opencode/src/server/instance/session.ts index c606af8544..b198c15c1f 100644 --- a/packages/opencode/src/server/instance/session.ts +++ b/packages/opencode/src/server/instance/session.ts @@ -9,7 +9,7 @@ import { SessionPrompt } from "../../session/prompt" import { SessionRunState } from "@/session/run-state" import { SessionCompaction } from "../../session/compaction" import { SessionRevert } from "../../session/revert" -import { SessionShare } from "@/share/session" +import { SessionShare } from "@/share" import { SessionStatus } from "@/session/status" import { SessionSummary } from "@/session/summary" import { Todo } from "../../session/todo" diff --git a/packages/opencode/src/share/index.ts b/packages/opencode/src/share/index.ts new file mode 100644 index 0000000000..534375a0ac --- /dev/null +++ b/packages/opencode/src/share/index.ts @@ -0,0 +1,2 @@ +export * as ShareNext from "./share-next" +export * as SessionShare from "./session" diff --git a/packages/opencode/src/share/session.ts b/packages/opencode/src/share/session.ts index 0a673f81c6..71fa17c889 100644 --- a/packages/opencode/src/share/session.ts +++ b/packages/opencode/src/share/session.ts @@ -4,56 +4,54 @@ import { SyncEvent } from "@/sync" import { Effect, Layer, Scope, Context } from "effect" import { Config } from "../config" import { Flag } from "../flag/flag" -import { ShareNext } from "./share-next" +import { ShareNext } from "." -export namespace SessionShare { - export interface Interface { - readonly create: (input?: Session.CreateInput) => Effect.Effect - readonly share: (sessionID: SessionID) => Effect.Effect<{ url: string }, unknown> - readonly unshare: (sessionID: SessionID) => Effect.Effect - } - - export class Service extends Context.Service()("@opencode/SessionShare") {} - - export const layer = Layer.effect( - Service, - Effect.gen(function* () { - const cfg = yield* Config.Service - const session = yield* Session.Service - const shareNext = yield* ShareNext.Service - const scope = yield* Scope.Scope - - const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) { - const conf = yield* cfg.get() - if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration") - const result = yield* shareNext.create(sessionID) - yield* Effect.sync(() => - SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }), - ) - return result - }) - - const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) { - yield* shareNext.remove(sessionID) - yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })) - }) - - const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) { - const result = yield* session.create(input) - if (result.parentID) return result - const conf = yield* cfg.get() - if (!(Flag.OPENCODE_AUTO_SHARE || conf.share === "auto")) return result - yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope)) - return result - }) - - return Service.of({ create, share, unshare }) - }), - ) - - export const defaultLayer = layer.pipe( - Layer.provide(ShareNext.defaultLayer), - Layer.provide(Session.defaultLayer), - Layer.provide(Config.defaultLayer), - ) +export interface Interface { + readonly create: (input?: Session.CreateInput) => Effect.Effect + readonly share: (sessionID: SessionID) => Effect.Effect<{ url: string }, unknown> + readonly unshare: (sessionID: SessionID) => Effect.Effect } + +export class Service extends Context.Service()("@opencode/SessionShare") {} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const cfg = yield* Config.Service + const session = yield* Session.Service + const shareNext = yield* ShareNext.Service + const scope = yield* Scope.Scope + + const share = Effect.fn("SessionShare.share")(function* (sessionID: SessionID) { + const conf = yield* cfg.get() + if (conf.share === "disabled") throw new Error("Sharing is disabled in configuration") + const result = yield* shareNext.create(sessionID) + yield* Effect.sync(() => + SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: result.url } } }), + ) + return result + }) + + const unshare = Effect.fn("SessionShare.unshare")(function* (sessionID: SessionID) { + yield* shareNext.remove(sessionID) + yield* Effect.sync(() => SyncEvent.run(Session.Event.Updated, { sessionID, info: { share: { url: null } } })) + }) + + const create = Effect.fn("SessionShare.create")(function* (input?: Session.CreateInput) { + const result = yield* session.create(input) + if (result.parentID) return result + const conf = yield* cfg.get() + if (!(Flag.OPENCODE_AUTO_SHARE || conf.share === "auto")) return result + yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope)) + return result + }) + + return Service.of({ create, share, unshare }) + }), +) + +export const defaultLayer = layer.pipe( + Layer.provide(ShareNext.defaultLayer), + Layer.provide(Session.defaultLayer), + Layer.provide(Config.defaultLayer), +) diff --git a/packages/opencode/src/share/share-next.ts b/packages/opencode/src/share/share-next.ts index 9b345ac8ef..007455cd44 100644 --- a/packages/opencode/src/share/share-next.ts +++ b/packages/opencode/src/share/share-next.ts @@ -14,337 +14,335 @@ import { Config } from "@/config" import { Log } from "@/util/log" import { SessionShareTable } from "./share.sql" -export namespace ShareNext { - const log = Log.create({ service: "share-next" }) - const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1" +const log = Log.create({ service: "share-next" }) +const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1" - export type Api = { - create: string - sync: (shareID: string) => string - remove: (shareID: string) => string - data: (shareID: string) => string - } +export type Api = { + create: string + sync: (shareID: string) => string + remove: (shareID: string) => string + data: (shareID: string) => string +} - export type Req = { - headers: Record - api: Api - baseUrl: string - } +export type Req = { + headers: Record + api: Api + baseUrl: string +} - const ShareSchema = Schema.Struct({ - id: Schema.String, - url: Schema.String, - secret: Schema.String, - }) - export type Share = typeof ShareSchema.Type +const ShareSchema = Schema.Struct({ + id: Schema.String, + url: Schema.String, + secret: Schema.String, +}) +export type Share = typeof ShareSchema.Type - type State = { - queue: Map }> - scope: Scope.Closeable - } +type State = { + queue: Map }> + scope: Scope.Closeable +} - type Data = - | { - type: "session" - data: SDK.Session - } - | { - type: "message" - data: SDK.Message - } - | { - type: "part" - data: SDK.Part - } - | { - type: "session_diff" - data: SDK.SnapshotFileDiff[] - } - | { - type: "model" - data: SDK.Model[] - } - - export interface Interface { - readonly init: () => Effect.Effect - readonly url: () => Effect.Effect - readonly request: () => Effect.Effect - readonly create: (sessionID: SessionID) => Effect.Effect - readonly remove: (sessionID: SessionID) => Effect.Effect - } - - export class Service extends Context.Service()("@opencode/ShareNext") {} - - const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => - Effect.sync(() => Database.use(fn)) - - function api(resource: string): Api { - return { - create: `/api/${resource}`, - sync: (shareID) => `/api/${resource}/${shareID}/sync`, - remove: (shareID) => `/api/${resource}/${shareID}`, - data: (shareID) => `/api/${resource}/${shareID}/data`, +type Data = + | { + type: "session" + data: SDK.Session } - } - - const legacyApi = api("share") - const consoleApi = api("shares") - - function key(item: Data) { - switch (item.type) { - case "session": - return "session" - case "message": - return `message/${item.data.id}` - case "part": - return `part/${item.data.messageID}/${item.data.id}` - case "session_diff": - return "session_diff" - case "model": - return "model" + | { + type: "message" + data: SDK.Message } + | { + type: "part" + data: SDK.Part + } + | { + type: "session_diff" + data: SDK.SnapshotFileDiff[] + } + | { + type: "model" + data: SDK.Model[] + } + +export interface Interface { + readonly init: () => Effect.Effect + readonly url: () => Effect.Effect + readonly request: () => Effect.Effect + readonly create: (sessionID: SessionID) => Effect.Effect + readonly remove: (sessionID: SessionID) => Effect.Effect +} + +export class Service extends Context.Service()("@opencode/ShareNext") {} + +const db = (fn: (d: Parameters[0] extends (trx: infer D) => any ? D : never) => T) => + Effect.sync(() => Database.use(fn)) + +function api(resource: string): Api { + return { + create: `/api/${resource}`, + sync: (shareID) => `/api/${resource}/${shareID}/sync`, + remove: (shareID) => `/api/${resource}/${shareID}`, + data: (shareID) => `/api/${resource}/${shareID}/data`, } +} - export const layer = Layer.effect( - Service, - Effect.gen(function* () { - const account = yield* Account.Service - const bus = yield* Bus.Service - const cfg = yield* Config.Service - const http = yield* HttpClient.HttpClient - const httpOk = HttpClient.filterStatusOk(http) - const provider = yield* Provider.Service - const session = yield* Session.Service +const legacyApi = api("share") +const consoleApi = api("shares") - function sync(sessionID: SessionID, data: Data[]): Effect.Effect { - return Effect.gen(function* () { - if (disabled) return - const s = yield* InstanceState.get(state) - const existing = s.queue.get(sessionID) - if (existing) { - for (const item of data) { - existing.data.set(key(item), item) - } - return +function key(item: Data) { + switch (item.type) { + case "session": + return "session" + case "message": + return `message/${item.data.id}` + case "part": + return `part/${item.data.messageID}/${item.data.id}` + case "session_diff": + return "session_diff" + case "model": + return "model" + } +} + +export const layer = Layer.effect( + Service, + Effect.gen(function* () { + const account = yield* Account.Service + const bus = yield* Bus.Service + const cfg = yield* Config.Service + const http = yield* HttpClient.HttpClient + const httpOk = HttpClient.filterStatusOk(http) + const provider = yield* Provider.Service + const session = yield* Session.Service + + function sync(sessionID: SessionID, data: Data[]): Effect.Effect { + return Effect.gen(function* () { + if (disabled) return + const s = yield* InstanceState.get(state) + const existing = s.queue.get(sessionID) + if (existing) { + for (const item of data) { + existing.data.set(key(item), item) } - - const next = new Map(data.map((item) => [key(item), item])) - s.queue.set(sessionID, { data: next }) - yield* flush(sessionID).pipe( - Effect.delay(1000), - Effect.catchCause((cause) => - Effect.sync(() => { - log.error("share flush failed", { sessionID, cause }) - }), - ), - Effect.forkIn(s.scope), - ) - }) - } - - const state: InstanceState.InstanceState = yield* InstanceState.make( - Effect.fn("ShareNext.state")(function* (_ctx) { - const cache: State = { queue: new Map(), scope: yield* Scope.make() } - - yield* Effect.addFinalizer(() => - Scope.close(cache.scope, Exit.void).pipe( - Effect.andThen( - Effect.sync(() => { - cache.queue.clear() - }), - ), - ), - ) - - if (disabled) return cache - - const watch = ( - def: D, - fn: (evt: { properties: any }) => Effect.Effect, - ) => - bus.subscribe(def as never).pipe( - Stream.runForEach((evt) => - fn(evt).pipe( - Effect.catchCause((cause) => - Effect.sync(() => { - log.error("share subscriber failed", { type: def.type, cause }) - }), - ), - ), - ), - Effect.forkScoped, - ) - - yield* watch(Session.Event.Updated, (evt) => - Effect.gen(function* () { - const info = yield* session.get(evt.properties.sessionID) - yield* sync(info.id, [{ type: "session", data: info }]) - }), - ) - yield* watch(MessageV2.Event.Updated, (evt) => - Effect.gen(function* () { - const info = evt.properties.info - yield* sync(info.sessionID, [{ type: "message", data: info }]) - if (info.role !== "user") return - const model = yield* provider.getModel(info.model.providerID, info.model.modelID) - yield* sync(info.sessionID, [{ type: "model", data: [model] }]) - }), - ) - yield* watch(MessageV2.Event.PartUpdated, (evt) => - sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]), - ) - yield* watch(Session.Event.Diff, (evt) => - sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]), - ) - yield* watch(Session.Event.Deleted, (evt) => remove(evt.properties.sessionID)) - - return cache - }), - ) - - const request = Effect.fn("ShareNext.request")(function* () { - const headers: Record = {} - const active = yield* account.active() - if (Option.isNone(active) || !active.value.active_org_id) { - const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai" - return { headers, api: legacyApi, baseUrl } satisfies Req + return } - const token = yield* account.token(active.value.id) - if (Option.isNone(token)) { - throw new Error("No active account token available for sharing") - } - - headers.authorization = `Bearer ${token.value}` - headers["x-org-id"] = active.value.active_org_id - return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req - }) - - const get = Effect.fnUntraced(function* (sessionID: SessionID) { - const row = yield* db((db) => - db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(), - ) - if (!row) return - return { id: row.id, secret: row.secret, url: row.url } satisfies Share - }) - - const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) { - if (disabled) return - const s = yield* InstanceState.get(state) - const queued = s.queue.get(sessionID) - if (!queued) return - - s.queue.delete(sessionID) - - const share = yield* get(sessionID) - if (!share) return - - const req = yield* request() - const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe( - HttpClientRequest.setHeaders(req.headers), - HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }), - Effect.flatMap((r) => http.execute(r)), - ) - - if (res.status >= 400) { - log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status }) - } - }) - - const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) { - log.info("full sync", { sessionID }) - const info = yield* session.get(sessionID) - const diffs = yield* session.diff(sessionID) - const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID))) - const models = yield* Effect.forEach( - Array.from( - new Map( - messages - .filter((msg) => msg.info.role === "user") - .map((msg) => (msg.info as SDK.UserMessage).model) - .map((item) => [`${item.providerID}/${item.modelID}`, item] as const), - ).values(), - ), - (item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)), - { concurrency: 8 }, - ) - - yield* sync(sessionID, [ - { type: "session", data: info }, - ...messages.map((item) => ({ type: "message" as const, data: item.info })), - ...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))), - { type: "session_diff", data: diffs }, - { type: "model", data: models }, - ]) - }) - - const init = Effect.fn("ShareNext.init")(function* () { - if (disabled) return - yield* InstanceState.get(state) - }) - - const url = Effect.fn("ShareNext.url")(function* () { - return (yield* request()).baseUrl - }) - - const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) { - if (disabled) return { id: "", url: "", secret: "" } - log.info("creating share", { sessionID }) - const req = yield* request() - const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe( - HttpClientRequest.setHeaders(req.headers), - HttpClientRequest.bodyJson({ sessionID }), - Effect.flatMap((r) => httpOk.execute(r)), - Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)), - ) - yield* db((db) => - db - .insert(SessionShareTable) - .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url }) - .onConflictDoUpdate({ - target: SessionShareTable.session_id, - set: { id: result.id, secret: result.secret, url: result.url }, - }) - .run(), - ) - const s = yield* InstanceState.get(state) - yield* full(sessionID).pipe( + const next = new Map(data.map((item) => [key(item), item])) + s.queue.set(sessionID, { data: next }) + yield* flush(sessionID).pipe( + Effect.delay(1000), Effect.catchCause((cause) => Effect.sync(() => { - log.error("share full sync failed", { sessionID, cause }) + log.error("share flush failed", { sessionID, cause }) }), ), Effect.forkIn(s.scope), ) - return result }) + } - const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) { - if (disabled) return - log.info("removing share", { sessionID }) - const share = yield* get(sessionID) - if (!share) return + const state: InstanceState.InstanceState = yield* InstanceState.make( + Effect.fn("ShareNext.state")(function* (_ctx) { + const cache: State = { queue: new Map(), scope: yield* Scope.make() } - const req = yield* request() - yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe( - HttpClientRequest.setHeaders(req.headers), - HttpClientRequest.bodyJson({ secret: share.secret }), - Effect.flatMap((r) => httpOk.execute(r)), + yield* Effect.addFinalizer(() => + Scope.close(cache.scope, Exit.void).pipe( + Effect.andThen( + Effect.sync(() => { + cache.queue.clear() + }), + ), + ), ) - yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run()) - }) + if (disabled) return cache - return Service.of({ init, url, request, create, remove }) - }), - ) + const watch = ( + def: D, + fn: (evt: { properties: any }) => Effect.Effect, + ) => + bus.subscribe(def as never).pipe( + Stream.runForEach((evt) => + fn(evt).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share subscriber failed", { type: def.type, cause }) + }), + ), + ), + ), + Effect.forkScoped, + ) - export const defaultLayer = layer.pipe( - Layer.provide(Bus.layer), - Layer.provide(Account.defaultLayer), - Layer.provide(Config.defaultLayer), - Layer.provide(FetchHttpClient.layer), - Layer.provide(Provider.defaultLayer), - Layer.provide(Session.defaultLayer), - ) -} + yield* watch(Session.Event.Updated, (evt) => + Effect.gen(function* () { + const info = yield* session.get(evt.properties.sessionID) + yield* sync(info.id, [{ type: "session", data: info }]) + }), + ) + yield* watch(MessageV2.Event.Updated, (evt) => + Effect.gen(function* () { + const info = evt.properties.info + yield* sync(info.sessionID, [{ type: "message", data: info }]) + if (info.role !== "user") return + const model = yield* provider.getModel(info.model.providerID, info.model.modelID) + yield* sync(info.sessionID, [{ type: "model", data: [model] }]) + }), + ) + yield* watch(MessageV2.Event.PartUpdated, (evt) => + sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]), + ) + yield* watch(Session.Event.Diff, (evt) => + sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]), + ) + yield* watch(Session.Event.Deleted, (evt) => remove(evt.properties.sessionID)) + + return cache + }), + ) + + const request = Effect.fn("ShareNext.request")(function* () { + const headers: Record = {} + const active = yield* account.active() + if (Option.isNone(active) || !active.value.active_org_id) { + const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai" + return { headers, api: legacyApi, baseUrl } satisfies Req + } + + const token = yield* account.token(active.value.id) + if (Option.isNone(token)) { + throw new Error("No active account token available for sharing") + } + + headers.authorization = `Bearer ${token.value}` + headers["x-org-id"] = active.value.active_org_id + return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req + }) + + const get = Effect.fnUntraced(function* (sessionID: SessionID) { + const row = yield* db((db) => + db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(), + ) + if (!row) return + return { id: row.id, secret: row.secret, url: row.url } satisfies Share + }) + + const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) { + if (disabled) return + const s = yield* InstanceState.get(state) + const queued = s.queue.get(sessionID) + if (!queued) return + + s.queue.delete(sessionID) + + const share = yield* get(sessionID) + if (!share) return + + const req = yield* request() + const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }), + Effect.flatMap((r) => http.execute(r)), + ) + + if (res.status >= 400) { + log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status }) + } + }) + + const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) { + log.info("full sync", { sessionID }) + const info = yield* session.get(sessionID) + const diffs = yield* session.diff(sessionID) + const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID))) + const models = yield* Effect.forEach( + Array.from( + new Map( + messages + .filter((msg) => msg.info.role === "user") + .map((msg) => (msg.info as SDK.UserMessage).model) + .map((item) => [`${item.providerID}/${item.modelID}`, item] as const), + ).values(), + ), + (item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)), + { concurrency: 8 }, + ) + + yield* sync(sessionID, [ + { type: "session", data: info }, + ...messages.map((item) => ({ type: "message" as const, data: item.info })), + ...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))), + { type: "session_diff", data: diffs }, + { type: "model", data: models }, + ]) + }) + + const init = Effect.fn("ShareNext.init")(function* () { + if (disabled) return + yield* InstanceState.get(state) + }) + + const url = Effect.fn("ShareNext.url")(function* () { + return (yield* request()).baseUrl + }) + + const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) { + if (disabled) return { id: "", url: "", secret: "" } + log.info("creating share", { sessionID }) + const req = yield* request() + const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ sessionID }), + Effect.flatMap((r) => httpOk.execute(r)), + Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)), + ) + yield* db((db) => + db + .insert(SessionShareTable) + .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url }) + .onConflictDoUpdate({ + target: SessionShareTable.session_id, + set: { id: result.id, secret: result.secret, url: result.url }, + }) + .run(), + ) + const s = yield* InstanceState.get(state) + yield* full(sessionID).pipe( + Effect.catchCause((cause) => + Effect.sync(() => { + log.error("share full sync failed", { sessionID, cause }) + }), + ), + Effect.forkIn(s.scope), + ) + return result + }) + + const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) { + if (disabled) return + log.info("removing share", { sessionID }) + const share = yield* get(sessionID) + if (!share) return + + const req = yield* request() + yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe( + HttpClientRequest.setHeaders(req.headers), + HttpClientRequest.bodyJson({ secret: share.secret }), + Effect.flatMap((r) => httpOk.execute(r)), + ) + + yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run()) + }) + + return Service.of({ init, url, request, create, remove }) + }), +) + +export const defaultLayer = layer.pipe( + Layer.provide(Bus.layer), + Layer.provide(Account.defaultLayer), + Layer.provide(Config.defaultLayer), + Layer.provide(FetchHttpClient.layer), + Layer.provide(Provider.defaultLayer), + Layer.provide(Session.defaultLayer), +) diff --git a/packages/opencode/test/share/share-next.test.ts b/packages/opencode/test/share/share-next.test.ts index 8150e03623..ac3f7b79e0 100644 --- a/packages/opencode/test/share/share-next.test.ts +++ b/packages/opencode/test/share/share-next.test.ts @@ -12,7 +12,7 @@ import { Config } from "../../src/config" import { Provider } from "../../src/provider" import { Session } from "../../src/session" import type { SessionID } from "../../src/session/schema" -import { ShareNext } from "../../src/share/share-next" +import { ShareNext } from "../../src/share" import { SessionShareTable } from "../../src/share/share.sql" import { Database, eq } from "../../src/storage/db" import { provideTmpdirInstance } from "../fixture/fixture"