Compare commits

...

3 Commits

Author SHA1 Message Date
Dax Raad
a0f7731ef4 progress 2026-04-19 20:33:46 -04:00
Dax Raad
64dc81cee0 sync 2026-04-19 20:16:25 -04:00
Dax Raad
eb36b4d381 sync 2026-04-19 20:14:01 -04:00
34 changed files with 3528 additions and 0 deletions

View File

@@ -190,6 +190,32 @@
"cloudflare": "5.2.0",
},
},
"packages/core": {
"name": "@opencode-ai/core",
"version": "1.4.11",
"bin": {
"opencode": "./bin/opencode",
},
"dependencies": {
"@effect/platform-node": "catalog:",
"@npmcli/arborist": "catalog:",
"effect": "catalog:",
"glob": "13.0.5",
"mime-types": "3.0.2",
"minimatch": "10.2.5",
"npm-package-arg": "13.0.2",
"semver": "catalog:",
"xdg-basedir": "5.1.0",
"zod": "catalog:",
},
"devDependencies": {
"@tsconfig/bun": "catalog:",
"@types/bun": "catalog:",
"@types/npm-package-arg": "6.1.4",
"@types/npmcli__arborist": "6.3.3",
"@types/semver": "catalog:",
},
},
"packages/desktop": {
"name": "@opencode-ai/desktop",
"version": "1.14.18",
@@ -1548,6 +1574,8 @@
"@opencode-ai/console-resource": ["@opencode-ai/console-resource@workspace:packages/console/resource"],
"@opencode-ai/core": ["@opencode-ai/core@workspace:packages/core"],
"@opencode-ai/desktop": ["@opencode-ai/desktop@workspace:packages/desktop"],
"@opencode-ai/desktop-electron": ["@opencode-ai/desktop-electron@workspace:packages/desktop-electron"],

View File

@@ -0,0 +1,41 @@
{
"$schema": "https://json.schemastore.org/package.json",
"version": "1.4.11",
"name": "@opencode-ai/core",
"type": "module",
"license": "MIT",
"private": true,
"scripts": {
"test": "bun test",
"typecheck": "tsgo --noEmit"
},
"bin": {
"opencode": "./bin/opencode"
},
"exports": {
"./*": "./src/*.ts"
},
"imports": {},
"devDependencies": {
"@tsconfig/bun": "catalog:",
"@types/semver": "catalog:",
"@types/bun": "catalog:",
"@types/npm-package-arg": "6.1.4",
"@types/npmcli__arborist": "6.3.3"
},
"dependencies": {
"@effect/platform-node": "catalog:",
"@npmcli/arborist": "catalog:",
"effect": "catalog:",
"glob": "13.0.5",
"mime-types": "3.0.2",
"minimatch": "10.2.5",
"npm-package-arg": "13.0.2",
"semver": "catalog:",
"xdg-basedir": "5.1.0",
"zod": "catalog:"
},
"overrides": {
"drizzle-orm": "catalog:"
}
}

View File

@@ -0,0 +1,2 @@
import { Layer } from "effect"
export const memoMap = Layer.makeMemoMapUnsafe()

View File

@@ -0,0 +1,107 @@
import { Effect, Layer, Logger } from "effect"
import { FetchHttpClient } from "effect/unstable/http"
import { OtlpLogger, OtlpSerialization } from "effect/unstable/observability"
import * as EffectLogger from "./logger"
import { Flag } from "@/flag/flag"
import { InstallationChannel, InstallationVersion } from "@/installation/version"
import { ensureProcessMetadata } from "@/util/opencode-process"
const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
export const enabled = !!base
const processID = crypto.randomUUID()
const headers = Flag.OTEL_EXPORTER_OTLP_HEADERS
? Flag.OTEL_EXPORTER_OTLP_HEADERS.split(",").reduce(
(acc, x) => {
const [key, ...value] = x.split("=")
acc[key] = value.join("=")
return acc
},
{} as Record<string, string>,
)
: undefined
export function resource(): { serviceName: string; serviceVersion: string; attributes: Record<string, string> } {
const processMetadata = ensureProcessMetadata("main")
const attributes: Record<string, string> = (() => {
const value = process.env.OTEL_RESOURCE_ATTRIBUTES
if (!value) return {}
try {
return Object.fromEntries(
value.split(",").map((entry) => {
const index = entry.indexOf("=")
if (index < 1) throw new Error("Invalid OTEL_RESOURCE_ATTRIBUTES entry")
return [decodeURIComponent(entry.slice(0, index)), decodeURIComponent(entry.slice(index + 1))]
}),
)
} catch {
return {}
}
})()
return {
serviceName: "opencode",
serviceVersion: InstallationVersion,
attributes: {
...attributes,
"deployment.environment.name": InstallationChannel,
"opencode.client": Flag.OPENCODE_CLIENT,
"opencode.process_role": processMetadata.processRole,
"opencode.run_id": processMetadata.runID,
"service.instance.id": processID,
},
}
}
function logs() {
return Logger.layer(
[
EffectLogger.logger,
OtlpLogger.make({
url: `${base}/v1/logs`,
resource: resource(),
headers,
}),
],
{ mergeWithExisting: false },
).pipe(Layer.provide(OtlpSerialization.layerJson), Layer.provide(FetchHttpClient.layer))
}
const traces = async () => {
const NodeSdk = await import("@effect/opentelemetry/NodeSdk")
const OTLP = await import("@opentelemetry/exporter-trace-otlp-http")
const SdkBase = await import("@opentelemetry/sdk-trace-base")
// @effect/opentelemetry creates a NodeTracerProvider but never calls
// register(), so the global @opentelemetry/api context manager stays
// as the no-op default. Non-Effect code (like the AI SDK) that calls
// tracer.startActiveSpan() relies on context.active() to find the
// parent span — without a real context manager every span starts a
// new trace. Registering AsyncLocalStorageContextManager fixes this.
const { AsyncLocalStorageContextManager } = await import("@opentelemetry/context-async-hooks")
const { context } = await import("@opentelemetry/api")
const mgr = new AsyncLocalStorageContextManager()
mgr.enable()
context.setGlobalContextManager(mgr)
return NodeSdk.layer(() => ({
resource: resource(),
spanProcessor: new SdkBase.BatchSpanProcessor(
new OTLP.OTLPTraceExporter({
url: `${base}/v1/traces`,
headers,
}),
),
}))
}
export const layer = !base
? EffectLogger.layer
: Layer.unwrap(
Effect.gen(function* () {
const trace = yield* Effect.promise(traces)
return Layer.mergeAll(trace, logs())
}),
)
export const Observability = { enabled, layer }

View File

@@ -0,0 +1,19 @@
import { Observability } from "./observability"
import { Layer, type Context, ManagedRuntime, type Effect } from "effect"
import { memoMap } from "./memo-map"
export function makeRuntime<I, S, E>(service: Context.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () =>
(rt ??= ManagedRuntime.make(Layer.provideMerge(layer, Observability.layer) as Layer.Layer<I, E>, { memoMap }))
return {
runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
runPromiseExit: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromiseExit(service.use(fn), options),
runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
getRuntime().runPromise(service.use(fn), options),
runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
}
}

View File

@@ -0,0 +1,237 @@
export * as AppFileSystem from "./filesystem.js"
import { NodeFileSystem } from "@effect/platform-node"
import { dirname, join, relative, resolve as pathResolve } from "path"
import { realpathSync } from "fs"
import * as NFS from "fs/promises"
import { lookup } from "mime-types"
import { Effect, FileSystem, Layer, Schema, Context } from "effect"
import type { PlatformError } from "effect/PlatformError"
import { Glob } from "./util/glob.js"
export class FileSystemError extends Schema.TaggedErrorClass<FileSystemError>()("FileSystemError", {
method: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export type Error = PlatformError | FileSystemError
export interface DirEntry {
readonly name: string
readonly type: "file" | "directory" | "symlink" | "other"
}
export interface Interface extends FileSystem.FileSystem {
readonly isDir: (path: string) => Effect.Effect<boolean>
readonly isFile: (path: string) => Effect.Effect<boolean>
readonly existsSafe: (path: string) => Effect.Effect<boolean>
readonly readJson: (path: string) => Effect.Effect<unknown, Error>
readonly writeJson: (path: string, data: unknown, mode?: number) => Effect.Effect<void, Error>
readonly ensureDir: (path: string) => Effect.Effect<void, Error>
readonly writeWithDirs: (path: string, content: string | Uint8Array, mode?: number) => Effect.Effect<void, Error>
readonly readDirectoryEntries: (path: string) => Effect.Effect<DirEntry[], Error>
readonly findUp: (target: string, start: string, stop?: string) => Effect.Effect<string[], Error>
readonly up: (options: { targets: string[]; start: string; stop?: string }) => Effect.Effect<string[], Error>
readonly globUp: (pattern: string, start: string, stop?: string) => Effect.Effect<string[], Error>
readonly glob: (pattern: string, options?: Glob.Options) => Effect.Effect<string[], Error>
readonly globMatch: (pattern: string, filepath: string) => boolean
}
export class Service extends Context.Service<Service, Interface>()("@opencode/FileSystem") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const existsSafe = Effect.fn("FileSystem.existsSafe")(function* (path: string) {
return yield* fs.exists(path).pipe(Effect.orElseSucceed(() => false))
})
const isDir = Effect.fn("FileSystem.isDir")(function* (path: string) {
const info = yield* fs.stat(path).pipe(Effect.catch(() => Effect.void))
return info?.type === "Directory"
})
const isFile = Effect.fn("FileSystem.isFile")(function* (path: string) {
const info = yield* fs.stat(path).pipe(Effect.catch(() => Effect.void))
return info?.type === "File"
})
const readDirectoryEntries = Effect.fn("FileSystem.readDirectoryEntries")(function* (dirPath: string) {
return yield* Effect.tryPromise({
try: async () => {
const entries = await NFS.readdir(dirPath, { withFileTypes: true })
return entries.map(
(e): DirEntry => ({
name: e.name,
type: e.isDirectory() ? "directory" : e.isSymbolicLink() ? "symlink" : e.isFile() ? "file" : "other",
}),
)
},
catch: (cause) => new FileSystemError({ method: "readDirectoryEntries", cause }),
})
})
const readJson = Effect.fn("FileSystem.readJson")(function* (path: string) {
const text = yield* fs.readFileString(path)
return JSON.parse(text)
})
const writeJson = Effect.fn("FileSystem.writeJson")(function* (path: string, data: unknown, mode?: number) {
const content = JSON.stringify(data, null, 2)
yield* fs.writeFileString(path, content)
if (mode) yield* fs.chmod(path, mode)
})
const ensureDir = Effect.fn("FileSystem.ensureDir")(function* (path: string) {
yield* fs.makeDirectory(path, { recursive: true })
})
const writeWithDirs = Effect.fn("FileSystem.writeWithDirs")(function* (
path: string,
content: string | Uint8Array,
mode?: number,
) {
const write = typeof content === "string" ? fs.writeFileString(path, content) : fs.writeFile(path, content)
yield* write.pipe(
Effect.catchIf(
(e) => e.reason._tag === "NotFound",
() =>
Effect.gen(function* () {
yield* fs.makeDirectory(dirname(path), { recursive: true })
yield* write
}),
),
)
if (mode) yield* fs.chmod(path, mode)
})
const glob = Effect.fn("FileSystem.glob")(function* (pattern: string, options?: Glob.Options) {
return yield* Effect.tryPromise({
try: () => Glob.scan(pattern, options),
catch: (cause) => new FileSystemError({ method: "glob", cause }),
})
})
const findUp = Effect.fn("FileSystem.findUp")(function* (target: string, start: string, stop?: string) {
const result: string[] = []
let current = start
while (true) {
const search = join(current, target)
if (yield* fs.exists(search)) result.push(search)
if (stop === current) break
const parent = dirname(current)
if (parent === current) break
current = parent
}
return result
})
const up = Effect.fn("FileSystem.up")(function* (options: { targets: string[]; start: string; stop?: string }) {
const result: string[] = []
let current = options.start
while (true) {
for (const target of options.targets) {
const search = join(current, target)
if (yield* fs.exists(search)) result.push(search)
}
if (options.stop === current) break
const parent = dirname(current)
if (parent === current) break
current = parent
}
return result
})
const globUp = Effect.fn("FileSystem.globUp")(function* (pattern: string, start: string, stop?: string) {
const result: string[] = []
let current = start
while (true) {
const matches = yield* glob(pattern, { cwd: current, absolute: true, include: "file", dot: true }).pipe(
Effect.catch(() => Effect.succeed([] as string[])),
)
result.push(...matches)
if (stop === current) break
const parent = dirname(current)
if (parent === current) break
current = parent
}
return result
})
return Service.of({
...fs,
existsSafe,
isDir,
isFile,
readDirectoryEntries,
readJson,
writeJson,
ensureDir,
writeWithDirs,
findUp,
up,
globUp,
glob,
globMatch: Glob.match,
})
}),
)
export const defaultLayer = layer.pipe(Layer.provide(NodeFileSystem.layer))
// Pure helpers that don't need Effect (path manipulation, sync operations)
export function mimeType(p: string): string {
return lookup(p) || "application/octet-stream"
}
export function normalizePath(p: string): string {
if (process.platform !== "win32") return p
const resolved = pathResolve(windowsPath(p))
try {
return realpathSync.native(resolved)
} catch {
return resolved
}
}
export function normalizePathPattern(p: string): string {
if (process.platform !== "win32") return p
if (p === "*") return p
const match = p.match(/^(.*)[\\/]\*$/)
if (!match) return normalizePath(p)
const dir = /^[A-Za-z]:$/.test(match[1]) ? match[1] + "\\" : match[1]
return join(normalizePath(dir), "*")
}
export function resolve(p: string): string {
const resolved = pathResolve(windowsPath(p))
try {
return normalizePath(realpathSync(resolved))
} catch (e: any) {
if (e?.code === "ENOENT") return normalizePath(resolved)
throw e
}
}
export function windowsPath(p: string): string {
if (process.platform !== "win32") return p
return p
.replace(/^\/([a-zA-Z]):(?:[\\/]|$)/, (_, drive) => `${drive.toUpperCase()}:/`)
.replace(/^\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`)
.replace(/^\/cygdrive\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`)
.replace(/^\/mnt\/([a-zA-Z])(?:\/|$)/, (_, drive) => `${drive.toUpperCase()}:/`)
}
export function overlaps(a: string, b: string) {
const relA = relative(a, b)
const relB = relative(b, a)
return !relA || !relA.startsWith("..") || !relB || !relB.startsWith("..")
}
export function contains(parent: string, child: string) {
return !relative(parent, child).startsWith("..")
}

