feat: unwrap storage namespaces to flat exports + barrel (#22747)

This commit is contained in:
Kit Langton
2026-04-15 23:30:49 -04:00
committed by GitHub
parent 1ca257e356
commit f24207844f
40 changed files with 874 additions and 854 deletions

View File

@@ -1,7 +1,7 @@
import { eq } from "drizzle-orm"
import { Effect, Layer, Option, Schema, Context } from "effect"
import { Database } from "@/storage/db"
import { Database } from "@/storage"
import { AccountStateTable, AccountTable } from "./account.sql"
import { AccessToken, AccountID, AccountRepoError, Info, OrgID, RefreshToken } from "./schema"
import { normalizeServerUrl } from "./url"

View File

@@ -1,11 +1,11 @@
import type { Argv } from "yargs"
import { spawn } from "child_process"
import { Database } from "../../storage/db"
import { Database } from "../../storage"
import { drizzle } from "drizzle-orm/bun-sqlite"
import { Database as BunDatabase } from "bun:sqlite"
import { UI } from "../ui"
import { cmd } from "./cmd"
import { JsonMigration } from "../../storage/json-migration"
import { JsonMigration } from "../../storage"
import { EOL } from "os"
import { errorMessage } from "../../util/error"

View File

@@ -4,7 +4,7 @@ import { Session } from "../../session"
import { MessageV2 } from "../../session/message-v2"
import { cmd } from "./cmd"
import { bootstrap } from "../bootstrap"
import { Database } from "../../storage/db"
import { Database } from "../../storage"
import { SessionTable, MessageTable, PartTable } from "../../session/session.sql"
import { Instance } from "../../project/instance"
import { ShareNext } from "../../share"

View File

@@ -2,7 +2,7 @@ import type { Argv } from "yargs"
import { cmd } from "./cmd"
import { Session } from "../../session"
import { bootstrap } from "../bootstrap"
import { Database } from "../../storage/db"
import { Database } from "../../storage"
import { SessionTable } from "../../session/session.sql"
import { Project } from "../../project"
import { Instance } from "../../project/instance"

View File

@@ -1,7 +1,7 @@
import z from "zod"
import { setTimeout as sleep } from "node:timers/promises"
import { fn } from "@/util/fn"
import { Database, asc, eq, inArray } from "@/storage/db"
import { Database, asc, eq, inArray } from "@/storage"
import { Project } from "@/project"
import { BusEvent } from "@/bus/bus-event"
import { GlobalBus } from "@/bus/global"
@@ -114,7 +114,7 @@ export namespace Workspace {
await adaptor.create(config)
void startSync(info)
startSync(info)
await waitEvent({
timeout: TIMEOUT,
@@ -294,7 +294,7 @@ export namespace Workspace {
)
const spaces = rows.map(fromRow).sort((a, b) => a.id.localeCompare(b.id))
for (const space of spaces) void startSync(space)
for (const space of spaces) startSync(space)
return spaces
}
@@ -307,7 +307,7 @@ export namespace Workspace {
export const get = fn(WorkspaceID.zod, async (id) => {
const space = lookup(id)
if (!space) return
void startSync(space)
startSync(space)
return space
})

View File

@@ -12,7 +12,7 @@ import { Ripgrep } from "@/file/ripgrep"
import { FileTime } from "@/file/time"
import { File } from "@/file"
import { FileWatcher } from "@/file/watcher"
import { Storage } from "@/storage/storage"
import { Storage } from "@/storage"
import { Snapshot } from "@/snapshot"
import { Plugin } from "@/plugin"
import { Provider } from "@/provider"

View File

@@ -31,8 +31,8 @@ import { SessionCommand } from "./cli/cmd/session"
import { DbCommand } from "./cli/cmd/db"
import path from "path"
import { Global } from "./global"
import { JsonMigration } from "./storage/json-migration"
import { Database } from "./storage/db"
import { JsonMigration } from "./storage"
import { Database } from "./storage"
import { errorMessage } from "./util/error"
import { PluginCommand } from "./cli/cmd/plug"
import { Heap } from "./cli/heap"

View File

@@ -2,5 +2,5 @@ export { Config } from "./config"
export { Server } from "./server/server"
export { bootstrap } from "./cli/bootstrap"
export { Log } from "./util"
export { Database } from "./storage/db"
export { JsonMigration } from "./storage/json-migration"
export { Database } from "./storage"
export { JsonMigration } from "./storage"

View File

@@ -5,7 +5,7 @@ import { InstanceState } from "@/effect"
import { ProjectID } from "@/project/schema"
import { MessageID, SessionID } from "@/session/schema"
import { PermissionTable } from "@/session/session.sql"
import { Database, eq } from "@/storage/db"
import { Database, eq } from "@/storage"
import { zod } from "@/util/effect-zod"
import { Log } from "@/util"
import { withStatics } from "@/util/schema"

View File

@@ -1,5 +1,5 @@
import z from "zod"
import { and, Database, eq } from "../storage/db"
import { and, Database, eq } from "../storage"
import { ProjectTable } from "./project.sql"
import { SessionTable } from "../session/session.sql"
import { Log } from "../util"

View File

@@ -1,6 +1,6 @@
import { resolver } from "hono-openapi"
import z from "zod"
import { NotFoundError } from "../storage/db"
import { NotFoundError } from "../storage"
export const ERRORS = {
400: {

View File

@@ -1,5 +1,5 @@
import type { MiddlewareHandler } from "hono"
import { Database, inArray } from "@/storage/db"
import { Database, inArray } from "@/storage"
import { EventSequenceTable } from "@/sync/event.sql"
import { Workspace } from "@/control-plane/workspace"
import type { WorkspaceID } from "@/control-plane/schema"

View File

@@ -6,7 +6,7 @@ import z from "zod"
import { AppRuntime } from "@/effect/app-runtime"
import { Pty } from "@/pty"
import { PtyID } from "@/pty/schema"
import { NotFoundError } from "../../storage/db"
import { NotFoundError } from "../../storage"
import { errors } from "../error"
export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) {

View File

@@ -2,7 +2,7 @@ import z from "zod"
import { Hono } from "hono"
import { describeRoute, validator, resolver } from "hono-openapi"
import { SyncEvent } from "@/sync"
import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
import { Database, asc, and, not, or, lte, eq } from "@/storage"
import { EventTable } from "@/sync/event.sql"
import { lazy } from "@/util/lazy"
import { Log } from "@/util"

View File

@@ -1,6 +1,6 @@
import { Provider } from "../provider"
import { NamedError } from "@opencode-ai/shared/util/error"
import { NotFoundError } from "../storage/db"
import { NotFoundError } from "../storage"
import { Session } from "../session"
import type { ContentfulStatusCode } from "hono/utils/http-status"
import type { ErrorHandler, MiddlewareHandler } from "hono"

View File

@@ -3,7 +3,7 @@ import sessionProjectors from "../session/projectors"
import { SyncEvent } from "@/sync"
import { Session } from "@/session"
import { SessionTable } from "@/session/session.sql"
import { Database, eq } from "@/storage/db"
import { Database, eq } from "@/storage"
export function initProjectors() {
SyncEvent.init({

View File

@@ -11,7 +11,7 @@ import { SessionProcessor } from "./processor"
import { Agent } from "@/agent/agent"
import { Plugin } from "@/plugin"
import { Config } from "@/config"
import { NotFoundError } from "@/storage/db"
import { NotFoundError } from "@/storage"
import { ModelID, ProviderID } from "@/provider/schema"
import { Effect, Layer, Context } from "effect"
import { InstanceState } from "@/effect"

View File

@@ -6,7 +6,7 @@ import { APICallError, convertToModelMessages, LoadAPIKeyError, type ModelMessag
import { LSP } from "../lsp"
import { Snapshot } from "@/snapshot"
import { SyncEvent } from "../sync"
import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage/db"
import { Database, NotFoundError, and, desc, eq, inArray, lt, or } from "@/storage"
import { MessageTable, PartTable, SessionTable } from "./session.sql"
import { ProviderError } from "@/provider/error"
import { iife } from "@/util/iife"

View File

@@ -1,4 +1,4 @@
import { NotFoundError, eq, and } from "../storage/db"
import { NotFoundError, eq, and } from "../storage"
import { SyncEvent } from "@/sync"
import { Session } from "."
import { MessageV2 } from "./message-v2"

View File

@@ -2,7 +2,7 @@ import z from "zod"
import { Effect, Layer, Context } from "effect"
import { Bus } from "../bus"
import { Snapshot } from "../snapshot"
import { Storage } from "@/storage/storage"
import { Storage } from "@/storage"
import { SyncEvent } from "../sync"
import { Log } from "../util"
import { Session } from "."

View File

@@ -8,12 +8,12 @@ import { type ProviderMetadata, type LanguageModelUsage } from "ai"
import { Flag } from "../flag/flag"
import { Installation } from "../installation"
import { Database, NotFoundError, eq, and, gte, isNull, desc, like, inArray, lt } from "../storage/db"
import { Database, NotFoundError, eq, and, gte, isNull, desc, like, inArray, lt } from "../storage"
import { SyncEvent } from "../sync"
import type { SQL } from "../storage/db"
import type { SQL } from "../storage"
import { PartTable, SessionTable } from "./session.sql"
import { ProjectTable } from "../project/project.sql"
import { Storage } from "@/storage/storage"
import { Storage } from "@/storage"
import { Log } from "../util"
import { updateSchema } from "../util/update-schema"
import { MessageV2 } from "./message-v2"

View File

@@ -2,7 +2,7 @@ import z from "zod"
import { Effect, Layer, Context } from "effect"
import { Bus } from "@/bus"
import { Snapshot } from "@/snapshot"
import { Storage } from "@/storage/storage"
import { Storage } from "@/storage"
import { Session } from "."
import { MessageV2 } from "./message-v2"
import { SessionID, MessageID } from "./schema"

View File

@@ -3,7 +3,7 @@ import { Bus } from "@/bus"
import { SessionID } from "./schema"
import { Effect, Layer, Context } from "effect"
import z from "zod"
import { Database, eq, asc } from "../storage/db"
import { Database, eq, asc } from "../storage"
import { TodoTable } from "./session.sql"
export namespace Todo {

View File

@@ -9,7 +9,7 @@ import { ModelID, ProviderID } from "@/provider/schema"
import { Session } from "@/session"
import { MessageV2 } from "@/session/message-v2"
import type { SessionID } from "@/session/schema"
import { Database, eq } from "@/storage/db"
import { Database, eq } from "@/storage"
import { Config } from "@/config"
import { Log } from "@/util"
import { SessionShareTable } from "./share.sql"

View File

@@ -27,148 +27,146 @@ export const NotFoundError = NamedError.create(
const log = Log.create({ service: "db" })
export namespace Database {
export function getChannelPath() {
if (["latest", "beta", "prod"].includes(CHANNEL) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
return path.join(Global.Path.data, "opencode.db")
const safe = CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")
return path.join(Global.Path.data, `opencode-${safe}.db`)
export function getChannelPath() {
if (["latest", "beta", "prod"].includes(CHANNEL) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
return path.join(Global.Path.data, "opencode.db")
const safe = CHANNEL.replace(/[^a-zA-Z0-9._-]/g, "-")
return path.join(Global.Path.data, `opencode-${safe}.db`)
}
export const Path = iife(() => {
if (Flag.OPENCODE_DB) {
if (Flag.OPENCODE_DB === ":memory:" || path.isAbsolute(Flag.OPENCODE_DB)) return Flag.OPENCODE_DB
return path.join(Global.Path.data, Flag.OPENCODE_DB)
}
return getChannelPath()
})
export const Path = iife(() => {
if (Flag.OPENCODE_DB) {
if (Flag.OPENCODE_DB === ":memory:" || path.isAbsolute(Flag.OPENCODE_DB)) return Flag.OPENCODE_DB
return path.join(Global.Path.data, Flag.OPENCODE_DB)
}
return getChannelPath()
})
export type Transaction = SQLiteTransaction<"sync", void>
export type Transaction = SQLiteTransaction<"sync", void>
type Client = SQLiteBunDatabase
type Client = SQLiteBunDatabase
type Journal = { sql: string; timestamp: number; name: string }[]
type Journal = { sql: string; timestamp: number; name: string }[]
function time(tag: string) {
const match = /^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})/.exec(tag)
if (!match) return 0
return Date.UTC(
Number(match[1]),
Number(match[2]) - 1,
Number(match[3]),
Number(match[4]),
Number(match[5]),
Number(match[6]),
)
}
function time(tag: string) {
const match = /^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})/.exec(tag)
if (!match) return 0
return Date.UTC(
Number(match[1]),
Number(match[2]) - 1,
Number(match[3]),
Number(match[4]),
Number(match[5]),
Number(match[6]),
)
}
function migrations(dir: string): Journal {
const dirs = readdirSync(dir, { withFileTypes: true })
.filter((entry) => entry.isDirectory())
.map((entry) => entry.name)
function migrations(dir: string): Journal {
const dirs = readdirSync(dir, { withFileTypes: true })
.filter((entry) => entry.isDirectory())
.map((entry) => entry.name)
const sql = dirs
.map((name) => {
const file = path.join(dir, name, "migration.sql")
if (!existsSync(file)) return
return {
sql: readFileSync(file, "utf-8"),
timestamp: time(name),
name,
}
})
.filter(Boolean) as Journal
return sql.sort((a, b) => a.timestamp - b.timestamp)
}
export const Client = lazy(() => {
log.info("opening database", { path: Path })
const db = init(Path)
db.run("PRAGMA journal_mode = WAL")
db.run("PRAGMA synchronous = NORMAL")
db.run("PRAGMA busy_timeout = 5000")
db.run("PRAGMA cache_size = -64000")
db.run("PRAGMA foreign_keys = ON")
db.run("PRAGMA wal_checkpoint(PASSIVE)")
// Apply schema migrations
const entries =
typeof OPENCODE_MIGRATIONS !== "undefined"
? OPENCODE_MIGRATIONS
: migrations(path.join(import.meta.dirname, "../../migration"))
if (entries.length > 0) {
log.info("applying migrations", {
count: entries.length,
mode: typeof OPENCODE_MIGRATIONS !== "undefined" ? "bundled" : "dev",
})
if (Flag.OPENCODE_SKIP_MIGRATIONS) {
for (const item of entries) {
item.sql = "select 1;"
}
const sql = dirs
.map((name) => {
const file = path.join(dir, name, "migration.sql")
if (!existsSync(file)) return
return {
sql: readFileSync(file, "utf-8"),
timestamp: time(name),
name,
}
migrate(db, entries)
}
})
.filter(Boolean) as Journal
return db
})
return sql.sort((a, b) => a.timestamp - b.timestamp)
}
export function close() {
Client().$client.close()
Client.reset()
}
export const Client = lazy(() => {
log.info("opening database", { path: Path })
export type TxOrDb = Transaction | Client
const db = init(Path)
const ctx = LocalContext.create<{
tx: TxOrDb
effects: (() => void | Promise<void>)[]
}>("database")
db.run("PRAGMA journal_mode = WAL")
db.run("PRAGMA synchronous = NORMAL")
db.run("PRAGMA busy_timeout = 5000")
db.run("PRAGMA cache_size = -64000")
db.run("PRAGMA foreign_keys = ON")
db.run("PRAGMA wal_checkpoint(PASSIVE)")
export function use<T>(callback: (trx: TxOrDb) => T): T {
try {
return callback(ctx.use().tx)
} catch (err) {
if (err instanceof LocalContext.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = ctx.provide({ effects, tx: Client() }, () => callback(Client()))
for (const effect of effects) void effect()
return result
// Apply schema migrations
const entries =
typeof OPENCODE_MIGRATIONS !== "undefined"
? OPENCODE_MIGRATIONS
: migrations(path.join(import.meta.dirname, "../../migration"))
if (entries.length > 0) {
log.info("applying migrations", {
count: entries.length,
mode: typeof OPENCODE_MIGRATIONS !== "undefined" ? "bundled" : "dev",
})
if (Flag.OPENCODE_SKIP_MIGRATIONS) {
for (const item of entries) {
item.sql = "select 1;"
}
throw err
}
migrate(db, entries)
}
export function effect(fn: () => any | Promise<any>) {
const bound = InstanceState.bind(fn)
try {
ctx.use().effects.push(bound)
} catch {
void bound()
}
}
return db
})
type NotPromise<T> = T extends Promise<any> ? never : T
export function close() {
Client().$client.close()
Client.reset()
}
export function transaction<T>(
callback: (tx: TxOrDb) => NotPromise<T>,
options?: {
behavior?: "deferred" | "immediate" | "exclusive"
},
): NotPromise<T> {
try {
return callback(ctx.use().tx)
} catch (err) {
if (err instanceof LocalContext.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
const result = Client().transaction(txCallback, { behavior: options?.behavior })
for (const effect of effects) void effect()
return result as NotPromise<T>
}
throw err
export type TxOrDb = Transaction | Client
const ctx = LocalContext.create<{
tx: TxOrDb
effects: (() => void | Promise<void>)[]
}>("database")
export function use<T>(callback: (trx: TxOrDb) => T): T {
try {
return callback(ctx.use().tx)
} catch (err) {
if (err instanceof LocalContext.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const result = ctx.provide({ effects, tx: Client() }, () => callback(Client()))
for (const effect of effects) effect()
return result
}
throw err
}
}
export function effect(fn: () => any | Promise<any>) {
const bound = InstanceState.bind(fn)
try {
ctx.use().effects.push(bound)
} catch {
bound()
}
}
type NotPromise<T> = T extends Promise<any> ? never : T
export function transaction<T>(
callback: (tx: TxOrDb) => NotPromise<T>,
options?: {
behavior?: "deferred" | "immediate" | "exclusive"
},
): NotPromise<T> {
try {
return callback(ctx.use().tx)
} catch (err) {
if (err instanceof LocalContext.NotFound) {
const effects: (() => void | Promise<void>)[] = []
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
const result = Client().transaction(txCallback, { behavior: options?.behavior })
for (const effect of effects) effect()
return result as NotPromise<T>
}
throw err
}
}

View File

@@ -0,0 +1,26 @@
export * as JsonMigration from "./json-migration"
export * as Database from "./db"
export * as Storage from "./storage"
export {
asc,
eq,
and,
or,
inArray,
desc,
not,
sql,
isNull,
isNotNull,
count,
like,
exists,
between,
gt,
gte,
lt,
lte,
ne,
} from "drizzle-orm"
export type { SQL } from "drizzle-orm"
export { NotFoundError } from "./storage"

View File

@@ -10,47 +10,24 @@ import { existsSync } from "fs"
import { Filesystem } from "../util"
import { Glob } from "@opencode-ai/shared/util/glob"
export namespace JsonMigration {
const log = Log.create({ service: "json-migration" })
const log = Log.create({ service: "json-migration" })
export type Progress = {
current: number
total: number
label: string
}
export type Progress = {
current: number
total: number
label: string
}
type Options = {
progress?: (event: Progress) => void
}
type Options = {
progress?: (event: Progress) => void
}
export async function run(db: SQLiteBunDatabase<any, any> | NodeSQLiteDatabase<any, any>, options?: Options) {
const storageDir = path.join(Global.Path.data, "storage")
export async function run(db: SQLiteBunDatabase<any, any> | NodeSQLiteDatabase<any, any>, options?: Options) {
const storageDir = path.join(Global.Path.data, "storage")
if (!existsSync(storageDir)) {
log.info("storage directory does not exist, skipping migration")
return {
projects: 0,
sessions: 0,
messages: 0,
parts: 0,
todos: 0,
permissions: 0,
shares: 0,
errors: [] as string[],
}
}
log.info("starting json to sqlite migration", { storageDir })
const start = performance.now()
// const db = drizzle({ client: sqlite })
// Optimize SQLite for bulk inserts
db.run("PRAGMA journal_mode = WAL")
db.run("PRAGMA synchronous = OFF")
db.run("PRAGMA cache_size = 10000")
db.run("PRAGMA temp_store = MEMORY")
const stats = {
if (!existsSync(storageDir)) {
log.info("storage directory does not exist, skipping migration")
return {
projects: 0,
sessions: 0,
messages: 0,
@@ -60,370 +37,391 @@ export namespace JsonMigration {
shares: 0,
errors: [] as string[],
}
const orphans = {
sessions: 0,
todos: 0,
permissions: 0,
shares: 0,
}
const errs = stats.errors
const batchSize = 1000
const now = Date.now()
async function list(pattern: string) {
return Glob.scan(pattern, { cwd: storageDir, absolute: true })
}
async function read(files: string[], start: number, end: number) {
const count = end - start
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const tasks = new Array(count)
for (let i = 0; i < count; i++) {
tasks[i] = Filesystem.readJson(files[start + i])
}
const results = await Promise.allSettled(tasks)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const items = new Array(count)
for (let i = 0; i < results.length; i++) {
const result = results[i]
if (result.status === "fulfilled") {
items[i] = result.value
continue
}
errs.push(`failed to read ${files[start + i]}: ${result.reason}`)
}
return items
}
function insert(values: any[], table: any, label: string) {
if (values.length === 0) return 0
try {
db.insert(table).values(values).onConflictDoNothing().run()
return values.length
} catch (e) {
errs.push(`failed to migrate ${label} batch: ${e}`)
return 0
}
}
// Pre-scan all files upfront to avoid repeated glob operations
log.info("scanning files...")
const [projectFiles, sessionFiles, messageFiles, partFiles, todoFiles, permFiles, shareFiles] = await Promise.all([
list("project/*.json"),
list("session/*/*.json"),
list("message/*/*.json"),
list("part/*/*.json"),
list("todo/*.json"),
list("permission/*.json"),
list("session_share/*.json"),
])
log.info("file scan complete", {
projects: projectFiles.length,
sessions: sessionFiles.length,
messages: messageFiles.length,
parts: partFiles.length,
todos: todoFiles.length,
permissions: permFiles.length,
shares: shareFiles.length,
})
const total = Math.max(
1,
projectFiles.length +
sessionFiles.length +
messageFiles.length +
partFiles.length +
todoFiles.length +
permFiles.length +
shareFiles.length,
)
const progress = options?.progress
let current = 0
const step = (label: string, count: number) => {
current = Math.min(total, current + count)
progress?.({ current, total, label })
}
progress?.({ current, total, label: "starting" })
db.run("BEGIN TRANSACTION")
// Migrate projects first (no FK deps)
// Derive all IDs from file paths, not JSON content
const projectIds = new Set<string>()
const projectValues = [] as any[]
for (let i = 0; i < projectFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, projectFiles.length)
const batch = await read(projectFiles, i, end)
projectValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const id = path.basename(projectFiles[i + j], ".json")
projectIds.add(id)
projectValues.push({
id,
worktree: data.worktree ?? "/",
vcs: data.vcs,
name: data.name ?? undefined,
icon_url: data.icon?.url,
icon_color: data.icon?.color,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
time_initialized: data.time?.initialized,
sandboxes: data.sandboxes ?? [],
commands: data.commands,
})
}
stats.projects += insert(projectValues, ProjectTable, "project")
step("projects", end - i)
}
log.info("migrated projects", { count: stats.projects, duration: Math.round(performance.now() - start) })
// Migrate sessions (depends on projects)
// Derive all IDs from directory/file paths, not JSON content, since earlier
// migrations may have moved sessions to new directories without updating the JSON
const sessionProjects = sessionFiles.map((file) => path.basename(path.dirname(file)))
const sessionIds = new Set<string>()
const sessionValues = [] as any[]
for (let i = 0; i < sessionFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, sessionFiles.length)
const batch = await read(sessionFiles, i, end)
sessionValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const id = path.basename(sessionFiles[i + j], ".json")
const projectID = sessionProjects[i + j]
if (!projectIds.has(projectID)) {
orphans.sessions++
continue
}
sessionIds.add(id)
sessionValues.push({
id,
project_id: projectID,
parent_id: data.parentID ?? null,
slug: data.slug ?? "",
directory: data.directory ?? "",
title: data.title ?? "",
version: data.version ?? "",
share_url: data.share?.url ?? null,
summary_additions: data.summary?.additions ?? null,
summary_deletions: data.summary?.deletions ?? null,
summary_files: data.summary?.files ?? null,
summary_diffs: data.summary?.diffs ?? null,
revert: data.revert ?? null,
permission: data.permission ?? null,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
time_compacting: data.time?.compacting ?? null,
time_archived: data.time?.archived ?? null,
})
}
stats.sessions += insert(sessionValues, SessionTable, "session")
step("sessions", end - i)
}
log.info("migrated sessions", { count: stats.sessions })
if (orphans.sessions > 0) {
log.warn("skipped orphaned sessions", { count: orphans.sessions })
}
// Migrate messages using pre-scanned file map
const allMessageFiles = [] as string[]
const allMessageSessions = [] as string[]
const messageSessions = new Map<string, string>()
for (const file of messageFiles) {
const sessionID = path.basename(path.dirname(file))
if (!sessionIds.has(sessionID)) continue
allMessageFiles.push(file)
allMessageSessions.push(sessionID)
}
for (let i = 0; i < allMessageFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, allMessageFiles.length)
const batch = await read(allMessageFiles, i, end)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const values = new Array(batch.length)
let count = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const file = allMessageFiles[i + j]
const id = path.basename(file, ".json")
const sessionID = allMessageSessions[i + j]
messageSessions.set(id, sessionID)
const rest = data
delete rest.id
delete rest.sessionID
values[count++] = {
id,
session_id: sessionID,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
data: rest,
}
}
values.length = count
stats.messages += insert(values, MessageTable, "message")
step("messages", end - i)
}
log.info("migrated messages", { count: stats.messages })
// Migrate parts using pre-scanned file map
for (let i = 0; i < partFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, partFiles.length)
const batch = await read(partFiles, i, end)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const values = new Array(batch.length)
let count = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const file = partFiles[i + j]
const id = path.basename(file, ".json")
const messageID = path.basename(path.dirname(file))
const sessionID = messageSessions.get(messageID)
if (!sessionID) {
errs.push(`part missing message session: ${file}`)
continue
}
if (!sessionIds.has(sessionID)) continue
const rest = data
delete rest.id
delete rest.messageID
delete rest.sessionID
values[count++] = {
id,
message_id: messageID,
session_id: sessionID,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
data: rest,
}
}
values.length = count
stats.parts += insert(values, PartTable, "part")
step("parts", end - i)
}
log.info("migrated parts", { count: stats.parts })
// Migrate todos
const todoSessions = todoFiles.map((file) => path.basename(file, ".json"))
for (let i = 0; i < todoFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, todoFiles.length)
const batch = await read(todoFiles, i, end)
const values = [] as any[]
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const sessionID = todoSessions[i + j]
if (!sessionIds.has(sessionID)) {
orphans.todos++
continue
}
if (!Array.isArray(data)) {
errs.push(`todo not an array: ${todoFiles[i + j]}`)
continue
}
for (let position = 0; position < data.length; position++) {
const todo = data[position]
if (!todo?.content || !todo?.status || !todo?.priority) continue
values.push({
session_id: sessionID,
content: todo.content,
status: todo.status,
priority: todo.priority,
position,
time_created: now,
time_updated: now,
})
}
}
stats.todos += insert(values, TodoTable, "todo")
step("todos", end - i)
}
log.info("migrated todos", { count: stats.todos })
if (orphans.todos > 0) {
log.warn("skipped orphaned todos", { count: orphans.todos })
}
// Migrate permissions
const permProjects = permFiles.map((file) => path.basename(file, ".json"))
const permValues = [] as any[]
for (let i = 0; i < permFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, permFiles.length)
const batch = await read(permFiles, i, end)
permValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const projectID = permProjects[i + j]
if (!projectIds.has(projectID)) {
orphans.permissions++
continue
}
permValues.push({ project_id: projectID, data })
}
stats.permissions += insert(permValues, PermissionTable, "permission")
step("permissions", end - i)
}
log.info("migrated permissions", { count: stats.permissions })
if (orphans.permissions > 0) {
log.warn("skipped orphaned permissions", { count: orphans.permissions })
}
// Migrate session shares
const shareSessions = shareFiles.map((file) => path.basename(file, ".json"))
const shareValues = [] as any[]
for (let i = 0; i < shareFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, shareFiles.length)
const batch = await read(shareFiles, i, end)
shareValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const sessionID = shareSessions[i + j]
if (!sessionIds.has(sessionID)) {
orphans.shares++
continue
}
if (!data?.id || !data?.secret || !data?.url) {
errs.push(`session_share missing id/secret/url: ${shareFiles[i + j]}`)
continue
}
shareValues.push({ session_id: sessionID, id: data.id, secret: data.secret, url: data.url })
}
stats.shares += insert(shareValues, SessionShareTable, "session_share")
step("shares", end - i)
}
log.info("migrated session shares", { count: stats.shares })
if (orphans.shares > 0) {
log.warn("skipped orphaned session shares", { count: orphans.shares })
}
db.run("COMMIT")
log.info("json migration complete", {
projects: stats.projects,
sessions: stats.sessions,
messages: stats.messages,
parts: stats.parts,
todos: stats.todos,
permissions: stats.permissions,
shares: stats.shares,
errorCount: stats.errors.length,
duration: Math.round(performance.now() - start),
})
if (stats.errors.length > 0) {
log.warn("migration errors", { errors: stats.errors.slice(0, 20) })
}
progress?.({ current: total, total, label: "complete" })
return stats
}
log.info("starting json to sqlite migration", { storageDir })
const start = performance.now()
// const db = drizzle({ client: sqlite })
// Optimize SQLite for bulk inserts
db.run("PRAGMA journal_mode = WAL")
db.run("PRAGMA synchronous = OFF")
db.run("PRAGMA cache_size = 10000")
db.run("PRAGMA temp_store = MEMORY")
const stats = {
projects: 0,
sessions: 0,
messages: 0,
parts: 0,
todos: 0,
permissions: 0,
shares: 0,
errors: [] as string[],
}
const orphans = {
sessions: 0,
todos: 0,
permissions: 0,
shares: 0,
}
const errs = stats.errors
const batchSize = 1000
const now = Date.now()
async function list(pattern: string) {
return Glob.scan(pattern, { cwd: storageDir, absolute: true })
}
async function read(files: string[], start: number, end: number) {
const count = end - start
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const tasks = new Array(count)
for (let i = 0; i < count; i++) {
tasks[i] = Filesystem.readJson(files[start + i])
}
const results = await Promise.allSettled(tasks)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const items = new Array(count)
for (let i = 0; i < results.length; i++) {
const result = results[i]
if (result.status === "fulfilled") {
items[i] = result.value
continue
}
errs.push(`failed to read ${files[start + i]}: ${result.reason}`)
}
return items
}
function insert(values: any[], table: any, label: string) {
if (values.length === 0) return 0
try {
db.insert(table).values(values).onConflictDoNothing().run()
return values.length
} catch (e) {
errs.push(`failed to migrate ${label} batch: ${e}`)
return 0
}
}
// Pre-scan all files upfront to avoid repeated glob operations
log.info("scanning files...")
const [projectFiles, sessionFiles, messageFiles, partFiles, todoFiles, permFiles, shareFiles] = await Promise.all([
list("project/*.json"),
list("session/*/*.json"),
list("message/*/*.json"),
list("part/*/*.json"),
list("todo/*.json"),
list("permission/*.json"),
list("session_share/*.json"),
])
log.info("file scan complete", {
projects: projectFiles.length,
sessions: sessionFiles.length,
messages: messageFiles.length,
parts: partFiles.length,
todos: todoFiles.length,
permissions: permFiles.length,
shares: shareFiles.length,
})
const total = Math.max(
1,
projectFiles.length +
sessionFiles.length +
messageFiles.length +
partFiles.length +
todoFiles.length +
permFiles.length +
shareFiles.length,
)
const progress = options?.progress
let current = 0
const step = (label: string, count: number) => {
current = Math.min(total, current + count)
progress?.({ current, total, label })
}
progress?.({ current, total, label: "starting" })
db.run("BEGIN TRANSACTION")
// Migrate projects first (no FK deps)
// Derive all IDs from file paths, not JSON content
const projectIds = new Set<string>()
const projectValues = [] as any[]
for (let i = 0; i < projectFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, projectFiles.length)
const batch = await read(projectFiles, i, end)
projectValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const id = path.basename(projectFiles[i + j], ".json")
projectIds.add(id)
projectValues.push({
id,
worktree: data.worktree ?? "/",
vcs: data.vcs,
name: data.name ?? undefined,
icon_url: data.icon?.url,
icon_color: data.icon?.color,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
time_initialized: data.time?.initialized,
sandboxes: data.sandboxes ?? [],
commands: data.commands,
})
}
stats.projects += insert(projectValues, ProjectTable, "project")
step("projects", end - i)
}
log.info("migrated projects", { count: stats.projects, duration: Math.round(performance.now() - start) })
// Migrate sessions (depends on projects)
// Derive all IDs from directory/file paths, not JSON content, since earlier
// migrations may have moved sessions to new directories without updating the JSON
const sessionProjects = sessionFiles.map((file) => path.basename(path.dirname(file)))
const sessionIds = new Set<string>()
const sessionValues = [] as any[]
for (let i = 0; i < sessionFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, sessionFiles.length)
const batch = await read(sessionFiles, i, end)
sessionValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const id = path.basename(sessionFiles[i + j], ".json")
const projectID = sessionProjects[i + j]
if (!projectIds.has(projectID)) {
orphans.sessions++
continue
}
sessionIds.add(id)
sessionValues.push({
id,
project_id: projectID,
parent_id: data.parentID ?? null,
slug: data.slug ?? "",
directory: data.directory ?? "",
title: data.title ?? "",
version: data.version ?? "",
share_url: data.share?.url ?? null,
summary_additions: data.summary?.additions ?? null,
summary_deletions: data.summary?.deletions ?? null,
summary_files: data.summary?.files ?? null,
summary_diffs: data.summary?.diffs ?? null,
revert: data.revert ?? null,
permission: data.permission ?? null,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
time_compacting: data.time?.compacting ?? null,
time_archived: data.time?.archived ?? null,
})
}
stats.sessions += insert(sessionValues, SessionTable, "session")
step("sessions", end - i)
}
log.info("migrated sessions", { count: stats.sessions })
if (orphans.sessions > 0) {
log.warn("skipped orphaned sessions", { count: orphans.sessions })
}
// Migrate messages using pre-scanned file map
const allMessageFiles = [] as string[]
const allMessageSessions = [] as string[]
const messageSessions = new Map<string, string>()
for (const file of messageFiles) {
const sessionID = path.basename(path.dirname(file))
if (!sessionIds.has(sessionID)) continue
allMessageFiles.push(file)
allMessageSessions.push(sessionID)
}
for (let i = 0; i < allMessageFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, allMessageFiles.length)
const batch = await read(allMessageFiles, i, end)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const values = new Array(batch.length)
let count = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const file = allMessageFiles[i + j]
const id = path.basename(file, ".json")
const sessionID = allMessageSessions[i + j]
messageSessions.set(id, sessionID)
const rest = data
delete rest.id
delete rest.sessionID
values[count++] = {
id,
session_id: sessionID,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
data: rest,
}
}
values.length = count
stats.messages += insert(values, MessageTable, "message")
step("messages", end - i)
}
log.info("migrated messages", { count: stats.messages })
// Migrate parts using pre-scanned file map
for (let i = 0; i < partFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, partFiles.length)
const batch = await read(partFiles, i, end)
// oxlint-disable-next-line unicorn/no-new-array -- pre-allocated for index-based batch fill
const values = new Array(batch.length)
let count = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const file = partFiles[i + j]
const id = path.basename(file, ".json")
const messageID = path.basename(path.dirname(file))
const sessionID = messageSessions.get(messageID)
if (!sessionID) {
errs.push(`part missing message session: ${file}`)
continue
}
if (!sessionIds.has(sessionID)) continue
const rest = data
delete rest.id
delete rest.messageID
delete rest.sessionID
values[count++] = {
id,
message_id: messageID,
session_id: sessionID,
time_created: data.time?.created ?? now,
time_updated: data.time?.updated ?? now,
data: rest,
}
}
values.length = count
stats.parts += insert(values, PartTable, "part")
step("parts", end - i)
}
log.info("migrated parts", { count: stats.parts })
// Migrate todos
const todoSessions = todoFiles.map((file) => path.basename(file, ".json"))
for (let i = 0; i < todoFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, todoFiles.length)
const batch = await read(todoFiles, i, end)
const values = [] as any[]
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const sessionID = todoSessions[i + j]
if (!sessionIds.has(sessionID)) {
orphans.todos++
continue
}
if (!Array.isArray(data)) {
errs.push(`todo not an array: ${todoFiles[i + j]}`)
continue
}
for (let position = 0; position < data.length; position++) {
const todo = data[position]
if (!todo?.content || !todo?.status || !todo?.priority) continue
values.push({
session_id: sessionID,
content: todo.content,
status: todo.status,
priority: todo.priority,
position,
time_created: now,
time_updated: now,
})
}
}
stats.todos += insert(values, TodoTable, "todo")
step("todos", end - i)
}
log.info("migrated todos", { count: stats.todos })
if (orphans.todos > 0) {
log.warn("skipped orphaned todos", { count: orphans.todos })
}
// Migrate permissions
const permProjects = permFiles.map((file) => path.basename(file, ".json"))
const permValues = [] as any[]
for (let i = 0; i < permFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, permFiles.length)
const batch = await read(permFiles, i, end)
permValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const projectID = permProjects[i + j]
if (!projectIds.has(projectID)) {
orphans.permissions++
continue
}
permValues.push({ project_id: projectID, data })
}
stats.permissions += insert(permValues, PermissionTable, "permission")
step("permissions", end - i)
}
log.info("migrated permissions", { count: stats.permissions })
if (orphans.permissions > 0) {
log.warn("skipped orphaned permissions", { count: orphans.permissions })
}
// Migrate session shares
const shareSessions = shareFiles.map((file) => path.basename(file, ".json"))
const shareValues = [] as any[]
for (let i = 0; i < shareFiles.length; i += batchSize) {
const end = Math.min(i + batchSize, shareFiles.length)
const batch = await read(shareFiles, i, end)
shareValues.length = 0
for (let j = 0; j < batch.length; j++) {
const data = batch[j]
if (!data) continue
const sessionID = shareSessions[i + j]
if (!sessionIds.has(sessionID)) {
orphans.shares++
continue
}
if (!data?.id || !data?.secret || !data?.url) {
errs.push(`session_share missing id/secret/url: ${shareFiles[i + j]}`)
continue
}
shareValues.push({ session_id: sessionID, id: data.id, secret: data.secret, url: data.url })
}
stats.shares += insert(shareValues, SessionShareTable, "session_share")
step("shares", end - i)
}
log.info("migrated session shares", { count: stats.shares })
if (orphans.shares > 0) {
log.warn("skipped orphaned session shares", { count: orphans.shares })
}
db.run("COMMIT")
log.info("json migration complete", {
projects: stats.projects,
sessions: stats.sessions,
messages: stats.messages,
parts: stats.parts,
todos: stats.todos,
permissions: stats.permissions,
shares: stats.shares,
errorCount: stats.errors.length,
duration: Math.round(performance.now() - start),
})
if (stats.errors.length > 0) {
log.warn("migration errors", { errors: stats.errors.slice(0, 20) })
}
progress?.({ current: total, total, label: "complete" })
return stats
}

