mirror of
https://fastgit.cc/https://github.com/anomalyco/opencode
synced 2026-05-02 06:54:35 +08:00
Compare commits
14 Commits
dev
...
effect-dri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb767f8902 | ||
|
|
49a933f875 | ||
|
|
069414b149 | ||
|
|
9bf3985a66 | ||
|
|
8651dc4f3d | ||
|
|
ce804811c0 | ||
|
|
23a5c4d799 | ||
|
|
29c5cd05a2 | ||
|
|
a03a08abe0 | ||
|
|
48a27db002 | ||
|
|
2b22cc2993 | ||
|
|
2abed73283 | ||
|
|
3e0533632e | ||
|
|
b6a83e196c |
16
bun.lock
16
bun.lock
@@ -301,6 +301,19 @@
|
||||
"@lydell/node-pty-win32-x64": "1.2.0-beta.10",
|
||||
},
|
||||
},
|
||||
"packages/effect-drizzle-sqlite": {
|
||||
"name": "@opencode-ai/effect-drizzle-sqlite",
|
||||
"version": "0.0.0",
|
||||
"dependencies": {
|
||||
"drizzle-orm": "catalog:",
|
||||
"effect": "catalog:",
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tsconfig/bun": "catalog:",
|
||||
"@types/bun": "catalog:",
|
||||
"@typescript/native-preview": "catalog:",
|
||||
},
|
||||
},
|
||||
"packages/enterprise": {
|
||||
"name": "@opencode-ai/enterprise",
|
||||
"version": "1.14.28",
|
||||
@@ -390,6 +403,7 @@
|
||||
"@octokit/graphql": "9.0.2",
|
||||
"@octokit/rest": "catalog:",
|
||||
"@openauthjs/openauth": "catalog:",
|
||||
"@opencode-ai/effect-drizzle-sqlite": "workspace:*",
|
||||
"@opencode-ai/plugin": "workspace:*",
|
||||
"@opencode-ai/script": "workspace:*",
|
||||
"@opencode-ai/sdk": "workspace:*",
|
||||
@@ -1567,6 +1581,8 @@
|
||||
|
||||
"@opencode-ai/desktop-electron": ["@opencode-ai/desktop-electron@workspace:packages/desktop-electron"],
|
||||
|
||||
"@opencode-ai/effect-drizzle-sqlite": ["@opencode-ai/effect-drizzle-sqlite@workspace:packages/effect-drizzle-sqlite"],
|
||||
|
||||
"@opencode-ai/enterprise": ["@opencode-ai/enterprise@workspace:packages/enterprise"],
|
||||
|
||||
"@opencode-ai/function": ["@opencode-ai/function@workspace:packages/function"],
|
||||
|
||||
24
packages/effect-drizzle-sqlite/package.json
Normal file
24
packages/effect-drizzle-sqlite/package.json
Normal file
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"$schema": "https://json.schemastore.org/package.json",
|
||||
"name": "@opencode-ai/effect-drizzle-sqlite",
|
||||
"version": "0.0.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"scripts": {
|
||||
"test": "bun test",
|
||||
"typecheck": "tsgo --noEmit"
|
||||
},
|
||||
"exports": {
|
||||
".": "./src/index.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"drizzle-orm": "catalog:",
|
||||
"effect": "catalog:"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tsconfig/bun": "catalog:",
|
||||
"@types/bun": "catalog:",
|
||||
"@typescript/native-preview": "catalog:"
|
||||
}
|
||||
}
|
||||
266
packages/effect-drizzle-sqlite/src/index.ts
Normal file
266
packages/effect-drizzle-sqlite/src/index.ts
Normal file
@@ -0,0 +1,266 @@
|
||||
import { Database } from "bun:sqlite"
|
||||
import { drizzle as drizzleBun, type SQLiteBunDatabase } from "drizzle-orm/bun-sqlite"
|
||||
import type { AnyRelations, EmptyRelations } from "drizzle-orm/relations"
|
||||
import { SQLiteCountBuilder } from "drizzle-orm/sqlite-core/query-builders/count"
|
||||
import { SQLiteDeleteBase } from "drizzle-orm/sqlite-core/query-builders/delete"
|
||||
import { SQLiteInsertBase } from "drizzle-orm/sqlite-core/query-builders/insert"
|
||||
import { SQLiteRelationalQuery, SQLiteSyncRelationalQuery } from "drizzle-orm/sqlite-core/query-builders/_query"
|
||||
import { SQLiteSelectBase } from "drizzle-orm/sqlite-core/query-builders/select"
|
||||
import { SQLiteUpdateBase } from "drizzle-orm/sqlite-core/query-builders/update"
|
||||
import type { PreparedQueryConfig, SQLiteSession, SQLiteTransaction, SQLiteTransactionConfig } from "drizzle-orm/sqlite-core/session"
|
||||
import { SQLitePreparedQuery } from "drizzle-orm/sqlite-core/session"
|
||||
import type { DrizzleConfig } from "drizzle-orm/utils"
|
||||
import { Cause, Effect, Exit, Schema } from "effect"
|
||||
import { pipeArguments } from "effect/Pipeable"
|
||||
|
||||
export class EffectDrizzleQueryError extends Schema.TaggedErrorClass<EffectDrizzleQueryError>()(
|
||||
"EffectDrizzleQueryError",
|
||||
{
|
||||
query: Schema.String,
|
||||
params: Schema.Array(Schema.Unknown),
|
||||
cause: Schema.Unknown,
|
||||
},
|
||||
) {
|
||||
override get message() {
|
||||
return `Failed query: ${this.query}\nparams: ${JSON.stringify(this.params)}`
|
||||
}
|
||||
}
|
||||
|
||||
export type EffectSQLiteDatabase<
|
||||
TSchema extends Record<string, unknown> = Record<string, never>,
|
||||
TRelations extends AnyRelations = EmptyRelations,
|
||||
> = SQLiteBunDatabase<TSchema, TRelations> & {
|
||||
readonly $client: Database
|
||||
readonly withTransaction: <A, E, R>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
config?: SQLiteTransactionConfig,
|
||||
) => Effect.Effect<A, E, R>
|
||||
}
|
||||
|
||||
export type MakeConfig<
|
||||
TSchema extends Record<string, unknown> = Record<string, never>,
|
||||
TRelations extends AnyRelations = EmptyRelations,
|
||||
> = DrizzleConfig<TSchema, TRelations> & {
|
||||
readonly client?: Database
|
||||
}
|
||||
|
||||
type EffectLikeQuery<A = unknown> = {
|
||||
readonly asEffect?: () => Effect.Effect<A, EffectDrizzleQueryError>
|
||||
readonly toSQL?: () => { readonly sql: string; readonly params?: readonly unknown[] }
|
||||
}
|
||||
|
||||
type PreparedLike<A = unknown> = EffectLikeQuery<A> & {
|
||||
readonly execute: () => unknown
|
||||
readonly getQuery?: () => { readonly sql: string; readonly params?: readonly unknown[] }
|
||||
}
|
||||
|
||||
type SelectLike<A = unknown> = EffectLikeQuery<A> & {
|
||||
readonly all: () => A
|
||||
}
|
||||
|
||||
type MutationLike<A = unknown> = EffectLikeQuery<A> & {
|
||||
readonly all: () => A
|
||||
readonly run: () => A
|
||||
readonly config?: { readonly returning?: unknown }
|
||||
}
|
||||
|
||||
type CountLike = EffectLikeQuery<number> & {
|
||||
readonly session: { readonly values: (sql: unknown) => unknown[][] }
|
||||
readonly sql: unknown
|
||||
}
|
||||
|
||||
class TransactionFailure extends Error {
|
||||
constructor(readonly effectCause: Cause.Cause<unknown>) {
|
||||
super("Effect transaction failed")
|
||||
}
|
||||
}
|
||||
|
||||
// These keys are Effect runtime internals (effect/internal/core.ts). They are
|
||||
// not exported from the `effect` public API. We rely on them to make Drizzle
|
||||
// query builders directly yieldable. If a future Effect version renames or
|
||||
// removes them, the module-load assertion below fails loudly instead of
|
||||
// failing silently with "Effect.evaluate: Not implemented" defects deep in
|
||||
// the fiber executor.
|
||||
const EffectTypeId = "~effect/Effect"
|
||||
const EffectIdentifier = `${EffectTypeId}/identifier`
|
||||
const EffectEvaluate = `${EffectTypeId}/evaluate`
|
||||
|
||||
if (!(Effect.succeed(0) as unknown as Record<PropertyKey, unknown>)[EffectTypeId]) {
|
||||
throw new Error(
|
||||
"@opencode-ai/effect-drizzle-sqlite: Effect protocol keys are missing on Effect.succeed(0). " +
|
||||
"The installed `effect` version is incompatible with this adapter.",
|
||||
)
|
||||
}
|
||||
|
||||
const effectVariance = {
|
||||
_A: (value: unknown) => value,
|
||||
_E: (value: unknown) => value,
|
||||
_R: (value: unknown) => value,
|
||||
}
|
||||
|
||||
const queryInfo = (query: EffectLikeQuery | PreparedLike) => {
|
||||
const info = "getQuery" in query && typeof query.getQuery === "function" ? query.getQuery() : query.toSQL?.()
|
||||
return {
|
||||
query: info?.sql ?? "<unknown>",
|
||||
params: [...(info?.params ?? [])],
|
||||
}
|
||||
}
|
||||
|
||||
const queryError = (query: EffectLikeQuery | PreparedLike, cause: unknown) =>
|
||||
new EffectDrizzleQueryError({
|
||||
...queryInfo(query),
|
||||
cause,
|
||||
})
|
||||
|
||||
const fromSync = <A>(query: EffectLikeQuery, run: () => A) =>
|
||||
Effect.try({
|
||||
try: run,
|
||||
catch: (cause) => queryError(query, cause),
|
||||
})
|
||||
|
||||
const fromMutation = (query: MutationLike) => fromSync(query, () => (query.config?.returning ? query.all() : query.run()))
|
||||
|
||||
const fromCount = (query: CountLike) => fromSync(query, () => Number(query.session.values(query.sql)[0]?.[0] ?? 0))
|
||||
|
||||
const fromExecuteResult = (result: unknown) => {
|
||||
if (result && typeof result === "object" && "sync" in result && typeof result.sync === "function") {
|
||||
return result.sync()
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const queryEffectProto = {
|
||||
[EffectTypeId]: effectVariance,
|
||||
pipe() {
|
||||
return pipeArguments(this, arguments)
|
||||
},
|
||||
[Symbol.iterator]() {
|
||||
let done = false
|
||||
const self = this
|
||||
return {
|
||||
next(value: unknown) {
|
||||
if (done) return { done: true, value }
|
||||
done = true
|
||||
return { done: false, value: self }
|
||||
},
|
||||
[Symbol.iterator]() {
|
||||
return this
|
||||
},
|
||||
}
|
||||
},
|
||||
[EffectIdentifier]: "DrizzleSqliteQuery",
|
||||
[EffectEvaluate](this: EffectLikeQuery) {
|
||||
return this.asEffect?.() ?? Effect.die("Drizzle SQLite query is missing asEffect()")
|
||||
},
|
||||
}
|
||||
|
||||
const patchClass = <A>(ctor: { readonly prototype: object }, asEffect: (self: A) => Effect.Effect<unknown, EffectDrizzleQueryError>) => {
|
||||
if (Object.prototype.hasOwnProperty.call(ctor.prototype, "asEffect")) return
|
||||
Object.assign(ctor.prototype, queryEffectProto, {
|
||||
asEffect(this: A) {
|
||||
return asEffect(this)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// `patchClass` is idempotent via `hasOwnProperty` check, so calling this
|
||||
// repeatedly is cheap. Patches are applied to Drizzle prototypes globally and
|
||||
// survive any Database close/reopen cycle.
|
||||
const patchQueryBuilders = () => {
|
||||
patchClass(SQLitePreparedQuery, (query: PreparedLike) => fromSync(query, () => fromExecuteResult(query.execute())))
|
||||
patchClass(SQLiteSelectBase, (query: SelectLike) => fromSync(query, () => query.all()))
|
||||
patchClass(SQLiteInsertBase, fromMutation)
|
||||
patchClass(SQLiteUpdateBase, fromMutation)
|
||||
patchClass(SQLiteDeleteBase, fromMutation)
|
||||
patchClass(SQLiteRelationalQuery, (query: EffectLikeQuery & { readonly executeRaw: () => unknown }) =>
|
||||
fromSync(query, () => query.executeRaw()),
|
||||
)
|
||||
patchClass(SQLiteSyncRelationalQuery, (query: EffectLikeQuery & { readonly executeRaw: () => unknown }) =>
|
||||
fromSync(query, () => query.executeRaw()),
|
||||
)
|
||||
patchClass(SQLiteCountBuilder, fromCount)
|
||||
}
|
||||
|
||||
const attachTransaction = <
|
||||
TSchema extends Record<string, unknown> = Record<string, never>,
|
||||
TRelations extends AnyRelations = EmptyRelations,
|
||||
>(db: SQLiteBunDatabase<TSchema, TRelations> & { readonly $client: Database }): EffectSQLiteDatabase<TSchema, TRelations> => {
|
||||
const txStack: Array<SQLiteTransaction<"sync", void, TSchema, TRelations>> = []
|
||||
const current = () => txStack.at(-1) ?? db
|
||||
const runTransaction = (target: SQLiteBunDatabase<TSchema, TRelations> | SQLiteTransaction<"sync", void, TSchema, TRelations>) =>
|
||||
target.transaction.bind(target) as (
|
||||
transaction: (tx: SQLiteTransaction<"sync", void, TSchema, TRelations>) => unknown,
|
||||
config?: SQLiteTransactionConfig,
|
||||
) => unknown
|
||||
|
||||
const withTransaction = <A, E, R>(
|
||||
effect: Effect.Effect<A, E, R>,
|
||||
config?: SQLiteTransactionConfig,
|
||||
): Effect.Effect<A, E, R> =>
|
||||
Effect.context<R>().pipe(
|
||||
Effect.flatMap((context) =>
|
||||
Effect.sync(
|
||||
() =>
|
||||
runTransaction(current())((tx) => {
|
||||
txStack.push(tx)
|
||||
try {
|
||||
const exit = Effect.runSyncExit(Effect.provideContext(effect, context))
|
||||
if (Exit.isSuccess(exit)) return exit.value
|
||||
throw new TransactionFailure(exit.cause)
|
||||
} finally {
|
||||
txStack.pop()
|
||||
}
|
||||
}, config) as A,
|
||||
).pipe(
|
||||
Effect.catchDefect((defect) =>
|
||||
defect instanceof TransactionFailure ? Effect.failCause(defect.effectCause as Cause.Cause<E>) : Effect.die(defect),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
return new Proxy(db, {
|
||||
get(_target, property) {
|
||||
if (property === "withTransaction") return withTransaction
|
||||
if (property === "$client") return db.$client
|
||||
|
||||
const target = current()
|
||||
const value = Reflect.get(target, property)
|
||||
return typeof value === "function" ? value.bind(target) : value
|
||||
},
|
||||
}) as EffectSQLiteDatabase<TSchema, TRelations>
|
||||
}
|
||||
|
||||
export const make = <
|
||||
TSchema extends Record<string, unknown> = Record<string, never>,
|
||||
TRelations extends AnyRelations = EmptyRelations,
|
||||
>(config: MakeConfig<TSchema, TRelations> = {}): EffectSQLiteDatabase<TSchema, TRelations> => {
|
||||
patchQueryBuilders()
|
||||
return attachTransaction(
|
||||
drizzleBun({
|
||||
...config,
|
||||
client: config.client ?? new Database(":memory:"),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
export const drizzle = make
|
||||
|
||||
declare module "drizzle-orm/query-promise" {
|
||||
interface QueryPromise<T> extends Effect.Effect<T, EffectDrizzleQueryError> {
|
||||
asEffect(): Effect.Effect<T, EffectDrizzleQueryError>
|
||||
}
|
||||
}
|
||||
|
||||
declare module "drizzle-orm/sqlite-core/session" {
|
||||
interface SQLitePreparedQuery<T extends PreparedQueryConfig> extends Effect.Effect<T["execute"], EffectDrizzleQueryError> {
|
||||
asEffect(): Effect.Effect<T["execute"], EffectDrizzleQueryError>
|
||||
}
|
||||
}
|
||||
|
||||
declare module "drizzle-orm/sqlite-core/query-builders/count" {
|
||||
interface SQLiteCountBuilder<TSession extends SQLiteSession<any, any, any, any>>
|
||||
extends Effect.Effect<number, EffectDrizzleQueryError> {
|
||||
asEffect(): Effect.Effect<number, EffectDrizzleQueryError>
|
||||
}
|
||||
}
|
||||
237
packages/effect-drizzle-sqlite/test/sqlite.test.ts
Normal file
237
packages/effect-drizzle-sqlite/test/sqlite.test.ts
Normal file
@@ -0,0 +1,237 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test"
|
||||
import { Database } from "bun:sqlite"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { relations } from "drizzle-orm/_relations"
|
||||
import { drizzle as drizzleBun } from "drizzle-orm/bun-sqlite"
|
||||
import { integer, sqliteTable, text } from "drizzle-orm/sqlite-core"
|
||||
import { Cause, Effect, Exit } from "effect"
|
||||
import { EffectDrizzleQueryError, make, type EffectSQLiteDatabase } from "../src"
|
||||
|
||||
const users = sqliteTable("users", {
|
||||
id: integer().primaryKey(),
|
||||
name: text().notNull(),
|
||||
})
|
||||
|
||||
const posts = sqliteTable("posts", {
|
||||
id: integer().primaryKey(),
|
||||
user_id: integer()
|
||||
.notNull()
|
||||
.references(() => users.id),
|
||||
title: text().notNull(),
|
||||
})
|
||||
|
||||
const usersRelations = relations(users, ({ many }) => ({
|
||||
posts: many(posts),
|
||||
}))
|
||||
|
||||
const postsRelations = relations(posts, ({ one }) => ({
|
||||
user: one(users, {
|
||||
fields: [posts.user_id],
|
||||
references: [users.id],
|
||||
}),
|
||||
}))
|
||||
|
||||
const schema = { users, posts, usersRelations, postsRelations }
|
||||
|
||||
let db: EffectSQLiteDatabase<typeof schema>
|
||||
|
||||
const testEffect = <A, E>(name: string, effect: () => Effect.Effect<A, E>) => test(name, () => Effect.runPromise(effect()))
|
||||
|
||||
beforeEach(() => {
|
||||
db = make({ schema })
|
||||
db.$client.run("PRAGMA foreign_keys = ON")
|
||||
db.$client.run("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)")
|
||||
db.$client.run(
|
||||
"CREATE TABLE posts (id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL REFERENCES users(id), title TEXT NOT NULL)",
|
||||
)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
db.$client.close()
|
||||
})
|
||||
|
||||
describe("effect drizzle sqlite", () => {
|
||||
test("keeps normal Drizzle Bun SQLite clients usable after patching", async () => {
|
||||
const sqlite = new Database(":memory:")
|
||||
try {
|
||||
const normal = drizzleBun({ client: sqlite })
|
||||
sqlite.run("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT NOT NULL)")
|
||||
|
||||
normal.insert(users).values({ id: 1, name: "Ada" }).run()
|
||||
|
||||
expect(normal.select().from(users).all()).toEqual([{ id: 1, name: "Ada" }])
|
||||
expect(await normal.select().from(users)).toEqual([{ id: 1, name: "Ada" }])
|
||||
} finally {
|
||||
sqlite.close()
|
||||
}
|
||||
})
|
||||
|
||||
testEffect("makes select/insert/update/delete query builders yieldable Effects", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
yield* db.insert(users).values({ id: 2, name: "Grace" })
|
||||
|
||||
const selected = yield* db.select().from(users).orderBy(users.id)
|
||||
expect(selected).toEqual([
|
||||
{ id: 1, name: "Ada" },
|
||||
{ id: 2, name: "Grace" },
|
||||
])
|
||||
|
||||
const updated = yield* db.update(users).set({ name: "Lovelace" }).where(eq(users.id, 1)).returning()
|
||||
expect(updated).toEqual([{ id: 1, name: "Lovelace" }])
|
||||
|
||||
const deleted = yield* db.delete(users).where(eq(users.id, 2)).returning({ id: users.id })
|
||||
expect(deleted).toEqual([{ id: 2 }])
|
||||
|
||||
expect(yield* db.select().from(users)).toEqual([{ id: 1, name: "Lovelace" }])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("supports direct Effect combinators on queries", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
|
||||
expect(
|
||||
yield* (db.select().from(users) as Effect.Effect<Array<{ readonly name: string }>, EffectDrizzleQueryError>).pipe(
|
||||
Effect.map((rows) => rows.map((row) => row.name)),
|
||||
),
|
||||
).toEqual(["Ada"])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("supports relational query builders", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
yield* db.insert(posts).values({ id: 1, user_id: 1, title: "Notes" })
|
||||
expect(
|
||||
yield* db._query.users.findMany({
|
||||
with: {
|
||||
posts: true,
|
||||
},
|
||||
}),
|
||||
).toEqual([
|
||||
{
|
||||
id: 1,
|
||||
name: "Ada",
|
||||
posts: [{ id: 1, user_id: 1, title: "Notes" }],
|
||||
},
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("runs synchronous Effect programs inside transactions", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
return yield* db.select().from(users)
|
||||
}).pipe(db.withTransaction)
|
||||
|
||||
expect(yield* db.select().from(users)).toEqual([{ id: 1, name: "Ada" }])
|
||||
|
||||
const exit = yield* Effect.exit(
|
||||
Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 2, name: "Grace" })
|
||||
return yield* Effect.fail("rollback")
|
||||
}).pipe(db.withTransaction),
|
||||
)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(yield* db.select().from(users).orderBy(users.id)).toEqual([{ id: 1, name: "Ada" }])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("supports pipeable transactions using the same database service", () =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
return yield* Effect.fail("rollback")
|
||||
}).pipe(db.withTransaction, Effect.exit)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(yield* db.select().from(users)).toEqual([])
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 2, name: "Grace" })
|
||||
expect(yield* db.$count(users)).toBe(1)
|
||||
}).pipe(db.withTransaction)
|
||||
|
||||
expect(yield* db.select().from(users)).toEqual([{ id: 2, name: "Grace" }])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("supports count builders and prepared queries", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* db.insert(users).values([
|
||||
{ id: 1, name: "Ada" },
|
||||
{ id: 2, name: "Grace" },
|
||||
])
|
||||
|
||||
expect(yield* db.$count(users)).toBe(2)
|
||||
|
||||
const prepared = db.select().from(users).orderBy(users.id).prepare()
|
||||
expect(yield* prepared).toEqual([
|
||||
{ id: 1, name: "Ada" },
|
||||
{ id: 2, name: "Grace" },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("nested pipeable transactions commit or roll back with the outer transaction", () =>
|
||||
Effect.gen(function* () {
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 2, name: "Grace" })
|
||||
}).pipe(db.withTransaction)
|
||||
}).pipe(db.withTransaction)
|
||||
|
||||
expect(yield* db.select().from(users).orderBy(users.id)).toEqual([
|
||||
{ id: 1, name: "Ada" },
|
||||
{ id: 2, name: "Grace" },
|
||||
])
|
||||
|
||||
const exit = yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 3, name: "Katherine" })
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 4, name: "Dorothy" })
|
||||
return yield* Effect.fail("inner rollback")
|
||||
}).pipe(db.withTransaction)
|
||||
}).pipe(db.withTransaction, Effect.exit)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(yield* db.select().from(users).orderBy(users.id)).toEqual([
|
||||
{ id: 1, name: "Ada" },
|
||||
{ id: 2, name: "Grace" },
|
||||
])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("defects inside transactions roll back and stay defects", () =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* Effect.gen(function* () {
|
||||
yield* db.insert(users).values({ id: 1, name: "Ada" })
|
||||
return yield* Effect.die("boom")
|
||||
}).pipe(db.withTransaction, Effect.exit)
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
if (Exit.isFailure(exit)) {
|
||||
expect(exit.cause.reasons.some(Cause.isDieReason)).toBe(true)
|
||||
}
|
||||
expect(yield* db.select().from(users)).toEqual([])
|
||||
}),
|
||||
)
|
||||
|
||||
testEffect("wraps query failures with query text and parameters", () =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* Effect.exit(db.insert(posts).values({ id: 1, user_id: 404, title: "Missing" }))
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
if (Exit.isFailure(exit)) {
|
||||
const error = exit.cause.reasons.filter(Cause.isFailReason)[0]?.error
|
||||
expect(error).toBeInstanceOf(EffectDrizzleQueryError)
|
||||
expect((error as EffectDrizzleQueryError).query).toContain("insert into")
|
||||
expect((error as EffectDrizzleQueryError).params).toEqual([1, 404, "Missing"])
|
||||
}
|
||||
}),
|
||||
)
|
||||
})
|
||||
15
packages/effect-drizzle-sqlite/tsconfig.json
Normal file
15
packages/effect-drizzle-sqlite/tsconfig.json
Normal file
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"$schema": "https://json.schemastore.org/tsconfig",
|
||||
"extends": "@tsconfig/bun/tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"types": ["bun"],
|
||||
"noUncheckedIndexedAccess": false,
|
||||
"plugins": [
|
||||
{
|
||||
"name": "@effect/language-service",
|
||||
"transform": "@effect/language-service/transform",
|
||||
"namespaceImportPackages": ["effect", "@effect/*"]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -112,6 +112,7 @@
|
||||
"@octokit/graphql": "9.0.2",
|
||||
"@octokit/rest": "catalog:",
|
||||
"@openauthjs/openauth": "catalog:",
|
||||
"@opencode-ai/effect-drizzle-sqlite": "workspace:*",
|
||||
"@opencode-ai/plugin": "workspace:*",
|
||||
"@opencode-ai/script": "workspace:*",
|
||||
"@opencode-ai/sdk": "workspace:*",
|
||||
|
||||
@@ -17,10 +17,6 @@ export const WorkspaceContext = {
|
||||
},
|
||||
|
||||
get workspaceID() {
|
||||
try {
|
||||
return context.use().workspaceID
|
||||
} catch {
|
||||
return undefined
|
||||
}
|
||||
return context.peek()?.workspaceID
|
||||
},
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Layer, ManagedRuntime } from "effect"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import { attach } from "./run-service"
|
||||
import { disposable } from "@/util/disposable"
|
||||
import * as Observability from "@opencode-ai/core/effect/observability"
|
||||
|
||||
import { AppFileSystem } from "@opencode-ai/core/filesystem"
|
||||
@@ -47,7 +49,6 @@ import { Installation } from "@/installation"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { SessionShare } from "@/share/session"
|
||||
import { Npm } from "@opencode-ai/core/npm"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
export const AppLayer = Layer.mergeAll(
|
||||
Npm.defaultLayer,
|
||||
@@ -97,25 +98,23 @@ export const AppLayer = Layer.mergeAll(
|
||||
SessionShare.defaultLayer,
|
||||
).pipe(Layer.provideMerge(Observability.layer))
|
||||
|
||||
const rt = ManagedRuntime.make(AppLayer, { memoMap })
|
||||
type Runtime = Pick<typeof rt, "runSync" | "runPromise" | "runPromiseExit" | "runFork" | "runCallback" | "dispose">
|
||||
const wrap = (effect: Parameters<typeof rt.runSync>[0]) => attach(effect as never) as never
|
||||
const rt = disposable(() => ManagedRuntime.make(AppLayer, { memoMap }))
|
||||
type Runtime = Pick<
|
||||
ReturnType<typeof rt>,
|
||||
"runSync" | "runPromise" | "runPromiseExit" | "runFork" | "runCallback" | "dispose"
|
||||
>
|
||||
|
||||
// Each method wraps the effect through `attach()` which reads the current
|
||||
// Instance and Workspace from AsyncLocalStorage and provides them as
|
||||
// `InstanceRef` / `WorkspaceRef`. ALS state changes between calls, so this
|
||||
// has to happen per-call — a static layer would close over a single read.
|
||||
const wrap = (effect: Parameters<ReturnType<typeof rt>["runSync"]>[0]) => attach(effect as never) as never
|
||||
|
||||
export const AppRuntime: Runtime = {
|
||||
runSync(effect) {
|
||||
return rt.runSync(wrap(effect))
|
||||
},
|
||||
runPromise(effect, options) {
|
||||
return rt.runPromise(wrap(effect), options)
|
||||
},
|
||||
runPromiseExit(effect, options) {
|
||||
return rt.runPromiseExit(wrap(effect), options)
|
||||
},
|
||||
runFork(effect) {
|
||||
return rt.runFork(wrap(effect))
|
||||
},
|
||||
runCallback(effect) {
|
||||
return rt.runCallback(wrap(effect))
|
||||
},
|
||||
dispose: () => rt.dispose(),
|
||||
runSync: (effect) => rt().runSync(wrap(effect)),
|
||||
runPromise: (effect, options) => rt().runPromise(wrap(effect), options),
|
||||
runPromiseExit: (effect, options) => rt().runPromiseExit(wrap(effect), options),
|
||||
runFork: (effect) => rt().runFork(wrap(effect)),
|
||||
runCallback: (effect) => rt().runCallback(wrap(effect)),
|
||||
dispose: rt.dispose,
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Layer, ManagedRuntime } from "effect"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
import { Plugin } from "@/plugin"
|
||||
import { LSP } from "@/lsp/lsp"
|
||||
@@ -10,9 +11,15 @@ import { Vcs } from "@/project/vcs"
|
||||
import { Snapshot } from "@/snapshot"
|
||||
import { Bus } from "@/bus"
|
||||
import { Config } from "@/config/config"
|
||||
import { disposable } from "@/util/disposable"
|
||||
import * as Observability from "@opencode-ai/core/effect/observability"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
// BootstrapRuntime exists only to break a structural import cycle: AppLayer
|
||||
// imports `Worktree.defaultLayer`, and `Worktree.create` needs a runtime to
|
||||
// run `InstanceBootstrap` against in a freshly-provided directory. Going
|
||||
// through AppRuntime there would form `app-runtime → worktree → app-runtime`.
|
||||
// BootstrapRuntime carries the smaller set of services InstanceBootstrap
|
||||
// actually requires and imports nothing from `app-runtime.ts`.
|
||||
export const BootstrapLayer = Layer.mergeAll(
|
||||
Config.defaultLayer,
|
||||
Plugin.defaultLayer,
|
||||
@@ -26,4 +33,10 @@ export const BootstrapLayer = Layer.mergeAll(
|
||||
Bus.defaultLayer,
|
||||
).pipe(Layer.provide(Observability.layer))
|
||||
|
||||
export const BootstrapRuntime = ManagedRuntime.make(BootstrapLayer, { memoMap })
|
||||
const rt = disposable(() => ManagedRuntime.make(BootstrapLayer, { memoMap }))
|
||||
type Runtime = Pick<ReturnType<typeof rt>, "runPromise" | "dispose">
|
||||
|
||||
export const BootstrapRuntime: Runtime = {
|
||||
runPromise: (effect, options) => rt().runPromise(effect, options),
|
||||
dispose: rt.dispose,
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ import { Effect, Fiber } from "effect"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import { Instance, type InstanceContext } from "@/project/instance"
|
||||
import type { WorkspaceID } from "@/control-plane/schema"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { attachWith } from "./run-service"
|
||||
|
||||
@@ -23,16 +22,7 @@ function restore<R>(instance: InstanceContext | undefined, workspace: WorkspaceI
|
||||
export function make(): Effect.Effect<Shape> {
|
||||
return Effect.gen(function* () {
|
||||
const ctx = yield* Effect.context()
|
||||
const value = yield* InstanceRef
|
||||
const instance =
|
||||
value ??
|
||||
(() => {
|
||||
try {
|
||||
return Instance.current
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
})()
|
||||
const instance = (yield* InstanceRef) ?? Instance.peekCurrent
|
||||
const workspace = (yield* WorkspaceRef) ?? WorkspaceContext.workspaceID
|
||||
const attach = <A, E, R>(effect: Effect.Effect<A, E, R>) => attachWith(effect, { instance, workspace })
|
||||
const wrap = <A, E, R>(effect: Effect.Effect<A, E, R>) =>
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { Effect, Fiber, ScopedCache, Scope, Context } from "effect"
|
||||
import * as EffectLogger from "@opencode-ai/core/effect/logger"
|
||||
import { Instance, type InstanceContext } from "@/project/instance"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import { registerDisposer } from "./instance-registry"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
@@ -13,14 +12,14 @@ export interface InstanceState<A, E = never, R = never> {
|
||||
readonly cache: ScopedCache.ScopedCache<string, A, E, R>
|
||||
}
|
||||
|
||||
// Captures the current Instance context for native callbacks. Falls back to
|
||||
// the running fiber's `InstanceRef` when no ALS frame is present (e.g. when
|
||||
// invoked from inside an Effect that was provided InstanceRef directly).
|
||||
// Returns the original function untouched if neither source has a context.
|
||||
export const bind = <F extends (...args: any[]) => any>(fn: F): F => {
|
||||
try {
|
||||
return Instance.bind(fn)
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
const fiber = Fiber.getCurrent()
|
||||
const ctx = fiber ? Context.getReferenceUnsafe(fiber.context, InstanceRef) : undefined
|
||||
const fromAls = Instance.peekCurrent
|
||||
const fromFiber = fromAls ? undefined : Fiber.getCurrent()
|
||||
const ctx = fromAls ?? (fromFiber ? Context.getReferenceUnsafe(fromFiber.context, InstanceRef) : undefined)
|
||||
if (!ctx) return fn
|
||||
return ((...args: any[]) => Instance.restore(ctx, () => fn(...args))) as F
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import { Effect, Layer, ManagedRuntime } from "effect"
|
||||
import * as Context from "effect/Context"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { InstanceRef, WorkspaceRef } from "./instance-ref"
|
||||
import * as Observability from "@opencode-ai/core/effect/observability"
|
||||
import { WorkspaceContext } from "@/control-plane/workspace-context"
|
||||
import type { InstanceContext } from "@/project/instance"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import { disposable } from "@/util/disposable"
|
||||
|
||||
type Refs = {
|
||||
instance?: InstanceContext
|
||||
@@ -24,29 +24,24 @@ export function attachWith<A, E, R>(effect: Effect.Effect<A, E, R>, refs: Refs):
|
||||
}
|
||||
|
||||
export function attach<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> {
|
||||
try {
|
||||
return attachWith(effect, {
|
||||
instance: Instance.current,
|
||||
workspace: WorkspaceContext.workspaceID,
|
||||
})
|
||||
} catch (err) {
|
||||
if (!(err instanceof LocalContext.NotFound)) throw err
|
||||
}
|
||||
return effect
|
||||
return attachWith(effect, {
|
||||
instance: Instance.peekCurrent,
|
||||
workspace: WorkspaceContext.workspaceID,
|
||||
})
|
||||
}
|
||||
|
||||
export function makeRuntime<I, S, E>(service: Context.Service<I, S>, layer: Layer.Layer<I, E>) {
|
||||
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
|
||||
const getRuntime = () => (rt ??= ManagedRuntime.make(Layer.provideMerge(layer, Observability.layer), { memoMap }))
|
||||
|
||||
const rt = disposable(() =>
|
||||
ManagedRuntime.make<I, E>(Layer.provideMerge(layer, Observability.layer), { memoMap }),
|
||||
)
|
||||
return {
|
||||
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(attach(service.use(fn))),
|
||||
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => rt().runSync(attach(service.use(fn))),
|
||||
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
|
||||
getRuntime().runPromiseExit(attach(service.use(fn)), options),
|
||||
rt().runPromiseExit(attach(service.use(fn)), options),
|
||||
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
|
||||
getRuntime().runPromise(attach(service.use(fn)), options),
|
||||
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(attach(service.use(fn))),
|
||||
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) =>
|
||||
getRuntime().runCallback(attach(service.use(fn))),
|
||||
rt().runPromise(attach(service.use(fn)), options),
|
||||
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => rt().runFork(attach(service.use(fn))),
|
||||
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => rt().runCallback(attach(service.use(fn))),
|
||||
dispose: rt.dispose,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -126,7 +126,7 @@ const cli = yargs(args)
|
||||
let last = -1
|
||||
if (tty) process.stderr.write("\x1b[?25l")
|
||||
try {
|
||||
await JsonMigration.run(drizzle({ client: Database.Client().$client }), {
|
||||
await JsonMigration.run(drizzle({ client: Database.client().$client }), {
|
||||
progress: (event) => {
|
||||
const percent = Math.floor((event.current / event.total) * 100)
|
||||
if (percent === last && event.current !== event.total) return
|
||||
|
||||
@@ -5,7 +5,7 @@ import { InstanceState } from "@/effect/instance-state"
|
||||
import { ProjectID } from "@/project/schema"
|
||||
import { MessageID, SessionID } from "@/session/schema"
|
||||
import { PermissionTable } from "@/session/session.sql"
|
||||
import { Database } from "@/storage/db"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { zod } from "@/util/effect-zod"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
@@ -154,14 +154,17 @@ export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const db = yield* DatabaseEffect.Service
|
||||
const state = yield* InstanceState.make<State>(
|
||||
Effect.fn("Permission.state")(function* (ctx) {
|
||||
const row = Database.use((db) =>
|
||||
db.select().from(PermissionTable).where(eq(PermissionTable.project_id, ctx.project.id)).get(),
|
||||
)
|
||||
const rows = yield* db
|
||||
.select()
|
||||
.from(PermissionTable)
|
||||
.where(eq(PermissionTable.project_id, ctx.project.id))
|
||||
.pipe(Effect.orDie)
|
||||
const state = {
|
||||
pending: new Map<PermissionID, PendingEntry>(),
|
||||
approved: row?.data ?? [],
|
||||
approved: rows[0]?.data ?? [],
|
||||
}
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
@@ -320,6 +323,6 @@ export function disabled(tools: string[], ruleset: Ruleset): Set<string> {
|
||||
return result
|
||||
}
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
|
||||
export const defaultLayer: Layer.Layer<Service> = layer.pipe(Layer.provide(Bus.layer), Layer.provide(DatabaseEffect.layer))
|
||||
|
||||
export * as Permission from "."
|
||||
|
||||
@@ -76,6 +76,14 @@ export const Instance = {
|
||||
get current() {
|
||||
return context.use()
|
||||
},
|
||||
/**
|
||||
* Returns the current instance context, or `undefined` when there is no
|
||||
* ALS frame. Use this when the caller needs to fall back to a different
|
||||
* source instead of throwing (e.g. reading from `InstanceRef` on a fiber).
|
||||
*/
|
||||
get peekCurrent(): InstanceContext | undefined {
|
||||
return context.peek()
|
||||
},
|
||||
get directory() {
|
||||
return context.use().directory
|
||||
},
|
||||
|
||||
@@ -9,7 +9,7 @@ import { InstanceBootstrap } from "@/project/bootstrap"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Pty } from "@/pty"
|
||||
import { Session } from "@/session/session"
|
||||
import { lazy } from "@/util/lazy"
|
||||
import { disposable } from "@/util/disposable"
|
||||
import { Filesystem } from "@/util/filesystem"
|
||||
import { authorizationLayer } from "./auth"
|
||||
import { ConfigApi, configHandlers } from "./config"
|
||||
@@ -99,11 +99,13 @@ export const routes = Layer.mergeAll(
|
||||
Layer.provideMerge(Observability.layer),
|
||||
)
|
||||
|
||||
export const webHandler = lazy(() =>
|
||||
export const webHandler = disposable(() =>
|
||||
HttpRouter.toWebHandler(routes, {
|
||||
memoMap,
|
||||
middleware: disposeMiddleware,
|
||||
}),
|
||||
)
|
||||
|
||||
export const disposeWebHandler = webHandler.dispose
|
||||
|
||||
export * as ExperimentalHttpApiServer from "./server"
|
||||
|
||||
@@ -5,7 +5,7 @@ import { zod } from "@/util/effect-zod"
|
||||
import { withStatics } from "@/util/schema"
|
||||
import { Effect, Layer, Context, Schema } from "effect"
|
||||
import z from "zod"
|
||||
import { Database } from "@/storage/db"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { asc } from "drizzle-orm"
|
||||
import { TodoTable } from "./session.sql"
|
||||
@@ -42,34 +42,34 @@ export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const db = yield* DatabaseEffect.Service
|
||||
|
||||
const update = Effect.fn("Todo.update")(function* (input: { sessionID: SessionID; todos: Info[] }) {
|
||||
yield* Effect.sync(() =>
|
||||
Database.transaction((db) => {
|
||||
db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run()
|
||||
if (input.todos.length === 0) return
|
||||
db.insert(TodoTable)
|
||||
.values(
|
||||
input.todos.map((todo, position) => ({
|
||||
session_id: input.sessionID,
|
||||
content: todo.content,
|
||||
status: todo.status,
|
||||
priority: todo.priority,
|
||||
position,
|
||||
})),
|
||||
)
|
||||
.run()
|
||||
}),
|
||||
)
|
||||
yield* Effect.gen(function* () {
|
||||
yield* db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID))
|
||||
if (input.todos.length === 0) return
|
||||
yield* db.insert(TodoTable).values(
|
||||
input.todos.map((todo, position) => ({
|
||||
session_id: input.sessionID,
|
||||
content: todo.content,
|
||||
status: todo.status,
|
||||
priority: todo.priority,
|
||||
position,
|
||||
})),
|
||||
)
|
||||
}).pipe(db.withTransaction, Effect.orDie)
|
||||
|
||||
yield* bus.publish(Event.Updated, input)
|
||||
})
|
||||
|
||||
const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) {
|
||||
const rows = yield* Effect.sync(() =>
|
||||
Database.use((db) =>
|
||||
db.select().from(TodoTable).where(eq(TodoTable.session_id, sessionID)).orderBy(asc(TodoTable.position)).all(),
|
||||
),
|
||||
)
|
||||
const rows = yield* db
|
||||
.select()
|
||||
.from(TodoTable)
|
||||
.where(eq(TodoTable.session_id, sessionID))
|
||||
.orderBy(asc(TodoTable.position))
|
||||
.pipe(Effect.orDie)
|
||||
|
||||
return rows.map((row) => ({
|
||||
content: row.content,
|
||||
status: row.status,
|
||||
@@ -81,6 +81,6 @@ export const layer = Layer.effect(
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
|
||||
export const defaultLayer: Layer.Layer<Service> = layer.pipe(Layer.provide(Bus.layer), Layer.provide(DatabaseEffect.layer))
|
||||
|
||||
export * as Todo from "./todo"
|
||||
|
||||
@@ -9,7 +9,7 @@ import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Session } from "@/session/session"
|
||||
import { MessageV2 } from "@/session/message-v2"
|
||||
import type { SessionID } from "@/session/schema"
|
||||
import { Database } from "@/storage/db"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { Config } from "@/config/config"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
@@ -76,9 +76,6 @@ export interface Interface {
|
||||
|
||||
export class Service extends Context.Service<Service, Interface>()("@opencode/ShareNext") {}
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
function api(resource: string): Api {
|
||||
return {
|
||||
create: `/api/${resource}`,
|
||||
@@ -116,6 +113,7 @@ export const layer = Layer.effect(
|
||||
const httpOk = HttpClient.filterStatusOk(http)
|
||||
const provider = yield* Provider.Service
|
||||
const session = yield* Session.Service
|
||||
const db = yield* DatabaseEffect.Service
|
||||
|
||||
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
|
||||
return Effect.gen(function* () {
|
||||
@@ -226,9 +224,12 @@ export const layer = Layer.effect(
|
||||
})
|
||||
|
||||
const get = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
const row = yield* db((db) =>
|
||||
db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
|
||||
)
|
||||
const rows = yield* db
|
||||
.select()
|
||||
.from(SessionShareTable)
|
||||
.where(eq(SessionShareTable.session_id, sessionID))
|
||||
.pipe(Effect.orDie)
|
||||
const row = rows[0]
|
||||
if (!row) return
|
||||
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
|
||||
})
|
||||
@@ -314,16 +315,13 @@ export const layer = Layer.effect(
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)),
|
||||
)
|
||||
yield* db((db) =>
|
||||
db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
.run(),
|
||||
)
|
||||
yield* db
|
||||
.insert(SessionShareTable)
|
||||
.values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
|
||||
.onConflictDoUpdate({
|
||||
target: SessionShareTable.session_id,
|
||||
set: { id: result.id, secret: result.secret, url: result.url },
|
||||
})
|
||||
const s = yield* InstanceState.get(state)
|
||||
s.shared.set(sessionID, result)
|
||||
yield* full(sessionID).pipe(
|
||||
@@ -355,7 +353,7 @@ export const layer = Layer.effect(
|
||||
Effect.flatMap((r) => httpOk.execute(r)),
|
||||
)
|
||||
|
||||
yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
||||
yield* db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID))
|
||||
s.shared.delete(sessionID)
|
||||
s.queue.delete(sessionID)
|
||||
})
|
||||
@@ -364,13 +362,14 @@ export const layer = Layer.effect(
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(
|
||||
export const defaultLayer: Layer.Layer<Service> = layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Account.defaultLayer),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(FetchHttpClient.layer),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
Layer.provide(DatabaseEffect.layer),
|
||||
)
|
||||
|
||||
export * as ShareNext from "./share-next"
|
||||
|
||||
113
packages/opencode/src/storage/db-effect.ts
Normal file
113
packages/opencode/src/storage/db-effect.ts
Normal file
@@ -0,0 +1,113 @@
|
||||
import { migrate } from "drizzle-orm/bun-sqlite/migrator"
|
||||
import type { EffectSQLiteDatabase } from "@opencode-ai/effect-drizzle-sqlite"
|
||||
import { Context, Effect, Layer } from "effect"
|
||||
import { existsSync, readdirSync, readFileSync } from "fs"
|
||||
import path from "path"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { Global } from "@opencode-ai/core/global"
|
||||
import { InstallationChannel } from "@opencode-ai/core/installation/version"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { iife } from "@/util/iife"
|
||||
import * as StorageSchema from "@/storage/schema"
|
||||
import { init } from "#db"
|
||||
|
||||
declare const OPENCODE_MIGRATIONS: { sql: string; timestamp: number; name: string }[] | undefined
|
||||
|
||||
const log = Log.create({ service: "db" })
|
||||
|
||||
export function getChannelPath() {
|
||||
if (["latest", "beta", "prod"].includes(InstallationChannel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
|
||||
return path.join(Global.Path.data, "opencode.db")
|
||||
const safe = InstallationChannel.replace(/[^a-zA-Z0-9._-]/g, "-")
|
||||
return path.join(Global.Path.data, `opencode-${safe}.db`)
|
||||
}
|
||||
|
||||
export const Path = iife(() => {
|
||||
if (Flag.OPENCODE_DB) {
|
||||
if (Flag.OPENCODE_DB === ":memory:" || path.isAbsolute(Flag.OPENCODE_DB)) return Flag.OPENCODE_DB
|
||||
return path.join(Global.Path.data, Flag.OPENCODE_DB)
|
||||
}
|
||||
return getChannelPath()
|
||||
})
|
||||
|
||||
type Journal = { sql: string; timestamp: number; name: string }[]
|
||||
|
||||
function timestampFromTag(tag: string) {
|
||||
const match = /^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})/.exec(tag)
|
||||
if (!match) return 0
|
||||
return Date.UTC(
|
||||
Number(match[1]),
|
||||
Number(match[2]) - 1,
|
||||
Number(match[3]),
|
||||
Number(match[4]),
|
||||
Number(match[5]),
|
||||
Number(match[6]),
|
||||
)
|
||||
}
|
||||
|
||||
function readMigrations(dir: string): Journal {
|
||||
return readdirSync(dir, { withFileTypes: true })
|
||||
.filter((entry) => entry.isDirectory())
|
||||
.flatMap((entry) => {
|
||||
const file = path.join(dir, entry.name, "migration.sql")
|
||||
if (!existsSync(file)) return []
|
||||
return [{ sql: readFileSync(file, "utf-8"), timestamp: timestampFromTag(entry.name), name: entry.name }]
|
||||
})
|
||||
.sort((a, b) => a.timestamp - b.timestamp)
|
||||
}
|
||||
|
||||
export class Service extends Context.Service<Service, EffectSQLiteDatabase<typeof StorageSchema>>()(
|
||||
"@opencode/DatabaseEffect",
|
||||
) {}
|
||||
|
||||
// The layer owns the SQLite connection. `Effect.acquireRelease` opens the
|
||||
// handle, applies PRAGMAs and migrations, and registers a finalizer that
|
||||
// closes the handle when the layer's scope ends.
|
||||
//
|
||||
// Multiple runtimes (AppRuntime, BootstrapRuntime, the dedicated DbRuntime
|
||||
// behind `Database.use`) share this layer through the global memoMap. The
|
||||
// memoMap refcounts the entry by scope; the finalizer fires only after the
|
||||
// last runtime referencing it has been disposed. In production no runtime
|
||||
// is ever disposed so the connection lives for the process; in tests
|
||||
// `resetDatabase` disposes every runtime and the close fires automatically.
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.acquireRelease(
|
||||
Effect.sync(() => {
|
||||
log.info("opening database", { path: Path })
|
||||
const db = init(Path, StorageSchema)
|
||||
db.run("PRAGMA journal_mode = WAL")
|
||||
db.run("PRAGMA synchronous = NORMAL")
|
||||
db.run("PRAGMA busy_timeout = 5000")
|
||||
db.run("PRAGMA cache_size = -64000")
|
||||
db.run("PRAGMA foreign_keys = ON")
|
||||
db.run("PRAGMA wal_checkpoint(PASSIVE)")
|
||||
|
||||
const source =
|
||||
typeof OPENCODE_MIGRATIONS !== "undefined"
|
||||
? OPENCODE_MIGRATIONS
|
||||
: readMigrations(path.join(import.meta.dirname, "../../migration"))
|
||||
if (source.length > 0) {
|
||||
log.info("applying migrations", {
|
||||
count: source.length,
|
||||
mode: typeof OPENCODE_MIGRATIONS !== "undefined" ? "bundled" : "dev",
|
||||
})
|
||||
// Mapping rather than mutating preserves the bundled `OPENCODE_MIGRATIONS`
|
||||
// array so a subsequent acquire (after Database.close) sees the original
|
||||
// SQL even if `OPENCODE_SKIP_MIGRATIONS` was toggled mid-process.
|
||||
const entries = Flag.OPENCODE_SKIP_MIGRATIONS
|
||||
? source.map((item) => ({ ...item, sql: "select 1;" }))
|
||||
: source
|
||||
migrate(db, entries)
|
||||
}
|
||||
return db
|
||||
}),
|
||||
(db) =>
|
||||
Effect.sync(() => {
|
||||
log.info("closing database")
|
||||
db.$client.close()
|
||||
}),
|
||||
),
|
||||
)
|
||||
|
||||
export * as DatabaseEffect from "./db-effect"
|
||||
@@ -1,8 +1,6 @@
|
||||
import { Database } from "bun:sqlite"
|
||||
import { drizzle } from "drizzle-orm/bun-sqlite"
|
||||
import { drizzle } from "@opencode-ai/effect-drizzle-sqlite"
|
||||
|
||||
export function init(path: string) {
|
||||
const sqlite = new Database(path, { create: true })
|
||||
const db = drizzle({ client: sqlite })
|
||||
return db
|
||||
export function init<TSchema extends Record<string, unknown>>(path: string, schema: TSchema) {
|
||||
return drizzle({ client: new Database(path, { create: true }), schema })
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import { DatabaseSync } from "node:sqlite"
|
||||
import { drizzle } from "drizzle-orm/node-sqlite"
|
||||
|
||||
export function init(path: string) {
|
||||
const sqlite = new DatabaseSync(path)
|
||||
const db = drizzle({ client: sqlite })
|
||||
return db
|
||||
export function init<TSchema extends Record<string, unknown>>(path: string, schema: TSchema) {
|
||||
return drizzle({ client: new DatabaseSync(path), schema })
|
||||
}
|
||||
|
||||
@@ -1,22 +1,14 @@
|
||||
import { type SQLiteBunDatabase } from "drizzle-orm/bun-sqlite"
|
||||
import { migrate } from "drizzle-orm/bun-sqlite/migrator"
|
||||
import { type SQLiteTransaction } from "drizzle-orm/sqlite-core"
|
||||
import { ManagedRuntime } from "effect"
|
||||
export * from "drizzle-orm"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { lazy } from "../util/lazy"
|
||||
import { Global } from "@opencode-ai/core/global"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import { NamedError } from "@opencode-ai/core/util/error"
|
||||
import z from "zod"
|
||||
import path from "path"
|
||||
import { readFileSync, readdirSync, existsSync } from "fs"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { InstallationChannel } from "@opencode-ai/core/installation/version"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { iife } from "@/util/iife"
|
||||
import { init } from "#db"
|
||||
import { LocalContext } from "@/util/local-context"
|
||||
import { disposable } from "@/util/disposable"
|
||||
import { DatabaseEffect } from "./db-effect"
|
||||
|
||||
declare const OPENCODE_MIGRATIONS: { sql: string; timestamp: number; name: string }[] | undefined
|
||||
export { Path, getChannelPath } from "./db-effect"
|
||||
|
||||
export const NotFoundError = NamedError.create(
|
||||
"NotFoundError",
|
||||
@@ -25,128 +17,59 @@ export const NotFoundError = NamedError.create(
|
||||
}),
|
||||
)
|
||||
|
||||
const log = Log.create({ service: "db" })
|
||||
// Dedicated runtime that owns the DB layer. It exists separately from
|
||||
// AppRuntime/BootstrapRuntime to keep the legacy sync `use` / `transaction`
|
||||
// helpers reachable without an import cycle from app-runtime.ts. All three
|
||||
// runtimes share the global memoMap so they resolve to the same Service.
|
||||
const runtime = disposable(() => ManagedRuntime.make(DatabaseEffect.layer, { memoMap }))
|
||||
|
||||
export function getChannelPath() {
|
||||
if (["latest", "beta", "prod"].includes(InstallationChannel) || Flag.OPENCODE_DISABLE_CHANNEL_DB)
|
||||
return path.join(Global.Path.data, "opencode.db")
|
||||
const safe = InstallationChannel.replace(/[^a-zA-Z0-9._-]/g, "-")
|
||||
return path.join(Global.Path.data, `opencode-${safe}.db`)
|
||||
// Resolves the current Drizzle handle. The Service value is stable for the
|
||||
// lifetime of the runtime, so cache it to avoid paying fiber-startup cost on
|
||||
// every `Database.use(cb)` call. `close()` clears the cache when the runtime
|
||||
// is disposed.
|
||||
let cached: Client | undefined
|
||||
function resolve() {
|
||||
return runtime().runSync(DatabaseEffect.Service.asEffect())
|
||||
}
|
||||
export function client(): Client {
|
||||
return (cached ??= resolve())
|
||||
}
|
||||
|
||||
export const Path = iife(() => {
|
||||
if (Flag.OPENCODE_DB) {
|
||||
if (Flag.OPENCODE_DB === ":memory:" || path.isAbsolute(Flag.OPENCODE_DB)) return Flag.OPENCODE_DB
|
||||
return path.join(Global.Path.data, Flag.OPENCODE_DB)
|
||||
}
|
||||
return getChannelPath()
|
||||
})
|
||||
export type Client = ReturnType<typeof resolve>
|
||||
|
||||
export type Transaction = SQLiteTransaction<"sync", void>
|
||||
|
||||
type Client = SQLiteBunDatabase
|
||||
|
||||
type Journal = { sql: string; timestamp: number; name: string }[]
|
||||
|
||||
function time(tag: string) {
|
||||
const match = /^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})/.exec(tag)
|
||||
if (!match) return 0
|
||||
return Date.UTC(
|
||||
Number(match[1]),
|
||||
Number(match[2]) - 1,
|
||||
Number(match[3]),
|
||||
Number(match[4]),
|
||||
Number(match[5]),
|
||||
Number(match[6]),
|
||||
)
|
||||
}
|
||||
|
||||
function migrations(dir: string): Journal {
|
||||
const dirs = readdirSync(dir, { withFileTypes: true })
|
||||
.filter((entry) => entry.isDirectory())
|
||||
.map((entry) => entry.name)
|
||||
|
||||
const sql = dirs
|
||||
.map((name) => {
|
||||
const file = path.join(dir, name, "migration.sql")
|
||||
if (!existsSync(file)) return
|
||||
return {
|
||||
sql: readFileSync(file, "utf-8"),
|
||||
timestamp: time(name),
|
||||
name,
|
||||
}
|
||||
})
|
||||
.filter(Boolean) as Journal
|
||||
|
||||
return sql.sort((a, b) => a.timestamp - b.timestamp)
|
||||
}
|
||||
|
||||
export const Client = lazy(() => {
|
||||
log.info("opening database", { path: Path })
|
||||
|
||||
const db = init(Path)
|
||||
|
||||
db.run("PRAGMA journal_mode = WAL")
|
||||
db.run("PRAGMA synchronous = NORMAL")
|
||||
db.run("PRAGMA busy_timeout = 5000")
|
||||
db.run("PRAGMA cache_size = -64000")
|
||||
db.run("PRAGMA foreign_keys = ON")
|
||||
db.run("PRAGMA wal_checkpoint(PASSIVE)")
|
||||
|
||||
// Apply schema migrations
|
||||
const entries =
|
||||
typeof OPENCODE_MIGRATIONS !== "undefined"
|
||||
? OPENCODE_MIGRATIONS
|
||||
: migrations(path.join(import.meta.dirname, "../../migration"))
|
||||
if (entries.length > 0) {
|
||||
log.info("applying migrations", {
|
||||
count: entries.length,
|
||||
mode: typeof OPENCODE_MIGRATIONS !== "undefined" ? "bundled" : "dev",
|
||||
})
|
||||
if (Flag.OPENCODE_SKIP_MIGRATIONS) {
|
||||
for (const item of entries) {
|
||||
item.sql = "select 1;"
|
||||
}
|
||||
}
|
||||
migrate(db, entries)
|
||||
}
|
||||
|
||||
return db
|
||||
})
|
||||
|
||||
export function close() {
|
||||
Client().$client.close()
|
||||
Client.reset()
|
||||
}
|
||||
export type Transaction = Parameters<Parameters<Client["transaction"]>[0]>[0]
|
||||
|
||||
export type TxOrDb = Transaction | Client
|
||||
|
||||
// Disposes the dedicated DB runtime so its memoMap reference is released.
|
||||
// When every other runtime consuming the layer has also been disposed, the
|
||||
// memoMap drops the entry and the layer's finalizer closes the SQLite
|
||||
// handle. Used by `test/fixture/db.ts:resetDatabase`.
|
||||
export async function close() {
|
||||
cached = undefined
|
||||
await runtime.dispose()
|
||||
}
|
||||
|
||||
const ctx = LocalContext.create<{
|
||||
tx: TxOrDb
|
||||
effects: (() => void | Promise<void>)[]
|
||||
}>("database")
|
||||
|
||||
export function use<T>(callback: (trx: TxOrDb) => T): T {
|
||||
try {
|
||||
return callback(ctx.use().tx)
|
||||
} catch (err) {
|
||||
if (err instanceof LocalContext.NotFound) {
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const result = ctx.provide({ effects, tx: Client() }, () => callback(Client()))
|
||||
for (const effect of effects) effect()
|
||||
return result
|
||||
}
|
||||
throw err
|
||||
}
|
||||
const existing = ctx.peek()
|
||||
if (existing) return callback(existing.tx)
|
||||
const db = client()
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const result = ctx.provide({ effects, tx: db }, () => callback(db))
|
||||
for (const effect of effects) effect()
|
||||
return result
|
||||
}
|
||||
|
||||
export function effect(fn: () => any | Promise<any>) {
|
||||
const bound = InstanceState.bind(fn)
|
||||
try {
|
||||
ctx.use().effects.push(bound)
|
||||
} catch {
|
||||
bound()
|
||||
}
|
||||
const existing = ctx.peek()
|
||||
if (existing) existing.effects.push(bound)
|
||||
else bound()
|
||||
}
|
||||
|
||||
type NotPromise<T> = T extends Promise<any> ? never : T
|
||||
@@ -157,18 +80,14 @@ export function transaction<T>(
|
||||
behavior?: "deferred" | "immediate" | "exclusive"
|
||||
},
|
||||
): NotPromise<T> {
|
||||
try {
|
||||
return callback(ctx.use().tx)
|
||||
} catch (err) {
|
||||
if (err instanceof LocalContext.NotFound) {
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
|
||||
const result = Client().transaction(txCallback, { behavior: options?.behavior })
|
||||
for (const effect of effects) effect()
|
||||
return result as NotPromise<T>
|
||||
}
|
||||
throw err
|
||||
}
|
||||
const existing = ctx.peek()
|
||||
if (existing) return callback(existing.tx)
|
||||
const db = client()
|
||||
const effects: (() => void | Promise<void>)[] = []
|
||||
const txCallback = InstanceState.bind((tx: TxOrDb) => ctx.provide({ tx, effects }, () => callback(tx)))
|
||||
const result = db.transaction(txCallback, { behavior: options?.behavior }) as NotPromise<T>
|
||||
for (const effect of effects) effect()
|
||||
return result
|
||||
}
|
||||
|
||||
export * as Database from "./db"
|
||||
|
||||
22
packages/opencode/src/util/disposable.ts
Normal file
22
packages/opencode/src/util/disposable.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { lazy } from "./lazy"
|
||||
|
||||
// Lazy single-value cache plus a uniform `dispose()` that tears down only the
|
||||
// instance the factory built. Used for module-scoped disposables like
|
||||
// ManagedRuntimes or HttpRouter web handlers, where production never disposes
|
||||
// (the cached value lives for the process) and tests reset between runs.
|
||||
//
|
||||
// Example:
|
||||
// const rt = disposable(() => ManagedRuntime.make(layer, { memoMap }))
|
||||
// rt() // build/get
|
||||
// rt.peek() // current value or undefined (no build)
|
||||
// await rt.dispose() // tear down and clear the cache
|
||||
export function disposable<T extends { dispose(): Promise<void> }>(make: () => T) {
|
||||
const get = lazy(make)
|
||||
return Object.assign(get, {
|
||||
async dispose() {
|
||||
const old = get.peek()
|
||||
get.reset()
|
||||
await old?.dispose()
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -14,5 +14,7 @@ export function lazy<T>(fn: () => T) {
|
||||
value = undefined
|
||||
}
|
||||
|
||||
result.peek = () => (loaded ? value : undefined)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -11,11 +11,12 @@ export function create<T>(name: string) {
|
||||
return {
|
||||
use() {
|
||||
const result = storage.getStore()
|
||||
if (!result) {
|
||||
throw new NotFound(name)
|
||||
}
|
||||
if (!result) throw new NotFound(name)
|
||||
return result
|
||||
},
|
||||
peek() {
|
||||
return storage.getStore()
|
||||
},
|
||||
provide<R>(value: T, fn: () => R) {
|
||||
return storage.run(value, fn)
|
||||
},
|
||||
|
||||
@@ -3,18 +3,10 @@ import { Effect, Layer, Option } from "effect"
|
||||
|
||||
import { AccountRepo } from "../../src/account/repo"
|
||||
import { AccessToken, AccountID, OrgID, RefreshToken } from "../../src/account/schema"
|
||||
import { Database } from "@/storage/db"
|
||||
import { truncate } from "../lib/db"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
const truncate = Layer.effectDiscard(
|
||||
Effect.sync(() => {
|
||||
const db = Database.Client()
|
||||
db.run(/*sql*/ `DELETE FROM account_state`)
|
||||
db.run(/*sql*/ `DELETE FROM account`)
|
||||
}),
|
||||
)
|
||||
|
||||
const it = testEffect(Layer.merge(AccountRepo.layer, truncate))
|
||||
const it = testEffect(Layer.merge(AccountRepo.layer, truncate("account_state", "account")))
|
||||
|
||||
it.live("list returns empty when no accounts exist", () =>
|
||||
Effect.gen(function* () {
|
||||
|
||||
@@ -15,18 +15,10 @@ import {
|
||||
RefreshToken,
|
||||
UserCode,
|
||||
} from "../../src/account/schema"
|
||||
import { Database } from "@/storage/db"
|
||||
import { truncate } from "../lib/db"
|
||||
import { testEffect } from "../lib/effect"
|
||||
|
||||
const truncate = Layer.effectDiscard(
|
||||
Effect.sync(() => {
|
||||
const db = Database.Client()
|
||||
db.run(/*sql*/ `DELETE FROM account_state`)
|
||||
db.run(/*sql*/ `DELETE FROM account`)
|
||||
}),
|
||||
)
|
||||
|
||||
const it = testEffect(Layer.merge(AccountRepo.layer, truncate))
|
||||
const it = testEffect(Layer.merge(AccountRepo.layer, truncate("account_state", "account")))
|
||||
|
||||
const insideEagerRefreshWindow = Duration.toMillis(Duration.minutes(1))
|
||||
const outsideEagerRefreshWindow = Duration.toMillis(Duration.minutes(10))
|
||||
|
||||
@@ -1,11 +1,30 @@
|
||||
import { rm } from "fs/promises"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { AppRuntime } from "@/effect/app-runtime"
|
||||
import { BootstrapRuntime } from "@/effect/bootstrap-runtime"
|
||||
import { ExperimentalHttpApiServer } from "@/server/routes/instance/httpapi/server"
|
||||
import { Database } from "@/storage/db"
|
||||
|
||||
// Disposes every runtime that consumes `DatabaseEffect.layer` so the shared
|
||||
// layer memoMap drops its entry. Once all consumers are released, the
|
||||
// layer's finalizer closes the SQLite handle and only then are the
|
||||
// on-disk files safe to remove.
|
||||
//
|
||||
// The four module-scoped consumers are independent and can dispose in
|
||||
// parallel. The DB runtime is disposed afterwards because its release is
|
||||
// what fires the layer finalizer once the refcount hits zero, and the file
|
||||
// removal must come last.
|
||||
export async function resetDatabase() {
|
||||
await Instance.disposeAll().catch(() => undefined)
|
||||
Database.close()
|
||||
await rm(Database.Path, { force: true }).catch(() => undefined)
|
||||
await rm(`${Database.Path}-wal`, { force: true }).catch(() => undefined)
|
||||
await rm(`${Database.Path}-shm`, { force: true }).catch(() => undefined)
|
||||
await Promise.allSettled([
|
||||
Instance.disposeAll(),
|
||||
AppRuntime.dispose(),
|
||||
BootstrapRuntime.dispose(),
|
||||
ExperimentalHttpApiServer.disposeWebHandler(),
|
||||
])
|
||||
await Database.close()
|
||||
await Promise.allSettled([
|
||||
rm(Database.Path, { force: true }),
|
||||
rm(`${Database.Path}-wal`, { force: true }),
|
||||
rm(`${Database.Path}-shm`, { force: true }),
|
||||
])
|
||||
}
|
||||
|
||||
14
packages/opencode/test/lib/db.ts
Normal file
14
packages/opencode/test/lib/db.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { Effect, Layer } from "effect"
|
||||
import { Database } from "@/storage/db"
|
||||
|
||||
// Returns a layer that, on build, truncates the given tables. Useful as a
|
||||
// test setup layer to start each test from a known empty state without
|
||||
// tearing down the full DB connection.
|
||||
export const truncate = (...tables: string[]) =>
|
||||
Layer.effectDiscard(
|
||||
Effect.sync(() =>
|
||||
Database.use((db) => {
|
||||
for (const table of tables) db.run(`DELETE FROM ${table}`)
|
||||
}),
|
||||
),
|
||||
)
|
||||
@@ -3,14 +3,25 @@ import { Cause, Effect, Exit, Layer } from "effect"
|
||||
import type * as Scope from "effect/Scope"
|
||||
import * as TestClock from "effect/testing/TestClock"
|
||||
import * as TestConsole from "effect/testing/TestConsole"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
|
||||
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
|
||||
|
||||
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
|
||||
|
||||
// Tests share the global layer memoMap so a layer like `DatabaseEffect.layer`
|
||||
// resolves to the same Service value here, in `Database.use`'s runtime, and
|
||||
// in any other module-scoped runtime. Without this, a `:memory:` test DB
|
||||
// built by the test would be a different instance from the one
|
||||
// `Database.use` (sync legacy API) opens.
|
||||
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
|
||||
const exit = yield* body(value).pipe(
|
||||
Effect.scoped,
|
||||
Effect.provide(layer),
|
||||
Effect.provideService(Layer.CurrentMemoMap, memoMap),
|
||||
Effect.exit,
|
||||
)
|
||||
if (Exit.isFailure(exit)) {
|
||||
for (const err of Cause.prettyErrors(exit.cause)) {
|
||||
yield* Effect.logError(err)
|
||||
|
||||
@@ -6,12 +6,17 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Permission } from "../../src/permission"
|
||||
import { PermissionID } from "../../src/permission/schema"
|
||||
import { Instance } from "../../src/project/instance"
|
||||
import { DatabaseEffect } from "../../src/storage/db-effect"
|
||||
import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { MessageID, SessionID } from "../../src/session/schema"
|
||||
|
||||
const bus = Bus.layer
|
||||
const env = Layer.mergeAll(Permission.layer.pipe(Layer.provide(bus)), bus, CrossSpawnSpawner.defaultLayer)
|
||||
const env = Layer.mergeAll(
|
||||
Permission.layer.pipe(Layer.provide(bus), Layer.provide(DatabaseEffect.layer)),
|
||||
bus,
|
||||
CrossSpawnSpawner.defaultLayer,
|
||||
)
|
||||
const it = testEffect(env)
|
||||
|
||||
afterEach(async () => {
|
||||
|
||||
@@ -11,7 +11,7 @@ const dir = path.join(os.tmpdir(), "opencode-test-data-" + process.pid)
|
||||
await fs.mkdir(dir, { recursive: true })
|
||||
afterAll(async () => {
|
||||
const { Database } = await import("../src/storage/db")
|
||||
Database.close()
|
||||
await Database.close()
|
||||
const busy = (error: unknown) =>
|
||||
typeof error === "object" && error !== null && "code" in error && error.code === "EBUSY"
|
||||
const rm = async (left: number): Promise<void> => {
|
||||
|
||||
@@ -112,7 +112,7 @@ describe("experimental HttpApi", () => {
|
||||
|
||||
test("serves Console org switch through Hono bridge", async () => {
|
||||
await using tmp = await tmpdir({ config: { formatter: false, lsp: false } })
|
||||
Database.Client()
|
||||
Database.client()
|
||||
.$client.prepare(
|
||||
"INSERT INTO account (id, email, url, access_token, refresh_token, time_created, time_updated) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
)
|
||||
|
||||
@@ -37,6 +37,7 @@ import { Shell } from "../../src/shell/shell"
|
||||
import { Snapshot } from "../../src/snapshot"
|
||||
import { ToolRegistry } from "@/tool/registry"
|
||||
import { Truncate } from "@/tool/truncate"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import * as Log from "@opencode-ai/core/util/log"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Ripgrep } from "../../src/file/ripgrep"
|
||||
@@ -167,6 +168,7 @@ function makeHttp() {
|
||||
lsp,
|
||||
mcp,
|
||||
AppFileSystem.defaultLayer,
|
||||
DatabaseEffect.layer,
|
||||
status,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const question = Question.layer.pipe(Layer.provideMerge(deps))
|
||||
|
||||
@@ -51,6 +51,7 @@ import { SessionStatus } from "../../src/session/status"
|
||||
import { Snapshot } from "../../src/snapshot"
|
||||
import { ToolRegistry } from "@/tool/registry"
|
||||
import { Truncate } from "@/tool/truncate"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { AppFileSystem } from "@opencode-ai/core/filesystem"
|
||||
import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner"
|
||||
import { Ripgrep } from "../../src/file/ripgrep"
|
||||
@@ -120,6 +121,7 @@ function makeHttp() {
|
||||
lsp,
|
||||
mcp,
|
||||
AppFileSystem.defaultLayer,
|
||||
DatabaseEffect.layer,
|
||||
status,
|
||||
).pipe(Layer.provideMerge(infra))
|
||||
const question = Question.layer.pipe(Layer.provideMerge(deps))
|
||||
|
||||
@@ -15,6 +15,7 @@ import type { SessionID } from "../../src/session/schema"
|
||||
import { ShareNext } from "@/share/share-next"
|
||||
import { SessionShareTable } from "../../src/share/share.sql"
|
||||
import { Database } from "@/storage/db"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { eq } from "drizzle-orm"
|
||||
import { provideTmpdirInstance } from "../fixture/fixture"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
@@ -39,18 +40,6 @@ const json = (req: Parameters<typeof HttpClientResponse.fromWeb>[0], body: unkno
|
||||
|
||||
const none = HttpClient.make(() => Effect.die("unexpected http call"))
|
||||
|
||||
function live(client: HttpClient.HttpClient) {
|
||||
const http = Layer.succeed(HttpClient.HttpClient, client)
|
||||
return ShareNext.layer.pipe(
|
||||
Layer.provide(Bus.layer),
|
||||
Layer.provide(Account.layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(http))),
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(http),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(Session.defaultLayer),
|
||||
)
|
||||
}
|
||||
|
||||
function wired(client: HttpClient.HttpClient) {
|
||||
const http = Layer.succeed(HttpClient.HttpClient, client)
|
||||
return Layer.mergeAll(
|
||||
@@ -66,6 +55,7 @@ function wired(client: HttpClient.HttpClient) {
|
||||
Layer.provide(Config.defaultLayer),
|
||||
Layer.provide(http),
|
||||
Layer.provide(Provider.defaultLayer),
|
||||
Layer.provide(DatabaseEffect.layer),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -104,7 +94,7 @@ describe("ShareNext", () => {
|
||||
expect(req.baseUrl).toBe("https://legacy-share.example.com")
|
||||
expect(req.headers).toEqual({})
|
||||
}),
|
||||
).pipe(Effect.provide(live(none))),
|
||||
).pipe(Effect.provide(wired(none))),
|
||||
{ config: { enterprise: { url: "https://legacy-share.example.com" } } },
|
||||
),
|
||||
)
|
||||
@@ -119,7 +109,7 @@ describe("ShareNext", () => {
|
||||
expect(req.api.create).toBe("/api/share")
|
||||
expect(req.headers).toEqual({})
|
||||
}),
|
||||
).pipe(Effect.provide(live(none))),
|
||||
).pipe(Effect.provide(wired(none))),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -128,7 +118,7 @@ describe("ShareNext", () => {
|
||||
Effect.gen(function* () {
|
||||
yield* seed("https://control.example.com", "org-1")
|
||||
|
||||
const req = yield* ShareNext.Service.use((svc) => svc.request()).pipe(Effect.provide(live(none)))
|
||||
const req = yield* ShareNext.Service.use((svc) => svc.request())
|
||||
|
||||
expect(req.api.create).toBe("/api/shares")
|
||||
expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync")
|
||||
@@ -139,33 +129,33 @@ describe("ShareNext", () => {
|
||||
authorization: "Bearer st_test_token",
|
||||
"x-org-id": "org-1",
|
||||
})
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(none))),
|
||||
),
|
||||
)
|
||||
|
||||
it.live("create posts share, persists it, and returns the result", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.url.endsWith("/api/share")) {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(json(req, { ok: true }))
|
||||
})
|
||||
() => {
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.url.endsWith("/api/share")) {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(json(req, { ok: true }))
|
||||
})
|
||||
|
||||
const result = yield* ShareNext.Service.use((svc) => svc.create(session.id)).pipe(
|
||||
Effect.provide(live(client)),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
const result = yield* shareNext.create(session.id)
|
||||
|
||||
expect(result.id).toBe("shr_abc")
|
||||
expect(result.url).toBe("https://legacy-share.example.com/share/abc")
|
||||
@@ -179,60 +169,61 @@ describe("ShareNext", () => {
|
||||
expect(seen).toHaveLength(1)
|
||||
expect(seen[0].method).toBe("POST")
|
||||
expect(seen[0].url).toBe("https://legacy-share.example.com/api/share")
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
},
|
||||
{ config: { enterprise: { url: "https://legacy-share.example.com" } } },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("remove deletes the persisted share and calls the delete endpoint", () =>
|
||||
provideTmpdirInstance(
|
||||
() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.method === "POST") {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 })))
|
||||
})
|
||||
|
||||
yield* Effect.gen(function* () {
|
||||
yield* ShareNext.Service.use((svc) => svc.create(session.id))
|
||||
yield* ShareNext.Service.use((svc) => svc.remove(session.id))
|
||||
}).pipe(Effect.provide(live(client)))
|
||||
() => {
|
||||
const seen: HttpClientRequest.HttpClientRequest[] = []
|
||||
const client = HttpClient.make((req) => {
|
||||
seen.push(req)
|
||||
if (req.method === "POST") {
|
||||
return Effect.succeed(
|
||||
json(req, {
|
||||
id: "shr_abc",
|
||||
url: "https://legacy-share.example.com/share/abc",
|
||||
secret: "sec_123",
|
||||
}),
|
||||
)
|
||||
}
|
||||
return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 })))
|
||||
})
|
||||
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
yield* shareNext.create(session.id)
|
||||
yield* shareNext.remove(session.id)
|
||||
expect(share(session.id)).toBeUndefined()
|
||||
expect(seen.map((req) => [req.method, req.url])).toEqual([
|
||||
["POST", "https://legacy-share.example.com/api/share"],
|
||||
["DELETE", "https://legacy-share.example.com/api/share/shr_abc"],
|
||||
])
|
||||
}),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
},
|
||||
{ config: { enterprise: { url: "https://legacy-share.example.com" } } },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("create fails on a non-ok response and does not persist a share", () =>
|
||||
provideTmpdirInstance(() =>
|
||||
Effect.gen(function* () {
|
||||
const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
|
||||
const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500)))
|
||||
provideTmpdirInstance(() => {
|
||||
const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500)))
|
||||
|
||||
const exit = yield* ShareNext.Service.use((svc) => Effect.exit(svc.create(session.id))).pipe(
|
||||
Effect.provide(live(client)),
|
||||
)
|
||||
return Effect.gen(function* () {
|
||||
const sessions = yield* Session.Service
|
||||
const shareNext = yield* ShareNext.Service
|
||||
const session = yield* sessions.create({ title: "test" })
|
||||
const exit = yield* Effect.exit(shareNext.create(session.id))
|
||||
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
expect(share(session.id)).toBeUndefined()
|
||||
}),
|
||||
),
|
||||
}).pipe(Effect.provide(wired(client)))
|
||||
}),
|
||||
)
|
||||
|
||||
it.live("ShareNext coalesces rapid diff events into one delayed sync with latest data", () =>
|
||||
|
||||
125
packages/opencode/test/storage/db-effect.test.ts
Normal file
125
packages/opencode/test/storage/db-effect.test.ts
Normal file
@@ -0,0 +1,125 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test"
|
||||
import { Effect, ManagedRuntime } from "effect"
|
||||
import { memoMap } from "@opencode-ai/core/effect/memo-map"
|
||||
import { DatabaseEffect } from "@/storage/db-effect"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
|
||||
afterEach(async () => {
|
||||
await resetDatabase()
|
||||
})
|
||||
|
||||
describe("DatabaseEffect.layer", () => {
|
||||
test("yields a working Service that round-trips a query", async () => {
|
||||
const rt = ManagedRuntime.make(DatabaseEffect.layer)
|
||||
try {
|
||||
const value = await rt.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client.prepare("SELECT 42 as n").get() as { n: number }
|
||||
}),
|
||||
)
|
||||
expect(value).toEqual({ n: 42 })
|
||||
} finally {
|
||||
await rt.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("disposing the runtime closes the underlying SQLite handle", async () => {
|
||||
const rt = ManagedRuntime.make(DatabaseEffect.layer)
|
||||
const captured = await rt.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
expect(captured.prepare("SELECT 1 as n").get()).toEqual({ n: 1 })
|
||||
|
||||
await rt.dispose()
|
||||
|
||||
// The layer's release fires when the last consumer's scope closes, so
|
||||
// the underlying connection should now be closed.
|
||||
expect(() => captured.prepare("SELECT 1 as n").get()).toThrow()
|
||||
})
|
||||
|
||||
test("rebuilds a fresh handle after the previous runtime is disposed", async () => {
|
||||
const rt1 = ManagedRuntime.make(DatabaseEffect.layer)
|
||||
const first = await rt1.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
expect(first.prepare("SELECT 1 as n").get()).toEqual({ n: 1 })
|
||||
await rt1.dispose()
|
||||
|
||||
const rt2 = ManagedRuntime.make(DatabaseEffect.layer)
|
||||
try {
|
||||
const second = await rt2.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
expect(second).not.toBe(first)
|
||||
expect(second.prepare("SELECT 1 as n").get()).toEqual({ n: 1 })
|
||||
} finally {
|
||||
await rt2.dispose()
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// The shared layer memoMap refcounts the cached Service across every runtime
|
||||
// that consumes the layer. The release that closes the SQLite handle only
|
||||
// fires after the last consumer's scope has closed. These tests pin both
|
||||
// halves of that invariant — the property `test/fixture/db.ts:resetDatabase`
|
||||
// relies on by disposing every consumer before deleting the DB files.
|
||||
describe("DatabaseEffect.layer + shared memoMap lifecycle", () => {
|
||||
test("two runtimes consuming the layer share the same Service value", async () => {
|
||||
const rt1 = ManagedRuntime.make(DatabaseEffect.layer, { memoMap })
|
||||
const rt2 = ManagedRuntime.make(DatabaseEffect.layer, { memoMap })
|
||||
try {
|
||||
const a = await rt1.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
const b = await rt2.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
expect(a).toBe(b)
|
||||
} finally {
|
||||
await rt1.dispose()
|
||||
await rt2.dispose()
|
||||
}
|
||||
})
|
||||
|
||||
test("the handle stays open until the last consumer is disposed", async () => {
|
||||
const rt1 = ManagedRuntime.make(DatabaseEffect.layer, { memoMap })
|
||||
const rt2 = ManagedRuntime.make(DatabaseEffect.layer, { memoMap })
|
||||
|
||||
// Each runtime registers an observer on the memoMap entry only when it
|
||||
// first builds the layer, so both must run something through their
|
||||
// service tag before we can test refcounted release.
|
||||
const captured = await rt1.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const db = yield* DatabaseEffect.Service
|
||||
return db.$client
|
||||
}),
|
||||
)
|
||||
await rt2.runPromise(Effect.gen(function* () { yield* DatabaseEffect.Service }))
|
||||
expect(captured.prepare("SELECT 1 as n").get()).toEqual({ n: 1 })
|
||||
|
||||
// Disposing the first consumer must NOT close the handle while the
|
||||
// second still references the layer through the shared memoMap.
|
||||
await rt1.dispose()
|
||||
expect(captured.prepare("SELECT 1 as n").get()).toEqual({ n: 1 })
|
||||
|
||||
// Disposing the last consumer triggers the release.
|
||||
await rt2.dispose()
|
||||
expect(() => captured.prepare("SELECT 1 as n").get()).toThrow()
|
||||
})
|
||||
})
|
||||
@@ -9,11 +9,12 @@ import { EventTable } from "../../src/sync/event.sql"
|
||||
import { Identifier } from "../../src/id/id"
|
||||
import { Flag } from "@opencode-ai/core/flag/flag"
|
||||
import { initProjectors } from "../../src/server/projectors"
|
||||
import { resetDatabase } from "../fixture/db"
|
||||
|
||||
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
||||
|
||||
beforeEach(() => {
|
||||
Database.close()
|
||||
beforeEach(async () => {
|
||||
await resetDatabase()
|
||||
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
|
||||
})
|
||||
|
||||
@@ -25,8 +25,8 @@ void Log.init({ print: false })
|
||||
|
||||
const original = Flag.OPENCODE_EXPERIMENTAL_WORKSPACES
|
||||
|
||||
beforeEach(() => {
|
||||
Database.close()
|
||||
beforeEach(async () => {
|
||||
await resetDatabase()
|
||||
Flag.OPENCODE_EXPERIMENTAL_WORKSPACES = true
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user