View File

@@ -0,0 +1,42 @@
export * as Global from "./global.js"
import path from "path"
import { xdgData, xdgCache, xdgConfig, xdgState } from "xdg-basedir"
import os from "os"
import { Context, Effect, Layer } from "effect"
export class Service extends Context.Service<Service, Interface>()("@opencode/Global") {}
export interface Interface {
readonly home: string
readonly data: string
readonly cache: string
readonly config: string
readonly state: string
readonly bin: string
readonly log: string
}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const app = "opencode"
const home = process.env.OPENCODE_TEST_HOME ?? os.homedir()
const data = path.join(xdgData!, app)
const cache = path.join(xdgCache!, app)
const cfg = path.join(xdgConfig!, app)
const state = path.join(xdgState!, app)
const bin = path.join(cache, "bin")
const log = path.join(data, "log")
return Service.of({
home,
data,
cache,
config: cfg,
state,
bin,
log,
})
}),
)

294
packages/core/src/npm.ts Normal file
View File

@@ -0,0 +1,294 @@
export * as Npm from "./npm.js"
import path from "path"
import npa from "npm-package-arg"
import semver from "semver"
import { Effect, Schema, Context, Layer, Option, FileSystem } from "effect"
import { NodeFileSystem } from "@effect/platform-node"
import { AppFileSystem } from "./filesystem.js"
import { Global } from "./global.js"
import { EffectFlock } from "./util/effect-flock.js"
import { makeRuntime } from "../effect/runtime"
export class InstallFailedError extends Schema.TaggedErrorClass<InstallFailedError>()("NpmInstallFailedError", {
add: Schema.Array(Schema.String).pipe(Schema.optional),
dir: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export interface EntryPoint {
readonly directory: string
readonly entrypoint: Option.Option<string>
}
export interface Interface {
readonly add: (pkg: string) => Effect.Effect<EntryPoint, InstallFailedError | EffectFlock.LockError>
readonly install: (
dir: string,
input?: {
add: {
name: string
version?: string
}[]
},
) => Effect.Effect<void, EffectFlock.LockError | InstallFailedError>
readonly outdated: (pkg: string, cachedVersion: string) => Effect.Effect<boolean>
readonly which: (pkg: string) => Effect.Effect<Option.Option<string>>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Npm") {}
const illegal = process.platform === "win32" ? new Set(["<", ">", ":", '"', "|", "?", "*"]) : undefined
export function sanitize(pkg: string) {
if (!illegal) return pkg
return Array.from(pkg, (char) => (illegal.has(char) || char.charCodeAt(0) < 32 ? "_" : char)).join("")
}
const resolveEntryPoint = (name: string, dir: string): EntryPoint => {
let entrypoint: Option.Option<string>
try {
const resolved = typeof Bun !== "undefined" ? import.meta.resolve(name, dir) : import.meta.resolve(dir)
entrypoint = Option.some(resolved)
} catch {
entrypoint = Option.none()
}
return {
directory: dir,
entrypoint,
}
}
interface ArboristNode {
name: string
path: string
}
interface ArboristTree {
edgesOut: Map<string, { to?: ArboristNode }>
}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const afs = yield* AppFileSystem.Service
const global = yield* Global.Service
const fs = yield* FileSystem.FileSystem
const flock = yield* EffectFlock.Service
const directory = (pkg: string) => path.join(global.cache, "packages", sanitize(pkg))
const reify = (input: { dir: string; add?: string[] }) =>
Effect.gen(function* () {
yield* flock.acquire(`npm-install:${input.dir}`)
const { Arborist } = yield* Effect.promise(() => import("@npmcli/arborist"))
const arborist = new Arborist({
path: input.dir,
binLinks: true,
progress: false,
savePrefix: "",
ignoreScripts: true,
})
return yield* Effect.tryPromise({
try: () =>
arborist.reify({
add: input?.add || [],
save: true,
saveType: "prod",
}),
catch: (cause) =>
new InstallFailedError({
cause,
add: input?.add,
dir: input.dir,
}),
}) as Effect.Effect<ArboristTree, InstallFailedError>
}).pipe(
Effect.withSpan("Npm.reify", {
attributes: input,
}),
)
const outdated = Effect.fn("Npm.outdated")(function* (pkg: string, cachedVersion: string) {
const response = yield* Effect.tryPromise({
try: () => fetch(`https://registry.npmjs.org/${pkg}`),
catch: () => undefined,
}).pipe(Effect.orElseSucceed(() => undefined))
if (!response || !response.ok) {
return false
}
const data = yield* Effect.tryPromise({
try: () => response.json() as Promise<{ "dist-tags"?: { latest?: string } }>,
catch: () => undefined,
}).pipe(Effect.orElseSucceed(() => undefined))
const latestVersion = data?.["dist-tags"]?.latest
if (!latestVersion) {
return false
}
const range = /[\s^~*xX<>|=]/.test(cachedVersion)
if (range) return !semver.satisfies(latestVersion, cachedVersion)
return semver.lt(cachedVersion, latestVersion)
})
const add = Effect.fn("Npm.add")(function* (pkg: string) {
const dir = directory(pkg)
const name = (() => {
try {
return npa(pkg).name ?? pkg
} catch {
return pkg
}
})()
if (yield* afs.existsSafe(dir)) {
return resolveEntryPoint(name, path.join(dir, "node_modules", name))
}
const tree = yield* reify({ dir, add: [pkg] })
const first = tree.edgesOut.values().next().value?.to
if (!first) return yield* new InstallFailedError({ add: [pkg], dir })
return resolveEntryPoint(first.name, first.path)
}, Effect.scoped)
const install: Interface["install"] = Effect.fn("Npm.install")(function* (dir, input) {
const canWrite = yield* afs.access(dir, { writable: true }).pipe(
Effect.as(true),
Effect.orElseSucceed(() => false),
)
if (!canWrite) return
const add = input?.add.map((pkg) => [pkg.name, pkg.version].filter(Boolean).join("@")) ?? []
if (
yield* Effect.gen(function* () {
const nodeModulesExists = yield* afs.existsSafe(path.join(dir, "node_modules"))
if (!nodeModulesExists) {
yield* reify({ add, dir })
return true
}
return false
}).pipe(Effect.withSpan("Npm.checkNodeModules"))
)
return
yield* Effect.gen(function* () {
const pkg = yield* afs.readJson(path.join(dir, "package.json")).pipe(Effect.orElseSucceed(() => ({})))
const lock = yield* afs.readJson(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => ({})))
const pkgAny = pkg as any
const lockAny = lock as any
const declared = new Set([
...Object.keys(pkgAny?.dependencies || {}),
...Object.keys(pkgAny?.devDependencies || {}),
...Object.keys(pkgAny?.peerDependencies || {}),
...Object.keys(pkgAny?.optionalDependencies || {}),
...(input?.add || []).map((pkg) => pkg.name),
])
const root = lockAny?.packages?.[""] || {}
const locked = new Set([
...Object.keys(root?.dependencies || {}),
...Object.keys(root?.devDependencies || {}),
...Object.keys(root?.peerDependencies || {}),
...Object.keys(root?.optionalDependencies || {}),
])
for (const name of declared) {
if (!locked.has(name)) {
yield* reify({ dir, add })
return
}
}
}).pipe(Effect.withSpan("Npm.checkDirty"))
return
}, Effect.scoped)
const which = Effect.fn("Npm.which")(function* (pkg: string) {
const dir = directory(pkg)
const binDir = path.join(dir, "node_modules", ".bin")
const pick = Effect.fnUntraced(function* () {
const files = yield* fs.readDirectory(binDir).pipe(Effect.catch(() => Effect.succeed([] as string[])))
if (files.length === 0) return Option.none<string>()
if (files.length === 1) return Option.some(files[0])
const pkgJson = yield* afs.readJson(path.join(dir, "node_modules", pkg, "package.json")).pipe(Effect.option)
if (Option.isSome(pkgJson)) {
const parsed = pkgJson.value as { bin?: string | Record<string, string> }
if (parsed?.bin) {
const unscoped = pkg.startsWith("@") ? pkg.split("/")[1] : pkg
const bin = parsed.bin
if (typeof bin === "string") return Option.some(unscoped)
const keys = Object.keys(bin)
if (keys.length === 1) return Option.some(keys[0])
return bin[unscoped] ? Option.some(unscoped) : Option.some(keys[0])
}
}
return Option.some(files[0])
})
return yield* Effect.gen(function* () {
const bin = yield* pick()
if (Option.isSome(bin)) {
return Option.some(path.join(binDir, bin.value))
}
yield* fs.remove(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => {}))
yield* add(pkg)
const resolved = yield* pick()
if (Option.isNone(resolved)) return Option.none<string>()
return Option.some(path.join(binDir, resolved.value))
}).pipe(
Effect.scoped,
Effect.orElseSucceed(() => Option.none<string>()),
)
})
return Service.of({
add,
install,
outdated,
which,
})
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(EffectFlock.layer),
Layer.provide(AppFileSystem.layer),
Layer.provide(Global.layer),
Layer.provide(NodeFileSystem.layer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export async function install(...args: Parameters<Interface["install"]>) {
return runPromise((svc) => svc.install(...args))
}
export async function add(...args: Parameters<Interface["add"]>) {
const entry = await runPromise((svc) => svc.add(...args))
return {
directory: entry.directory,
entrypoint: Option.getOrUndefined(entry.entrypoint),
}
}
export async function outdated(...args: Parameters<Interface["outdated"]>) {
return runPromise((svc) => svc.outdated(...args))
}
export async function which(...args: Parameters<Interface["which"]>) {
const resolved = await runPromise((svc) => svc.which(...args))
return Option.getOrUndefined(resolved)
}

View File

@@ -0,0 +1,268 @@
import path from "path"
import npa from "npm-package-arg"
import semver from "semver"
import { Effect, Schema, Context, Layer, Option, FileSystem } from "effect"
import { NodeFileSystem } from "@effect/platform-node"
import { AppFileSystem } from "../filesystem"
import { Global } from "../global"
import { EffectFlock } from "../util/effect-flock"
export class InstallFailedError extends Schema.TaggedErrorClass<InstallFailedError>()("NpmInstallFailedError", {
add: Schema.Array(Schema.String).pipe(Schema.optional),
dir: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export interface EntryPoint {
readonly directory: string
readonly entrypoint: Option.Option<string>
}
export interface Interface {
readonly add: (pkg: string) => Effect.Effect<EntryPoint, InstallFailedError | EffectFlock.LockError>
readonly install: (
dir: string,
input?: {
add: {
name: string
version?: string
}[]
},
) => Effect.Effect<void, EffectFlock.LockError | InstallFailedError>
readonly outdated: (pkg: string, cachedVersion: string) => Effect.Effect<boolean>
readonly which: (pkg: string) => Effect.Effect<Option.Option<string>>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Npm") {}
const illegal = process.platform === "win32" ? new Set(["<", ">", ":", '"', "|", "?", "*"]) : undefined
export function sanitize(pkg: string) {
if (!illegal) return pkg
return Array.from(pkg, (char) => (illegal.has(char) || char.charCodeAt(0) < 32 ? "_" : char)).join("")
}
const resolveEntryPoint = (name: string, dir: string): EntryPoint => {
let entrypoint: Option.Option<string>
try {
const resolved = typeof Bun !== "undefined" ? import.meta.resolve(name, dir) : import.meta.resolve(dir)
entrypoint = Option.some(resolved)
} catch {
entrypoint = Option.none()
}
return {
directory: dir,
entrypoint,
}
}
interface ArboristNode {
name: string
path: string
}
interface ArboristTree {
edgesOut: Map<string, { to?: ArboristNode }>
}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const afs = yield* AppFileSystem.Service
const global = yield* Global.Service
const fs = yield* FileSystem.FileSystem
const flock = yield* EffectFlock.Service
const directory = (pkg: string) => path.join(global.cache, "packages", sanitize(pkg))
const reify = (input: { dir: string; add?: string[] }) =>
Effect.gen(function* () {
yield* flock.acquire(`npm-install:${input.dir}`)
const { Arborist } = yield* Effect.promise(() => import("@npmcli/arborist"))
const arborist = new Arborist({
path: input.dir,
binLinks: true,
progress: false,
savePrefix: "",
ignoreScripts: true,
})
return yield* Effect.tryPromise({
try: () =>
arborist.reify({
add: input?.add || [],
save: true,
saveType: "prod",
}),
catch: (cause) =>
new InstallFailedError({
cause,
add: input?.add,
dir: input.dir,
}),
}) as Effect.Effect<ArboristTree, InstallFailedError>
}).pipe(
Effect.withSpan("Npm.reify", {
attributes: input,
}),
)
const outdated = Effect.fn("Npm.outdated")(function* (pkg: string, cachedVersion: string) {
const response = yield* Effect.tryPromise({
try: () => fetch(`https://registry.npmjs.org/${pkg}`),
catch: () => undefined,
}).pipe(Effect.orElseSucceed(() => undefined))
if (!response || !response.ok) {
return false
}
const data = yield* Effect.tryPromise({
try: () => response.json() as Promise<{ "dist-tags"?: { latest?: string } }>,
catch: () => undefined,
}).pipe(Effect.orElseSucceed(() => undefined))
const latestVersion = data?.["dist-tags"]?.latest
if (!latestVersion) {
return false
}
const range = /[\s^~*xX<>|=]/.test(cachedVersion)
if (range) return !semver.satisfies(latestVersion, cachedVersion)
return semver.lt(cachedVersion, latestVersion)
})
const add = Effect.fn("Npm.add")(function* (pkg: string) {
const dir = directory(pkg)
const name = (() => {
try {
return npa(pkg).name ?? pkg
} catch {
return pkg
}
})()
if (yield* afs.existsSafe(dir)) {
return resolveEntryPoint(name, path.join(dir, "node_modules", name))
}
const tree = yield* reify({ dir, add: [pkg] })
const first = tree.edgesOut.values().next().value?.to
if (!first) return yield* new InstallFailedError({ add: [pkg], dir })
return resolveEntryPoint(first.name, first.path)
}, Effect.scoped)
const install: Interface["install"] = Effect.fn("Npm.install")(function* (dir, input) {
const canWrite = yield* afs.access(dir, { writable: true }).pipe(
Effect.as(true),
Effect.orElseSucceed(() => false),
)
if (!canWrite) return
const add = input?.add.map((pkg) => [pkg.name, pkg.version].filter(Boolean).join("@")) ?? []
if (
yield* Effect.gen(function* () {
const nodeModulesExists = yield* afs.existsSafe(path.join(dir, "node_modules"))
if (!nodeModulesExists) {
yield* reify({ add, dir })
return true
}
return false
}).pipe(Effect.withSpan("Npm.checkNodeModules"))
)
return
yield* Effect.gen(function* () {
const pkg = yield* afs.readJson(path.join(dir, "package.json")).pipe(Effect.orElseSucceed(() => ({})))
const lock = yield* afs.readJson(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => ({})))
const pkgAny = pkg as any
const lockAny = lock as any
const declared = new Set([
...Object.keys(pkgAny?.dependencies || {}),
...Object.keys(pkgAny?.devDependencies || {}),
...Object.keys(pkgAny?.peerDependencies || {}),
...Object.keys(pkgAny?.optionalDependencies || {}),
...(input?.add || []).map((pkg) => pkg.name),
])
const root = lockAny?.packages?.[""] || {}
const locked = new Set([
...Object.keys(root?.dependencies || {}),
...Object.keys(root?.devDependencies || {}),
...Object.keys(root?.peerDependencies || {}),
...Object.keys(root?.optionalDependencies || {}),
])
for (const name of declared) {
if (!locked.has(name)) {
yield* reify({ dir, add })
return
}
}
}).pipe(Effect.withSpan("Npm.checkDirty"))
return
}, Effect.scoped)
const which = Effect.fn("Npm.which")(function* (pkg: string) {
const dir = directory(pkg)
const binDir = path.join(dir, "node_modules", ".bin")
const pick = Effect.fnUntraced(function* () {
const files = yield* fs.readDirectory(binDir).pipe(Effect.catch(() => Effect.succeed([] as string[])))
if (files.length === 0) return Option.none<string>()
if (files.length === 1) return Option.some(files[0])
const pkgJson = yield* afs.readJson(path.join(dir, "node_modules", pkg, "package.json")).pipe(Effect.option)
if (Option.isSome(pkgJson)) {
const parsed = pkgJson.value as { bin?: string | Record<string, string> }
if (parsed?.bin) {
const unscoped = pkg.startsWith("@") ? pkg.split("/")[1] : pkg
const bin = parsed.bin
if (typeof bin === "string") return Option.some(unscoped)
const keys = Object.keys(bin)
if (keys.length === 1) return Option.some(keys[0])
return bin[unscoped] ? Option.some(unscoped) : Option.some(keys[0])
}
}
return Option.some(files[0])
})
return yield* Effect.gen(function* () {
const bin = yield* pick()
if (Option.isSome(bin)) {
return Option.some(path.join(binDir, bin.value))
}
yield* fs.remove(path.join(dir, "package-lock.json")).pipe(Effect.orElseSucceed(() => {}))
yield* add(pkg)
const resolved = yield* pick()
if (Option.isNone(resolved)) return Option.none<string>()
return Option.some(path.join(binDir, resolved.value))
}).pipe(
Effect.scoped,
Effect.orElseSucceed(() => Option.none<string>()),
)
})
return Service.of({
add,
install,
outdated,
which,
})
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(EffectFlock.layer),
Layer.provide(AppFileSystem.layer),
Layer.provide(Global.layer),
Layer.provide(NodeFileSystem.layer),
)
export * as Npm from "."

44
packages/core/src/types.d.ts vendored Normal file
View File

@@ -0,0 +1,44 @@
declare module "@npmcli/arborist" {
export interface ArboristOptions {
path: string
binLinks?: boolean
progress?: boolean
savePrefix?: string
ignoreScripts?: boolean
}
export interface ArboristNode {
name: string
path: string
}
export interface ArboristEdge {
to?: ArboristNode
}
export interface ArboristTree {
edgesOut: Map<string, ArboristEdge>
}
export interface ReifyOptions {
add?: string[]
save?: boolean
saveType?: "prod" | "dev" | "optional" | "peer"
}
export class Arborist {
constructor(options: ArboristOptions)
loadVirtual(): Promise<ArboristTree | undefined>
reify(options?: ReifyOptions): Promise<ArboristTree>
}
}
declare var Bun:
| {
file(path: string): {
text(): Promise<string>
json(): Promise<unknown>
}
write(path: string, content: string | Uint8Array): Promise<void>
}
| undefined

View File

@@ -0,0 +1,10 @@
export function findLast<T>(
items: readonly T[],
predicate: (item: T, index: number, items: readonly T[]) => boolean,
): T | undefined {
for (let i = items.length - 1; i >= 0; i -= 1) {
const item = items[i]
if (predicate(item, i, items)) return item
}
return undefined
}

View File

@@ -0,0 +1,41 @@
export namespace Binary {
export function search<T>(array: T[], id: string, compare: (item: T) => string): { found: boolean; index: number } {
let left = 0
let right = array.length - 1
while (left <= right) {
const mid = Math.floor((left + right) / 2)
const midId = compare(array[mid])
if (midId === id) {
return { found: true, index: mid }
} else if (midId < id) {
left = mid + 1
} else {
right = mid - 1
}
}
return { found: false, index: left }
}
export function insert<T>(array: T[], item: T, compare: (item: T) => string): T[] {
const id = compare(item)
let left = 0
let right = array.length
while (left < right) {
const mid = Math.floor((left + right) / 2)
const midId = compare(array[mid])
if (midId < id) {
left = mid + 1
} else {
right = mid
}
}
array.splice(left, 0, item)
return array
}
}

View File

@@ -0,0 +1,283 @@
import path from "path"
import os from "os"
import { randomUUID } from "crypto"
import { Context, Effect, Function, Layer, Option, Schedule, Schema } from "effect"
import type { FileSystem, Scope } from "effect"
import type { PlatformError } from "effect/PlatformError"
import { AppFileSystem } from "../filesystem"
import { Global } from "../global"
import { Hash } from "./hash"
export namespace EffectFlock {
// ---------------------------------------------------------------------------
// Errors
// ---------------------------------------------------------------------------
export class LockTimeoutError extends Schema.TaggedErrorClass<LockTimeoutError>()("LockTimeoutError", {
key: Schema.String,
}) {}
export class LockCompromisedError extends Schema.TaggedErrorClass<LockCompromisedError>()("LockCompromisedError", {
detail: Schema.String,
}) {}
class ReleaseError extends Schema.TaggedErrorClass<ReleaseError>()("ReleaseError", {
detail: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {
override get message() {
return this.detail
}
}
/** Internal: signals "lock is held, retry later". Never leaks to callers. */
class NotAcquired extends Schema.TaggedErrorClass<NotAcquired>()("NotAcquired", {}) {}
export type LockError = LockTimeoutError | LockCompromisedError
// ---------------------------------------------------------------------------
// Timing (baked in — no caller ever overrides these)
// ---------------------------------------------------------------------------
const STALE_MS = 60_000
const TIMEOUT_MS = 5 * 60_000
const BASE_DELAY_MS = 100
const MAX_DELAY_MS = 2_000
const HEARTBEAT_MS = Math.max(100, Math.floor(STALE_MS / 3))
const retrySchedule = Schedule.exponential(BASE_DELAY_MS, 1.7).pipe(
Schedule.either(Schedule.spaced(MAX_DELAY_MS)),
Schedule.jittered,
Schedule.while((meta) => meta.elapsed < TIMEOUT_MS),
)
// ---------------------------------------------------------------------------
// Lock metadata schema
// ---------------------------------------------------------------------------
const LockMetaJson = Schema.fromJsonString(
Schema.Struct({
token: Schema.String,
pid: Schema.Number,
hostname: Schema.String,
createdAt: Schema.String,
}),
)
const decodeMeta = Schema.decodeUnknownSync(LockMetaJson)
const encodeMeta = Schema.encodeSync(LockMetaJson)
// ---------------------------------------------------------------------------
// Service
// ---------------------------------------------------------------------------
export interface Interface {
readonly acquire: (key: string, dir?: string) => Effect.Effect<void, LockError, Scope.Scope>
readonly withLock: {
(key: string, dir?: string): <A, E, R>(body: Effect.Effect<A, E, R>) => Effect.Effect<A, E | LockError, R>
<A, E, R>(body: Effect.Effect<A, E, R>, key: string, dir?: string): Effect.Effect<A, E | LockError, R>
}
}
export class Service extends Context.Service<Service, Interface>()("EffectFlock") {}
// ---------------------------------------------------------------------------
// Layer
// ---------------------------------------------------------------------------
function wall() {
return performance.timeOrigin + performance.now()
}
const mtimeMs = (info: FileSystem.File.Info) => Option.getOrElse(info.mtime, () => new Date(0)).getTime()
const isPathGone = (e: PlatformError) => e.reason._tag === "NotFound" || e.reason._tag === "Unknown"
export const layer: Layer.Layer<Service, never, Global.Service | AppFileSystem.Service> = Layer.effect(
Service,
Effect.gen(function* () {
const global = yield* Global.Service
const fs = yield* AppFileSystem.Service
const lockRoot = path.join(global.state, "locks")
const hostname = os.hostname()
const ensuredDirs = new Set<string>()
// -- helpers (close over fs) --
const safeStat = (file: string) =>
fs.stat(file).pipe(
Effect.catchIf(isPathGone, () => Effect.void),
Effect.orDie,
)
const forceRemove = (target: string) => fs.remove(target, { recursive: true }).pipe(Effect.ignore)
/** Atomic mkdir — returns true if created, false if already exists, dies on other errors. */
const atomicMkdir = (dir: string) =>
fs.makeDirectory(dir, { mode: 0o700 }).pipe(
Effect.as(true),
Effect.catchIf(
(e) => e.reason._tag === "AlreadyExists",
() => Effect.succeed(false),
),
Effect.orDie,
)
/** Write with exclusive create — compromised error if file already exists. */
const exclusiveWrite = (filePath: string, content: string, lockDir: string, detail: string) =>
fs.writeFileString(filePath, content, { flag: "wx" }).pipe(
Effect.catch(() =>
Effect.gen(function* () {
yield* forceRemove(lockDir)
return yield* new LockCompromisedError({ detail })
}),
),
)
const cleanStaleBreaker = Effect.fnUntraced(function* (breakerPath: string) {
const bs = yield* safeStat(breakerPath)
if (bs && wall() - mtimeMs(bs) > STALE_MS) yield* forceRemove(breakerPath)
return false
})
const ensureDir = Effect.fnUntraced(function* (dir: string) {
if (ensuredDirs.has(dir)) return
yield* fs.makeDirectory(dir, { recursive: true }).pipe(Effect.orDie)
ensuredDirs.add(dir)
})
const isStale = Effect.fnUntraced(function* (lockDir: string, heartbeatPath: string, metaPath: string) {
const now = wall()
const hb = yield* safeStat(heartbeatPath)
if (hb) return now - mtimeMs(hb) > STALE_MS
const meta = yield* safeStat(metaPath)
if (meta) return now - mtimeMs(meta) > STALE_MS
const dir = yield* safeStat(lockDir)
if (!dir) return false
return now - mtimeMs(dir) > STALE_MS
})
// -- single lock attempt --
type Handle = { token: string; metaPath: string; heartbeatPath: string; lockDir: string }
const tryAcquireLockDir = (lockDir: string, key: string) =>
Effect.gen(function* () {
const token = randomUUID()
const metaPath = path.join(lockDir, "meta.json")
const heartbeatPath = path.join(lockDir, "heartbeat")
// Atomic mkdir — the POSIX lock primitive
const created = yield* atomicMkdir(lockDir)
if (!created) {
if (!(yield* isStale(lockDir, heartbeatPath, metaPath))) return yield* new NotAcquired()
// Stale — race for breaker ownership
const breakerPath = lockDir + ".breaker"
const claimed = yield* fs.makeDirectory(breakerPath, { mode: 0o700 }).pipe(
Effect.as(true),
Effect.catchIf(
(e) => e.reason._tag === "AlreadyExists",
() => cleanStaleBreaker(breakerPath),
),
Effect.catchIf(isPathGone, () => Effect.succeed(false)),
Effect.orDie,
)
if (!claimed) return yield* new NotAcquired()
// We own the breaker — double-check staleness, nuke, recreate
const recreated = yield* Effect.gen(function* () {
if (!(yield* isStale(lockDir, heartbeatPath, metaPath))) return false
yield* forceRemove(lockDir)
return yield* atomicMkdir(lockDir)
}).pipe(Effect.ensuring(forceRemove(breakerPath)))
if (!recreated) return yield* new NotAcquired()
}
// We own the lock dir — write heartbeat + meta with exclusive create
yield* exclusiveWrite(heartbeatPath, "", lockDir, "heartbeat already existed")
const metaJson = encodeMeta({ token, pid: process.pid, hostname, createdAt: new Date().toISOString() })
yield* exclusiveWrite(metaPath, metaJson, lockDir, "meta.json already existed")
return { token, metaPath, heartbeatPath, lockDir } satisfies Handle
}).pipe(
Effect.withSpan("EffectFlock.tryAcquire", {
attributes: { key },
}),
)
// -- retry wrapper (preserves Handle type) --
const acquireHandle = (lockfile: string, key: string): Effect.Effect<Handle, LockError> =>
tryAcquireLockDir(lockfile, key).pipe(
Effect.retry({
while: (err) => err._tag === "NotAcquired",
schedule: retrySchedule,
}),
Effect.catchTag("NotAcquired", () => Effect.fail(new LockTimeoutError({ key }))),
)
// -- release --
const release = (handle: Handle) =>
Effect.gen(function* () {
const raw = yield* fs.readFileString(handle.metaPath).pipe(
Effect.catch((err) => {
if (isPathGone(err)) return Effect.die(new ReleaseError({ detail: "metadata missing" }))
return Effect.die(err)
}),
)
const parsed = yield* Effect.try({
try: () => decodeMeta(raw),
catch: (cause) => new ReleaseError({ detail: "metadata invalid", cause }),
}).pipe(Effect.orDie)
if (parsed.token !== handle.token) return yield* Effect.die(new ReleaseError({ detail: "token mismatch" }))
yield* forceRemove(handle.lockDir)
})
// -- build service --
const acquire = Effect.fn("EffectFlock.acquire")(function* (key: string, dir?: string) {
const lockDir = dir ?? lockRoot
yield* ensureDir(lockDir)
const lockfile = path.join(lockDir, Hash.fast(key) + ".lock")
// acquireRelease: acquire is uninterruptible, release is guaranteed
const handle = yield* Effect.acquireRelease(acquireHandle(lockfile, key), (handle) => release(handle))
// Heartbeat fiber — scoped, so it's interrupted before release runs
yield* fs
.utimes(handle.heartbeatPath, new Date(), new Date())
.pipe(Effect.ignore, Effect.repeat(Schedule.spaced(HEARTBEAT_MS)), Effect.forkScoped)
})
const withLock: Interface["withLock"] = Function.dual(
(args) => Effect.isEffect(args[0]),
<A, E, R>(body: Effect.Effect<A, E, R>, key: string, dir?: string): Effect.Effect<A, E | LockError, R> =>
Effect.scoped(
Effect.gen(function* () {
yield* acquire(key, dir)
return yield* body
}),
),
)
return Service.of({ acquire, withLock })
}),
)
export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(Global.layer))
}

View File

@@ -0,0 +1,51 @@
export function base64Encode(value: string) {
const bytes = new TextEncoder().encode(value)
const binary = Array.from(bytes, (b) => String.fromCharCode(b)).join("")
return btoa(binary).replace(/\+/g, "-").replace(/\//g, "_").replace(/=/g, "")
}
export function base64Decode(value: string) {
const binary = atob(value.replace(/-/g, "+").replace(/_/g, "/"))
const bytes = Uint8Array.from(binary, (c) => c.charCodeAt(0))
return new TextDecoder().decode(bytes)
}
export async function hash(content: string, algorithm = "SHA-256"): Promise<string> {
const encoder = new TextEncoder()
const data = encoder.encode(content)
const hashBuffer = await crypto.subtle.digest(algorithm, data)
const hashArray = Array.from(new Uint8Array(hashBuffer))
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join("")
return hashHex
}
export function checksum(content: string): string | undefined {
if (!content) return undefined
let hash = 0x811c9dc5
for (let i = 0; i < content.length; i++) {
hash ^= content.charCodeAt(i)
hash = Math.imul(hash, 0x01000193)
}
return (hash >>> 0).toString(36)
}
export function sampledChecksum(content: string, limit = 500_000): string | undefined {
if (!content) return undefined
if (content.length <= limit) return checksum(content)
const size = 4096
const points = [
0,
Math.floor(content.length * 0.25),
Math.floor(content.length * 0.5),
Math.floor(content.length * 0.75),
content.length - size,
]
const hashes = points
.map((point) => {
const start = Math.max(0, Math.min(content.length - size, point - Math.floor(size / 2)))
return checksum(content.slice(start, start + size)) ?? ""
})
.join(":")
return `${content.length}:${hashes}`
}

View File

@@ -0,0 +1,60 @@
import z from "zod"
export abstract class NamedError extends Error {
abstract schema(): z.core.$ZodType
abstract toObject(): { name: string; data: any }
static hasName(error: unknown, name: string): boolean {
return (
typeof error === "object" && error !== null && "name" in error && (error as Record<string, unknown>).name === name
)
}
static create<Name extends string, Data extends z.core.$ZodType>(name: Name, data: Data) {
const schema = z
.object({
name: z.literal(name),
data,
})
.meta({
ref: name,
})
const result = class extends NamedError {
public static readonly Schema = schema
public override readonly name = name as Name
constructor(
public readonly data: z.input<Data>,
options?: ErrorOptions,
) {
super(name, options)
this.name = name
}
static isInstance(input: any): input is InstanceType<typeof result> {
return typeof input === "object" && "name" in input && input.name === name
}
schema() {
return schema
}
toObject() {
return {
name: name,
data: this.data,
}
}
}
Object.defineProperty(result, "name", { value: name })
return result
}
public static readonly Unknown = NamedError.create(
"UnknownError",
z.object({
message: z.string(),
}),
)
}

View File

@@ -0,0 +1,358 @@
import path from "path"
import os from "os"
import { randomBytes, randomUUID } from "crypto"
import { mkdir, readFile, rm, stat, utimes, writeFile } from "fs/promises"
import { Hash } from "./hash"
import { Effect } from "effect"
export type FlockGlobal = {
state: string
}
export namespace Flock {
let global: FlockGlobal | undefined
export function setGlobal(g: FlockGlobal) {
global = g
}
const root = () => {
if (!global) throw new Error("Flock global not set")
return path.join(global.state, "locks")
}
// Defaults for callers that do not provide timing options.
const defaultOpts = {
staleMs: 60_000,
timeoutMs: 5 * 60_000,
baseDelayMs: 100,
maxDelayMs: 2_000,
}
export interface WaitEvent {
key: string
attempt: number
delay: number
waited: number
}
export type Wait = (input: WaitEvent) => void | Promise<void>
export interface Options {
dir?: string
signal?: AbortSignal
staleMs?: number
timeoutMs?: number
baseDelayMs?: number
maxDelayMs?: number
onWait?: Wait
}
type Opts = {
staleMs: number
timeoutMs: number
baseDelayMs: number
maxDelayMs: number
}
type Owned = {
acquired: true
startHeartbeat: (intervalMs?: number) => void
release: () => Promise<void>
}
export interface Lease {
release: () => Promise<void>
[Symbol.asyncDispose]: () => Promise<void>
}
function code(err: unknown) {
if (typeof err !== "object" || err === null || !("code" in err)) return
const value = err.code
if (typeof value !== "string") return
return value
}
function sleep(ms: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (signal?.aborted) {
reject(signal.reason ?? new Error("Aborted"))
return
}
let timer: NodeJS.Timeout | undefined
const done = () => {
signal?.removeEventListener("abort", abort)
resolve()
}
const abort = () => {
if (timer) {
clearTimeout(timer)
}
signal?.removeEventListener("abort", abort)
reject(signal?.reason ?? new Error("Aborted"))
}
signal?.addEventListener("abort", abort, { once: true })
timer = setTimeout(done, ms)
})
}
function jitter(ms: number) {
const j = Math.floor(ms * 0.3)
const d = Math.floor(Math.random() * (2 * j + 1)) - j
return Math.max(0, ms + d)
}
function mono() {
return performance.now()
}
function wall() {
return performance.timeOrigin + mono()
}
async function stats(file: string) {
try {
return await stat(file)
} catch (err) {
const errCode = code(err)
if (errCode === "ENOENT" || errCode === "ENOTDIR") return
throw err
}
}
async function stale(lockDir: string, heartbeatPath: string, metaPath: string, staleMs: number) {
// Stale detection allows automatic recovery after crashed owners.
const now = wall()
const heartbeat = await stats(heartbeatPath)
if (heartbeat) {
return now - heartbeat.mtimeMs > staleMs
}
const meta = await stats(metaPath)
if (meta) {
return now - meta.mtimeMs > staleMs
}
const dir = await stats(lockDir)
if (!dir) {
return false
}
return now - dir.mtimeMs > staleMs
}
async function tryAcquireLockDir(lockDir: string, opts: Opts): Promise<Owned | { acquired: false }> {
const token = randomUUID?.() ?? randomBytes(16).toString("hex")
const metaPath = path.join(lockDir, "meta.json")
const heartbeatPath = path.join(lockDir, "heartbeat")
try {
await mkdir(lockDir, { mode: 0o700 })
} catch (err) {
if (code(err) !== "EEXIST") {
throw err
}
if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) {
return { acquired: false }
}
const breakerPath = lockDir + ".breaker"
try {
await mkdir(breakerPath, { mode: 0o700 })
} catch (claimErr) {
const errCode = code(claimErr)
if (errCode === "EEXIST") {
const breaker = await stats(breakerPath)
if (breaker && wall() - breaker.mtimeMs > opts.staleMs) {
await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined)
}
return { acquired: false }
}
if (errCode === "ENOENT" || errCode === "ENOTDIR") {
return { acquired: false }
}
throw claimErr
}
try {
// Breaker ownership ensures only one contender performs stale cleanup.
if (!(await stale(lockDir, heartbeatPath, metaPath, opts.staleMs))) {
return { acquired: false }
}
await rm(lockDir, { recursive: true, force: true })
try {
await mkdir(lockDir, { mode: 0o700 })
} catch (retryErr) {
const errCode = code(retryErr)
if (errCode === "EEXIST" || errCode === "ENOTEMPTY") {
return { acquired: false }
}
throw retryErr
}
} finally {
await rm(breakerPath, { recursive: true, force: true }).catch(() => undefined)
}
}
const meta = {
token,
pid: process.pid,
hostname: os.hostname(),
createdAt: new Date().toISOString(),
}
await writeFile(heartbeatPath, "", { flag: "wx" }).catch(async () => {
await rm(lockDir, { recursive: true, force: true })
throw new Error("Lock acquired but heartbeat already existed (possible compromise).")
})
await writeFile(metaPath, JSON.stringify(meta, null, 2), { flag: "wx" }).catch(async () => {
await rm(lockDir, { recursive: true, force: true })
throw new Error("Lock acquired but meta.json already existed (possible compromise).")
})
let timer: NodeJS.Timeout | undefined
const startHeartbeat = (intervalMs = Math.max(100, Math.floor(opts.staleMs / 3))) => {
if (timer) return
// Heartbeat prevents long critical sections from being evicted as stale.
timer = setInterval(() => {
const t = new Date()
void utimes(heartbeatPath, t, t).catch(() => undefined)
}, intervalMs)
timer.unref?.()
}
const release = async () => {
if (timer) {
clearInterval(timer)
timer = undefined
}
const current = await readFile(metaPath, "utf8")
.then((raw) => {
const parsed = JSON.parse(raw)
if (!parsed || typeof parsed !== "object") return {}
return {
token: "token" in parsed && typeof parsed.token === "string" ? parsed.token : undefined,
}
})
.catch((err) => {
const errCode = code(err)
if (errCode === "ENOENT" || errCode === "ENOTDIR") {
throw new Error("Refusing to release: lock is compromised (metadata missing).")
}
if (err instanceof SyntaxError) {
throw new Error("Refusing to release: lock is compromised (metadata invalid).")
}
throw err
})
// Token check prevents deleting a lock that was re-acquired by another process.
if (current.token !== token) {
throw new Error("Refusing to release: lock token mismatch (not the owner).")
}
await rm(lockDir, { recursive: true, force: true })
}
return {
acquired: true,
startHeartbeat,
release,
}
}
async function acquireLockDir(
lockDir: string,
input: { key: string; onWait?: Wait; signal?: AbortSignal },
opts: Opts,
) {
const stop = mono() + opts.timeoutMs
let attempt = 0
let waited = 0
let delay = opts.baseDelayMs
while (true) {
input.signal?.throwIfAborted()
const res = await tryAcquireLockDir(lockDir, opts)
if (res.acquired) {
return res
}
if (mono() > stop) {
throw new Error(`Timed out waiting for lock: ${input.key}`)
}
attempt += 1
const ms = jitter(delay)
await input.onWait?.({
key: input.key,
attempt,
delay: ms,
waited,
})
await sleep(ms, input.signal)
waited += ms
delay = Math.min(opts.maxDelayMs, Math.floor(delay * 1.7))
}
}
export async function acquire(key: string, input: Options = {}): Promise<Lease> {
input.signal?.throwIfAborted()
const cfg: Opts = {
staleMs: input.staleMs ?? defaultOpts.staleMs,
timeoutMs: input.timeoutMs ?? defaultOpts.timeoutMs,
baseDelayMs: input.baseDelayMs ?? defaultOpts.baseDelayMs,
maxDelayMs: input.maxDelayMs ?? defaultOpts.maxDelayMs,
}
const dir = input.dir ?? root()
await mkdir(dir, { recursive: true })
const lockfile = path.join(dir, Hash.fast(key) + ".lock")
const lock = await acquireLockDir(
lockfile,
{
key,
onWait: input.onWait,
signal: input.signal,
},
cfg,
)
lock.startHeartbeat()
const release = () => lock.release()
return {
release,
[Symbol.asyncDispose]() {
return release()
},
}
}
export async function withLock<T>(key: string, fn: () => Promise<T>, input: Options = {}) {
await using _ = await acquire(key, input)
input.signal?.throwIfAborted()
return await fn()
}
export const effect = Effect.fn("Flock.effect")(function* (key: string, input: Options = {}) {
return yield* Effect.acquireRelease(
Effect.promise((signal) => Flock.acquire(key, { ...input, signal })).pipe(
Effect.withSpan("Flock.acquire", {
attributes: { key },
}),
),
(lock) => Effect.promise(() => lock.release()).pipe(Effect.withSpan("Flock.release")),
).pipe(Effect.asVoid)
})
}

View File

@@ -0,0 +1,11 @@
import { z } from "zod"
export function fn<T extends z.ZodType, Result>(schema: T, cb: (input: z.infer<T>) => Result) {
const result = (input: z.infer<T>) => {
const parsed = schema.parse(input)
return cb(parsed)
}
result.force = (input: z.infer<T>) => cb(input)
result.schema = schema
return result
}

View File

@@ -0,0 +1,34 @@
import { glob, globSync, type GlobOptions } from "glob"
import { minimatch } from "minimatch"
export namespace Glob {
export interface Options {
cwd?: string
absolute?: boolean
include?: "file" | "all"
dot?: boolean
symlink?: boolean
}
function toGlobOptions(options: Options): GlobOptions {
return {
cwd: options.cwd,
absolute: options.absolute,
dot: options.dot,
follow: options.symlink ?? false,
nodir: options.include !== "all",
}
}
export async function scan(pattern: string, options: Options = {}): Promise<string[]> {
return glob(pattern, toGlobOptions(options)) as Promise<string[]>
}
export function scanSync(pattern: string, options: Options = {}): string[] {
return globSync(pattern, toGlobOptions(options)) as string[]
}
export function match(pattern: string, filepath: string): boolean {
return minimatch(filepath, pattern, { dot: true })
}
}

View File

@@ -0,0 +1,7 @@
import { createHash } from "crypto"
export namespace Hash {
export function fast(input: string | Buffer): string {
return createHash("sha1").update(input).digest("hex")
}
}

View File

@@ -0,0 +1,48 @@
import { randomBytes } from "crypto"
export namespace Identifier {
const LENGTH = 26
// State for monotonic ID generation
let lastTimestamp = 0
let counter = 0
export function ascending() {
return create(false)
}
export function descending() {
return create(true)
}
function randomBase62(length: number): string {
const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
let result = ""
const bytes = randomBytes(length)
for (let i = 0; i < length; i++) {
result += chars[bytes[i] % 62]
}
return result
}
export function create(descending: boolean, timestamp?: number): string {
const currentTimestamp = timestamp ?? Date.now()
if (currentTimestamp !== lastTimestamp) {
lastTimestamp = currentTimestamp
counter = 0
}
counter++
let now = BigInt(currentTimestamp) * BigInt(0x1000) + BigInt(counter)
now = descending ? ~now : now
const timeBytes = Buffer.alloc(6)
for (let i = 0; i < 6; i++) {
timeBytes[i] = Number((now >> BigInt(40 - 8 * i)) & BigInt(0xff))
}
return timeBytes.toString("hex") + randomBase62(LENGTH - 12)
}
}

View File

@@ -0,0 +1,3 @@
export function iife<T>(fn: () => T) {
return fn()
}

View File

@@ -0,0 +1,11 @@
export function lazy<T>(fn: () => T) {
let value: T | undefined
let loaded = false
return (): T => {
if (loaded) return value as T
loaded = true
value = fn()
return value as T
}
}

View File

@@ -0,0 +1,10 @@
import { createRequire } from "node:module"
import path from "node:path"
export namespace Module {
export function resolve(id: string, dir: string) {
try {
return createRequire(path.join(dir, "package.json")).resolve(id)
} catch {}
}
}

View File

@@ -0,0 +1,37 @@
export function getFilename(path: string | undefined) {
if (!path) return ""
const trimmed = path.replace(/[/\\]+$/, "")
const parts = trimmed.split(/[/\\]/)
return parts[parts.length - 1] ?? ""
}
export function getDirectory(path: string | undefined) {
if (!path) return ""
const trimmed = path.replace(/[/\\]+$/, "")
const parts = trimmed.split(/[/\\]/)
return parts.slice(0, parts.length - 1).join("/") + "/"
}
export function getFileExtension(path: string | undefined) {
if (!path) return ""
const parts = path.split(".")
return parts[parts.length - 1]
}
export function getFilenameTruncated(path: string | undefined, maxLength: number = 20) {
const filename = getFilename(path)
if (filename.length <= maxLength) return filename
const lastDot = filename.lastIndexOf(".")
const ext = lastDot <= 0 ? "" : filename.slice(lastDot)
const available = maxLength - ext.length - 1 // -1 for ellipsis
if (available <= 0) return filename.slice(0, maxLength - 1) + "…"
return filename.slice(0, available) + "…" + ext
}
export function truncateMiddle(text: string, maxLength: number = 20) {
if (text.length <= maxLength) return text
const available = maxLength - 1 // -1 for ellipsis
const start = Math.ceil(available / 2)
const end = Math.floor(available / 2)
return text.slice(0, start) + "…" + text.slice(-end)
}

View File

@@ -0,0 +1,42 @@
export interface RetryOptions {
attempts?: number
delay?: number
factor?: number
maxDelay?: number
retryIf?: (error: unknown) => boolean
}
const TRANSIENT_MESSAGES = [
"load failed",
"network connection was lost",
"network request failed",
"failed to fetch",
"econnreset",
"econnrefused",
"etimedout",
"socket hang up",
]
function isTransientError(error: unknown): boolean {
if (!error) return false
// oxlint-disable-next-line no-base-to-string -- error is unknown, intentional coercion for message matching
const message = String(error instanceof Error ? error.message : error).toLowerCase()
return TRANSIENT_MESSAGES.some((m) => message.includes(m))
}
export async function retry<T>(fn: () => Promise<T>, options: RetryOptions = {}): Promise<T> {
const { attempts = 3, delay = 500, factor = 2, maxDelay = 10000, retryIf = isTransientError } = options
let lastError: unknown
for (let attempt = 0; attempt < attempts; attempt++) {
try {
return await fn()
} catch (error) {
lastError = error
if (attempt === attempts - 1 || !retryIf(error)) throw error
const wait = Math.min(delay * Math.pow(factor, attempt), maxDelay)
await new Promise((resolve) => setTimeout(resolve, wait))
}
}
throw lastError
}

View File

@@ -0,0 +1,74 @@
export namespace Slug {
const ADJECTIVES = [
"brave",
"calm",
"clever",
"cosmic",
"crisp",
"curious",
"eager",
"gentle",
"glowing",
"happy",
"hidden",
"jolly",
"kind",
"lucky",
"mighty",
"misty",
"neon",
"nimble",
"playful",
"proud",
"quick",
"quiet",
"shiny",
"silent",
"stellar",
"sunny",
"swift",
"tidy",
"witty",
] as const
const NOUNS = [
"cabin",
"cactus",
"canyon",
"circuit",
"comet",
"eagle",
"engine",
"falcon",
"forest",
"garden",
"harbor",
"island",
"knight",
"lagoon",
"meadow",
"moon",
"mountain",
"nebula",
"orchid",
"otter",
"panda",
"pixel",
"planet",
"river",
"rocket",
"sailor",
"squid",
"star",
"tiger",
"wizard",
"wolf",
] as const
export function create() {
return [
ADJECTIVES[Math.floor(Math.random() * ADJECTIVES.length)],
NOUNS[Math.floor(Math.random() * NOUNS.length)],
].join("-")
}
}

10
packages/core/sst-env.d.ts vendored Normal file
View File

@@ -0,0 +1,10 @@
/* This file is auto-generated by SST. Do not edit. */
/* tslint:disable */
/* eslint-disable */
/* deno-fmt-ignore-file */
/* biome-ignore-all lint: auto-generated */
/// <reference path="../../sst-env.d.ts" />
import "sst"
export {}

View File

@@ -0,0 +1,338 @@
import { describe, test, expect } from "bun:test"
import { Effect, Layer, FileSystem } from "effect"
import { NodeFileSystem } from "@effect/platform-node"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { testEffect } from "../lib/effect"
import path from "path"
const live = AppFileSystem.layer.pipe(Layer.provideMerge(NodeFileSystem.layer))
const { effect: it } = testEffect(live)
describe("AppFileSystem", () => {
describe("isDir", () => {
it(
"returns true for directories",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
expect(yield* fs.isDir(tmp)).toBe(true)
}),
)
it(
"returns false for files",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "test.txt")
yield* filesys.writeFileString(file, "hello")
expect(yield* fs.isDir(file)).toBe(false)
}),
)
it(
"returns false for non-existent paths",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
expect(yield* fs.isDir("/tmp/nonexistent-" + Math.random())).toBe(false)
}),
)
})
describe("isFile", () => {
it(
"returns true for files",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "test.txt")
yield* filesys.writeFileString(file, "hello")
expect(yield* fs.isFile(file)).toBe(true)
}),
)
it(
"returns false for directories",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
expect(yield* fs.isFile(tmp)).toBe(false)
}),
)
})
describe("readJson / writeJson", () => {
it(
"round-trips JSON data",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "data.json")
const data = { name: "test", count: 42, nested: { ok: true } }
yield* fs.writeJson(file, data)
const result = yield* fs.readJson(file)
expect(result).toEqual(data)
}),
)
})
describe("ensureDir", () => {
it(
"creates nested directories",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const nested = path.join(tmp, "a", "b", "c")
yield* fs.ensureDir(nested)
const info = yield* filesys.stat(nested)
expect(info.type).toBe("Directory")
}),
)
it(
"is idempotent",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const dir = path.join(tmp, "existing")
yield* filesys.makeDirectory(dir)
yield* fs.ensureDir(dir)
const info = yield* filesys.stat(dir)
expect(info.type).toBe("Directory")
}),
)
})
describe("writeWithDirs", () => {
it(
"creates parent directories if missing",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "deep", "nested", "file.txt")
yield* fs.writeWithDirs(file, "hello")
expect(yield* filesys.readFileString(file)).toBe("hello")
}),
)
it(
"writes directly when parent exists",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "direct.txt")
yield* fs.writeWithDirs(file, "world")
expect(yield* filesys.readFileString(file)).toBe("world")
}),
)
it(
"writes Uint8Array content",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "binary.bin")
const content = new Uint8Array([0x00, 0x01, 0x02, 0x03])
yield* fs.writeWithDirs(file, content)
const result = yield* filesys.readFile(file)
expect(new Uint8Array(result)).toEqual(content)
}),
)
})
describe("findUp", () => {
it(
"finds target in start directory",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "target.txt"), "found")
const result = yield* fs.findUp("target.txt", tmp)
expect(result).toEqual([path.join(tmp, "target.txt")])
}),
)
it(
"finds target in parent directories",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "marker"), "root")
const child = path.join(tmp, "a", "b")
yield* filesys.makeDirectory(child, { recursive: true })
const result = yield* fs.findUp("marker", child, tmp)
expect(result).toEqual([path.join(tmp, "marker")])
}),
)
it(
"returns empty array when not found",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const result = yield* fs.findUp("nonexistent", tmp, tmp)
expect(result).toEqual([])
}),
)
})
describe("up", () => {
it(
"finds multiple targets walking up",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "a.txt"), "a")
yield* filesys.writeFileString(path.join(tmp, "b.txt"), "b")
const child = path.join(tmp, "sub")
yield* filesys.makeDirectory(child)
yield* filesys.writeFileString(path.join(child, "a.txt"), "a-child")
const result = yield* fs.up({ targets: ["a.txt", "b.txt"], start: child, stop: tmp })
expect(result).toContain(path.join(child, "a.txt"))
expect(result).toContain(path.join(tmp, "a.txt"))
expect(result).toContain(path.join(tmp, "b.txt"))
}),
)
})
describe("glob", () => {
it(
"finds files matching pattern",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "a.ts"), "a")
yield* filesys.writeFileString(path.join(tmp, "b.ts"), "b")
yield* filesys.writeFileString(path.join(tmp, "c.json"), "c")
const result = yield* fs.glob("*.ts", { cwd: tmp })
expect(result.sort()).toEqual(["a.ts", "b.ts"])
}),
)
it(
"supports absolute paths",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "file.txt"), "hello")
const result = yield* fs.glob("*.txt", { cwd: tmp, absolute: true })
expect(result).toEqual([path.join(tmp, "file.txt")])
}),
)
})
describe("globMatch", () => {
it(
"matches patterns",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
expect(fs.globMatch("*.ts", "foo.ts")).toBe(true)
expect(fs.globMatch("*.ts", "foo.json")).toBe(false)
expect(fs.globMatch("src/**", "src/a/b.ts")).toBe(true)
}),
)
})
describe("globUp", () => {
it(
"finds files walking up directories",
Effect.gen(function* () {
const fs = yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
yield* filesys.writeFileString(path.join(tmp, "root.md"), "root")
const child = path.join(tmp, "a", "b")
yield* filesys.makeDirectory(child, { recursive: true })
yield* filesys.writeFileString(path.join(child, "leaf.md"), "leaf")
const result = yield* fs.globUp("*.md", child, tmp)
expect(result).toContain(path.join(child, "leaf.md"))
expect(result).toContain(path.join(tmp, "root.md"))
}),
)
})
describe("built-in passthrough", () => {
it(
"exists works",
Effect.gen(function* () {
yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "exists.txt")
yield* filesys.writeFileString(file, "yes")
expect(yield* filesys.exists(file)).toBe(true)
expect(yield* filesys.exists(file + ".nope")).toBe(false)
}),
)
it(
"remove works",
Effect.gen(function* () {
yield* AppFileSystem.Service
const filesys = yield* FileSystem.FileSystem
const tmp = yield* filesys.makeTempDirectoryScoped()
const file = path.join(tmp, "delete-me.txt")
yield* filesys.writeFileString(file, "bye")
yield* filesys.remove(file)
expect(yield* filesys.exists(file)).toBe(false)
}),
)
})
describe("pure helpers", () => {
test("mimeType returns correct types", () => {
expect(AppFileSystem.mimeType("file.json")).toBe("application/json")
expect(AppFileSystem.mimeType("image.png")).toBe("image/png")
expect(AppFileSystem.mimeType("unknown.qzx")).toBe("application/octet-stream")
})
test("contains checks path containment", () => {
expect(AppFileSystem.contains("/a/b", "/a/b/c")).toBe(true)
expect(AppFileSystem.contains("/a/b", "/a/c")).toBe(false)
})
test("overlaps detects overlapping paths", () => {
expect(AppFileSystem.overlaps("/a/b", "/a/b/c")).toBe(true)
expect(AppFileSystem.overlaps("/a/b/c", "/a/b")).toBe(true)
expect(AppFileSystem.overlaps("/a", "/b")).toBe(false)
})
})
})