View File

@@ -7,327 +7,325 @@ import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import { Effect, Exit, Layer, Option, RcMap, Schema, Context, TxReentrantLock } from "effect"
import { Git } from "@/git"
export namespace Storage {
const log = Log.create({ service: "storage" })
const log = Log.create({ service: "storage" })
type Migration = (
dir: string,
fs: AppFileSystem.Interface,
git: Git.Interface,
) => Effect.Effect<void, AppFileSystem.Error>
type Migration = (
dir: string,
fs: AppFileSystem.Interface,
git: Git.Interface,
) => Effect.Effect<void, AppFileSystem.Error>
export const NotFoundError = NamedError.create(
"NotFoundError",
z.object({
message: z.string(),
export const NotFoundError = NamedError.create(
"NotFoundError",
z.object({
message: z.string(),
}),
)
export type Error = AppFileSystem.Error | InstanceType<typeof NotFoundError>
const RootFile = Schema.Struct({
path: Schema.optional(
Schema.Struct({
root: Schema.optional(Schema.String),
}),
)
),
})
export type Error = AppFileSystem.Error | InstanceType<typeof NotFoundError>
const SessionFile = Schema.Struct({
id: Schema.String,
})
const RootFile = Schema.Struct({
path: Schema.optional(
Schema.Struct({
root: Schema.optional(Schema.String),
}),
),
})
const MessageFile = Schema.Struct({
id: Schema.String,
})
const SessionFile = Schema.Struct({
id: Schema.String,
})
const DiffFile = Schema.Struct({
additions: Schema.Number,
deletions: Schema.Number,
})
const MessageFile = Schema.Struct({
id: Schema.String,
})
const SummaryFile = Schema.Struct({
id: Schema.String,
projectID: Schema.String,
summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }),
})
const DiffFile = Schema.Struct({
additions: Schema.Number,
deletions: Schema.Number,
})
const decodeRoot = Schema.decodeUnknownOption(RootFile)
const decodeSession = Schema.decodeUnknownOption(SessionFile)
const decodeMessage = Schema.decodeUnknownOption(MessageFile)
const decodeSummary = Schema.decodeUnknownOption(SummaryFile)
const SummaryFile = Schema.Struct({
id: Schema.String,
projectID: Schema.String,
summary: Schema.Struct({ diffs: Schema.Array(DiffFile) }),
})
export interface Interface {
readonly remove: (key: string[]) => Effect.Effect<void, AppFileSystem.Error>
readonly read: <T>(key: string[]) => Effect.Effect<T, Error>
readonly update: <T>(key: string[], fn: (draft: T) => void) => Effect.Effect<T, Error>
readonly write: <T>(key: string[], content: T) => Effect.Effect<void, AppFileSystem.Error>
readonly list: (prefix: string[]) => Effect.Effect<string[][], AppFileSystem.Error>
}
const decodeRoot = Schema.decodeUnknownOption(RootFile)
const decodeSession = Schema.decodeUnknownOption(SessionFile)
const decodeMessage = Schema.decodeUnknownOption(MessageFile)
const decodeSummary = Schema.decodeUnknownOption(SummaryFile)
export class Service extends Context.Service<Service, Interface>()("@opencode/Storage") {}
export interface Interface {
readonly remove: (key: string[]) => Effect.Effect<void, AppFileSystem.Error>
readonly read: <T>(key: string[]) => Effect.Effect<T, Error>
readonly update: <T>(key: string[], fn: (draft: T) => void) => Effect.Effect<T, Error>
readonly write: <T>(key: string[], content: T) => Effect.Effect<void, AppFileSystem.Error>
readonly list: (prefix: string[]) => Effect.Effect<string[][], AppFileSystem.Error>
function file(dir: string, key: string[]) {
return path.join(dir, ...key) + ".json"
}
function missing(err: unknown) {
if (!err || typeof err !== "object") return false
if ("code" in err && err.code === "ENOENT") return true
if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) {
return err.reason._tag === "NotFound"
}
return false
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Storage") {}
function parseMigration(text: string) {
const value = Number.parseInt(text, 10)
return Number.isNaN(value) ? 0 : value
}
function file(dir: string, key: string[]) {
return path.join(dir, ...key) + ".json"
}
const MIGRATIONS: Migration[] = [
Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface, git: Git.Interface) {
const project = path.resolve(dir, "../project")
if (!(yield* fs.isDir(project))) return
const projectDirs = yield* fs.glob("*", {
cwd: project,
include: "all",
})
for (const projectDir of projectDirs) {
const full = path.join(project, projectDir)
if (!(yield* fs.isDir(full))) continue
log.info(`migrating project ${projectDir}`)
let projectID = projectDir
let worktree = "/"
function missing(err: unknown) {
if (!err || typeof err !== "object") return false
if ("code" in err && err.code === "ENOENT") return true
if ("reason" in err && err.reason && typeof err.reason === "object" && "_tag" in err.reason) {
return err.reason._tag === "NotFound"
}
return false
}
function parseMigration(text: string) {
const value = Number.parseInt(text, 10)
return Number.isNaN(value) ? 0 : value
}
const MIGRATIONS: Migration[] = [
Effect.fn("Storage.migration.1")(function* (dir: string, fs: AppFileSystem.Interface, git: Git.Interface) {
const project = path.resolve(dir, "../project")
if (!(yield* fs.isDir(project))) return
const projectDirs = yield* fs.glob("*", {
cwd: project,
include: "all",
})
for (const projectDir of projectDirs) {
const full = path.join(project, projectDir)
if (!(yield* fs.isDir(full))) continue
log.info(`migrating project ${projectDir}`)
let projectID = projectDir
let worktree = "/"
if (projectID !== "global") {
for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", {
cwd: full,
absolute: true,
})) {
const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" })
const root = Option.isSome(json) ? json.value.path?.root : undefined
if (!root) continue
worktree = root
break
}
if (!worktree) continue
if (!(yield* fs.isDir(worktree))) continue
const result = yield* git.run(["rev-list", "--max-parents=0", "--all"], {
cwd: worktree,
})
const [id] = result
.text()
.split("\n")
.filter(Boolean)
.map((x) => x.trim())
.toSorted()
if (!id) continue
projectID = id
yield* fs.writeWithDirs(
path.join(dir, "project", projectID + ".json"),
JSON.stringify(
{
id,
vcs: "git",
worktree,
time: {
created: Date.now(),
initialized: Date.now(),
},
},
null,
2,
),
)
log.info(`migrating sessions for project ${projectID}`)
for (const sessionFile of yield* fs.glob("storage/session/info/*.json", {
cwd: full,
absolute: true,
})) {
const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
log.info("copying", { sessionFile, dest })
const session = yield* fs.readJson(sessionFile)
const info = decodeSession(session, { onExcessProperty: "preserve" })
yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2))
if (Option.isNone(info)) continue
log.info(`migrating messages for session ${info.value.id}`)
for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, {
cwd: full,
absolute: true,
})) {
const next = path.join(dir, "message", info.value.id, path.basename(msgFile))
log.info("copying", {
msgFile,
dest: next,
})
const message = yield* fs.readJson(msgFile)
const item = decodeMessage(message, { onExcessProperty: "preserve" })
yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2))
if (Option.isNone(item)) continue
log.info(`migrating parts for message ${item.value.id}`)
for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, {
cwd: full,
absolute: true,
})) {
const out = path.join(dir, "part", item.value.id, path.basename(partFile))
const part = yield* fs.readJson(partFile)
log.info("copying", {
partFile,
dest: out,
})
yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2))
}
}
}
if (projectID !== "global") {
for (const msgFile of yield* fs.glob("storage/session/message/*/*.json", {
cwd: full,
absolute: true,
})) {
const json = decodeRoot(yield* fs.readJson(msgFile), { onExcessProperty: "preserve" })
const root = Option.isSome(json) ? json.value.path?.root : undefined
if (!root) continue
worktree = root
break
}
}
}),
Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) {
for (const item of yield* fs.glob("session/*/*.json", {
cwd: dir,
absolute: true,
})) {
const raw = yield* fs.readJson(item)
const session = decodeSummary(raw, { onExcessProperty: "preserve" })
if (Option.isNone(session)) continue
const diffs = session.value.summary.diffs
if (!worktree) continue
if (!(yield* fs.isDir(worktree))) continue
const result = yield* git.run(["rev-list", "--max-parents=0", "--all"], {
cwd: worktree,
})
const [id] = result
.text()
.split("\n")
.filter(Boolean)
.map((x) => x.trim())
.toSorted()
if (!id) continue
projectID = id
yield* fs.writeWithDirs(
path.join(dir, "session_diff", session.value.id + ".json"),
JSON.stringify(diffs, null, 2),
)
yield* fs.writeWithDirs(
path.join(dir, "session", session.value.projectID, session.value.id + ".json"),
path.join(dir, "project", projectID + ".json"),
JSON.stringify(
{
...(raw as Record<string, unknown>),
summary: {
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
id,
vcs: "git",
worktree,
time: {
created: Date.now(),
initialized: Date.now(),
},
},
null,
2,
),
)
}
}),
]
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const git = yield* Git.Service
const locks = yield* RcMap.make({
lookup: () => TxReentrantLock.make(),
idleTimeToLive: 0,
})
const state = yield* Effect.cached(
Effect.gen(function* () {
const dir = path.join(Global.Path.data, "storage")
const marker = path.join(dir, "migration")
const migration = yield* fs.readFileString(marker).pipe(
Effect.map(parseMigration),
Effect.catchIf(missing, () => Effect.succeed(0)),
Effect.orElseSucceed(() => 0),
)
for (let i = migration; i < MIGRATIONS.length; i++) {
log.info("running migration", { index: i })
const step = MIGRATIONS[i]!
const exit = yield* Effect.exit(step(dir, fs, git))
if (Exit.isFailure(exit)) {
log.error("failed to run migration", { index: i, cause: exit.cause })
break
log.info(`migrating sessions for project ${projectID}`)
for (const sessionFile of yield* fs.glob("storage/session/info/*.json", {
cwd: full,
absolute: true,
})) {
const dest = path.join(dir, "session", projectID, path.basename(sessionFile))
log.info("copying", { sessionFile, dest })
const session = yield* fs.readJson(sessionFile)
const info = decodeSession(session, { onExcessProperty: "preserve" })
yield* fs.writeWithDirs(dest, JSON.stringify(session, null, 2))
if (Option.isNone(info)) continue
log.info(`migrating messages for session ${info.value.id}`)
for (const msgFile of yield* fs.glob(`storage/session/message/${info.value.id}/*.json`, {
cwd: full,
absolute: true,
})) {
const next = path.join(dir, "message", info.value.id, path.basename(msgFile))
log.info("copying", {
msgFile,
dest: next,
})
const message = yield* fs.readJson(msgFile)
const item = decodeMessage(message, { onExcessProperty: "preserve" })
yield* fs.writeWithDirs(next, JSON.stringify(message, null, 2))
if (Option.isNone(item)) continue
log.info(`migrating parts for message ${item.value.id}`)
for (const partFile of yield* fs.glob(`storage/session/part/${info.value.id}/${item.value.id}/*.json`, {
cwd: full,
absolute: true,
})) {
const out = path.join(dir, "part", item.value.id, path.basename(partFile))
const part = yield* fs.readJson(partFile)
log.info("copying", {
partFile,
dest: out,
})
yield* fs.writeWithDirs(out, JSON.stringify(part, null, 2))
}
yield* fs.writeWithDirs(marker, String(i + 1))
}
return { dir }
}
}
}
}),
Effect.fn("Storage.migration.2")(function* (dir: string, fs: AppFileSystem.Interface) {
for (const item of yield* fs.glob("session/*/*.json", {
cwd: dir,
absolute: true,
})) {
const raw = yield* fs.readJson(item)
const session = decodeSummary(raw, { onExcessProperty: "preserve" })
if (Option.isNone(session)) continue
const diffs = session.value.summary.diffs
yield* fs.writeWithDirs(
path.join(dir, "session_diff", session.value.id + ".json"),
JSON.stringify(diffs, null, 2),
)
yield* fs.writeWithDirs(
path.join(dir, "session", session.value.projectID, session.value.id + ".json"),
JSON.stringify(
{
...(raw as Record<string, unknown>),
summary: {
additions: diffs.reduce((sum, x) => sum + x.additions, 0),
deletions: diffs.reduce((sum, x) => sum + x.deletions, 0),
},
},
null,
2,
),
)
}
}),
]
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const git = yield* Git.Service
const locks = yield* RcMap.make({
lookup: () => TxReentrantLock.make(),
idleTimeToLive: 0,
})
const state = yield* Effect.cached(
Effect.gen(function* () {
const dir = path.join(Global.Path.data, "storage")
const marker = path.join(dir, "migration")
const migration = yield* fs.readFileString(marker).pipe(
Effect.map(parseMigration),
Effect.catchIf(missing, () => Effect.succeed(0)),
Effect.orElseSucceed(() => 0),
)
for (let i = migration; i < MIGRATIONS.length; i++) {
log.info("running migration", { index: i })
const step = MIGRATIONS[i]!
const exit = yield* Effect.exit(step(dir, fs, git))
if (Exit.isFailure(exit)) {
log.error("failed to run migration", { index: i, cause: exit.cause })
break
}
yield* fs.writeWithDirs(marker, String(i + 1))
}
return { dir }
}),
)
const fail = (target: string): Effect.Effect<never, InstanceType<typeof NotFoundError>> =>
Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` }))
const wrap = <A>(target: string, body: Effect.Effect<A, AppFileSystem.Error>) =>
body.pipe(Effect.catchIf(missing, () => fail(target)))
const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) {
yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2))
})
const withResolved = <A, E>(
key: string[],
fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect<A, E>,
): Effect.Effect<A, E | AppFileSystem.Error> =>
Effect.scoped(
Effect.gen(function* () {
const target = file((yield* state).dir, key)
return yield* fn(target, yield* RcMap.get(locks, target))
}),
)
const fail = (target: string): Effect.Effect<never, InstanceType<typeof NotFoundError>> =>
Effect.fail(new NotFoundError({ message: `Resource not found: ${target}` }))
const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) {
yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))),
)
})
const wrap = <A>(target: string, body: Effect.Effect<A, AppFileSystem.Error>) =>
body.pipe(Effect.catchIf(missing, () => fail(target)))
const writeJson = Effect.fnUntraced(function* (target: string, content: unknown) {
yield* fs.writeWithDirs(target, JSON.stringify(content, null, 2))
})
const withResolved = <A, E>(
key: string[],
fn: (target: string, rw: TxReentrantLock.TxReentrantLock) => Effect.Effect<A, E>,
): Effect.Effect<A, E | AppFileSystem.Error> =>
Effect.scoped(
Effect.gen(function* () {
const target = file((yield* state).dir, key)
return yield* fn(target, yield* RcMap.get(locks, target))
}),
const read: Interface["read"] = <T>(key: string[]) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))),
)
return value as T
})
const remove: Interface["remove"] = Effect.fn("Storage.remove")(function* (key: string[]) {
yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(rw, fs.remove(target).pipe(Effect.catchIf(missing, () => Effect.void))),
const update: Interface["update"] = <T>(key: string[], fn: (draft: T) => void) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(
rw,
Effect.gen(function* () {
const content = yield* wrap(target, fs.readJson(target))
fn(content as T)
yield* writeJson(target, content)
return content
}),
),
)
return value as T
})
const read: Interface["read"] = <T>(key: string[]) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withReadLock(rw, wrap(target, fs.readJson(target))),
)
return value as T
})
const update: Interface["update"] = <T>(key: string[], fn: (draft: T) => void) =>
Effect.gen(function* () {
const value = yield* withResolved(key, (target, rw) =>
TxReentrantLock.withWriteLock(
rw,
Effect.gen(function* () {
const content = yield* wrap(target, fs.readJson(target))
fn(content as T)
yield* writeJson(target, content)
return content
}),
),
)
return value as T
})
const write: Interface["write"] = (key: string[], content: unknown) =>
Effect.gen(function* () {
yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content)))
})
const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) {
const dir = (yield* state).dir
const cwd = path.join(dir, ...prefix)
const result = yield* fs
.glob("**/*", {
cwd,
include: "file",
})
.pipe(Effect.catch(() => Effect.succeed<string[]>([])))
return result
.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])
.toSorted((a, b) => a.join("/").localeCompare(b.join("/")))
const write: Interface["write"] = (key: string[], content: unknown) =>
Effect.gen(function* () {
yield* withResolved(key, (target, rw) => TxReentrantLock.withWriteLock(rw, writeJson(target, content)))
})
return Service.of({
remove,
read,
update,
write,
list,
})
}),
)
const list: Interface["list"] = Effect.fn("Storage.list")(function* (prefix: string[]) {
const dir = (yield* state).dir
const cwd = path.join(dir, ...prefix)
const result = yield* fs
.glob("**/*", {
cwd,
include: "file",
})
.pipe(Effect.catch(() => Effect.succeed<string[]>([])))
return result
.map((x) => [...prefix, ...x.slice(0, -5).split(path.sep)])
.toSorted((a, b) => a.join("/").localeCompare(b.join("/")))
})
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(Git.defaultLayer))
}
return Service.of({
remove,
read,
update,
write,
list,
})
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(Git.defaultLayer))

View File

@@ -1,6 +1,6 @@
import z from "zod"
import type { ZodObject } from "zod"
import { Database, eq } from "@/storage/db"
import { Database, eq } from "@/storage"
import { GlobalBus } from "@/bus/global"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"

View File

@@ -4,7 +4,7 @@ import { Global } from "../global"
import { Instance } from "../project/instance"
import { InstanceBootstrap } from "../project/bootstrap"
import { Project } from "../project"
import { Database, eq } from "../storage/db"
import { Database, eq } from "../storage"
import { ProjectTable } from "../project/project.sql"
import type { ProjectID } from "../project/schema"
import { Log } from "../util"

View File

@@ -3,7 +3,7 @@ import { Effect, Layer, Option } from "effect"
import { AccountRepo } from "../../src/account/repo"
import { AccessToken, AccountID, OrgID, RefreshToken } from "../../src/account/schema"
import { Database } from "../../src/storage/db"
import { Database } from "../../src/storage"
import { testEffect } from "../lib/effect"
const truncate = Layer.effectDiscard(

View File

@@ -15,7 +15,7 @@ import {
RefreshToken,
UserCode,
} from "../../src/account/schema"
import { Database } from "../../src/storage/db"
import { Database } from "../../src/storage"
import { testEffect } from "../lib/effect"
const truncate = Layer.effectDiscard(

View File

@@ -1,6 +1,6 @@
import { rm } from "fs/promises"
import { Instance } from "../../src/project/instance"
import { Database } from "../../src/storage/db"
import { Database } from "../../src/storage"
export async function resetDatabase() {
await Instance.disposeAll().catch(() => undefined)

View File

@@ -10,7 +10,7 @@ import { afterAll } from "bun:test"
const dir = path.join(os.tmpdir(), "opencode-test-data-" + process.pid)
await fs.mkdir(dir, { recursive: true })
afterAll(async () => {
const { Database } = await import("../src/storage/db")
const { Database } = await import("../src/storage")
Database.close()
const busy = (error: unknown) =>
typeof error === "object" && error !== null && "code" in error && error.code === "EBUSY"

View File

@@ -1,6 +1,6 @@
import { describe, expect, test } from "bun:test"
import { Project } from "../../src/project"
import { Database, eq } from "../../src/storage/db"
import { Database, eq } from "../../src/storage"
import { SessionTable } from "../../src/session/session.sql"
import { ProjectTable } from "../../src/project/project.sql"
import { ProjectID } from "../../src/project/schema"
@@ -10,7 +10,7 @@ import { $ } from "bun"
import { tmpdir } from "../fixture/fixture"
import { Effect } from "effect"
void Log.init({ print: false })
Log.init({ print: false })
function run<A>(fn: (svc: Project.Interface) => Effect.Effect<A>) {
return Effect.runPromise(

View File

@@ -14,7 +14,7 @@ import { Session } from "../../src/session"
import type { SessionID } from "../../src/session/schema"
import { ShareNext } from "../../src/share"
import { SessionShareTable } from "../../src/share/share.sql"
import { Database, eq } from "../../src/storage/db"
import { Database, eq } from "../../src/storage"
import { provideTmpdirInstance } from "../fixture/fixture"
import { resetDatabase } from "../fixture/db"
import { testEffect } from "../lib/effect"

View File

@@ -2,7 +2,7 @@ import { describe, expect, test } from "bun:test"
import path from "path"
import { Global } from "../../src/global"
import { Installation } from "../../src/installation"
import { Database } from "../../src/storage/db"
import { Database } from "../../src/storage"
describe("Database.Path", () => {
test("returns database path for the current channel", () => {

View File

@@ -5,7 +5,7 @@ import { migrate } from "drizzle-orm/bun-sqlite/migrator"
import path from "path"
import fs from "fs/promises"
import { readFileSync, readdirSync } from "fs"
import { JsonMigration } from "../../src/storage/json-migration"
import { JsonMigration } from "../../src/storage"
import { Global } from "../../src/global"
import { ProjectTable } from "../../src/project/project.sql"
import { ProjectID } from "../../src/project/schema"

View File

@@ -5,7 +5,7 @@ import { AppFileSystem } from "@opencode-ai/shared/filesystem"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { Git } from "../../src/git"
import { Global } from "../../src/global"
import { Storage } from "../../src/storage/storage"
import { Storage } from "../../src/storage"
import { tmpdirScoped } from "../fixture/fixture"
import { testEffect } from "../lib/effect"

View File

@@ -4,7 +4,7 @@ import z from "zod"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { SyncEvent } from "../../src/sync"
import { Database } from "../../src/storage/db"
import { Database } from "../../src/storage"
import { EventTable } from "../../src/sync/event.sql"
import { Identifier } from "../../src/id/id"
import { Flag } from "../../src/flag/flag"