From 6825b0bbc7f6f3863a5159b9153e0f3878ace76e Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Mon, 13 Apr 2026 11:47:05 -0400 Subject: [PATCH] refactor(pty): remove async facade exports (#22305) --- packages/opencode/src/pty/index.ts | 31 --- packages/opencode/src/server/instance/pty.ts | 64 +++++- .../test/pty/pty-output-isolation.test.ts | 193 +++++++++--------- .../opencode/test/pty/pty-session.test.ts | 98 +++++---- packages/opencode/test/pty/pty-shell.test.ts | 42 ++-- 5 files changed, 235 insertions(+), 193 deletions(-) diff --git a/packages/opencode/src/pty/index.ts b/packages/opencode/src/pty/index.ts index a563bb954b..1891721851 100644 --- a/packages/opencode/src/pty/index.ts +++ b/packages/opencode/src/pty/index.ts @@ -1,7 +1,6 @@ import { BusEvent } from "@/bus/bus-event" import { Bus } from "@/bus" import { InstanceState } from "@/effect/instance-state" -import { makeRuntime } from "@/effect/run-service" import { Instance } from "@/project/instance" import type { Proc } from "#pty" import z from "zod" @@ -361,34 +360,4 @@ export namespace Pty { ) export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Plugin.defaultLayer)) - - const { runPromise } = makeRuntime(Service, defaultLayer) - - export async function list() { - return runPromise((svc) => svc.list()) - } - - export async function get(id: PtyID) { - return runPromise((svc) => svc.get(id)) - } - - export async function write(id: PtyID, data: string) { - return runPromise((svc) => svc.write(id, data)) - } - - export async function connect(id: PtyID, ws: Socket, cursor?: number) { - return runPromise((svc) => svc.connect(id, ws, cursor)) - } - - export async function create(input: CreateInput) { - return runPromise((svc) => svc.create(input)) - } - - export async function update(id: PtyID, input: UpdateInput) { - return runPromise((svc) => svc.update(id, input)) - } - - export async function remove(id: PtyID) { - return runPromise((svc) => svc.remove(id)) - } } diff --git a/packages/opencode/src/server/instance/pty.ts b/packages/opencode/src/server/instance/pty.ts index c333f4dd69..576cbe5de6 100644 --- a/packages/opencode/src/server/instance/pty.ts +++ b/packages/opencode/src/server/instance/pty.ts @@ -1,7 +1,9 @@ import { Hono, type MiddlewareHandler } from "hono" import { describeRoute, validator, resolver } from "hono-openapi" import type { UpgradeWebSocket } from "hono/ws" +import { Effect } from "effect" import z from "zod" +import { AppRuntime } from "@/effect/app-runtime" import { Pty } from "@/pty" import { PtyID } from "@/pty/schema" import { NotFoundError } from "../../storage/db" @@ -27,7 +29,14 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { }, }), async (c) => { - return c.json(await Pty.list()) + return c.json( + await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.list() + }), + ), + ) }, ) .post( @@ -50,7 +59,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { }), validator("json", Pty.CreateInput), async (c) => { - const info = await Pty.create(c.req.valid("json")) + const info = await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.create(c.req.valid("json")) + }), + ) return c.json(info) }, ) @@ -74,7 +88,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { }), validator("param", z.object({ ptyID: PtyID.zod })), async (c) => { - const info = await Pty.get(c.req.valid("param").ptyID) + const info = await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.get(c.req.valid("param").ptyID) + }), + ) if (!info) { throw new NotFoundError({ message: "Session not found" }) } @@ -102,7 +121,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { validator("param", z.object({ ptyID: PtyID.zod })), validator("json", Pty.UpdateInput), async (c) => { - const info = await Pty.update(c.req.valid("param").ptyID, c.req.valid("json")) + const info = await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.update(c.req.valid("param").ptyID, c.req.valid("json")) + }), + ) return c.json(info) }, ) @@ -126,7 +150,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { }), validator("param", z.object({ ptyID: PtyID.zod })), async (c) => { - await Pty.remove(c.req.valid("param").ptyID) + await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + yield* pty.remove(c.req.valid("param").ptyID) + }), + ) return c.json(true) }, ) @@ -150,6 +179,11 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { }), validator("param", z.object({ ptyID: PtyID.zod })), upgradeWebSocket(async (c) => { + type Handler = { + onMessage: (message: string | ArrayBuffer) => void + onClose: () => void + } + const id = PtyID.zod.parse(c.req.param("ptyID")) const cursor = (() => { const value = c.req.query("cursor") @@ -158,8 +192,17 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { if (!Number.isSafeInteger(parsed) || parsed < -1) return return parsed })() - let handler: Awaited> - if (!(await Pty.get(id))) throw new Error("Session not found") + let handler: Handler | undefined + if ( + !(await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.get(id) + }), + )) + ) { + throw new Error("Session not found") + } type Socket = { readyState: number @@ -185,7 +228,12 @@ export function PtyRoutes(upgradeWebSocket: UpgradeWebSocket) { ws.close() return } - handler = await Pty.connect(id, socket, cursor) + handler = await AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + return yield* pty.connect(id, socket, cursor) + }), + ) ready = true for (const msg of pending) handler?.onMessage(msg) pending.length = 0 diff --git a/packages/opencode/test/pty/pty-output-isolation.test.ts b/packages/opencode/test/pty/pty-output-isolation.test.ts index ec1bbd4690..9ef9741bad 100644 --- a/packages/opencode/test/pty/pty-output-isolation.test.ts +++ b/packages/opencode/test/pty/pty-output-isolation.test.ts @@ -1,4 +1,6 @@ import { describe, expect, test } from "bun:test" +import { AppRuntime } from "../../src/effect/app-runtime" +import { Effect } from "effect" import { Instance } from "../../src/project/instance" import { Pty } from "../../src/pty" import { tmpdir } from "../fixture/fixture" @@ -10,48 +12,48 @@ describe("pty", () => { await Instance.provide({ directory: dir.path, - fn: async () => { - const a = await Pty.create({ command: "cat", title: "a" }) - const b = await Pty.create({ command: "cat", title: "b" }) - try { - const outA: string[] = [] - const outB: string[] = [] + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* pty.create({ command: "cat", title: "a" }) + const b = yield* pty.create({ command: "cat", title: "b" }) + try { + const outA: string[] = [] + const outB: string[] = [] - const ws = { - readyState: 1, - data: { events: { connection: "a" } }, - send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op (simulate abrupt drop) - }, - } + const ws = { + readyState: 1, + data: { events: { connection: "a" } }, + send: (data: unknown) => { + outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + }, + close: () => { + // no-op (simulate abrupt drop) + }, + } - // Connect "a" first with ws. - Pty.connect(a.id, ws as any) + yield* pty.connect(a.id, ws as any) - // Now "reuse" the same ws object for another connection. - ws.data = { events: { connection: "b" } } - ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - } - Pty.connect(b.id, ws as any) + ws.data = { events: { connection: "b" } } + ws.send = (data: unknown) => { + outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + } + yield* pty.connect(b.id, ws as any) - // Clear connect metadata writes. - outA.length = 0 - outB.length = 0 + outA.length = 0 + outB.length = 0 - // Output from a must never show up in b. - Pty.write(a.id, "AAA\n") - await sleep(100) + yield* pty.write(a.id, "AAA\n") + yield* Effect.promise(() => sleep(100)) - expect(outB.join("")).not.toContain("AAA") - } finally { - await Pty.remove(a.id) - await Pty.remove(b.id) - } - }, + expect(outB.join("")).not.toContain("AAA") + } finally { + yield* pty.remove(a.id) + yield* pty.remove(b.id) + } + }), + ), }) }) @@ -60,42 +62,43 @@ describe("pty", () => { await Instance.provide({ directory: dir.path, - fn: async () => { - const a = await Pty.create({ command: "cat", title: "a" }) - try { - const outA: string[] = [] - const outB: string[] = [] + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* pty.create({ command: "cat", title: "a" }) + try { + const outA: string[] = [] + const outB: string[] = [] - const ws = { - readyState: 1, - data: { events: { connection: "a" } }, - send: (data: unknown) => { - outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op (simulate abrupt drop) - }, - } + const ws = { + readyState: 1, + data: { events: { connection: "a" } }, + send: (data: unknown) => { + outA.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + }, + close: () => { + // no-op (simulate abrupt drop) + }, + } - // Connect "a" first. - Pty.connect(a.id, ws as any) - outA.length = 0 + yield* pty.connect(a.id, ws as any) + outA.length = 0 - // Simulate Bun reusing the same websocket object for another - // connection before the next onOpen calls Pty.connect. - ws.data = { events: { connection: "b" } } - ws.send = (data: unknown) => { - outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - } + ws.data = { events: { connection: "b" } } + ws.send = (data: unknown) => { + outB.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + } - Pty.write(a.id, "AAA\n") - await sleep(100) + yield* pty.write(a.id, "AAA\n") + yield* Effect.promise(() => sleep(100)) - expect(outB.join("")).not.toContain("AAA") - } finally { - await Pty.remove(a.id) - } - }, + expect(outB.join("")).not.toContain("AAA") + } finally { + yield* pty.remove(a.id) + } + }), + ), }) }) @@ -104,38 +107,40 @@ describe("pty", () => { await Instance.provide({ directory: dir.path, - fn: async () => { - const a = await Pty.create({ command: "cat", title: "a" }) - try { - const out: string[] = [] + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const a = yield* pty.create({ command: "cat", title: "a" }) + try { + const out: string[] = [] - const ctx = { connId: 1 } - const ws = { - readyState: 1, - data: ctx, - send: (data: unknown) => { - out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) - }, - close: () => { - // no-op - }, - } + const ctx = { connId: 1 } + const ws = { + readyState: 1, + data: ctx, + send: (data: unknown) => { + out.push(typeof data === "string" ? data : Buffer.from(data as Uint8Array).toString("utf8")) + }, + close: () => { + // no-op + }, + } - Pty.connect(a.id, ws as any) - out.length = 0 + yield* pty.connect(a.id, ws as any) + out.length = 0 - // Mutating fields on ws.data should not look like a new - // connection lifecycle when the object identity stays stable. - ctx.connId = 2 + ctx.connId = 2 - Pty.write(a.id, "AAA\n") - await sleep(100) + yield* pty.write(a.id, "AAA\n") + yield* Effect.promise(() => sleep(100)) - expect(out.join("")).toContain("AAA") - } finally { - await Pty.remove(a.id) - } - }, + expect(out.join("")).toContain("AAA") + } finally { + yield* pty.remove(a.id) + } + }), + ), }) }) }) diff --git a/packages/opencode/test/pty/pty-session.test.ts b/packages/opencode/test/pty/pty-session.test.ts index f7a949c921..3e4d658355 100644 --- a/packages/opencode/test/pty/pty-session.test.ts +++ b/packages/opencode/test/pty/pty-session.test.ts @@ -1,5 +1,7 @@ import { describe, expect, test } from "bun:test" +import { AppRuntime } from "../../src/effect/app-runtime" import { Bus } from "../../src/bus" +import { Effect } from "effect" import { Instance } from "../../src/project/instance" import { Pty } from "../../src/pty" import type { PtyID } from "../../src/pty/schema" @@ -27,33 +29,37 @@ describe("pty", () => { await Instance.provide({ directory: dir.path, - fn: async () => { - const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = [] - const off = [ - Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })), - Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })), - Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })), - ] + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = [] + const off = [ + Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })), + Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })), + Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })), + ] - let id: PtyID | undefined - try { - const info = await Pty.create({ - command: "/usr/bin/env", - args: ["sh", "-c", "sleep 0.1"], - title: "sleep", - }) - id = info.id + let id: PtyID | undefined + try { + const info = yield* pty.create({ + command: "/usr/bin/env", + args: ["sh", "-c", "sleep 0.1"], + title: "sleep", + }) + id = info.id - await wait(() => pick(log, id!).includes("exited")) + yield* Effect.promise(() => wait(() => pick(log, id!).includes("exited"))) - await Pty.remove(id) - await wait(() => pick(log, id!).length >= 3) - expect(pick(log, id!)).toEqual(["created", "exited", "deleted"]) - } finally { - off.forEach((x) => x()) - if (id) await Pty.remove(id) - } - }, + yield* pty.remove(id) + yield* Effect.promise(() => wait(() => pick(log, id!).length >= 3)) + expect(pick(log, id!)).toEqual(["created", "exited", "deleted"]) + } finally { + off.forEach((x) => x()) + if (id) yield* pty.remove(id) + } + }), + ), }) }) @@ -64,29 +70,33 @@ describe("pty", () => { await Instance.provide({ directory: dir.path, - fn: async () => { - const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = [] - const off = [ - Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })), - Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })), - Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })), - ] + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const log: Array<{ type: "created" | "exited" | "deleted"; id: PtyID }> = [] + const off = [ + Bus.subscribe(Pty.Event.Created, (evt) => log.push({ type: "created", id: evt.properties.info.id })), + Bus.subscribe(Pty.Event.Exited, (evt) => log.push({ type: "exited", id: evt.properties.id })), + Bus.subscribe(Pty.Event.Deleted, (evt) => log.push({ type: "deleted", id: evt.properties.id })), + ] - let id: PtyID | undefined - try { - const info = await Pty.create({ command: "/bin/sh", title: "sh" }) - id = info.id + let id: PtyID | undefined + try { + const info = yield* pty.create({ command: "/bin/sh", title: "sh" }) + id = info.id - await sleep(100) + yield* Effect.promise(() => sleep(100)) - await Pty.remove(id) - await wait(() => pick(log, id!).length >= 3) - expect(pick(log, id!)).toEqual(["created", "exited", "deleted"]) - } finally { - off.forEach((x) => x()) - if (id) await Pty.remove(id) - } - }, + yield* pty.remove(id) + yield* Effect.promise(() => wait(() => pick(log, id!).length >= 3)) + expect(pick(log, id!)).toEqual(["created", "exited", "deleted"]) + } finally { + off.forEach((x) => x()) + if (id) yield* pty.remove(id) + } + }), + ), }) }) }) diff --git a/packages/opencode/test/pty/pty-shell.test.ts b/packages/opencode/test/pty/pty-shell.test.ts index 65e7e1f901..d5182061d0 100644 --- a/packages/opencode/test/pty/pty-shell.test.ts +++ b/packages/opencode/test/pty/pty-shell.test.ts @@ -1,4 +1,6 @@ import { describe, expect, test } from "bun:test" +import { AppRuntime } from "../../src/effect/app-runtime" +import { Effect } from "effect" import { Instance } from "../../src/project/instance" import { Pty } from "../../src/pty" import { Shell } from "../../src/shell/shell" @@ -17,14 +19,18 @@ describe("pty shell args", () => { await using dir = await tmpdir() await Instance.provide({ directory: dir.path, - fn: async () => { - const info = await Pty.create({ command: ps, title: "pwsh" }) - try { - expect(info.args).toEqual([]) - } finally { - await Pty.remove(info.id) - } - }, + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* pty.create({ command: ps, title: "pwsh" }) + try { + expect(info.args).toEqual([]) + } finally { + yield* pty.remove(info.id) + } + }), + ), }) }, { timeout: 30000 }, @@ -43,14 +49,18 @@ describe("pty shell args", () => { await using dir = await tmpdir() await Instance.provide({ directory: dir.path, - fn: async () => { - const info = await Pty.create({ command: bash, title: "bash" }) - try { - expect(info.args).toEqual(["-l"]) - } finally { - await Pty.remove(info.id) - } - }, + fn: () => + AppRuntime.runPromise( + Effect.gen(function* () { + const pty = yield* Pty.Service + const info = yield* pty.create({ command: bash, title: "bash" }) + try { + expect(info.args).toEqual(["-l"]) + } finally { + yield* pty.remove(info.id) + } + }), + ), }) }, { timeout: 30000 },