View File

@@ -0,0 +1,63 @@
import fs from "fs/promises"
import os from "os"
import { Effect, Layer } from "effect"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { EffectFlock } from "@opencode-ai/core/util/effect-flock"
import { Global } from "@opencode-ai/core/global"
type Msg = {
key: string
dir: string
holdMs?: number
ready?: string
active?: string
done?: string
}
function sleep(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms))
}
const msg: Msg = JSON.parse(process.argv[2]!)
const testGlobal = Layer.succeed(
Global.Service,
Global.Service.of({
home: os.homedir(),
data: os.tmpdir(),
cache: os.tmpdir(),
config: os.tmpdir(),
state: os.tmpdir(),
bin: os.tmpdir(),
log: os.tmpdir(),
}),
)
const testLayer = EffectFlock.layer.pipe(Layer.provide(testGlobal), Layer.provide(AppFileSystem.defaultLayer))
async function job() {
if (msg.ready) await fs.writeFile(msg.ready, String(process.pid))
if (msg.active) await fs.writeFile(msg.active, String(process.pid), { flag: "wx" })
try {
if (msg.holdMs && msg.holdMs > 0) await sleep(msg.holdMs)
if (msg.done) await fs.appendFile(msg.done, "1\n")
} finally {
if (msg.active) await fs.rm(msg.active, { force: true })
}
}
await Effect.runPromise(
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
yield* flock.withLock(
Effect.promise(() => job()),
msg.key,
msg.dir,
)
}).pipe(Effect.provide(testLayer)),
).catch((err) => {
const text = err instanceof Error ? (err.stack ?? err.message) : String(err)
process.stderr.write(text)
process.exit(1)
})

View File

@@ -0,0 +1,72 @@
import fs from "fs/promises"
import { Flock } from "@opencode-ai/core/util/flock"
type Msg = {
key: string
dir: string
staleMs?: number
timeoutMs?: number
baseDelayMs?: number
maxDelayMs?: number
holdMs?: number
ready?: string
active?: string
done?: string
}
function sleep(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(resolve, ms)
})
}
function input() {
const raw = process.argv[2]
if (!raw) {
throw new Error("Missing flock worker input")
}
return JSON.parse(raw) as Msg
}
async function job(input: Msg) {
if (input.ready) {
await fs.writeFile(input.ready, String(process.pid))
}
if (input.active) {
await fs.writeFile(input.active, String(process.pid), { flag: "wx" })
}
try {
if (input.holdMs && input.holdMs > 0) {
await sleep(input.holdMs)
}
if (input.done) {
await fs.appendFile(input.done, "1\n")
}
} finally {
if (input.active) {
await fs.rm(input.active, { force: true })
}
}
}
async function main() {
const msg = input()
await Flock.withLock(msg.key, () => job(msg), {
dir: msg.dir,
staleMs: msg.staleMs,
timeoutMs: msg.timeoutMs,
baseDelayMs: msg.baseDelayMs,
maxDelayMs: msg.maxDelayMs,
})
}
await main().catch((err) => {
const text = err instanceof Error ? (err.stack ?? err.message) : String(err)
process.stderr.write(text)
process.exit(1)
})

View File

@@ -0,0 +1,53 @@
import { test, type TestOptions } from "bun:test"
import { Cause, Effect, Exit, Layer } from "effect"
import type * as Scope from "effect/Scope"
import * as TestClock from "effect/testing/TestClock"
import * as TestConsole from "effect/testing/TestConsole"
type Body<A, E, R> = Effect.Effect<A, E, R> | (() => Effect.Effect<A, E, R>)
const body = <A, E, R>(value: Body<A, E, R>) => Effect.suspend(() => (typeof value === "function" ? value() : value))
const run = <A, E, R, E2>(value: Body<A, E, R | Scope.Scope>, layer: Layer.Layer<R, E2>) =>
Effect.gen(function* () {
const exit = yield* body(value).pipe(Effect.scoped, Effect.provide(layer), Effect.exit)
if (Exit.isFailure(exit)) {
for (const err of Cause.prettyErrors(exit.cause)) {
yield* Effect.logError(err)
}
}
return yield* exit
}).pipe(Effect.runPromise)
const make = <R, E>(testLayer: Layer.Layer<R, E>, liveLayer: Layer.Layer<R, E>) => {
const effect = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test(name, () => run(value, testLayer), opts)
effect.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test.only(name, () => run(value, testLayer), opts)
effect.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test.skip(name, () => run(value, testLayer), opts)
const live = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test(name, () => run(value, liveLayer), opts)
live.only = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test.only(name, () => run(value, liveLayer), opts)
live.skip = <A, E2>(name: string, value: Body<A, E2, R | Scope.Scope>, opts?: number | TestOptions) =>
test.skip(name, () => run(value, liveLayer), opts)
return { effect, live }
}
// Test environment with TestClock and TestConsole
const testEnv = Layer.mergeAll(TestConsole.layer, TestClock.layer())
// Live environment - uses real clock, but keeps TestConsole for output capture
const liveEnv = TestConsole.layer
export const it = make(testEnv, liveEnv)
export const testEffect = <R, E>(layer: Layer.Layer<R, E>) =>
make(Layer.provideMerge(layer, testEnv), Layer.provideMerge(layer, liveEnv))

View File

@@ -0,0 +1,389 @@
import { describe, expect } from "bun:test"
import { spawn } from "child_process"
import fs from "fs/promises"
import path from "path"
import os from "os"
import { Cause, Effect, Exit, Layer } from "effect"
import { testEffect } from "../lib/effect"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { EffectFlock } from "@opencode-ai/core/util/effect-flock"
import { Global } from "@opencode-ai/core/global"
import { Hash } from "@opencode-ai/core/util/hash"
function lock(dir: string, key: string) {
return path.join(dir, Hash.fast(key) + ".lock")
}
function sleep(ms: number) {
return new Promise<void>((resolve) => setTimeout(resolve, ms))
}
async function exists(file: string) {
return fs
.stat(file)
.then(() => true)
.catch(() => false)
}
async function readJson<T>(p: string): Promise<T> {
return JSON.parse(await fs.readFile(p, "utf8"))
}
// ---------------------------------------------------------------------------
// Worker subprocess helpers
// ---------------------------------------------------------------------------
type Msg = {
key: string
dir: string
holdMs?: number
ready?: string
active?: string
done?: string
}
const root = path.join(import.meta.dir, "../..")
const worker = path.join(import.meta.dir, "../fixture/effect-flock-worker.ts")
function run(msg: Msg) {
return new Promise<{ code: number; stdout: Buffer; stderr: Buffer }>((resolve) => {
const proc = spawn(process.execPath, [worker, JSON.stringify(msg)], { cwd: root })
const stdout: Buffer[] = []
const stderr: Buffer[] = []
proc.stdout?.on("data", (data) => stdout.push(Buffer.from(data)))
proc.stderr?.on("data", (data) => stderr.push(Buffer.from(data)))
proc.on("close", (code) => {
resolve({ code: code ?? 1, stdout: Buffer.concat(stdout), stderr: Buffer.concat(stderr) })
})
})
}
function spawnWorker(msg: Msg) {
return spawn(process.execPath, [worker, JSON.stringify(msg)], {
cwd: root,
stdio: ["ignore", "pipe", "pipe"],
})
}
function stopWorker(proc: ReturnType<typeof spawnWorker>) {
if (proc.exitCode !== null || proc.signalCode !== null) return Promise.resolve()
if (process.platform !== "win32" || !proc.pid) {
proc.kill()
return Promise.resolve()
}
return new Promise<void>((resolve) => {
const killProc = spawn("taskkill", ["/pid", String(proc.pid), "/T", "/F"])
killProc.on("close", () => {
proc.kill()
resolve()
})
})
}
async function waitForFile(file: string, timeout = 3_000) {
const stop = Date.now() + timeout
while (Date.now() < stop) {
if (await exists(file)) return
await sleep(20)
}
throw new Error(`Timed out waiting for file: ${file}`)
}
// ---------------------------------------------------------------------------
// Test layer
// ---------------------------------------------------------------------------
const testGlobal = Layer.succeed(
Global.Service,
Global.Service.of({
home: os.homedir(),
data: os.tmpdir(),
cache: os.tmpdir(),
config: os.tmpdir(),
state: os.tmpdir(),
bin: os.tmpdir(),
log: os.tmpdir(),
}),
)
const testLayer = EffectFlock.layer.pipe(Layer.provide(testGlobal), Layer.provide(AppFileSystem.defaultLayer))
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("util.effect-flock", () => {
const it = testEffect(testLayer)
it.live(
"acquire and release via scoped Effect",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const lockDir = lock(dir, "eflock:acquire")
yield* Effect.scoped(flock.acquire("eflock:acquire", dir))
expect(yield* Effect.promise(() => exists(lockDir))).toBe(false)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"withLock data-first",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
let hit = false
yield* flock.withLock(
Effect.sync(() => {
hit = true
}),
"eflock:df",
dir,
)
expect(hit).toBe(true)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"withLock pipeable",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
let hit = false
yield* Effect.sync(() => {
hit = true
}).pipe(flock.withLock("eflock:pipe", dir))
expect(hit).toBe(true)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"writes owner metadata",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const key = "eflock:meta"
const file = path.join(lock(dir, key), "meta.json")
yield* Effect.scoped(
Effect.gen(function* () {
yield* flock.acquire(key, dir)
const json = yield* Effect.promise(() =>
readJson<{ token?: unknown; pid?: unknown; hostname?: unknown; createdAt?: unknown }>(file),
)
expect(typeof json.token).toBe("string")
expect(typeof json.pid).toBe("number")
expect(typeof json.hostname).toBe("string")
expect(typeof json.createdAt).toBe("string")
}),
)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"breaks stale lock dirs",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const key = "eflock:stale"
const lockDir = lock(dir, key)
yield* Effect.promise(async () => {
await fs.mkdir(lockDir, { recursive: true })
const old = new Date(Date.now() - 120_000)
await fs.utimes(lockDir, old, old)
})
let hit = false
yield* flock.withLock(
Effect.sync(() => {
hit = true
}),
key,
dir,
)
expect(hit).toBe(true)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"recovers from stale breaker",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const key = "eflock:stale-breaker"
const lockDir = lock(dir, key)
const breaker = lockDir + ".breaker"
yield* Effect.promise(async () => {
await fs.mkdir(lockDir, { recursive: true })
await fs.mkdir(breaker)
const old = new Date(Date.now() - 120_000)
await fs.utimes(lockDir, old, old)
await fs.utimes(breaker, old, old)
})
let hit = false
yield* flock.withLock(
Effect.sync(() => {
hit = true
}),
key,
dir,
)
expect(hit).toBe(true)
expect(yield* Effect.promise(() => exists(breaker))).toBe(false)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"detects compromise when lock dir removed",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const key = "eflock:compromised"
const lockDir = lock(dir, key)
const result = yield* flock
.withLock(
Effect.promise(() => fs.rm(lockDir, { recursive: true, force: true })),
key,
dir,
)
.pipe(Effect.exit)
expect(Exit.isFailure(result)).toBe(true)
expect(Exit.isFailure(result) ? Cause.pretty(result.cause) : "").toContain("missing")
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"detects token mismatch",
Effect.gen(function* () {
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
const key = "eflock:token"
const lockDir = lock(dir, key)
const meta = path.join(lockDir, "meta.json")
const result = yield* flock
.withLock(
Effect.promise(async () => {
const json = await readJson<{ token?: string }>(meta)
json.token = "tampered"
await fs.writeFile(meta, JSON.stringify(json, null, 2))
}),
key,
dir,
)
.pipe(Effect.exit)
expect(Exit.isFailure(result)).toBe(true)
expect(Exit.isFailure(result) ? Cause.pretty(result.cause) : "").toContain("token mismatch")
expect(yield* Effect.promise(() => exists(lockDir))).toBe(true)
yield* Effect.promise(() => fs.rm(tmp, { recursive: true, force: true }))
}),
)
it.live(
"fails on unwritable lock roots",
Effect.gen(function* () {
if (process.platform === "win32") return
const flock = yield* EffectFlock.Service
const tmp = yield* Effect.promise(() => fs.mkdtemp(path.join(os.tmpdir(), "eflock-test-")))
const dir = path.join(tmp, "locks")
yield* Effect.promise(async () => {
await fs.mkdir(dir, { recursive: true })
await fs.chmod(dir, 0o500)
})
const result = yield* flock.withLock(Effect.void, "eflock:perm", dir).pipe(Effect.exit)
// oxlint-disable-next-line no-base-to-string -- Exit has a useful toString for test assertions
expect(String(result)).toContain("PermissionDenied")
yield* Effect.promise(() => fs.chmod(dir, 0o700).then(() => fs.rm(tmp, { recursive: true, force: true })))
}),
)
it.live(
"enforces mutual exclusion under process contention",
() =>
Effect.promise(async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "eflock-stress-"))
const dir = path.join(tmp, "locks")
const done = path.join(tmp, "done.log")
const active = path.join(tmp, "active")
const n = 16
try {
const out = await Promise.all(
Array.from({ length: n }, () => run({ key: "eflock:stress", dir, done, active, holdMs: 30 })),
)
expect(out.map((x) => x.code)).toEqual(Array.from({ length: n }, () => 0))
expect(out.map((x) => x.stderr.toString()).filter(Boolean)).toEqual([])
const lines = (await fs.readFile(done, "utf8"))
.split("\n")
.map((x) => x.trim())
.filter(Boolean)
expect(lines.length).toBe(n)
} finally {
await fs.rm(tmp, { recursive: true, force: true })
}
}),
60_000,
)
it.live(
"recovers after a crashed lock owner",
() =>
Effect.promise(async () => {
const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "eflock-crash-"))
const dir = path.join(tmp, "locks")
const ready = path.join(tmp, "ready")
const proc = spawnWorker({ key: "eflock:crash", dir, ready, holdMs: 120_000 })
try {
await waitForFile(ready, 5_000)
await stopWorker(proc)
await new Promise((resolve) => proc.on("close", resolve))
// Backdate lock files so they're past STALE_MS (60s)
const lockDir = lock(dir, "eflock:crash")
const old = new Date(Date.now() - 120_000)
await fs.utimes(lockDir, old, old).catch(() => {})
await fs.utimes(path.join(lockDir, "heartbeat"), old, old).catch(() => {})
await fs.utimes(path.join(lockDir, "meta.json"), old, old).catch(() => {})
const done = path.join(tmp, "done.log")
const result = await run({ key: "eflock:crash", dir, done, holdMs: 10 })
expect(result.code).toBe(0)
expect(result.stderr.toString()).toBe("")
} finally {
await stopWorker(proc).catch(() => {})
await fs.rm(tmp, { recursive: true, force: true })
}
}),
30_000,
)
})

View File

@@ -0,0 +1,426 @@
import { describe, expect, test } from "bun:test"
import fs from "fs/promises"
import { spawn } from "child_process"
import path from "path"
import os from "os"
import { Flock } from "@opencode-ai/core/util/flock"
import { Hash } from "@opencode-ai/core/util/hash"
type Msg = {
key: string
dir: string
staleMs?: number
timeoutMs?: number
baseDelayMs?: number
maxDelayMs?: number
holdMs?: number
ready?: string
active?: string
done?: string
}
const root = path.join(import.meta.dir, "../..")
const worker = path.join(import.meta.dir, "../fixture/flock-worker.ts")
async function tmpdir() {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "flock-test-"))
return {
path: dir,
async [Symbol.asyncDispose]() {
await fs.rm(dir, { recursive: true, force: true })
},
}
}
function lock(dir: string, key: string) {
return path.join(dir, Hash.fast(key) + ".lock")
}
function sleep(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(resolve, ms)
})
}
async function exists(file: string) {
return fs
.stat(file)
.then(() => true)
.catch(() => false)
}
async function wait(file: string, timeout = 3_000) {
const stop = Date.now() + timeout
while (Date.now() < stop) {
if (await exists(file)) return
await sleep(20)
}
throw new Error(`Timed out waiting for file: ${file}`)
}
function run(msg: Msg) {
return new Promise<{ code: number; stdout: Buffer; stderr: Buffer }>((resolve) => {
const proc = spawn(process.execPath, [worker, JSON.stringify(msg)], {
cwd: root,
})
const stdout: Buffer[] = []
const stderr: Buffer[] = []
proc.stdout?.on("data", (data) => stdout.push(Buffer.from(data)))
proc.stderr?.on("data", (data) => stderr.push(Buffer.from(data)))
proc.on("close", (code) => {
resolve({
code: code ?? 1,
stdout: Buffer.concat(stdout),
stderr: Buffer.concat(stderr),
})
})
})
}
function spawnWorker(msg: Msg) {
return spawn(process.execPath, [worker, JSON.stringify(msg)], {
cwd: root,
stdio: ["ignore", "pipe", "pipe"],
})
}
function stopWorker(proc: ReturnType<typeof spawnWorker>) {
if (proc.exitCode !== null || proc.signalCode !== null) return Promise.resolve()
if (process.platform !== "win32" || !proc.pid) {
proc.kill()
return Promise.resolve()
}
return new Promise<void>((resolve) => {
const killProc = spawn("taskkill", ["/pid", String(proc.pid), "/T", "/F"])
killProc.on("close", () => {
proc.kill()
resolve()
})
})
}
async function readJson<T>(p: string): Promise<T> {
return JSON.parse(await fs.readFile(p, "utf8"))
}
describe("util.flock", () => {
test("enforces mutual exclusion under process contention", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const done = path.join(tmp.path, "done.log")
const active = path.join(tmp.path, "active")
const key = "flock:stress"
const n = 16
const out = await Promise.all(
Array.from({ length: n }, () =>
run({
key,
dir,
done,
active,
holdMs: 30,
staleMs: 1_000,
timeoutMs: 15_000,
}),
),
)
expect(out.map((x) => x.code)).toEqual(Array.from({ length: n }, () => 0))
expect(out.map((x) => x.stderr.toString()).filter(Boolean)).toEqual([])
const lines = (await fs.readFile(done, "utf8"))
.split("\n")
.map((x) => x.trim())
.filter(Boolean)
expect(lines.length).toBe(n)
}, 20_000)
test("times out while waiting when lock is still healthy", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:timeout"
const ready = path.join(tmp.path, "ready")
const proc = spawnWorker({
key,
dir,
ready,
holdMs: 20_000,
staleMs: 10_000,
timeoutMs: 30_000,
})
try {
await wait(ready, 5_000)
const seen: string[] = []
const err = await Flock.withLock(key, async () => {}, {
dir,
staleMs: 10_000,
timeoutMs: 1_000,
onWait: (tick) => {
seen.push(tick.key)
},
}).catch((err) => err)
expect(err).toBeInstanceOf(Error)
if (!(err instanceof Error)) throw err
expect(err.message).toContain("Timed out waiting for lock")
expect(seen.length).toBeGreaterThan(0)
expect(seen.every((x) => x === key)).toBe(true)
} finally {
await stopWorker(proc).catch(() => undefined)
await new Promise((resolve) => proc.on("close", resolve))
}
}, 15_000)
test("recovers after a crashed lock owner", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:crash"
const ready = path.join(tmp.path, "ready")
const proc = spawnWorker({
key,
dir,
ready,
holdMs: 20_000,
staleMs: 500,
timeoutMs: 30_000,
})
await wait(ready, 5_000)
await stopWorker(proc)
await new Promise((resolve) => proc.on("close", resolve))
let hit = false
await Flock.withLock(
key,
async () => {
hit = true
},
{
dir,
staleMs: 500,
timeoutMs: 8_000,
},
)
expect(hit).toBe(true)
}, 20_000)
test("breaks stale lock dirs when heartbeat is missing", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:missing-heartbeat"
const lockDir = lock(dir, key)
await fs.mkdir(lockDir, { recursive: true })
const old = new Date(Date.now() - 2_000)
await fs.utimes(lockDir, old, old)
let hit = false
await Flock.withLock(
key,
async () => {
hit = true
},
{
dir,
staleMs: 200,
timeoutMs: 3_000,
},
)
expect(hit).toBe(true)
})
test("recovers when a stale breaker claim was left behind", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:stale-breaker"
const lockDir = lock(dir, key)
const breaker = lockDir + ".breaker"
await fs.mkdir(lockDir, { recursive: true })
await fs.mkdir(breaker)
const old = new Date(Date.now() - 2_000)
await fs.utimes(lockDir, old, old)
await fs.utimes(breaker, old, old)
let hit = false
await Flock.withLock(
key,
async () => {
hit = true
},
{
dir,
staleMs: 200,
timeoutMs: 3_000,
},
)
expect(hit).toBe(true)
expect(await exists(breaker)).toBe(false)
})
test("fails clearly if lock dir is removed while held", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:compromised"
const lockDir = lock(dir, key)
const err = await Flock.withLock(
key,
async () => {
await fs.rm(lockDir, {
recursive: true,
force: true,
})
},
{
dir,
staleMs: 1_000,
timeoutMs: 3_000,
},
).catch((err) => err)
expect(err).toBeInstanceOf(Error)
if (!(err instanceof Error)) throw err
expect(err.message).toContain("compromised")
let hit = false
await Flock.withLock(
key,
async () => {
hit = true
},
{
dir,
staleMs: 200,
timeoutMs: 3_000,
},
)
expect(hit).toBe(true)
})
test("writes owner metadata while lock is held", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:meta"
const file = path.join(lock(dir, key), "meta.json")
await Flock.withLock(
key,
async () => {
const json = await readJson<{
token?: unknown
pid?: unknown
hostname?: unknown
createdAt?: unknown
}>(file)
expect(typeof json.token).toBe("string")
expect(typeof json.pid).toBe("number")
expect(typeof json.hostname).toBe("string")
expect(typeof json.createdAt).toBe("string")
},
{
dir,
staleMs: 1_000,
timeoutMs: 3_000,
},
)
})
test("supports acquire with await using", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:acquire"
const lockDir = lock(dir, key)
{
await using _ = await Flock.acquire(key, {
dir,
staleMs: 1_000,
timeoutMs: 3_000,
})
expect(await exists(lockDir)).toBe(true)
}
expect(await exists(lockDir)).toBe(false)
})
test("refuses token mismatch release and recovers from stale", async () => {
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:token"
const lockDir = lock(dir, key)
const meta = path.join(lockDir, "meta.json")
const err = await Flock.withLock(
key,
async () => {
const json = await readJson<{ token?: string }>(meta)
json.token = "tampered"
await fs.writeFile(meta, JSON.stringify(json, null, 2))
},
{
dir,
staleMs: 500,
timeoutMs: 3_000,
},
).catch((err) => err)
expect(err).toBeInstanceOf(Error)
if (!(err instanceof Error)) throw err
expect(err.message).toContain("token mismatch")
expect(await exists(lockDir)).toBe(true)
let hit = false
await Flock.withLock(
key,
async () => {
hit = true
},
{
dir,
staleMs: 500,
timeoutMs: 6_000,
},
)
expect(hit).toBe(true)
})
test("fails clearly on unwritable lock roots", async () => {
if (process.platform === "win32") return
await using tmp = await tmpdir()
const dir = path.join(tmp.path, "locks")
const key = "flock:perm"
await fs.mkdir(dir, { recursive: true })
await fs.chmod(dir, 0o500)
try {
const err = await Flock.withLock(key, async () => {}, {
dir,
staleMs: 100,
timeoutMs: 500,
}).catch((err) => err)
expect(err).toBeInstanceOf(Error)
if (!(err instanceof Error)) throw err
const text = err.message
expect(text.includes("EACCES") || text.includes("EPERM")).toBe(true)
} finally {
await fs.chmod(dir, 0o700)
}
})
})

View File

@@ -0,0 +1,15 @@
{
"$schema": "https://www.schemastore.org/tsconfig",
"_version": "24.0.0",
"compilerOptions": {
"lib": ["es2024", "ESNext.Array", "ESNext.Collection", "ESNext.Error", "ESNext.Iterator", "ESNext.Promise"],
"module": "nodenext",
"moduleResolution": "nodenext",
"target": "es2024",
"noUncheckedIndexedAccess": false,
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true
}
}