diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts index 5717fda39e..977357a9f2 100644 --- a/apps/server/src/terminal/Layers/Manager.test.ts +++ b/apps/server/src/terminal/Layers/Manager.test.ts @@ -1,24 +1,39 @@ -import fs from "node:fs"; -import os from "node:os"; import path from "node:path"; +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { assert, it } from "@effect/vitest"; import { DEFAULT_TERMINAL_ID, type TerminalEvent, type TerminalOpenInput, type TerminalRestartInput, } from "@t3tools/contracts"; -import { afterEach, describe, expect, it } from "vitest"; - import { - PtySpawnError, + Duration, + Effect, + Encoding, + Exit, + Fiber, + FileSystem, + Option, + PlatformError, + Ref, + Schedule, + Scope, + Stream, +} from "effect"; +import { TestClock } from "effect/testing"; +import { expect } from "vitest"; + +import type { TerminalManagerShape } from "../Services/Manager"; +import { type PtyAdapterShape, type PtyExitEvent, type PtyProcess, type PtySpawnInput, + PtySpawnError, } from "../Services/PTY"; -import { TerminalManagerRuntime } from "./Manager"; -import { Effect, Encoding } from "effect"; +import { makeTerminalManagerWithOptions } from "./Manager"; class FakePtyProcess implements PtyProcess { readonly writes: string[] = []; @@ -107,27 +122,29 @@ class FakePtyAdapter implements PtyAdapterShape { } } -function waitFor(predicate: () => boolean, timeoutMs = 800): Promise { - const started = Date.now(); - return new Promise((resolve, reject) => { - const poll = () => { - if (predicate()) { - resolve(); - return; - } - if (Date.now() - started > timeoutMs) { - reject(new Error("Timed out waiting for condition")); - return; - } - setTimeout(poll, 15); - }; - poll(); - }); -} +const waitFor = ( + predicate: Effect.Effect, + timeout: Duration.Input = 800, +): Effect.Effect => + predicate.pipe( + Effect.filterOrFail( + (done) => done, + () => new Error("Condition not met"), + ), + Effect.retry(Schedule.spaced("15 millis")), + Effect.timeoutOption(timeout), + Effect.flatMap((result) => + Option.match(result, { + onNone: () => Effect.fail(new Error("Timed out waiting for condition")), + onSome: () => Effect.void, + }), + ), + ); function openInput(overrides: Partial = {}): TerminalOpenInput { return { threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, cwd: process.cwd(), cols: 100, rows: 24, @@ -138,6 +155,7 @@ function openInput(overrides: Partial = {}): TerminalOpenInpu function restartInput(overrides: Partial = {}): TerminalRestartInput { return { threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, cwd: process.cwd(), cols: 100, rows: 24, @@ -169,598 +187,710 @@ function multiTerminalHistoryLogPath( return path.join(logsDir, multiTerminalHistoryLogName(threadId, terminalId)); } -describe("TerminalManager", () => { - const tempDirs: string[] = []; - - afterEach(() => { - for (const dir of tempDirs.splice(0, tempDirs.length)) { - fs.rmSync(dir, { recursive: true, force: true }); - } - }); - - function makeManager( - historyLineLimit = 5, - options: { - shellResolver?: () => string; - subprocessChecker?: (terminalPid: number) => Promise; - subprocessPollIntervalMs?: number; - processKillGraceMs?: number; - maxRetainedInactiveSessions?: number; - ptyAdapter?: FakePtyAdapter; - } = {}, - ) { - const logsDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3code-terminal-")); - tempDirs.push(logsDir); - const ptyAdapter = options.ptyAdapter ?? new FakePtyAdapter(); - const manager = new TerminalManagerRuntime({ - logsDir, - ptyAdapter, - historyLineLimit, - shellResolver: options.shellResolver ?? (() => "/bin/bash"), - ...(options.subprocessChecker ? { subprocessChecker: options.subprocessChecker } : {}), - ...(options.subprocessPollIntervalMs - ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs } - : {}), - ...(options.processKillGraceMs ? { processKillGraceMs: options.processKillGraceMs } : {}), - ...(options.maxRetainedInactiveSessions - ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions } - : {}), - }); - return { logsDir, ptyAdapter, manager }; - } - - it("spawns lazily and reuses running terminal per thread", async () => { - const { manager, ptyAdapter } = makeManager(); - const [first, second] = await Promise.all([ - manager.open(openInput()), - manager.open(openInput()), - ]); - const third = await manager.open(openInput()); - - expect(first.threadId).toBe("thread-1"); - expect(first.terminalId).toBe("default"); - expect(second.threadId).toBe("thread-1"); - expect(third.threadId).toBe("thread-1"); - expect(ptyAdapter.spawnInputs).toHaveLength(1); - - manager.dispose(); - }); - - it("supports asynchronous PTY spawn effects", async () => { - const { manager, ptyAdapter } = makeManager(5, { ptyAdapter: new FakePtyAdapter("async") }); - - const snapshot = await manager.open(openInput()); - - expect(snapshot.status).toBe("running"); - expect(ptyAdapter.spawnInputs).toHaveLength(1); - expect(ptyAdapter.processes).toHaveLength(1); - - manager.dispose(); - }); - - it("forwards write and resize to active pty process", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - await manager.write({ threadId: "thread-1", data: "ls\n" }); - await manager.resize({ threadId: "thread-1", cols: 120, rows: 30 }); - - expect(process.writes).toEqual(["ls\n"]); - expect(process.resizeCalls).toEqual([{ cols: 120, rows: 30 }]); - - manager.dispose(); - }); - - it("resizes running terminal on open when a different size is requested", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput({ cols: 100, rows: 24 })); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - await manager.open(openInput({ cols: 140, rows: 40 })); - - expect(process.resizeCalls).toEqual([{ cols: 140, rows: 40 }]); - - manager.dispose(); - }); - - it("preserves existing terminal size on open when size is omitted", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput({ cols: 100, rows: 24 })); - const ptyProcess = ptyAdapter.processes[0]; - expect(ptyProcess).toBeDefined(); - if (!ptyProcess) return; - - await manager.open({ - threadId: "thread-1", - cwd: globalThis.process.cwd(), - }); - - expect(ptyProcess.resizeCalls).toEqual([]); - - ptyProcess.emitExit({ exitCode: 0, signal: 0 }); - await manager.open({ - threadId: "thread-1", - cwd: globalThis.process.cwd(), - }); - - const resumedSpawn = ptyAdapter.spawnInputs[1]; - expect(resumedSpawn).toBeDefined(); - if (!resumedSpawn) return; - expect(resumedSpawn.cols).toBe(100); - expect(resumedSpawn.rows).toBe(24); - - manager.dispose(); - }); - - it("uses default dimensions when opening a new terminal without size hints", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open({ - threadId: "thread-1", - cwd: process.cwd(), - }); - - const spawned = ptyAdapter.spawnInputs[0]; - expect(spawned).toBeDefined(); - if (!spawned) return; - expect(spawned.cols).toBe(120); - expect(spawned.rows).toBe(30); - - manager.dispose(); - }); - - it("supports multiple terminals per thread with isolated sessions", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput({ terminalId: "default" })); - await manager.open(openInput({ terminalId: "term-2" })); - - const first = ptyAdapter.processes[0]; - const second = ptyAdapter.processes[1]; - expect(first).toBeDefined(); - expect(second).toBeDefined(); - if (!first || !second) return; - - await manager.write({ threadId: "thread-1", terminalId: "default", data: "pwd\n" }); - await manager.write({ threadId: "thread-1", terminalId: "term-2", data: "ls\n" }); - - expect(first.writes).toEqual(["pwd\n"]); - expect(second.writes).toEqual(["ls\n"]); - expect(ptyAdapter.spawnInputs).toHaveLength(2); - - manager.dispose(); - }); - - it("clears transcript and emits cleared event", async () => { - const { manager, ptyAdapter, logsDir } = makeManager(); - const events: TerminalEvent[] = []; - manager.on("event", (event) => { - events.push(event); - }); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("hello\n"); - await waitFor(() => fs.existsSync(historyLogPath(logsDir))); - await manager.clear({ threadId: "thread-1" }); - await waitFor(() => fs.readFileSync(historyLogPath(logsDir), "utf8") === ""); - - expect(events.some((event) => event.type === "cleared")).toBe(true); - expect( - events.some( - (event) => - event.type === "cleared" && - event.threadId === "thread-1" && - event.terminalId === "default", - ), - ).toBe(true); - - manager.dispose(); - }); - - it("restarts terminal with empty transcript and respawns pty", async () => { - const { manager, ptyAdapter, logsDir } = makeManager(); - await manager.open(openInput()); - const firstProcess = ptyAdapter.processes[0]; - expect(firstProcess).toBeDefined(); - if (!firstProcess) return; - firstProcess.emitData("before restart\n"); - await waitFor(() => fs.existsSync(historyLogPath(logsDir))); - - const snapshot = await manager.restart(restartInput()); - expect(snapshot.history).toBe(""); - expect(snapshot.status).toBe("running"); - expect(ptyAdapter.spawnInputs).toHaveLength(2); - await waitFor(() => fs.readFileSync(historyLogPath(logsDir), "utf8") === ""); - - manager.dispose(); - }); - - it("emits exited event and reopens with clean transcript after exit", async () => { - const { manager, ptyAdapter, logsDir } = makeManager(); - const events: TerminalEvent[] = []; - manager.on("event", (event) => { - events.push(event); - }); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - process.emitData("old data\n"); - await waitFor(() => fs.existsSync(historyLogPath(logsDir))); - process.emitExit({ exitCode: 0, signal: 0 }); - - await waitFor(() => events.some((event) => event.type === "exited")); - const reopened = await manager.open(openInput()); - - expect(reopened.history).toBe(""); - expect(ptyAdapter.spawnInputs).toHaveLength(2); - expect(fs.readFileSync(historyLogPath(logsDir), "utf8")).toBe(""); - - manager.dispose(); - }); - - it("ignores trailing writes after terminal exit", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitExit({ exitCode: 0, signal: 0 }); - - await expect(manager.write({ threadId: "thread-1", data: "\r" })).resolves.toBeUndefined(); - expect(process.writes).toEqual([]); - - manager.dispose(); - }); - - it("emits subprocess activity events when child-process state changes", async () => { - let hasRunningSubprocess = false; - const { manager } = makeManager(5, { - subprocessChecker: async () => hasRunningSubprocess, - subprocessPollIntervalMs: 20, - }); - const events: TerminalEvent[] = []; - manager.on("event", (event) => { - events.push(event); - }); - - await manager.open(openInput()); - await waitFor(() => events.some((event) => event.type === "started")); - expect(events.some((event) => event.type === "activity")).toBe(false); - - hasRunningSubprocess = true; - await waitFor( - () => - events.some((event) => event.type === "activity" && event.hasRunningSubprocess === true), - 1_200, - ); +interface CreateManagerOptions { + shellResolver?: () => string; + subprocessChecker?: (terminalPid: number) => Effect.Effect; + subprocessPollIntervalMs?: number; + processKillGraceMs?: number; + maxRetainedInactiveSessions?: number; + ptyAdapter?: FakePtyAdapter; +} - hasRunningSubprocess = false; - await waitFor( - () => - events.some((event) => event.type === "activity" && event.hasRunningSubprocess === false), - 1_200, - ); +interface ManagerFixture { + readonly baseDir: string; + readonly logsDir: string; + readonly ptyAdapter: FakePtyAdapter; + readonly manager: TerminalManagerShape; + readonly getEvents: Effect.Effect>; +} - manager.dispose(); - }); - - it("caps persisted history to configured line limit", async () => { - const { manager, ptyAdapter } = makeManager(3); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("line1\nline2\nline3\nline4\n"); - await manager.close({ threadId: "thread-1" }); - - const reopened = await manager.open(openInput()); - const nonEmptyLines = reopened.history.split("\n").filter((line) => line.length > 0); - expect(nonEmptyLines).toEqual(["line2", "line3", "line4"]); - - manager.dispose(); - }); - - it("strips replay-unsafe terminal query and reply sequences from persisted history", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("prompt "); - process.emitData("\u001b[32mok\u001b[0m "); - process.emitData("\u001b]11;rgb:ffff/ffff/ffff\u0007"); - process.emitData("\u001b[1;1R"); - process.emitData("done\n"); - - await manager.close({ threadId: "thread-1" }); - - const reopened = await manager.open(openInput()); - expect(reopened.history).toBe("prompt \u001b[32mok\u001b[0m done\n"); - - manager.dispose(); - }); - - it("preserves clear and style control sequences while dropping chunk-split query traffic", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("before clear\n"); - process.emitData("\u001b[H\u001b[2J"); - process.emitData("prompt "); - process.emitData("\u001b]11;"); - process.emitData("rgb:ffff/ffff/ffff\u0007\u001b[1;1"); - process.emitData("R\u001b[36mdone\u001b[0m\n"); - - await manager.close({ threadId: "thread-1" }); - - const reopened = await manager.open(openInput()); - expect(reopened.history).toBe( - "before clear\n\u001b[H\u001b[2Jprompt \u001b[36mdone\u001b[0m\n", +const createManager = ( + historyLineLimit = 5, + options: CreateManagerOptions = {}, +): Effect.Effect< + ManagerFixture, + PlatformError.PlatformError, + FileSystem.FileSystem | Scope.Scope +> => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => + Effect.gen(function* () { + const baseDir = yield* fs.makeTempDirectoryScoped({ prefix: "t3code-terminal-" }); + const logsDir = path.join(baseDir, "userdata", "logs", "terminals"); + const ptyAdapter = options.ptyAdapter ?? new FakePtyAdapter(); + + const manager = yield* makeTerminalManagerWithOptions({ + logsDir, + historyLineLimit, + ptyAdapter, + ...(options.shellResolver !== undefined ? { shellResolver: options.shellResolver } : {}), + ...(options.subprocessChecker !== undefined + ? { subprocessChecker: options.subprocessChecker } + : {}), + ...(options.subprocessPollIntervalMs !== undefined + ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs } + : {}), + ...(options.processKillGraceMs !== undefined + ? { processKillGraceMs: options.processKillGraceMs } + : {}), + ...(options.maxRetainedInactiveSessions !== undefined + ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions } + : {}), + }); + const eventsRef = yield* Ref.make>([]); + + yield* Stream.runForEach(manager.streamEvents, (event) => + Ref.update(eventsRef, (events) => [...events, event]), + ).pipe(Effect.forkScoped); + + return { + baseDir, + logsDir, + ptyAdapter, + manager, + getEvents: Ref.get(eventsRef), + }; + }), + ); + +it.layer(NodeServices.layer, { excludeTestServices: true })("TerminalManager", (it) => { + it.effect("spawns lazily and reuses running terminal per thread", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + const [first, second] = yield* Effect.all( + [manager.open(openInput()), manager.open(openInput())], + { concurrency: "unbounded" }, + ); + const third = yield* manager.open(openInput()); + + assert.equal(first.threadId, "thread-1"); + assert.equal(first.terminalId, "default"); + assert.equal(second.threadId, "thread-1"); + assert.equal(third.threadId, "thread-1"); + expect(ptyAdapter.spawnInputs).toHaveLength(1); + }), + ); + + const makeDirectory = (filePath: string) => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => + fs.makeDirectory(filePath, { recursive: true }), ); - manager.dispose(); - }); - - it("does not leak final bytes from ESC sequences with intermediate bytes", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("before "); - process.emitData("\u001b(B"); - process.emitData("after\n"); + const chmod = (filePath: string, mode: number) => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => fs.chmod(filePath, mode)); - await manager.close({ threadId: "thread-1" }); - - const reopened = await manager.open(openInput()); - expect(reopened.history).toBe("before \u001b(Bafter\n"); - - manager.dispose(); - }); + const pathExists = (filePath: string) => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => fs.exists(filePath)); - it("preserves chunk-split ESC sequences with intermediate bytes without leaking final bytes", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - - process.emitData("before "); - process.emitData("\u001b("); - process.emitData("Bafter\n"); + const readFileString = (filePath: string) => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => fs.readFileString(filePath)); - await manager.close({ threadId: "thread-1" }); - - const reopened = await manager.open(openInput()); - expect(reopened.history).toBe("before \u001b(Bafter\n"); - - manager.dispose(); - }); - - it("deletes history file when close(deleteHistory=true)", async () => { - const { manager, ptyAdapter, logsDir } = makeManager(); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; - process.emitData("bye\n"); - await waitFor(() => fs.existsSync(historyLogPath(logsDir))); - - await manager.close({ threadId: "thread-1", deleteHistory: true }); - expect(fs.existsSync(historyLogPath(logsDir))).toBe(false); - - manager.dispose(); - }); - - it("closes all terminals for a thread when close omits terminalId", async () => { - const { manager, ptyAdapter, logsDir } = makeManager(); - await manager.open(openInput({ terminalId: "default" })); - await manager.open(openInput({ terminalId: "sidecar" })); - const defaultProcess = ptyAdapter.processes[0]; - const sidecarProcess = ptyAdapter.processes[1]; - expect(defaultProcess).toBeDefined(); - expect(sidecarProcess).toBeDefined(); - if (!defaultProcess || !sidecarProcess) return; - - defaultProcess.emitData("default\n"); - sidecarProcess.emitData("sidecar\n"); - await waitFor(() => fs.existsSync(multiTerminalHistoryLogPath(logsDir, "thread-1", "default"))); - await waitFor(() => fs.existsSync(multiTerminalHistoryLogPath(logsDir, "thread-1", "sidecar"))); - - await manager.close({ threadId: "thread-1", deleteHistory: true }); - - expect(defaultProcess.killed).toBe(true); - expect(sidecarProcess.killed).toBe(true); - expect(fs.existsSync(multiTerminalHistoryLogPath(logsDir, "thread-1", "default"))).toBe(false); - expect(fs.existsSync(multiTerminalHistoryLogPath(logsDir, "thread-1", "sidecar"))).toBe(false); + const writeFileString = (filePath: string, contents: string) => + Effect.flatMap(Effect.service(FileSystem.FileSystem), (fs) => + fs.writeFileString(filePath, contents), + ); - manager.dispose(); - }); + it.effect("preserves non-notFound cwd stat failures", () => + Effect.gen(function* () { + const { manager, baseDir } = yield* createManager(); + const blockedRoot = path.join(baseDir, "blocked-root"); + const blockedCwd = path.join(blockedRoot, "cwd"); + yield* makeDirectory(blockedCwd); + yield* chmod(blockedRoot, 0o000); - it("escalates terminal shutdown to SIGKILL when process does not exit in time", async () => { - const { manager, ptyAdapter } = makeManager(5, { processKillGraceMs: 10 }); - await manager.open(openInput()); - const process = ptyAdapter.processes[0]; - expect(process).toBeDefined(); - if (!process) return; + const error = yield* Effect.flip(manager.open(openInput({ cwd: blockedCwd }))).pipe( + Effect.ensuring(chmod(blockedRoot, 0o755).pipe(Effect.ignore)), + ); - await manager.close({ threadId: "thread-1" }); - await waitFor(() => process.killSignals.includes("SIGKILL")); + expect(error).toMatchObject({ + _tag: "TerminalCwdError", + cwd: blockedCwd, + reason: "statFailed", + }); + }), + ); - expect(process.killSignals[0]).toBe("SIGTERM"); - expect(process.killSignals).toContain("SIGKILL"); + it.effect("supports asynchronous PTY spawn effects", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(5, { + ptyAdapter: new FakePtyAdapter("async"), + }); - manager.dispose(); - }); + const snapshot = yield* manager.open(openInput()); + + assert.equal(snapshot.status, "running"); + expect(ptyAdapter.spawnInputs).toHaveLength(1); + expect(ptyAdapter.processes).toHaveLength(1); + }), + ); + + it.effect("forwards write and resize to active pty process", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + yield* manager.write({ + threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, + data: "ls\n", + }); + yield* manager.resize({ + threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, + cols: 120, + rows: 30, + }); - it("evicts oldest inactive terminal sessions when retention limit is exceeded", async () => { - const { manager, ptyAdapter } = makeManager(5, { maxRetainedInactiveSessions: 1 }); + expect(process.writes).toEqual(["ls\n"]); + expect(process.resizeCalls).toEqual([{ cols: 120, rows: 30 }]); + }), + ); + + it.effect("resizes running terminal on open when a different size is requested", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput({ cols: 100, rows: 24 })); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + const reopened = yield* manager.open(openInput({ cols: 120, rows: 30 })); + + assert.equal(reopened.status, "running"); + expect(process.resizeCalls).toEqual([{ cols: 120, rows: 30 }]); + }), + ); + + it.effect("supports multiple terminals per thread independently", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput({ terminalId: "default" })); + yield* manager.open(openInput({ terminalId: "term-2" })); + + const first = ptyAdapter.processes[0]; + const second = ptyAdapter.processes[1]; + expect(first).toBeDefined(); + expect(second).toBeDefined(); + if (!first || !second) return; + + yield* manager.write({ threadId: "thread-1", terminalId: "default", data: "pwd\n" }); + yield* manager.write({ threadId: "thread-1", terminalId: "term-2", data: "ls\n" }); + + expect(first.writes).toEqual(["pwd\n"]); + expect(second.writes).toEqual(["ls\n"]); + expect(ptyAdapter.spawnInputs).toHaveLength(2); + }), + ); + + it.effect("clears transcript and emits cleared event", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir, getEvents } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("hello\n"); + yield* waitFor(pathExists(historyLogPath(logsDir))); + yield* manager.clear({ threadId: "thread-1", terminalId: DEFAULT_TERMINAL_ID }); + yield* waitFor(Effect.map(readFileString(historyLogPath(logsDir)), (text) => text === "")); + + const events = yield* getEvents; + expect(events.some((event) => event.type === "cleared")).toBe(true); + expect( + events.some( + (event) => + event.type === "cleared" && + event.threadId === "thread-1" && + event.terminalId === "default", + ), + ).toBe(true); + }), + ); + + it.effect("restarts terminal with empty transcript and respawns pty", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir } = yield* createManager(); + yield* manager.open(openInput()); + const firstProcess = ptyAdapter.processes[0]; + expect(firstProcess).toBeDefined(); + if (!firstProcess) return; + firstProcess.emitData("before restart\n"); + yield* waitFor(pathExists(historyLogPath(logsDir))); + + const snapshot = yield* manager.restart(restartInput()); + assert.equal(snapshot.history, ""); + assert.equal(snapshot.status, "running"); + expect(ptyAdapter.spawnInputs).toHaveLength(2); + yield* waitFor(Effect.map(readFileString(historyLogPath(logsDir)), (text) => text === "")); + }), + ); + + it.effect("emits exited event and reopens with clean transcript after exit", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir, getEvents } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + process.emitData("old data\n"); + yield* waitFor(pathExists(historyLogPath(logsDir))); + process.emitExit({ exitCode: 0, signal: 0 }); + + yield* waitFor( + Effect.map(getEvents, (events) => events.some((event) => event.type === "exited")), + ); + const reopened = yield* manager.open(openInput()); + + assert.equal(reopened.history, ""); + expect(ptyAdapter.spawnInputs).toHaveLength(2); + expect(yield* readFileString(historyLogPath(logsDir))).toBe(""); + }), + ); + + it.effect("ignores trailing writes after terminal exit", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitExit({ exitCode: 0, signal: 0 }); + + yield* manager.write({ + threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, + data: "\r", + }); + expect(process.writes).toEqual([]); + }), + ); + + it.effect("emits subprocess activity events when child-process state changes", () => + Effect.gen(function* () { + let hasRunningSubprocess = false; + const { manager, getEvents } = yield* createManager(5, { + subprocessChecker: () => Effect.succeed(hasRunningSubprocess), + subprocessPollIntervalMs: 20, + }); - await manager.open(openInput({ threadId: "thread-1" })); - await manager.open(openInput({ threadId: "thread-2" })); - - const first = ptyAdapter.processes[0]; - const second = ptyAdapter.processes[1]; - expect(first).toBeDefined(); - expect(second).toBeDefined(); - if (!first || !second) return; + yield* manager.open(openInput()); + expect((yield* getEvents).some((event) => event.type === "activity")).toBe(false); - first.emitExit({ exitCode: 0, signal: 0 }); - await new Promise((resolve) => setTimeout(resolve, 5)); - second.emitExit({ exitCode: 0, signal: 0 }); + hasRunningSubprocess = true; + yield* waitFor( + Effect.map(getEvents, (events) => + events.some((event) => event.type === "activity" && event.hasRunningSubprocess === true), + ), + "1200 millis", + ); - await waitFor(() => { - const sessions = (manager as unknown as { sessions: Map }).sessions; - return sessions.size === 1; - }); - - const sessions = (manager as unknown as { sessions: Map }).sessions; - const keys = [...sessions.keys()]; - expect(keys).toEqual(["thread-2\u0000default"]); - - manager.dispose(); - }); - - it("migrates legacy transcript filenames to terminal-scoped history path on open", async () => { - const { manager, logsDir } = makeManager(); - const legacyPath = path.join(logsDir, "thread-1.log"); - const nextPath = historyLogPath(logsDir); - fs.writeFileSync(legacyPath, "legacy-line\n", "utf8"); - - const snapshot = await manager.open(openInput()); + hasRunningSubprocess = false; + yield* waitFor( + Effect.map(getEvents, (events) => + events.some((event) => event.type === "activity" && event.hasRunningSubprocess === false), + ), + "1200 millis", + ); + }), + ); + + it.effect("does not invoke subprocess polling until a terminal session is running", () => + Effect.gen(function* () { + let checks = 0; + const { manager } = yield* createManager(5, { + subprocessChecker: () => { + checks += 1; + return Effect.succeed(false); + }, + subprocessPollIntervalMs: 20, + }); - expect(snapshot.history).toBe("legacy-line\n"); - expect(fs.existsSync(nextPath)).toBe(true); - expect(fs.readFileSync(nextPath, "utf8")).toBe("legacy-line\n"); - expect(fs.existsSync(legacyPath)).toBe(false); - - manager.dispose(); - }); - - it("retries with fallback shells when preferred shell spawn fails", async () => { - const { manager, ptyAdapter } = makeManager(5, { - shellResolver: () => "/definitely/missing-shell -l", - }); - ptyAdapter.spawnFailures.push(new Error("posix_spawnp failed.")); - - const snapshot = await manager.open(openInput()); + yield* Effect.sleep("80 millis"); + assert.equal(checks, 0); - expect(snapshot.status).toBe("running"); - expect(ptyAdapter.spawnInputs.length).toBeGreaterThanOrEqual(2); - expect(ptyAdapter.spawnInputs[0]?.shell).toBe("/definitely/missing-shell"); + yield* manager.open(openInput()); + yield* waitFor( + Effect.sync(() => checks > 0), + "1200 millis", + ); + }), + ); + + it.effect("caps persisted history to configured line limit", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(3); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("line1\nline2\nline3\nline4\n"); + yield* manager.close({ threadId: "thread-1" }); + + const reopened = yield* manager.open(openInput()); + const nonEmptyLines = reopened.history.split("\n").filter((line) => line.length > 0); + expect(nonEmptyLines).toEqual(["line2", "line3", "line4"]); + }), + ); + + it.effect("strips replay-unsafe terminal query and reply sequences from persisted history", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("prompt "); + process.emitData("\u001b[32mok\u001b[0m "); + process.emitData("\u001b]11;rgb:ffff/ffff/ffff\u0007"); + process.emitData("\u001b[1;1R"); + process.emitData("done\n"); + + yield* manager.close({ threadId: "thread-1" }); + + const reopened = yield* manager.open(openInput()); + assert.equal(reopened.history, "prompt \u001b[32mok\u001b[0m done\n"); + }), + ); + + it.effect( + "preserves clear and style control sequences while dropping chunk-split query traffic", + () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("before clear\n"); + process.emitData("\u001b[H\u001b[2J"); + process.emitData("prompt "); + process.emitData("\u001b]11;"); + process.emitData("rgb:ffff/ffff/ffff\u0007\u001b[1;1"); + process.emitData("R\u001b[36mdone\u001b[0m\n"); + + yield* manager.close({ threadId: "thread-1" }); + + const reopened = yield* manager.open(openInput()); + assert.equal( + reopened.history, + "before clear\n\u001b[H\u001b[2Jprompt \u001b[36mdone\u001b[0m\n", + ); + }), + ); + + it.effect("does not leak final bytes from ESC sequences with intermediate bytes", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("before "); + process.emitData("\u001b(B"); + process.emitData("after\n"); + + yield* manager.close({ threadId: "thread-1" }); + + const reopened = yield* manager.open(openInput()); + assert.equal(reopened.history, "before \u001b(Bafter\n"); + }), + ); + + it.effect( + "preserves chunk-split ESC sequences with intermediate bytes without leaking final bytes", + () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("before "); + process.emitData("\u001b("); + process.emitData("Bafter\n"); + + yield* manager.close({ threadId: "thread-1" }); + + const reopened = yield* manager.open(openInput()); + assert.equal(reopened.history, "before \u001b(Bafter\n"); + }), + ); + + it.effect("deletes history file when close(deleteHistory=true)", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir } = yield* createManager(); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + process.emitData("bye\n"); + yield* waitFor(pathExists(historyLogPath(logsDir))); + + yield* manager.close({ threadId: "thread-1", deleteHistory: true }); + expect(yield* pathExists(historyLogPath(logsDir))).toBe(false); + }), + ); + + it.effect("closes all terminals for a thread when close omits terminalId", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir } = yield* createManager(); + yield* manager.open(openInput({ terminalId: "default" })); + yield* manager.open(openInput({ terminalId: "sidecar" })); + const defaultProcess = ptyAdapter.processes[0]; + const sidecarProcess = ptyAdapter.processes[1]; + expect(defaultProcess).toBeDefined(); + expect(sidecarProcess).toBeDefined(); + if (!defaultProcess || !sidecarProcess) return; + + defaultProcess.emitData("default\n"); + sidecarProcess.emitData("sidecar\n"); + yield* waitFor(pathExists(multiTerminalHistoryLogPath(logsDir, "thread-1", "default"))); + yield* waitFor(pathExists(multiTerminalHistoryLogPath(logsDir, "thread-1", "sidecar"))); + + yield* manager.close({ threadId: "thread-1", deleteHistory: true }); + + assert.equal(defaultProcess.killed, true); + assert.equal(sidecarProcess.killed, true); + expect(yield* pathExists(multiTerminalHistoryLogPath(logsDir, "thread-1", "default"))).toBe( + false, + ); + expect(yield* pathExists(multiTerminalHistoryLogPath(logsDir, "thread-1", "sidecar"))).toBe( + false, + ); + }), + ); + + it.effect("escalates terminal shutdown to SIGKILL when process does not exit in time", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(5, { processKillGraceMs: 10 }); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + const closeFiber = yield* manager.close({ threadId: "thread-1" }).pipe(Effect.forkScoped); + yield* Effect.yieldNow; + yield* TestClock.adjust("10 millis"); + yield* Fiber.join(closeFiber); + + assert.equal(process.killSignals[0], "SIGTERM"); + expect(process.killSignals).toContain("SIGKILL"); + }).pipe(Effect.provide(TestClock.layer())), + ); + + it.effect("evicts oldest inactive terminal sessions when retention limit is exceeded", () => + Effect.gen(function* () { + const { manager, ptyAdapter, logsDir, getEvents } = yield* createManager(5, { + maxRetainedInactiveSessions: 1, + }); - if (process.platform === "win32") { - expect( - ptyAdapter.spawnInputs.some( - (input) => input.shell === "cmd.exe" || input.shell === "powershell.exe", + yield* manager.open(openInput({ threadId: "thread-1" })); + yield* manager.open(openInput({ threadId: "thread-2" })); + + const first = ptyAdapter.processes[0]; + const second = ptyAdapter.processes[1]; + expect(first).toBeDefined(); + expect(second).toBeDefined(); + if (!first || !second) return; + + first.emitData("first-history\n"); + second.emitData("second-history\n"); + yield* waitFor(pathExists(historyLogPath(logsDir, "thread-1"))); + first.emitExit({ exitCode: 0, signal: 0 }); + yield* Effect.sleep(Duration.millis(5)); + second.emitExit({ exitCode: 0, signal: 0 }); + + yield* waitFor( + Effect.map( + getEvents, + (events) => events.filter((event) => event.type === "exited").length === 2, ), - ).toBe(true); - } else { - expect( - ptyAdapter.spawnInputs - .slice(1) - .some((input) => input.shell !== "/definitely/missing-shell"), - ).toBe(true); - } - - manager.dispose(); - }); + ); - it("filters app runtime env variables from terminal sessions", async () => { - const originalValues = new Map(); - const setEnv = (key: string, value: string | undefined) => { - if (!originalValues.has(key)) { - originalValues.set(key, process.env[key]); - } - if (value === undefined) { - delete process.env[key]; - return; + const reopenedSecond = yield* manager.open(openInput({ threadId: "thread-2" })); + const reopenedFirst = yield* manager.open(openInput({ threadId: "thread-1" })); + + assert.equal(reopenedFirst.history, "first-history\n"); + assert.equal(reopenedSecond.history, ""); + }), + ); + + it.effect("migrates legacy transcript filenames to terminal-scoped history path on open", () => + Effect.gen(function* () { + const { manager, logsDir } = yield* createManager(); + const legacyPath = path.join(logsDir, "thread-1.log"); + const nextPath = historyLogPath(logsDir); + yield* writeFileString(legacyPath, "legacy-line\n"); + + const snapshot = yield* manager.open(openInput()); + + assert.equal(snapshot.history, "legacy-line\n"); + expect(yield* pathExists(nextPath)).toBe(true); + expect(yield* readFileString(nextPath)).toBe("legacy-line\n"); + expect(yield* pathExists(legacyPath)).toBe(false); + }), + ); + + it.effect("retries with fallback shells when preferred shell spawn fails", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(5, { + shellResolver: () => "/definitely/missing-shell -l", + }); + ptyAdapter.spawnFailures.push(new Error("posix_spawnp failed.")); + + const snapshot = yield* manager.open(openInput()); + + assert.equal(snapshot.status, "running"); + expect(ptyAdapter.spawnInputs.length).toBeGreaterThanOrEqual(2); + expect(ptyAdapter.spawnInputs[0]?.shell).toBe("/definitely/missing-shell"); + + if (process.platform === "win32") { + expect( + ptyAdapter.spawnInputs.some( + (input) => input.shell === "cmd.exe" || input.shell === "powershell.exe", + ), + ).toBe(true); + } else { + expect( + ptyAdapter.spawnInputs + .slice(1) + .some((input) => input.shell !== "/definitely/missing-shell"), + ).toBe(true); } - process.env[key] = value; - }; - const restoreEnv = () => { - for (const [key, value] of originalValues) { + }), + ); + + it.effect("filters app runtime env variables from terminal sessions", () => + Effect.gen(function* () { + const originalValues = new Map(); + const setEnv = (key: string, value: string | undefined) => { + if (!originalValues.has(key)) { + originalValues.set(key, process.env[key]); + } if (value === undefined) { delete process.env[key]; - } else { - process.env[key] = value; + return; + } + process.env[key] = value; + }; + const restoreEnv = () => { + for (const [key, value] of originalValues) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } } + }; + + setEnv("PORT", "5173"); + setEnv("T3CODE_PORT", "3773"); + setEnv("VITE_DEV_SERVER_URL", "http://localhost:5173"); + setEnv("TEST_TERMINAL_KEEP", "keep-me"); + + try { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open(openInput()); + const spawnInput = ptyAdapter.spawnInputs[0]; + expect(spawnInput).toBeDefined(); + if (!spawnInput) return; + + expect(spawnInput.env.PORT).toBeUndefined(); + expect(spawnInput.env.T3CODE_PORT).toBeUndefined(); + expect(spawnInput.env.VITE_DEV_SERVER_URL).toBeUndefined(); + expect(spawnInput.env.TEST_TERMINAL_KEEP).toBe("keep-me"); + } finally { + restoreEnv(); } - }; - - setEnv("PORT", "5173"); - setEnv("T3CODE_PORT", "3773"); - setEnv("VITE_DEV_SERVER_URL", "http://localhost:5173"); - setEnv("TEST_TERMINAL_KEEP", "keep-me"); + }), + ); + + it.effect("injects runtime env overrides into spawned terminals", () => + Effect.gen(function* () { + const { manager, ptyAdapter } = yield* createManager(); + yield* manager.open( + openInput({ + env: { + T3CODE_PROJECT_ROOT: "/repo", + T3CODE_WORKTREE_PATH: "/repo/worktree-a", + CUSTOM_FLAG: "1", + }, + }), + ); + const spawnInput = ptyAdapter.spawnInputs[0]; + expect(spawnInput).toBeDefined(); + if (!spawnInput) return; - try { - const { manager, ptyAdapter } = makeManager(); - await manager.open(openInput()); + assert.equal(spawnInput.env.T3CODE_PROJECT_ROOT, "/repo"); + assert.equal(spawnInput.env.T3CODE_WORKTREE_PATH, "/repo/worktree-a"); + assert.equal(spawnInput.env.CUSTOM_FLAG, "1"); + }), + ); + + it.effect("starts zsh with prompt spacer disabled to avoid `%` end markers", () => + Effect.gen(function* () { + if (process.platform === "win32") return; + const { manager, ptyAdapter } = yield* createManager(5, { + shellResolver: () => "/bin/zsh", + }); + yield* manager.open(openInput()); const spawnInput = ptyAdapter.spawnInputs[0]; expect(spawnInput).toBeDefined(); if (!spawnInput) return; - expect(spawnInput.env.PORT).toBeUndefined(); - expect(spawnInput.env.T3CODE_PORT).toBeUndefined(); - expect(spawnInput.env.VITE_DEV_SERVER_URL).toBeUndefined(); - expect(spawnInput.env.TEST_TERMINAL_KEEP).toBe("keep-me"); + expect(spawnInput.args).toEqual(["-o", "nopromptsp"]); + }), + ); - manager.dispose(); - } finally { - restoreEnv(); - } - }); - - it("injects runtime env overrides into spawned terminals", async () => { - const { manager, ptyAdapter } = makeManager(); - await manager.open( - openInput({ - env: { - T3CODE_PROJECT_ROOT: "/repo", - T3CODE_WORKTREE_PATH: "/repo/worktree-a", - CUSTOM_FLAG: "1", - }, - }), - ); - const spawnInput = ptyAdapter.spawnInputs[0]; - expect(spawnInput).toBeDefined(); - if (!spawnInput) return; - - expect(spawnInput.env.T3CODE_PROJECT_ROOT).toBe("/repo"); - expect(spawnInput.env.T3CODE_WORKTREE_PATH).toBe("/repo/worktree-a"); - expect(spawnInput.env.CUSTOM_FLAG).toBe("1"); - - manager.dispose(); - }); - - it("starts zsh with prompt spacer disabled to avoid `%` end markers", async () => { - if (process.platform === "win32") return; - const { manager, ptyAdapter } = makeManager(5, { - shellResolver: () => "/bin/zsh", - }); - await manager.open(openInput()); - const spawnInput = ptyAdapter.spawnInputs[0]; - expect(spawnInput).toBeDefined(); - if (!spawnInput) return; - - expect(spawnInput.shell).toBe("/bin/zsh"); - expect(spawnInput.args).toEqual(["-o", "nopromptsp"]); - - manager.dispose(); - }); + it.effect("bridges PTY callbacks back into Effect-managed event streaming", () => + Effect.gen(function* () { + const { manager, ptyAdapter, getEvents } = yield* createManager(5, { + ptyAdapter: new FakePtyAdapter("async"), + }); + + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + process.emitData("hello from callback\n"); + + yield* waitFor( + Effect.map(getEvents, (events) => + events.some((event) => event.type === "output" && event.data === "hello from callback\n"), + ), + "1200 millis", + ); + }), + ); + + it.effect("scoped runtime shutdown stops active terminals cleanly", () => + Effect.gen(function* () { + const scope = yield* Scope.make("sequential"); + const { manager, ptyAdapter } = yield* createManager(5, { + processKillGraceMs: 10, + }).pipe(Effect.provideService(Scope.Scope, scope)); + yield* manager.open(openInput()); + const process = ptyAdapter.processes[0]; + expect(process).toBeDefined(); + if (!process) return; + + const closeScope = yield* Scope.close(scope, Exit.void).pipe(Effect.forkScoped); + yield* Effect.yieldNow; + yield* TestClock.adjust("10 millis"); + yield* Fiber.join(closeScope); + + assert.equal(process.killSignals[0], "SIGTERM"); + expect(process.killSignals).toContain("SIGKILL"); + }).pipe(Effect.provide(TestClock.layer())), + ); }); diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts index b5085220c2..4e15fa2c9d 100644 --- a/apps/server/src/terminal/Layers/Manager.ts +++ b/apps/server/src/terminal/Layers/Manager.ts @@ -1,32 +1,46 @@ -import { EventEmitter } from "node:events"; -import fs from "node:fs"; import path from "node:path"; import { DEFAULT_TERMINAL_ID, - TerminalClearInput, - TerminalCloseInput, - TerminalOpenInput, - TerminalResizeInput, - TerminalRestartInput, - TerminalWriteInput, type TerminalEvent, type TerminalSessionSnapshot, + type TerminalSessionStatus, } from "@t3tools/contracts"; -import { Effect, Encoding, Layer, Schema } from "effect"; +import { makeKeyedCoalescingWorker } from "@t3tools/shared/KeyedCoalescingWorker"; +import { + Data, + Effect, + Encoding, + Equal, + Exit, + Fiber, + FileSystem, + Layer, + Option, + PubSub, + Scope, + Semaphore, + Stream, + SynchronizedRef, +} from "effect"; -import { createLogger } from "../../logger"; -import { PtyAdapter, PtyAdapterShape, type PtyExitEvent, type PtyProcess } from "../Services/PTY"; -import { runProcess } from "../../processRunner"; import { ServerConfig } from "../../config"; +import { runProcess } from "../../processRunner"; import { - ShellCandidate, - TerminalError, + TerminalCwdError, + TerminalHistoryError, TerminalManager, - TerminalManagerShape, - TerminalSessionState, - TerminalStartInput, + TerminalNotRunningError, + TerminalSessionLookupError, + type TerminalManagerShape, } from "../Services/Manager"; +import { + PtyAdapter, + PtySpawnError, + type PtyAdapterShape, + type PtyExitEvent, + type PtyProcess, +} from "../Services/PTY"; const DEFAULT_HISTORY_LINE_LIMIT = 5_000; const DEFAULT_PERSIST_DEBOUNCE_MS = 40; @@ -37,14 +51,87 @@ const DEFAULT_OPEN_COLS = 120; const DEFAULT_OPEN_ROWS = 30; const TERMINAL_ENV_BLOCKLIST = new Set(["PORT", "ELECTRON_RENDERER_PORT", "ELECTRON_RUN_AS_NODE"]); -const decodeTerminalOpenInput = Schema.decodeUnknownSync(TerminalOpenInput); -const decodeTerminalRestartInput = Schema.decodeUnknownSync(TerminalRestartInput); -const decodeTerminalWriteInput = Schema.decodeUnknownSync(TerminalWriteInput); -const decodeTerminalResizeInput = Schema.decodeUnknownSync(TerminalResizeInput); -const decodeTerminalClearInput = Schema.decodeUnknownSync(TerminalClearInput); -const decodeTerminalCloseInput = Schema.decodeUnknownSync(TerminalCloseInput); +type TerminalSubprocessChecker = ( + terminalPid: number, +) => Effect.Effect; + +class TerminalSubprocessCheckError extends Data.TaggedError("TerminalSubprocessCheckError")<{ + readonly message: string; + readonly cause?: unknown; + readonly terminalPid: number; + readonly command: "powershell" | "pgrep" | "ps"; +}> {} + +class TerminalProcessSignalError extends Data.TaggedError("TerminalProcessSignalError")<{ + readonly message: string; + readonly cause?: unknown; + readonly signal: "SIGTERM" | "SIGKILL"; +}> {} + +interface ShellCandidate { + shell: string; + args?: string[]; +} + +interface TerminalStartInput { + threadId: string; + terminalId: string; + cwd: string; + cols: number; + rows: number; + env?: Record; +} + +interface TerminalSessionState { + threadId: string; + terminalId: string; + cwd: string; + status: TerminalSessionStatus; + pid: number | null; + history: string; + pendingHistoryControlSequence: string; + exitCode: number | null; + exitSignal: number | null; + updatedAt: string; + cols: number; + rows: number; + process: PtyProcess | null; + unsubscribeData: (() => void) | null; + unsubscribeExit: (() => void) | null; + hasRunningSubprocess: boolean; + runtimeEnv: Record | null; +} + +interface PersistHistoryRequest { + history: string; + immediate: boolean; +} + +interface TerminalManagerState { + sessions: Map; + killFibers: Map>; +} + +function snapshot(session: TerminalSessionState): TerminalSessionSnapshot { + return { + threadId: session.threadId, + terminalId: session.terminalId, + cwd: session.cwd, + status: session.status, + pid: session.pid, + history: session.history, + exitCode: session.exitCode, + exitSignal: session.exitSignal, + updatedAt: session.updatedAt, + }; +} -type TerminalSubprocessChecker = (terminalPid: number) => Promise; +function cleanupProcessHandles(session: TerminalSessionState): void { + session.unsubscribeData?.(); + session.unsubscribeData = null; + session.unsubscribeExit?.(); + session.unsubscribeExit = null; +} function defaultShellResolver(): string { if (process.platform === "win32") { @@ -118,7 +205,7 @@ function resolveShellCandidates(shellResolver: () => string): ShellCandidate[] { ]); } -function isRetryableShellSpawnError(error: unknown): boolean { +function isRetryableShellSpawnError(error: PtySpawnError): boolean { const queue: unknown[] = [error]; const seen = new Set(); const messages: string[] = []; @@ -165,82 +252,107 @@ function isRetryableShellSpawnError(error: unknown): boolean { ); } -async function checkWindowsSubprocessActivity(terminalPid: number): Promise { +function checkWindowsSubprocessActivity( + terminalPid: number, +): Effect.Effect { const command = [ `$children = Get-CimInstance Win32_Process -Filter "ParentProcessId = ${terminalPid}" -ErrorAction SilentlyContinue`, "if ($children) { exit 0 }", "exit 1", ].join("; "); - try { - const result = await runProcess( - "powershell.exe", - ["-NoProfile", "-NonInteractive", "-Command", command], - { + return Effect.tryPromise({ + try: () => + runProcess("powershell.exe", ["-NoProfile", "-NonInteractive", "-Command", command], { timeoutMs: 1_500, allowNonZeroExit: true, maxBufferBytes: 32_768, outputMode: "truncate", - }, - ); - return result.code === 0; - } catch { - return false; - } + }), + catch: (cause) => + new TerminalSubprocessCheckError({ + message: "Failed to check Windows terminal subprocess activity.", + cause, + terminalPid, + command: "powershell", + }), + }).pipe(Effect.map((result) => result.code === 0)); } -async function checkPosixSubprocessActivity(terminalPid: number): Promise { - try { - const pgrepResult = await runProcess("pgrep", ["-P", String(terminalPid)], { - timeoutMs: 1_000, - allowNonZeroExit: true, - maxBufferBytes: 32_768, - outputMode: "truncate", - }); - if (pgrepResult.code === 0) { - return pgrepResult.stdout.trim().length > 0; +const checkPosixSubprocessActivity = Effect.fn("terminal.checkPosixSubprocessActivity")(function* ( + terminalPid: number, +): Effect.fn.Return { + const runPgrep = Effect.tryPromise({ + try: () => + runProcess("pgrep", ["-P", String(terminalPid)], { + timeoutMs: 1_000, + allowNonZeroExit: true, + maxBufferBytes: 32_768, + outputMode: "truncate", + }), + catch: (cause) => + new TerminalSubprocessCheckError({ + message: "Failed to inspect terminal subprocesses with pgrep.", + cause, + terminalPid, + command: "pgrep", + }), + }); + + const runPs = Effect.tryPromise({ + try: () => + runProcess("ps", ["-eo", "pid=,ppid="], { + timeoutMs: 1_000, + allowNonZeroExit: true, + maxBufferBytes: 262_144, + outputMode: "truncate", + }), + catch: (cause) => + new TerminalSubprocessCheckError({ + message: "Failed to inspect terminal subprocesses with ps.", + cause, + terminalPid, + command: "ps", + }), + }); + + const pgrepResult = yield* Effect.exit(runPgrep); + if (pgrepResult._tag === "Success") { + if (pgrepResult.value.code === 0) { + return pgrepResult.value.stdout.trim().length > 0; } - if (pgrepResult.code === 1) { + if (pgrepResult.value.code === 1) { return false; } - } catch { - // Fall back to ps when pgrep is unavailable. } - try { - const psResult = await runProcess("ps", ["-eo", "pid=,ppid="], { - timeoutMs: 1_000, - allowNonZeroExit: true, - maxBufferBytes: 262_144, - outputMode: "truncate", - }); - if (psResult.code !== 0) { - return false; - } + const psResult = yield* Effect.exit(runPs); + if (psResult._tag === "Failure" || psResult.value.code !== 0) { + return false; + } - for (const line of psResult.stdout.split(/\r?\n/g)) { - const [pidRaw, ppidRaw] = line.trim().split(/\s+/g); - const pid = Number(pidRaw); - const ppid = Number(ppidRaw); - if (!Number.isInteger(pid) || !Number.isInteger(ppid)) continue; - if (ppid === terminalPid) { - return true; - } + for (const line of psResult.value.stdout.split(/\r?\n/g)) { + const [pidRaw, ppidRaw] = line.trim().split(/\s+/g); + const pid = Number(pidRaw); + const ppid = Number(ppidRaw); + if (!Number.isInteger(pid) || !Number.isInteger(ppid)) continue; + if (ppid === terminalPid) { + return true; } - return false; - } catch { - return false; } -} + return false; +}); -async function defaultSubprocessChecker(terminalPid: number): Promise { +const defaultSubprocessChecker = Effect.fn("terminal.defaultSubprocessChecker")(function* ( + terminalPid: number, +): Effect.fn.Return { if (!Number.isInteger(terminalPid) || terminalPid <= 0) { return false; } if (process.platform === "win32") { - return checkWindowsSubprocessActivity(terminalPid); + return yield* checkWindowsSubprocessActivity(terminalPid); } - return checkPosixSubprocessActivity(terminalPid); -} + return yield* checkPosixSubprocessActivity(terminalPid); +}); function capHistory(history: string, maxLines: number): string { if (history.length === 0) return history; @@ -482,12 +594,8 @@ function normalizedRuntimeEnv( return Object.fromEntries(entries.toSorted(([left], [right]) => left.localeCompare(right))); } -interface TerminalManagerEvents { - event: [event: TerminalEvent]; -} - interface TerminalManagerOptions { - logsDir?: string; + logsDir: string; historyLineLimit?: number; ptyAdapter: PtyAdapterShape; shellResolver?: () => string; @@ -497,917 +605,1148 @@ interface TerminalManagerOptions { maxRetainedInactiveSessions?: number; } -export class TerminalManagerRuntime extends EventEmitter { - private readonly sessions = new Map(); - private readonly logsDir: string; - private readonly historyLineLimit: number; - private readonly ptyAdapter: PtyAdapterShape; - private readonly shellResolver: () => string; - private readonly persistQueues = new Map>(); - private readonly persistTimers = new Map>(); - private readonly pendingPersistHistory = new Map(); - private readonly threadLocks = new Map>(); - private readonly persistDebounceMs: number; - private readonly subprocessChecker: TerminalSubprocessChecker; - private readonly subprocessPollIntervalMs: number; - private readonly processKillGraceMs: number; - private readonly maxRetainedInactiveSessions: number; - private subprocessPollTimer: ReturnType | null = null; - private subprocessPollInFlight = false; - private readonly killEscalationTimers = new Map>(); - private readonly logger = createLogger("terminal"); - - constructor(options: TerminalManagerOptions) { - super(); - this.logsDir = options.logsDir ?? path.resolve(process.cwd(), ".logs", "terminals"); - this.historyLineLimit = options.historyLineLimit ?? DEFAULT_HISTORY_LINE_LIMIT; - this.ptyAdapter = options.ptyAdapter; - this.shellResolver = options.shellResolver ?? defaultShellResolver; - this.persistDebounceMs = DEFAULT_PERSIST_DEBOUNCE_MS; - this.subprocessChecker = options.subprocessChecker ?? defaultSubprocessChecker; - this.subprocessPollIntervalMs = +const makeTerminalManager = Effect.fn("makeTerminalManager")(function* () { + const { terminalLogsDir } = yield* ServerConfig; + const ptyAdapter = yield* PtyAdapter; + return yield* makeTerminalManagerWithOptions({ + logsDir: terminalLogsDir, + ptyAdapter, + }); +}); + +export const makeTerminalManagerWithOptions = Effect.fn("makeTerminalManagerWithOptions")( + function* (options: TerminalManagerOptions) { + const fileSystem = yield* FileSystem.FileSystem; + const services = yield* Effect.services(); + const runFork = Effect.runForkWith(services); + + const logsDir = options.logsDir; + const historyLineLimit = options.historyLineLimit ?? DEFAULT_HISTORY_LINE_LIMIT; + const shellResolver = options.shellResolver ?? defaultShellResolver; + const subprocessChecker = options.subprocessChecker ?? defaultSubprocessChecker; + const subprocessPollIntervalMs = options.subprocessPollIntervalMs ?? DEFAULT_SUBPROCESS_POLL_INTERVAL_MS; - this.processKillGraceMs = options.processKillGraceMs ?? DEFAULT_PROCESS_KILL_GRACE_MS; - this.maxRetainedInactiveSessions = + const processKillGraceMs = options.processKillGraceMs ?? DEFAULT_PROCESS_KILL_GRACE_MS; + const maxRetainedInactiveSessions = options.maxRetainedInactiveSessions ?? DEFAULT_MAX_RETAINED_INACTIVE_SESSIONS; - fs.mkdirSync(this.logsDir, { recursive: true }); - } - async open(raw: TerminalOpenInput): Promise { - const input = decodeTerminalOpenInput(raw); - return this.runWithThreadLock(input.threadId, async () => { - await this.assertValidCwd(input.cwd); - - const sessionKey = toSessionKey(input.threadId, input.terminalId); - const existing = this.sessions.get(sessionKey); - if (!existing) { - await this.flushPersistQueue(input.threadId, input.terminalId); - const history = await this.readHistory(input.threadId, input.terminalId); - const cols = input.cols ?? DEFAULT_OPEN_COLS; - const rows = input.rows ?? DEFAULT_OPEN_ROWS; - const session: TerminalSessionState = { - threadId: input.threadId, - terminalId: input.terminalId, - cwd: input.cwd, - status: "starting", - pid: null, - history, - pendingHistoryControlSequence: "", - exitCode: null, - exitSignal: null, - updatedAt: new Date().toISOString(), - cols, - rows, - process: null, - unsubscribeData: null, - unsubscribeExit: null, - hasRunningSubprocess: false, - runtimeEnv: normalizedRuntimeEnv(input.env), - }; - this.sessions.set(sessionKey, session); - this.evictInactiveSessionsIfNeeded(); - await this.startSession(session, { ...input, cols, rows }, "started"); - return this.snapshot(session); - } + yield* fileSystem.makeDirectory(logsDir, { recursive: true }).pipe(Effect.orDie); + + const managerStateRef = yield* SynchronizedRef.make({ + sessions: new Map(), + killFibers: new Map(), + }); + const threadLocksRef = yield* SynchronizedRef.make(new Map()); + const terminalEvents = yield* Effect.acquireRelease( + PubSub.unbounded(), + PubSub.shutdown, + ); + const workerScope = yield* Scope.make("sequential"); + yield* Effect.addFinalizer(() => Scope.close(workerScope, Exit.void)); - const nextRuntimeEnv = normalizedRuntimeEnv(input.env); - const currentRuntimeEnv = existing.runtimeEnv; - const targetCols = input.cols ?? existing.cols; - const targetRows = input.rows ?? existing.rows; - const runtimeEnvChanged = - JSON.stringify(currentRuntimeEnv) !== JSON.stringify(nextRuntimeEnv); - - if (existing.cwd !== input.cwd || runtimeEnvChanged) { - this.stopProcess(existing); - existing.cwd = input.cwd; - existing.runtimeEnv = nextRuntimeEnv; - existing.history = ""; - existing.pendingHistoryControlSequence = ""; - await this.persistHistory(existing.threadId, existing.terminalId, existing.history); - } else if (existing.status === "exited" || existing.status === "error") { - existing.runtimeEnv = nextRuntimeEnv; - existing.history = ""; - existing.pendingHistoryControlSequence = ""; - await this.persistHistory(existing.threadId, existing.terminalId, existing.history); - } else if (currentRuntimeEnv !== nextRuntimeEnv) { - existing.runtimeEnv = nextRuntimeEnv; + const publishEvent = (event: TerminalEvent) => + PubSub.publish(terminalEvents, event).pipe(Effect.asVoid); + + const historyPath = (threadId: string, terminalId: string) => { + const threadPart = toSafeThreadId(threadId); + if (terminalId === DEFAULT_TERMINAL_ID) { + return path.join(logsDir, `${threadPart}.log`); } + return path.join(logsDir, `${threadPart}_${toSafeTerminalId(terminalId)}.log`); + }; + + const legacyHistoryPath = (threadId: string) => + path.join(logsDir, `${legacySafeThreadId(threadId)}.log`); + + const toTerminalHistoryError = + (operation: "read" | "truncate" | "migrate", threadId: string, terminalId: string) => + (cause: unknown) => + new TerminalHistoryError({ + operation, + threadId, + terminalId, + cause, + }); + + const readManagerState = SynchronizedRef.get(managerStateRef); + + const modifyManagerState = ( + f: (state: TerminalManagerState) => readonly [A, TerminalManagerState], + ) => SynchronizedRef.modify(managerStateRef, f); - if (!existing.process) { - await this.startSession( - existing, - { ...input, cols: targetCols, rows: targetRows }, - "started", + const getThreadSemaphore = (threadId: string) => + SynchronizedRef.modifyEffect(threadLocksRef, (current) => { + const existing: Option.Option = Option.fromNullishOr( + current.get(threadId), ); - return this.snapshot(existing); - } + return Option.match(existing, { + onNone: () => + Semaphore.make(1).pipe( + Effect.map((semaphore) => { + const next = new Map(current); + next.set(threadId, semaphore); + return [semaphore, next] as const; + }), + ), + onSome: (semaphore) => Effect.succeed([semaphore, current] as const), + }); + }); - if (existing.cols !== targetCols || existing.rows !== targetRows) { - existing.cols = targetCols; - existing.rows = targetRows; - existing.process.resize(targetCols, targetRows); - existing.updatedAt = new Date().toISOString(); + const withThreadLock = ( + threadId: string, + effect: Effect.Effect, + ): Effect.Effect => + Effect.flatMap(getThreadSemaphore(threadId), (semaphore) => semaphore.withPermit(effect)); + + const clearKillFiber = Effect.fn("terminal.clearKillFiber")(function* ( + process: PtyProcess | null, + ) { + if (!process) return; + const fiber: Option.Option> = yield* modifyManagerState< + Option.Option> + >((state) => { + const existing: Option.Option> = Option.fromNullishOr( + state.killFibers.get(process), + ); + if (Option.isNone(existing)) { + return [Option.none>(), state] as const; + } + const killFibers = new Map(state.killFibers); + killFibers.delete(process); + return [existing, { ...state, killFibers }] as const; + }); + if (Option.isSome(fiber)) { + yield* Fiber.interrupt(fiber.value).pipe(Effect.ignore); } + }); - return this.snapshot(existing); + const registerKillFiber = Effect.fn("terminal.registerKillFiber")(function* ( + process: PtyProcess, + fiber: Fiber.Fiber, + ) { + yield* modifyManagerState((state) => { + const killFibers = new Map(state.killFibers); + killFibers.set(process, fiber); + return [undefined, { ...state, killFibers }] as const; + }); }); - } - async write(raw: TerminalWriteInput): Promise { - const input = decodeTerminalWriteInput(raw); - const session = this.requireSession(input.threadId, input.terminalId); - if (!session.process || session.status !== "running") { - if (session.status === "exited") { + const runKillEscalation = Effect.fn("terminal.runKillEscalation")(function* ( + process: PtyProcess, + threadId: string, + terminalId: string, + ) { + const terminated = yield* Effect.try({ + try: () => process.kill("SIGTERM"), + catch: (cause) => + new TerminalProcessSignalError({ + message: "Failed to send SIGTERM to terminal process.", + cause, + signal: "SIGTERM", + }), + }).pipe( + Effect.as(true), + Effect.catch((error) => + Effect.logWarning("failed to kill terminal process", { + threadId, + terminalId, + signal: "SIGTERM", + error: error.message, + }).pipe(Effect.as(false)), + ), + ); + if (!terminated) { return; } - throw new Error( - `Terminal is not running for thread: ${input.threadId}, terminal: ${input.terminalId}`, + + yield* Effect.sleep(processKillGraceMs); + + yield* Effect.try({ + try: () => process.kill("SIGKILL"), + catch: (cause) => + new TerminalProcessSignalError({ + message: "Failed to send SIGKILL to terminal process.", + cause, + signal: "SIGKILL", + }), + }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to force-kill terminal process", { + threadId, + terminalId, + signal: "SIGKILL", + error: error.message, + }), + ), ); - } - session.process.write(input.data); - } + }); - async resize(raw: TerminalResizeInput): Promise { - const input = decodeTerminalResizeInput(raw); - const session = this.requireSession(input.threadId, input.terminalId); - if (!session.process || session.status !== "running") { - throw new Error( - `Terminal is not running for thread: ${input.threadId}, terminal: ${input.terminalId}`, + const startKillEscalation = Effect.fn("terminal.startKillEscalation")(function* ( + process: PtyProcess, + threadId: string, + terminalId: string, + ) { + const fiber = yield* runKillEscalation(process, threadId, terminalId).pipe( + Effect.ensuring( + modifyManagerState((state) => { + if (!state.killFibers.has(process)) { + return [undefined, state] as const; + } + const killFibers = new Map(state.killFibers); + killFibers.delete(process); + return [undefined, { ...state, killFibers }] as const; + }), + ), + Effect.forkIn(workerScope), ); - } - session.cols = input.cols; - session.rows = input.rows; - session.updatedAt = new Date().toISOString(); - session.process.resize(input.cols, input.rows); - } - async clear(raw: TerminalClearInput): Promise { - const input = decodeTerminalClearInput(raw); - await this.runWithThreadLock(input.threadId, async () => { - const session = this.requireSession(input.threadId, input.terminalId); - session.history = ""; - session.pendingHistoryControlSequence = ""; - session.updatedAt = new Date().toISOString(); - await this.persistHistory(input.threadId, input.terminalId, session.history); - this.emitEvent({ - type: "cleared", - threadId: input.threadId, - terminalId: input.terminalId, - createdAt: new Date().toISOString(), + yield* registerKillFiber(process, fiber); + }); + + const persistWorker = yield* makeKeyedCoalescingWorker< + string, + PersistHistoryRequest, + never, + never + >({ + merge: (current, next) => ({ + history: next.history, + immediate: current.immediate || next.immediate, + }), + process: Effect.fn("terminal.persistHistoryWorker")(function* (sessionKey, request) { + if (!request.immediate) { + yield* Effect.sleep(DEFAULT_PERSIST_DEBOUNCE_MS); + } + + const [threadId, terminalId] = sessionKey.split("\u0000"); + if (!threadId || !terminalId) { + return; + } + + yield* fileSystem.writeFileString(historyPath(threadId, terminalId), request.history).pipe( + Effect.catch((error) => + Effect.logWarning("failed to persist terminal history", { + threadId, + terminalId, + error: error instanceof Error ? error.message : String(error), + }), + ), + ); + }), + }); + + const queuePersist = Effect.fn("terminal.queuePersist")(function* ( + threadId: string, + terminalId: string, + history: string, + ) { + yield* persistWorker.enqueue(toSessionKey(threadId, terminalId), { + history, + immediate: false, }); }); - } - async restart(raw: TerminalRestartInput): Promise { - const input = decodeTerminalRestartInput(raw); - return this.runWithThreadLock(input.threadId, async () => { - await this.assertValidCwd(input.cwd); - - const sessionKey = toSessionKey(input.threadId, input.terminalId); - let session = this.sessions.get(sessionKey); - if (!session) { - const cols = input.cols ?? DEFAULT_OPEN_COLS; - const rows = input.rows ?? DEFAULT_OPEN_ROWS; - session = { - threadId: input.threadId, - terminalId: input.terminalId, - cwd: input.cwd, - status: "starting", - pid: null, - history: "", - pendingHistoryControlSequence: "", - exitCode: null, - exitSignal: null, - updatedAt: new Date().toISOString(), - cols, - rows, - process: null, - unsubscribeData: null, - unsubscribeExit: null, - hasRunningSubprocess: false, - runtimeEnv: normalizedRuntimeEnv(input.env), - }; - this.sessions.set(sessionKey, session); - this.evictInactiveSessionsIfNeeded(); - } else { - this.stopProcess(session); - session.cwd = input.cwd; - session.runtimeEnv = normalizedRuntimeEnv(input.env); + const flushPersist = Effect.fn("terminal.flushPersist")(function* ( + threadId: string, + terminalId: string, + ) { + yield* persistWorker.drainKey(toSessionKey(threadId, terminalId)); + }); + + const persistHistory = Effect.fn("terminal.persistHistory")(function* ( + threadId: string, + terminalId: string, + history: string, + ) { + yield* persistWorker.enqueue(toSessionKey(threadId, terminalId), { + history, + immediate: true, + }); + yield* flushPersist(threadId, terminalId); + }); + + const readHistory = Effect.fn("terminal.readHistory")(function* ( + threadId: string, + terminalId: string, + ) { + const nextPath = historyPath(threadId, terminalId); + if ( + yield* fileSystem + .exists(nextPath) + .pipe(Effect.mapError(toTerminalHistoryError("read", threadId, terminalId))) + ) { + const raw = yield* fileSystem + .readFileString(nextPath) + .pipe(Effect.mapError(toTerminalHistoryError("read", threadId, terminalId))); + const capped = capHistory(raw, historyLineLimit); + if (capped !== raw) { + yield* fileSystem + .writeFileString(nextPath, capped) + .pipe(Effect.mapError(toTerminalHistoryError("truncate", threadId, terminalId))); + } + return capped; + } + + if (terminalId !== DEFAULT_TERMINAL_ID) { + return ""; } - const cols = input.cols ?? session.cols; - const rows = input.rows ?? session.rows; + const legacyPath = legacyHistoryPath(threadId); + if ( + !(yield* fileSystem + .exists(legacyPath) + .pipe(Effect.mapError(toTerminalHistoryError("migrate", threadId, terminalId)))) + ) { + return ""; + } - session.history = ""; - session.pendingHistoryControlSequence = ""; - await this.persistHistory(input.threadId, input.terminalId, session.history); - await this.startSession(session, { ...input, cols, rows }, "restarted"); - return this.snapshot(session); + const raw = yield* fileSystem + .readFileString(legacyPath) + .pipe(Effect.mapError(toTerminalHistoryError("migrate", threadId, terminalId))); + const capped = capHistory(raw, historyLineLimit); + yield* fileSystem + .writeFileString(nextPath, capped) + .pipe(Effect.mapError(toTerminalHistoryError("migrate", threadId, terminalId))); + yield* fileSystem.remove(legacyPath, { force: true }).pipe( + Effect.catch((cleanupError) => + Effect.logWarning("failed to remove legacy terminal history", { + threadId, + error: cleanupError instanceof Error ? cleanupError.message : String(cleanupError), + }), + ), + ); + return capped; }); - } - async close(raw: TerminalCloseInput): Promise { - const input = decodeTerminalCloseInput(raw); - await this.runWithThreadLock(input.threadId, async () => { - if (input.terminalId) { - await this.closeSession(input.threadId, input.terminalId, input.deleteHistory === true); - return; + const deleteHistory = Effect.fn("terminal.deleteHistory")(function* ( + threadId: string, + terminalId: string, + ) { + yield* fileSystem.remove(historyPath(threadId, terminalId), { force: true }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to delete terminal history", { + threadId, + terminalId, + error: error instanceof Error ? error.message : String(error), + }), + ), + ); + if (terminalId === DEFAULT_TERMINAL_ID) { + yield* fileSystem.remove(legacyHistoryPath(threadId), { force: true }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to delete terminal history", { + threadId, + terminalId, + error: error instanceof Error ? error.message : String(error), + }), + ), + ); } + }); - const threadSessions = this.sessionsForThread(input.threadId); - for (const session of threadSessions) { - this.stopProcess(session); - this.sessions.delete(toSessionKey(session.threadId, session.terminalId)); - } - await Promise.all( - threadSessions.map((session) => - this.flushPersistQueue(session.threadId, session.terminalId), + const deleteAllHistoryForThread = Effect.fn("terminal.deleteAllHistoryForThread")(function* ( + threadId: string, + ) { + const threadPrefix = `${toSafeThreadId(threadId)}_`; + const entries = yield* fileSystem + .readDirectory(logsDir, { recursive: false }) + .pipe(Effect.catch(() => Effect.succeed([] as Array))); + yield* Effect.forEach( + entries.filter( + (name) => + name === `${toSafeThreadId(threadId)}.log` || + name === `${legacySafeThreadId(threadId)}.log` || + name.startsWith(threadPrefix), ), + (name) => + fileSystem.remove(path.join(logsDir, name), { force: true }).pipe( + Effect.catch((error) => + Effect.logWarning("failed to delete terminal histories for thread", { + threadId, + error: error instanceof Error ? error.message : String(error), + }), + ), + ), + { discard: true }, ); + }); - if (input.deleteHistory) { - await this.deleteAllHistoryForThread(input.threadId); + const assertValidCwd = Effect.fn("terminal.assertValidCwd")(function* (cwd: string) { + const stats = yield* fileSystem.stat(cwd).pipe( + Effect.mapError( + (cause) => + new TerminalCwdError({ + cwd, + reason: cause.reason._tag === "NotFound" ? "notFound" : "statFailed", + cause, + }), + ), + ); + if (stats.type !== "Directory") { + return yield* new TerminalCwdError({ + cwd, + reason: "notDirectory", + }); } - this.updateSubprocessPollingState(); }); - } - dispose(): void { - this.stopSubprocessPolling(); - const sessions = [...this.sessions.values()]; - this.sessions.clear(); - for (const session of sessions) { - this.stopProcess(session); - } - for (const timer of this.persistTimers.values()) { - clearTimeout(timer); - } - this.persistTimers.clear(); - for (const timer of this.killEscalationTimers.values()) { - clearTimeout(timer); - } - this.killEscalationTimers.clear(); - this.pendingPersistHistory.clear(); - this.threadLocks.clear(); - this.persistQueues.clear(); - } + const getSession = Effect.fn("terminal.getSession")(function* ( + threadId: string, + terminalId: string, + ): Effect.fn.Return> { + return yield* Effect.map(readManagerState, (state) => + Option.fromNullishOr(state.sessions.get(toSessionKey(threadId, terminalId))), + ); + }); - private async startSession( - session: TerminalSessionState, - input: TerminalStartInput, - eventType: "started" | "restarted", - ): Promise { - this.stopProcess(session); - - session.status = "starting"; - session.cwd = input.cwd; - session.cols = input.cols; - session.rows = input.rows; - session.exitCode = null; - session.exitSignal = null; - session.hasRunningSubprocess = false; - session.updatedAt = new Date().toISOString(); - - let ptyProcess: PtyProcess | null = null; - let startedShell: string | null = null; - try { - const shellCandidates = resolveShellCandidates(this.shellResolver); - const terminalEnv = createTerminalSpawnEnv(process.env, session.runtimeEnv); - let lastSpawnError: unknown = null; - - const spawnWithCandidate = (candidate: ShellCandidate) => - Effect.runPromise( - this.ptyAdapter.spawn({ - shell: candidate.shell, - ...(candidate.args ? { args: candidate.args } : {}), - cwd: session.cwd, - cols: session.cols, - rows: session.rows, - env: terminalEnv, - }), - ); + const requireSession = Effect.fn("terminal.requireSession")(function* ( + threadId: string, + terminalId: string, + ): Effect.fn.Return { + return yield* Effect.flatMap(getSession(threadId, terminalId), (session) => + Option.match(session, { + onNone: () => + Effect.fail( + new TerminalSessionLookupError({ + threadId, + terminalId, + }), + ), + onSome: Effect.succeed, + }), + ); + }); - const trySpawn = async ( - candidates: ShellCandidate[], - index = 0, - ): Promise<{ process: PtyProcess; shellLabel: string } | null> => { - if (index >= candidates.length) { - return null; - } - const candidate = candidates[index]; - if (!candidate) { - return null; - } + const sessionsForThread = Effect.fn("terminal.sessionsForThread")(function* (threadId: string) { + return yield* readManagerState.pipe( + Effect.map((state) => + [...state.sessions.values()].filter((session) => session.threadId === threadId), + ), + ); + }); + + const evictInactiveSessionsIfNeeded = Effect.fn("terminal.evictInactiveSessionsIfNeeded")( + function* () { + yield* modifyManagerState((state) => { + const inactiveSessions = [...state.sessions.values()].filter( + (session) => session.status !== "running", + ); + if (inactiveSessions.length <= maxRetainedInactiveSessions) { + return [undefined, state] as const; + } - try { - const process = await spawnWithCandidate(candidate); - return { process, shellLabel: formatShellCandidate(candidate) }; - } catch (error) { - lastSpawnError = error; - if (!isRetryableShellSpawnError(error)) { - throw error; + inactiveSessions.sort( + (left, right) => + left.updatedAt.localeCompare(right.updatedAt) || + left.threadId.localeCompare(right.threadId) || + left.terminalId.localeCompare(right.terminalId), + ); + + const sessions = new Map(state.sessions); + + const toEvict = inactiveSessions.length - maxRetainedInactiveSessions; + for (const session of inactiveSessions.slice(0, toEvict)) { + const key = toSessionKey(session.threadId, session.terminalId); + sessions.delete(key); } - return trySpawn(candidates, index + 1); + + return [undefined, { ...state, sessions }] as const; + }); + }, + ); + + const handleProcessData = Effect.fn("terminal.handleProcessData")(function* ( + sessionKey: string, + expectedPid: number, + data: string, + ) { + const update = yield* modifyManagerState((state) => { + const session = state.sessions.get(sessionKey); + if ( + !session || + !session.process || + session.status !== "running" || + session.pid !== expectedPid + ) { + return [null, state] as const; + } + + const sanitized = sanitizeTerminalHistoryChunk(session.pendingHistoryControlSequence, data); + session.pendingHistoryControlSequence = sanitized.pendingControlSequence; + if (sanitized.visibleText.length > 0) { + session.history = capHistory( + `${session.history}${sanitized.visibleText}`, + historyLineLimit, + ); } - }; + session.updatedAt = new Date().toISOString(); + + return [ + { + threadId: session.threadId, + terminalId: session.terminalId, + history: sanitized.visibleText.length > 0 ? session.history : null, + }, + state, + ] as const; + }); - const spawnResult = await trySpawn(shellCandidates); - if (spawnResult) { - ptyProcess = spawnResult.process; - startedShell = spawnResult.shellLabel; + if (!update) { + return; } - if (!ptyProcess) { - const detail = - lastSpawnError instanceof Error ? lastSpawnError.message : "Terminal start failed"; - const tried = - shellCandidates.length > 0 - ? ` Tried shells: ${shellCandidates.map((candidate) => formatShellCandidate(candidate)).join(", ")}.` - : ""; - throw new Error(`${detail}.${tried}`.trim()); + if (update.history !== null) { + yield* queuePersist(update.threadId, update.terminalId, update.history); } - session.process = ptyProcess; - session.pid = ptyProcess.pid; - session.status = "running"; - session.updatedAt = new Date().toISOString(); - session.unsubscribeData = ptyProcess.onData((data) => { - this.onProcessData(session, data); - }); - session.unsubscribeExit = ptyProcess.onExit((event) => { - this.onProcessExit(session, event); - }); - this.updateSubprocessPollingState(); - this.emitEvent({ - type: eventType, - threadId: session.threadId, - terminalId: session.terminalId, + yield* publishEvent({ + type: "output", + threadId: update.threadId, + terminalId: update.terminalId, createdAt: new Date().toISOString(), - snapshot: this.snapshot(session), + data, + }); + }); + + const handleProcessExit = Effect.fn("terminal.handleProcessExit")(function* ( + sessionKey: string, + expectedPid: number, + event: PtyExitEvent, + ) { + const update = yield* modifyManagerState((state) => { + const session = state.sessions.get(sessionKey); + if (!session || session.pid !== expectedPid) { + return [null, state] as const; + } + + const process = session.process; + cleanupProcessHandles(session); + session.process = null; + session.pid = null; + session.hasRunningSubprocess = false; + session.status = "exited"; + session.pendingHistoryControlSequence = ""; + session.exitCode = Number.isInteger(event.exitCode) ? event.exitCode : null; + session.exitSignal = Number.isInteger(event.signal) ? event.signal : null; + session.updatedAt = new Date().toISOString(); + + return [ + { + process, + threadId: session.threadId, + terminalId: session.terminalId, + exitCode: session.exitCode, + exitSignal: session.exitSignal, + }, + state, + ] as const; }); - } catch (error) { - if (ptyProcess) { - this.killProcessWithEscalation(ptyProcess, session.threadId, session.terminalId); + + if (!update) { + return; } - session.status = "error"; - session.pid = null; - session.process = null; - session.hasRunningSubprocess = false; - session.updatedAt = new Date().toISOString(); - this.evictInactiveSessionsIfNeeded(); - this.updateSubprocessPollingState(); - const message = error instanceof Error ? error.message : "Terminal start failed"; - this.emitEvent({ - type: "error", - threadId: session.threadId, - terminalId: session.terminalId, + + yield* clearKillFiber(update.process); + yield* publishEvent({ + type: "exited", + threadId: update.threadId, + terminalId: update.terminalId, createdAt: new Date().toISOString(), - message, + exitCode: update.exitCode, + exitSignal: update.exitSignal, }); - this.logger.error("failed to start terminal", { - threadId: session.threadId, - terminalId: session.terminalId, - error: message, - ...(startedShell ? { shell: startedShell } : {}), - }); - } - } - - private onProcessData(session: TerminalSessionState, data: string): void { - const sanitized = sanitizeTerminalHistoryChunk(session.pendingHistoryControlSequence, data); - session.pendingHistoryControlSequence = sanitized.pendingControlSequence; - if (sanitized.visibleText.length > 0) { - session.history = capHistory( - `${session.history}${sanitized.visibleText}`, - this.historyLineLimit, - ); - this.queuePersist(session.threadId, session.terminalId, session.history); - } - session.updatedAt = new Date().toISOString(); - this.emitEvent({ - type: "output", - threadId: session.threadId, - terminalId: session.terminalId, - createdAt: new Date().toISOString(), - data, + yield* evictInactiveSessionsIfNeeded(); }); - } - private onProcessExit(session: TerminalSessionState, event: PtyExitEvent): void { - this.clearKillEscalationTimer(session.process); - this.cleanupProcessHandles(session); - session.process = null; - session.pid = null; - session.hasRunningSubprocess = false; - session.status = "exited"; - session.pendingHistoryControlSequence = ""; - session.exitCode = Number.isInteger(event.exitCode) ? event.exitCode : null; - session.exitSignal = Number.isInteger(event.signal) ? event.signal : null; - session.updatedAt = new Date().toISOString(); - this.emitEvent({ - type: "exited", - threadId: session.threadId, - terminalId: session.terminalId, - createdAt: new Date().toISOString(), - exitCode: session.exitCode, - exitSignal: session.exitSignal, + const stopProcess = Effect.fn("terminal.stopProcess")(function* ( + session: TerminalSessionState, + ) { + const process = session.process; + if (!process) return; + + yield* modifyManagerState((state) => { + cleanupProcessHandles(session); + session.process = null; + session.pid = null; + session.hasRunningSubprocess = false; + session.status = "exited"; + session.pendingHistoryControlSequence = ""; + session.updatedAt = new Date().toISOString(); + return [undefined, state] as const; + }); + + yield* clearKillFiber(process); + yield* startKillEscalation(process, session.threadId, session.terminalId); + yield* evictInactiveSessionsIfNeeded(); }); - this.evictInactiveSessionsIfNeeded(); - this.updateSubprocessPollingState(); - } - private stopProcess(session: TerminalSessionState): void { - const process = session.process; - if (!process) return; - this.cleanupProcessHandles(session); - session.process = null; - session.pid = null; - session.hasRunningSubprocess = false; - session.status = "exited"; - session.pendingHistoryControlSequence = ""; - session.updatedAt = new Date().toISOString(); - this.killProcessWithEscalation(process, session.threadId, session.terminalId); - this.evictInactiveSessionsIfNeeded(); - this.updateSubprocessPollingState(); - } + const trySpawn = Effect.fn("terminal.trySpawn")(function* ( + shellCandidates: ReadonlyArray, + spawnEnv: NodeJS.ProcessEnv, + session: TerminalSessionState, + index = 0, + lastError: PtySpawnError | null = null, + ): Effect.fn.Return<{ process: PtyProcess; shellLabel: string }, PtySpawnError> { + if (index >= shellCandidates.length) { + const detail = lastError?.message ?? "Failed to spawn PTY process"; + const tried = + shellCandidates.length > 0 + ? ` Tried shells: ${shellCandidates.map((candidate) => formatShellCandidate(candidate)).join(", ")}.` + : ""; + return yield* new PtySpawnError({ + adapter: "terminal-manager", + message: `${detail}.${tried}`.trim(), + ...(lastError ? { cause: lastError } : {}), + }); + } - private cleanupProcessHandles(session: TerminalSessionState): void { - session.unsubscribeData?.(); - session.unsubscribeData = null; - session.unsubscribeExit?.(); - session.unsubscribeExit = null; - } + const candidate = shellCandidates[index]; + if (!candidate) { + return yield* ( + lastError ?? + new PtySpawnError({ + adapter: "terminal-manager", + message: "No shell candidate available for PTY spawn.", + }) + ); + } - private clearKillEscalationTimer(process: PtyProcess | null): void { - if (!process) return; - const timer = this.killEscalationTimers.get(process); - if (!timer) return; - clearTimeout(timer); - this.killEscalationTimers.delete(process); - } + const attempt = yield* Effect.result( + options.ptyAdapter.spawn({ + shell: candidate.shell, + ...(candidate.args ? { args: candidate.args } : {}), + cwd: session.cwd, + cols: session.cols, + rows: session.rows, + env: spawnEnv, + }), + ); - private killProcessWithEscalation( - process: PtyProcess, - threadId: string, - terminalId: string, - ): void { - this.clearKillEscalationTimer(process); - try { - process.kill("SIGTERM"); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - this.logger.warn("failed to kill terminal process", { - threadId, - terminalId, - signal: "SIGTERM", - error: message, - }); - return; - } + if (attempt._tag === "Success") { + return { + process: attempt.success, + shellLabel: formatShellCandidate(candidate), + }; + } - const timer = setTimeout(() => { - this.killEscalationTimers.delete(process); - try { - process.kill("SIGKILL"); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - this.logger.warn("failed to force-kill terminal process", { - threadId, - terminalId, - signal: "SIGKILL", - error: message, - }); + const spawnError = attempt.failure; + if (!isRetryableShellSpawnError(spawnError)) { + return yield* spawnError; } - }, this.processKillGraceMs); - timer.unref?.(); - this.killEscalationTimers.set(process, timer); - } - private evictInactiveSessionsIfNeeded(): void { - const inactiveSessions = [...this.sessions.values()].filter( - (session) => session.status !== "running", - ); - if (inactiveSessions.length <= this.maxRetainedInactiveSessions) { - return; - } + return yield* trySpawn(shellCandidates, spawnEnv, session, index + 1, spawnError); + }); - inactiveSessions.sort( - (left, right) => - left.updatedAt.localeCompare(right.updatedAt) || - left.threadId.localeCompare(right.threadId) || - left.terminalId.localeCompare(right.terminalId), - ); - const toEvict = inactiveSessions.length - this.maxRetainedInactiveSessions; - for (const session of inactiveSessions.slice(0, toEvict)) { - const key = toSessionKey(session.threadId, session.terminalId); - this.sessions.delete(key); - this.clearPersistTimer(session.threadId, session.terminalId); - this.pendingPersistHistory.delete(key); - this.persistQueues.delete(key); - this.clearKillEscalationTimer(session.process); - } - } + const startSession = Effect.fn("terminal.startSession")(function* ( + session: TerminalSessionState, + input: TerminalStartInput, + eventType: "started" | "restarted", + ) { + yield* stopProcess(session); - private queuePersist(threadId: string, terminalId: string, history: string): void { - const persistenceKey = toSessionKey(threadId, terminalId); - this.pendingPersistHistory.set(persistenceKey, history); - this.schedulePersist(threadId, terminalId); - } + yield* modifyManagerState((state) => { + session.status = "starting"; + session.cwd = input.cwd; + session.cols = input.cols; + session.rows = input.rows; + session.exitCode = null; + session.exitSignal = null; + session.hasRunningSubprocess = false; + session.updatedAt = new Date().toISOString(); + return [undefined, state] as const; + }); - private async persistHistory( - threadId: string, - terminalId: string, - history: string, - ): Promise { - const persistenceKey = toSessionKey(threadId, terminalId); - this.clearPersistTimer(threadId, terminalId); - this.pendingPersistHistory.delete(persistenceKey); - await this.enqueuePersistWrite(threadId, terminalId, history); - } + let ptyProcess: PtyProcess | null = null; + let startedShell: string | null = null; - private enqueuePersistWrite( - threadId: string, - terminalId: string, - history: string, - ): Promise { - const persistenceKey = toSessionKey(threadId, terminalId); - const task = async () => { - await fs.promises.writeFile(this.historyPath(threadId, terminalId), history, "utf8"); - }; - const previous = this.persistQueues.get(persistenceKey) ?? Promise.resolve(); - const next = previous - .catch(() => undefined) - .then(task) - .catch((error) => { - this.logger.warn("failed to persist terminal history", { - threadId, - terminalId, - error: error instanceof Error ? error.message : String(error), - }); - }); - this.persistQueues.set(persistenceKey, next); - const finalized = next.finally(() => { - if (this.persistQueues.get(persistenceKey) === next) { - this.persistQueues.delete(persistenceKey); - } - if ( - this.pendingPersistHistory.has(persistenceKey) && - !this.persistTimers.has(persistenceKey) - ) { - this.schedulePersist(threadId, terminalId); - } - }); - void finalized.catch(() => undefined); - return finalized; - } + const sessionKey = toSessionKey(session.threadId, session.terminalId); - private schedulePersist(threadId: string, terminalId: string): void { - const persistenceKey = toSessionKey(threadId, terminalId); - if (this.persistTimers.has(persistenceKey)) return; - const timer = setTimeout(() => { - this.persistTimers.delete(persistenceKey); - const pendingHistory = this.pendingPersistHistory.get(persistenceKey); - if (pendingHistory === undefined) return; - this.pendingPersistHistory.delete(persistenceKey); - void this.enqueuePersistWrite(threadId, terminalId, pendingHistory); - }, this.persistDebounceMs); - this.persistTimers.set(persistenceKey, timer); - } + const startResult = yield* Effect.result( + Effect.gen(function* () { + const shellCandidates = resolveShellCandidates(shellResolver); + const terminalEnv = createTerminalSpawnEnv(process.env, session.runtimeEnv); + const spawnResult = yield* trySpawn(shellCandidates, terminalEnv, session); + ptyProcess = spawnResult.process; + startedShell = spawnResult.shellLabel; - private clearPersistTimer(threadId: string, terminalId: string): void { - const persistenceKey = toSessionKey(threadId, terminalId); - const timer = this.persistTimers.get(persistenceKey); - if (!timer) return; - clearTimeout(timer); - this.persistTimers.delete(persistenceKey); - } + const processPid = ptyProcess.pid; + const unsubscribeData = ptyProcess.onData((data) => { + runFork(handleProcessData(sessionKey, processPid, data)); + }); + const unsubscribeExit = ptyProcess.onExit((event) => { + runFork(handleProcessExit(sessionKey, processPid, event)); + }); - private async readHistory(threadId: string, terminalId: string): Promise { - const nextPath = this.historyPath(threadId, terminalId); - try { - const raw = await fs.promises.readFile(nextPath, "utf8"); - const capped = capHistory(raw, this.historyLineLimit); - if (capped !== raw) { - await fs.promises.writeFile(nextPath, capped, "utf8"); - } - return capped; - } catch (error) { - if ((error as NodeJS.ErrnoException).code !== "ENOENT") { - throw error; - } - } + yield* modifyManagerState((state) => { + session.process = ptyProcess; + session.pid = processPid; + session.status = "running"; + session.updatedAt = new Date().toISOString(); + session.unsubscribeData = unsubscribeData; + session.unsubscribeExit = unsubscribeExit; + return [undefined, state] as const; + }); - if (terminalId !== DEFAULT_TERMINAL_ID) { - return ""; - } + yield* publishEvent({ + type: eventType, + threadId: session.threadId, + terminalId: session.terminalId, + createdAt: new Date().toISOString(), + snapshot: snapshot(session), + }); + }), + ); - const legacyPath = this.legacyHistoryPath(threadId); - try { - const raw = await fs.promises.readFile(legacyPath, "utf8"); - const capped = capHistory(raw, this.historyLineLimit); - - // Migrate legacy transcript filename to the terminal-scoped path. - await fs.promises.writeFile(nextPath, capped, "utf8"); - try { - await fs.promises.rm(legacyPath, { force: true }); - } catch (cleanupError) { - this.logger.warn("failed to remove legacy terminal history", { - threadId, - error: cleanupError instanceof Error ? cleanupError.message : String(cleanupError), - }); + if (startResult._tag === "Success") { + return; } - return capped; - } catch (error) { - if ((error as NodeJS.ErrnoException).code === "ENOENT") { - return ""; - } - throw error; - } - } + { + const error = startResult.failure; + if (ptyProcess) { + yield* startKillEscalation(ptyProcess, session.threadId, session.terminalId); + } - private async deleteHistory(threadId: string, terminalId: string): Promise { - const deletions = [fs.promises.rm(this.historyPath(threadId, terminalId), { force: true })]; - if (terminalId === DEFAULT_TERMINAL_ID) { - deletions.push(fs.promises.rm(this.legacyHistoryPath(threadId), { force: true })); - } - try { - await Promise.all(deletions); - } catch (error) { - this.logger.warn("failed to delete terminal history", { - threadId, - terminalId, - error: error instanceof Error ? error.message : String(error), - }); - } - } + yield* modifyManagerState((state) => { + session.status = "error"; + session.pid = null; + session.process = null; + session.unsubscribeData = null; + session.unsubscribeExit = null; + session.hasRunningSubprocess = false; + session.updatedAt = new Date().toISOString(); + return [undefined, state] as const; + }); - private async flushPersistQueue(threadId: string, terminalId: string): Promise { - const persistenceKey = toSessionKey(threadId, terminalId); - this.clearPersistTimer(threadId, terminalId); + yield* evictInactiveSessionsIfNeeded(); - while (true) { - const pendingHistory = this.pendingPersistHistory.get(persistenceKey); - if (pendingHistory !== undefined) { - this.pendingPersistHistory.delete(persistenceKey); - await this.enqueuePersistWrite(threadId, terminalId, pendingHistory); + const message = error.message; + yield* publishEvent({ + type: "error", + threadId: session.threadId, + terminalId: session.terminalId, + createdAt: new Date().toISOString(), + message, + }); + yield* Effect.logError("failed to start terminal", { + threadId: session.threadId, + terminalId: session.terminalId, + error: message, + ...(startedShell ? { shell: startedShell } : {}), + }); } + }); - const pending = this.persistQueues.get(persistenceKey); - if (!pending) { - return; + const closeSession = Effect.fn("terminal.closeSession")(function* ( + threadId: string, + terminalId: string, + deleteHistoryOnClose: boolean, + ) { + const key = toSessionKey(threadId, terminalId); + const session = yield* getSession(threadId, terminalId); + + if (Option.isSome(session)) { + yield* stopProcess(session.value); + yield* persistHistory(threadId, terminalId, session.value.history); } - await pending.catch(() => undefined); - } - } - private updateSubprocessPollingState(): void { - const hasRunningSessions = [...this.sessions.values()].some( - (session) => session.status === "running" && session.pid !== null, - ); - if (hasRunningSessions) { - this.ensureSubprocessPolling(); - return; - } - this.stopSubprocessPolling(); - } + yield* flushPersist(threadId, terminalId); - private ensureSubprocessPolling(): void { - if (this.subprocessPollTimer) return; - this.subprocessPollTimer = setInterval(() => { - void this.pollSubprocessActivity(); - }, this.subprocessPollIntervalMs); - this.subprocessPollTimer.unref?.(); - void this.pollSubprocessActivity(); - } + yield* modifyManagerState((state) => { + if (!state.sessions.has(key)) { + return [undefined, state] as const; + } + const sessions = new Map(state.sessions); + sessions.delete(key); + return [undefined, { ...state, sessions }] as const; + }); - private stopSubprocessPolling(): void { - if (!this.subprocessPollTimer) return; - clearInterval(this.subprocessPollTimer); - this.subprocessPollTimer = null; - } + if (deleteHistoryOnClose) { + yield* deleteHistory(threadId, terminalId); + } + }); - private async pollSubprocessActivity(): Promise { - if (this.subprocessPollInFlight) return; + const pollSubprocessActivity = Effect.fn("terminal.pollSubprocessActivity")(function* () { + const state = yield* readManagerState; + const runningSessions = [...state.sessions.values()].filter( + (session): session is TerminalSessionState & { pid: number } => + session.status === "running" && Number.isInteger(session.pid), + ); - const runningSessions = [...this.sessions.values()].filter( - (session): session is TerminalSessionState & { pid: number } => - session.status === "running" && Number.isInteger(session.pid), - ); - if (runningSessions.length === 0) { - this.stopSubprocessPolling(); - return; - } + if (runningSessions.length === 0) { + return; + } - this.subprocessPollInFlight = true; - try { - await Promise.all( - runningSessions.map(async (session) => { - const terminalPid = session.pid; - let hasRunningSubprocess = false; - try { - hasRunningSubprocess = await this.subprocessChecker(terminalPid); - } catch (error) { - this.logger.warn("failed to check terminal subprocess activity", { + const checkSubprocessActivity = Effect.fn("terminal.checkSubprocessActivity")(function* ( + session: TerminalSessionState & { pid: number }, + ) { + const terminalPid = session.pid; + const hasRunningSubprocess = yield* subprocessChecker(terminalPid).pipe( + Effect.map(Option.some), + Effect.catch((error) => + Effect.logWarning("failed to check terminal subprocess activity", { threadId: session.threadId, terminalId: session.terminalId, terminalPid, error: error instanceof Error ? error.message : String(error), - }); - return; - } + }).pipe(Effect.as(Option.none())), + ), + ); - const liveSession = this.sessions.get(toSessionKey(session.threadId, session.terminalId)); - if (!liveSession || liveSession.status !== "running" || liveSession.pid !== terminalPid) { - return; - } - if (liveSession.hasRunningSubprocess === hasRunningSubprocess) { - return; + if (Option.isNone(hasRunningSubprocess)) { + return; + } + + const event = yield* modifyManagerState((state) => { + const liveSession: Option.Option = Option.fromNullishOr( + state.sessions.get(toSessionKey(session.threadId, session.terminalId)), + ); + if ( + Option.isNone(liveSession) || + liveSession.value.status !== "running" || + liveSession.value.pid !== terminalPid || + liveSession.value.hasRunningSubprocess === hasRunningSubprocess.value + ) { + return [Option.none(), state] as const; } - liveSession.hasRunningSubprocess = hasRunningSubprocess; - liveSession.updatedAt = new Date().toISOString(); - this.emitEvent({ - type: "activity", - threadId: liveSession.threadId, - terminalId: liveSession.terminalId, - createdAt: new Date().toISOString(), - hasRunningSubprocess, - }); - }), - ); - } finally { - this.subprocessPollInFlight = false; - } - } + liveSession.value.hasRunningSubprocess = hasRunningSubprocess.value; + liveSession.value.updatedAt = new Date().toISOString(); + + return [ + Option.some({ + type: "activity" as const, + threadId: liveSession.value.threadId, + terminalId: liveSession.value.terminalId, + createdAt: new Date().toISOString(), + hasRunningSubprocess: hasRunningSubprocess.value, + }), + state, + ] as const; + }); - private async assertValidCwd(cwd: string): Promise { - let stats: fs.Stats; - try { - stats = await fs.promises.stat(cwd); - } catch (error) { - if ((error as NodeJS.ErrnoException).code === "ENOENT") { - throw new Error(`Terminal cwd does not exist: ${cwd}`, { cause: error }); - } - throw error; - } - if (!stats.isDirectory()) { - throw new Error(`Terminal cwd is not a directory: ${cwd}`); - } - } + if (Option.isSome(event)) { + yield* publishEvent(event.value); + } + }); - private async closeSession( - threadId: string, - terminalId: string, - deleteHistory: boolean, - ): Promise { - const key = toSessionKey(threadId, terminalId); - const session = this.sessions.get(key); - if (session) { - this.stopProcess(session); - this.sessions.delete(key); - } - this.updateSubprocessPollingState(); - await this.flushPersistQueue(threadId, terminalId); - if (deleteHistory) { - await this.deleteHistory(threadId, terminalId); - } - } + yield* Effect.forEach(runningSessions, checkSubprocessActivity, { + concurrency: "unbounded", + discard: true, + }); + }); - private sessionsForThread(threadId: string): TerminalSessionState[] { - return [...this.sessions.values()].filter((session) => session.threadId === threadId); - } + const hasRunningSessions = readManagerState.pipe( + Effect.map((state) => + [...state.sessions.values()].some((session) => session.status === "running"), + ), + ); - private async deleteAllHistoryForThread(threadId: string): Promise { - const threadPrefix = `${toSafeThreadId(threadId)}_`; - try { - const entries = await fs.promises.readdir(this.logsDir, { withFileTypes: true }); - const removals = entries - .filter((entry) => entry.isFile()) - .map((entry) => entry.name) - .filter( - (name) => - name === `${toSafeThreadId(threadId)}.log` || - name === `${legacySafeThreadId(threadId)}.log` || - name.startsWith(threadPrefix), - ) - .map((name) => fs.promises.rm(path.join(this.logsDir, name), { force: true })); - await Promise.all(removals); - } catch (error) { - this.logger.warn("failed to delete terminal histories for thread", { - threadId, - error: error instanceof Error ? error.message : String(error), - }); - } - } + yield* Effect.forever( + hasRunningSessions.pipe( + Effect.flatMap((active) => + active + ? pollSubprocessActivity().pipe( + Effect.flatMap(() => Effect.sleep(subprocessPollIntervalMs)), + ) + : Effect.sleep(subprocessPollIntervalMs), + ), + ), + ).pipe(Effect.forkIn(workerScope)); + + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + const sessions = yield* modifyManagerState( + (state) => + [ + [...state.sessions.values()], + { + ...state, + sessions: new Map(), + }, + ] as const, + ); - private requireSession(threadId: string, terminalId: string): TerminalSessionState { - const session = this.sessions.get(toSessionKey(threadId, terminalId)); - if (!session) { - throw new Error(`Unknown terminal thread: ${threadId}, terminal: ${terminalId}`); - } - return session; - } + const cleanupSession = Effect.fn("terminal.cleanupSession")(function* ( + session: TerminalSessionState, + ) { + cleanupProcessHandles(session); + if (!session.process) return; + yield* clearKillFiber(session.process); + yield* runKillEscalation(session.process, session.threadId, session.terminalId); + }); - private snapshot(session: TerminalSessionState): TerminalSessionSnapshot { - return { - threadId: session.threadId, - terminalId: session.terminalId, - cwd: session.cwd, - status: session.status, - pid: session.pid, - history: session.history, - exitCode: session.exitCode, - exitSignal: session.exitSignal, - updatedAt: session.updatedAt, - }; - } + yield* Effect.forEach(sessions, cleanupSession, { + concurrency: "unbounded", + discard: true, + }); + }).pipe(Effect.ignoreCause({ log: true })), + ); - private emitEvent(event: TerminalEvent): void { - this.emit("event", event); - } + const open: TerminalManagerShape["open"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + const terminalId = input.terminalId ?? DEFAULT_TERMINAL_ID; + yield* assertValidCwd(input.cwd); + + const sessionKey = toSessionKey(input.threadId, terminalId); + const existing = yield* getSession(input.threadId, terminalId); + if (Option.isNone(existing)) { + yield* flushPersist(input.threadId, terminalId); + const history = yield* readHistory(input.threadId, terminalId); + const cols = input.cols ?? DEFAULT_OPEN_COLS; + const rows = input.rows ?? DEFAULT_OPEN_ROWS; + const session: TerminalSessionState = { + threadId: input.threadId, + terminalId, + cwd: input.cwd, + status: "starting", + pid: null, + history, + pendingHistoryControlSequence: "", + exitCode: null, + exitSignal: null, + updatedAt: new Date().toISOString(), + cols, + rows, + process: null, + unsubscribeData: null, + unsubscribeExit: null, + hasRunningSubprocess: false, + runtimeEnv: normalizedRuntimeEnv(input.env), + }; + + const createdSession = session; + yield* modifyManagerState((state) => { + const sessions = new Map(state.sessions); + sessions.set(sessionKey, createdSession); + return [undefined, { ...state, sessions }] as const; + }); - private historyPath(threadId: string, terminalId: string): string { - const threadPart = toSafeThreadId(threadId); - if (terminalId === DEFAULT_TERMINAL_ID) { - return path.join(this.logsDir, `${threadPart}.log`); - } - return path.join(this.logsDir, `${threadPart}_${toSafeTerminalId(terminalId)}.log`); - } + yield* evictInactiveSessionsIfNeeded(); + yield* startSession( + session, + { + threadId: input.threadId, + terminalId, + cwd: input.cwd, + cols, + rows, + ...(input.env ? { env: input.env } : {}), + }, + "started", + ); + return snapshot(session); + } - private legacyHistoryPath(threadId: string): string { - return path.join(this.logsDir, `${legacySafeThreadId(threadId)}.log`); - } + const liveSession = existing.value; + const nextRuntimeEnv = normalizedRuntimeEnv(input.env); + const currentRuntimeEnv = liveSession.runtimeEnv; + const targetCols = input.cols ?? liveSession.cols; + const targetRows = input.rows ?? liveSession.rows; + const runtimeEnvChanged = !Equal.equals(currentRuntimeEnv, nextRuntimeEnv); + + if (liveSession.cwd !== input.cwd || runtimeEnvChanged) { + yield* stopProcess(liveSession); + liveSession.cwd = input.cwd; + liveSession.runtimeEnv = nextRuntimeEnv; + liveSession.history = ""; + liveSession.pendingHistoryControlSequence = ""; + yield* persistHistory( + liveSession.threadId, + liveSession.terminalId, + liveSession.history, + ); + } else if (liveSession.status === "exited" || liveSession.status === "error") { + liveSession.runtimeEnv = nextRuntimeEnv; + liveSession.history = ""; + liveSession.pendingHistoryControlSequence = ""; + yield* persistHistory( + liveSession.threadId, + liveSession.terminalId, + liveSession.history, + ); + } + + if (!liveSession.process) { + yield* startSession( + liveSession, + { + threadId: input.threadId, + terminalId, + cwd: input.cwd, + cols: targetCols, + rows: targetRows, + ...(input.env ? { env: input.env } : {}), + }, + "started", + ); + return snapshot(liveSession); + } + + if (liveSession.cols !== targetCols || liveSession.rows !== targetRows) { + liveSession.cols = targetCols; + liveSession.rows = targetRows; + liveSession.updatedAt = new Date().toISOString(); + liveSession.process.resize(targetCols, targetRows); + } + + return snapshot(liveSession); + }), + ); - private async runWithThreadLock(threadId: string, task: () => Promise): Promise { - const previous = this.threadLocks.get(threadId) ?? Promise.resolve(); - let release!: () => void; - const current = new Promise((resolve) => { - release = resolve; + const write: TerminalManagerShape["write"] = Effect.fn("terminal.write")(function* (input) { + const terminalId = input.terminalId ?? DEFAULT_TERMINAL_ID; + const session = yield* requireSession(input.threadId, terminalId); + const process = session.process; + if (!process || session.status !== "running") { + if (session.status === "exited") return; + return yield* new TerminalNotRunningError({ + threadId: input.threadId, + terminalId, + }); + } + yield* Effect.sync(() => process.write(input.data)); }); - this.threadLocks.set(threadId, current); - await previous.catch(() => undefined); - try { - return await task(); - } finally { - release(); - if (this.threadLocks.get(threadId) === current) { - this.threadLocks.delete(threadId); + + const resize: TerminalManagerShape["resize"] = Effect.fn("terminal.resize")(function* (input) { + const terminalId = input.terminalId ?? DEFAULT_TERMINAL_ID; + const session = yield* requireSession(input.threadId, terminalId); + const process = session.process; + if (!process || session.status !== "running") { + return yield* new TerminalNotRunningError({ + threadId: input.threadId, + terminalId, + }); } - } - } -} + session.cols = input.cols; + session.rows = input.rows; + session.updatedAt = new Date().toISOString(); + yield* Effect.sync(() => process.resize(input.cols, input.rows)); + }); -export const TerminalManagerLive = Layer.effect( - TerminalManager, - Effect.gen(function* () { - const { terminalLogsDir } = yield* ServerConfig; + const clear: TerminalManagerShape["clear"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + const terminalId = input.terminalId ?? DEFAULT_TERMINAL_ID; + const session = yield* requireSession(input.threadId, terminalId); + session.history = ""; + session.pendingHistoryControlSequence = ""; + session.updatedAt = new Date().toISOString(); + yield* persistHistory(input.threadId, terminalId, session.history); + yield* publishEvent({ + type: "cleared", + threadId: input.threadId, + terminalId, + createdAt: new Date().toISOString(), + }); + }), + ); - const ptyAdapter = yield* PtyAdapter; - const runtime = yield* Effect.acquireRelease( - Effect.sync(() => new TerminalManagerRuntime({ logsDir: terminalLogsDir, ptyAdapter })), - (r) => Effect.sync(() => r.dispose()), - ); + const restart: TerminalManagerShape["restart"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + const terminalId = input.terminalId ?? DEFAULT_TERMINAL_ID; + yield* assertValidCwd(input.cwd); + + const sessionKey = toSessionKey(input.threadId, terminalId); + const existingSession = yield* getSession(input.threadId, terminalId); + let session: TerminalSessionState; + if (Option.isNone(existingSession)) { + const cols = input.cols ?? DEFAULT_OPEN_COLS; + const rows = input.rows ?? DEFAULT_OPEN_ROWS; + session = { + threadId: input.threadId, + terminalId, + cwd: input.cwd, + status: "starting", + pid: null, + history: "", + pendingHistoryControlSequence: "", + exitCode: null, + exitSignal: null, + updatedAt: new Date().toISOString(), + cols, + rows, + process: null, + unsubscribeData: null, + unsubscribeExit: null, + hasRunningSubprocess: false, + runtimeEnv: normalizedRuntimeEnv(input.env), + }; + const createdSession = session; + yield* modifyManagerState((state) => { + const sessions = new Map(state.sessions); + sessions.set(sessionKey, createdSession); + return [undefined, { ...state, sessions }] as const; + }); + yield* evictInactiveSessionsIfNeeded(); + } else { + session = existingSession.value; + yield* stopProcess(session); + session.cwd = input.cwd; + session.runtimeEnv = normalizedRuntimeEnv(input.env); + } - return { - open: (input) => - Effect.tryPromise({ - try: () => runtime.open(input), - catch: (cause) => new TerminalError({ message: "Failed to open terminal", cause }), + const cols = input.cols ?? session.cols; + const rows = input.rows ?? session.rows; + + session.history = ""; + session.pendingHistoryControlSequence = ""; + yield* persistHistory(input.threadId, terminalId, session.history); + yield* startSession( + session, + { + threadId: input.threadId, + terminalId, + cwd: input.cwd, + cols, + rows, + ...(input.env ? { env: input.env } : {}), + }, + "restarted", + ); + return snapshot(session); }), - write: (input) => - Effect.tryPromise({ - try: () => runtime.write(input), - catch: (cause) => new TerminalError({ message: "Failed to write to terminal", cause }), - }), - resize: (input) => - Effect.tryPromise({ - try: () => runtime.resize(input), - catch: (cause) => new TerminalError({ message: "Failed to resize terminal", cause }), - }), - clear: (input) => - Effect.tryPromise({ - try: () => runtime.clear(input), - catch: (cause) => new TerminalError({ message: "Failed to clear terminal", cause }), - }), - restart: (input) => - Effect.tryPromise({ - try: () => runtime.restart(input), - catch: (cause) => new TerminalError({ message: "Failed to restart terminal", cause }), - }), - close: (input) => - Effect.tryPromise({ - try: () => runtime.close(input), - catch: (cause) => new TerminalError({ message: "Failed to close terminal", cause }), - }), - subscribe: (listener) => - Effect.sync(() => { - runtime.on("event", listener); - return () => { - runtime.off("event", listener); - }; + ); + + const close: TerminalManagerShape["close"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + if (input.terminalId) { + yield* closeSession(input.threadId, input.terminalId, input.deleteHistory === true); + return; + } + + const threadSessions = yield* sessionsForThread(input.threadId); + yield* Effect.forEach( + threadSessions, + (session) => closeSession(input.threadId, session.terminalId, false), + { discard: true }, + ); + + if (input.deleteHistory) { + yield* deleteAllHistoryForThread(input.threadId); + } }), - dispose: Effect.sync(() => runtime.dispose()), + ); + + return { + open, + write, + resize, + clear, + restart, + close, + get streamEvents(): TerminalManagerShape["streamEvents"] { + return Stream.fromPubSub(terminalEvents); + }, } satisfies TerminalManagerShape; - }), + }, ); + +export const TerminalManagerLive = Layer.effect(TerminalManager, makeTerminalManager()); diff --git a/apps/server/src/terminal/Services/Manager.ts b/apps/server/src/terminal/Services/Manager.ts index c2539da4b6..222dbe06c6 100644 --- a/apps/server/src/terminal/Services/Manager.ts +++ b/apps/server/src/terminal/Services/Manager.ts @@ -14,47 +14,81 @@ import { TerminalResizeInput, TerminalRestartInput, TerminalSessionSnapshot, - TerminalSessionStatus, TerminalWriteInput, } from "@t3tools/contracts"; -import { PtyProcess } from "./PTY"; -import { Effect, Schema, ServiceMap } from "effect"; +import { Effect, Schema, ServiceMap, Stream } from "effect"; -export class TerminalError extends Schema.TaggedErrorClass()("TerminalError", { - message: Schema.String, - cause: Schema.optional(Schema.Defect), -}) {} +export class TerminalCwdError extends Schema.TaggedErrorClass()( + "TerminalCwdError", + { + cwd: Schema.String, + reason: Schema.Literals(["notFound", "notDirectory", "statFailed"]), + cause: Schema.optional(Schema.Defect), + }, +) { + override get message() { + if (this.reason === "notDirectory") { + return `Terminal cwd is not a directory: ${this.cwd}`; + } + if (this.reason === "notFound") { + return `Terminal cwd does not exist: ${this.cwd}`; + } + const causeMessage = + this.cause && typeof this.cause === "object" && "message" in this.cause + ? this.cause.message + : undefined; + return causeMessage + ? `Failed to access terminal cwd: ${this.cwd} (${causeMessage})` + : `Failed to access terminal cwd: ${this.cwd}`; + } +} -export interface TerminalSessionState { - threadId: string; - terminalId: string; - cwd: string; - status: TerminalSessionStatus; - pid: number | null; - history: string; - pendingHistoryControlSequence: string; - exitCode: number | null; - exitSignal: number | null; - updatedAt: string; - cols: number; - rows: number; - process: PtyProcess | null; - unsubscribeData: (() => void) | null; - unsubscribeExit: (() => void) | null; - hasRunningSubprocess: boolean; - runtimeEnv: Record | null; +export class TerminalHistoryError extends Schema.TaggedErrorClass()( + "TerminalHistoryError", + { + operation: Schema.Literals(["read", "truncate", "migrate"]), + threadId: Schema.String, + terminalId: Schema.String, + cause: Schema.optional(Schema.Defect), + }, +) { + override get message() { + return `Failed to ${this.operation} terminal history for thread: ${this.threadId}, terminal: ${this.terminalId}`; + } } -export interface ShellCandidate { - shell: string; - args?: string[]; +export class TerminalSessionLookupError extends Schema.TaggedErrorClass()( + "TerminalSessionLookupError", + { + threadId: Schema.String, + terminalId: Schema.String, + }, +) { + override get message() { + return `Unknown terminal thread: ${this.threadId}, terminal: ${this.terminalId}`; + } } -export interface TerminalStartInput extends TerminalOpenInput { - cols: number; - rows: number; +export class TerminalNotRunningError extends Schema.TaggedErrorClass()( + "TerminalNotRunningError", + { + threadId: Schema.String, + terminalId: Schema.String, + }, +) { + override get message() { + return `Terminal is not running for thread: ${this.threadId}, terminal: ${this.terminalId}`; + } } +export const TerminalError = Schema.Union([ + TerminalCwdError, + TerminalHistoryError, + TerminalSessionLookupError, + TerminalNotRunningError, +]); +export type TerminalError = typeof TerminalError.Type; + /** * TerminalManagerShape - Service API for terminal session lifecycle operations. */ @@ -101,14 +135,9 @@ export interface TerminalManagerShape { readonly close: (input: TerminalCloseInput) => Effect.Effect; /** - * Subscribe to terminal runtime events. - */ - readonly subscribe: (listener: (event: TerminalEvent) => void) => Effect.Effect<() => void>; - - /** - * Dispose all managed terminal resources. + * Stream terminal runtime events. */ - readonly dispose: Effect.Effect; + readonly streamEvents: Stream.Stream; } /** diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index 826b9ad6fd..4a181a8118 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -90,20 +90,19 @@ const defaultServerSettings = DEFAULT_SERVER_SETTINGS; class MockTerminalManager implements TerminalManagerShape { private readonly sessions = new Map(); - private readonly listeners = new Set<(event: TerminalEvent) => void>(); + private readonly eventPubSub = Effect.runSync(PubSub.unbounded()); + private activeSubscriptions = 0; private key(threadId: string, terminalId: string): string { return `${threadId}\u0000${terminalId}`; } emitEvent(event: TerminalEvent): void { - for (const listener of this.listeners) { - listener(event); - } + Effect.runSync(PubSub.publish(this.eventPubSub, event)); } subscriptionCount(): number { - return this.listeners.size; + return this.activeSubscriptions; } readonly open: TerminalManagerShape["open"] = (input: TerminalOpenInput) => @@ -208,15 +207,20 @@ class MockTerminalManager implements TerminalManagerShape { } }); - readonly subscribe: TerminalManagerShape["subscribe"] = (listener) => - Effect.sync(() => { - this.listeners.add(listener); - return () => { - this.listeners.delete(listener); - }; - }); - - readonly dispose: TerminalManagerShape["dispose"] = Effect.void; + get streamEvents(): TerminalManagerShape["streamEvents"] { + return Stream.unwrap( + Effect.acquireRelease( + Effect.sync(() => { + this.activeSubscriptions += 1; + return Stream.fromPubSub(this.eventPubSub); + }), + () => + Effect.sync(() => { + this.activeSubscriptions -= 1; + }), + ), + ).pipe(Stream.scoped); + } } // --------------------------------------------------------------------------- @@ -1454,19 +1458,25 @@ describe("WebSocket Server", () => { expect(push.channel).toBe(WS_CHANNELS.terminalEvent); }); - it("detaches terminal event listener on stop for injected manager", async () => { + it("shuts down cleanly for injected terminal managers", async () => { const terminalManager = new MockTerminalManager(); server = await createTestServer({ cwd: "/test", terminalManager, }); - expect(terminalManager.subscriptionCount()).toBe(1); - await closeTestServer(); server = null; - expect(terminalManager.subscriptionCount()).toBe(0); + expect(() => + terminalManager.emitEvent({ + type: "output", + threadId: "thread-1", + terminalId: DEFAULT_TERMINAL_ID, + createdAt: new Date().toISOString(), + data: "after shutdown\n", + }), + ).not.toThrow(); }); it("returns validation errors for invalid terminal open params", async () => { diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index c04d913d52..580662c37b 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -718,10 +718,9 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< ); } - const unsubscribeTerminalEvents = yield* terminalManager.subscribe((event) => - runPromise(pushBus.publishAll(WS_CHANNELS.terminalEvent, event)), - ); - yield* Effect.addFinalizer(() => Effect.sync(() => unsubscribeTerminalEvents())); + yield* Stream.runForEach(terminalManager.streamEvents, (event) => + pushBus.publishAll(WS_CHANNELS.terminalEvent, event), + ).pipe(Effect.forkIn(subscriptionsScope)); yield* readiness.markTerminalSubscriptionsReady; yield* NodeHttpServer.make(() => httpServer, listenOptions).pipe( diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 5585e7f309..e8b1c46274 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -130,12 +130,12 @@ export interface NativeApi { confirm: (message: string) => Promise; }; terminal: { - open: (input: TerminalOpenInput) => Promise; - write: (input: TerminalWriteInput) => Promise; - resize: (input: TerminalResizeInput) => Promise; - clear: (input: TerminalClearInput) => Promise; - restart: (input: TerminalRestartInput) => Promise; - close: (input: TerminalCloseInput) => Promise; + open: (input: typeof TerminalOpenInput.Encoded) => Promise; + write: (input: typeof TerminalWriteInput.Encoded) => Promise; + resize: (input: typeof TerminalResizeInput.Encoded) => Promise; + clear: (input: typeof TerminalClearInput.Encoded) => Promise; + restart: (input: typeof TerminalRestartInput.Encoded) => Promise; + close: (input: typeof TerminalCloseInput.Encoded) => Promise; onEvent: (callback: (event: TerminalEvent) => void) => () => void; }; projects: { diff --git a/packages/contracts/src/terminal.ts b/packages/contracts/src/terminal.ts index b0493d95c2..f9729da66f 100644 --- a/packages/contracts/src/terminal.ts +++ b/packages/contracts/src/terminal.ts @@ -26,7 +26,7 @@ const TerminalIdWithDefaultSchema = TerminalIdSchema.pipe( export const TerminalThreadInput = Schema.Struct({ threadId: TrimmedNonEmptyStringSchema, }); -export type TerminalThreadInput = Schema.Codec.Encoded; +export type TerminalThreadInput = typeof TerminalThreadInput.Type; const TerminalSessionInput = Schema.Struct({ ...TerminalThreadInput.fields, @@ -73,7 +73,7 @@ export const TerminalCloseInput = Schema.Struct({ terminalId: Schema.optional(TerminalIdSchema), deleteHistory: Schema.optional(Schema.Boolean), }); -export type TerminalCloseInput = Schema.Codec.Encoded; +export type TerminalCloseInput = typeof TerminalCloseInput.Type; export const TerminalSessionStatus = Schema.Literals(["starting", "running", "exited", "error"]); export type TerminalSessionStatus = typeof TerminalSessionStatus.Type; diff --git a/packages/shared/package.json b/packages/shared/package.json index 40ffbf35c2..b35d23ef15 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -28,6 +28,10 @@ "types": "./src/DrainableWorker.ts", "import": "./src/DrainableWorker.ts" }, + "./KeyedCoalescingWorker": { + "types": "./src/KeyedCoalescingWorker.ts", + "import": "./src/KeyedCoalescingWorker.ts" + }, "./schemaJson": { "types": "./src/schemaJson.ts", "import": "./src/schemaJson.ts" diff --git a/packages/shared/src/KeyedCoalescingWorker.test.ts b/packages/shared/src/KeyedCoalescingWorker.test.ts new file mode 100644 index 0000000000..2226bbd003 --- /dev/null +++ b/packages/shared/src/KeyedCoalescingWorker.test.ts @@ -0,0 +1,96 @@ +import { it } from "@effect/vitest"; +import { describe, expect } from "vitest"; +import { Deferred, Effect } from "effect"; + +import { makeKeyedCoalescingWorker } from "./KeyedCoalescingWorker"; + +describe("makeKeyedCoalescingWorker", () => { + it.live("waits for latest work enqueued during active processing before draining the key", () => + Effect.scoped( + Effect.gen(function* () { + const processed: string[] = []; + const firstStarted = yield* Deferred.make(); + const releaseFirst = yield* Deferred.make(); + const secondStarted = yield* Deferred.make(); + const releaseSecond = yield* Deferred.make(); + + const worker = yield* makeKeyedCoalescingWorker({ + merge: (_current, next) => next, + process: (key, value) => + Effect.gen(function* () { + processed.push(`${key}:${value}`); + + if (value === "first") { + yield* Deferred.succeed(firstStarted, undefined).pipe(Effect.orDie); + yield* Deferred.await(releaseFirst); + } + + if (value === "second") { + yield* Deferred.succeed(secondStarted, undefined).pipe(Effect.orDie); + yield* Deferred.await(releaseSecond); + } + }), + }); + + yield* worker.enqueue("terminal-1", "first"); + yield* Deferred.await(firstStarted); + + const drained = yield* Deferred.make(); + yield* Effect.forkChild( + worker + .drainKey("terminal-1") + .pipe(Effect.tap(() => Deferred.succeed(drained, undefined).pipe(Effect.orDie))), + ); + + yield* worker.enqueue("terminal-1", "second"); + yield* Deferred.succeed(releaseFirst, undefined); + yield* Deferred.await(secondStarted); + + expect(yield* Deferred.isDone(drained)).toBe(false); + + yield* Deferred.succeed(releaseSecond, undefined); + yield* Deferred.await(drained); + + expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]); + }), + ), + ); + + it.live("requeues pending work for a key after a processor failure and keeps draining", () => + Effect.scoped( + Effect.gen(function* () { + const processed: string[] = []; + const firstStarted = yield* Deferred.make(); + const releaseFailure = yield* Deferred.make(); + const secondProcessed = yield* Deferred.make(); + + const worker = yield* makeKeyedCoalescingWorker({ + merge: (_current, next) => next, + process: (key, value) => + Effect.gen(function* () { + processed.push(`${key}:${value}`); + + if (value === "first") { + yield* Deferred.succeed(firstStarted, undefined).pipe(Effect.orDie); + yield* Deferred.await(releaseFailure); + yield* Effect.fail("boom"); + } + + if (value === "second") { + yield* Deferred.succeed(secondProcessed, undefined).pipe(Effect.orDie); + } + }), + }); + + yield* worker.enqueue("terminal-1", "first"); + yield* Deferred.await(firstStarted); + yield* worker.enqueue("terminal-1", "second"); + yield* Deferred.succeed(releaseFailure, undefined); + yield* Deferred.await(secondProcessed); + yield* worker.drainKey("terminal-1"); + + expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]); + }), + ), + ); +}); diff --git a/packages/shared/src/KeyedCoalescingWorker.ts b/packages/shared/src/KeyedCoalescingWorker.ts new file mode 100644 index 0000000000..567c1dac17 --- /dev/null +++ b/packages/shared/src/KeyedCoalescingWorker.ts @@ -0,0 +1,140 @@ +/** + * KeyedCoalescingWorker - A keyed worker that keeps only the latest value per key. + * + * Enqueues for an active or already-queued key are merged atomically instead of + * creating duplicate queued items. `drainKey()` resolves only when that key has + * no queued, pending, or active work left. + * + * @module KeyedCoalescingWorker + */ +import type { Scope } from "effect"; +import { Effect, TxQueue, TxRef } from "effect"; + +export interface KeyedCoalescingWorker { + readonly enqueue: (key: K, value: V) => Effect.Effect; + readonly drainKey: (key: K) => Effect.Effect; +} + +interface KeyedCoalescingWorkerState { + readonly latestByKey: Map; + readonly queuedKeys: Set; + readonly activeKeys: Set; +} + +export const makeKeyedCoalescingWorker = (options: { + readonly merge: (current: V, next: V) => V; + readonly process: (key: K, value: V) => Effect.Effect; +}): Effect.Effect, never, Scope.Scope | R> => + Effect.gen(function* () { + const queue = yield* Effect.acquireRelease(TxQueue.unbounded(), TxQueue.shutdown); + const stateRef = yield* TxRef.make>({ + latestByKey: new Map(), + queuedKeys: new Set(), + activeKeys: new Set(), + }); + + const processKey = (key: K, value: V): Effect.Effect => + options.process(key, value).pipe( + Effect.flatMap(() => + TxRef.modify(stateRef, (state) => { + const nextValue = state.latestByKey.get(key); + if (nextValue === undefined) { + const activeKeys = new Set(state.activeKeys); + activeKeys.delete(key); + return [null, { ...state, activeKeys }] as const; + } + + const latestByKey = new Map(state.latestByKey); + latestByKey.delete(key); + return [nextValue, { ...state, latestByKey }] as const; + }).pipe(Effect.tx), + ), + Effect.flatMap((nextValue) => + nextValue === null ? Effect.void : processKey(key, nextValue), + ), + ); + + const cleanupFailedKey = (key: K): Effect.Effect => + TxRef.modify(stateRef, (state) => { + const activeKeys = new Set(state.activeKeys); + activeKeys.delete(key); + + if (state.latestByKey.has(key) && !state.queuedKeys.has(key)) { + const queuedKeys = new Set(state.queuedKeys); + queuedKeys.add(key); + return [true, { ...state, activeKeys, queuedKeys }] as const; + } + + return [false, { ...state, activeKeys }] as const; + }).pipe( + Effect.tx, + Effect.flatMap((shouldRequeue) => + shouldRequeue ? TxQueue.offer(queue, key) : Effect.void, + ), + ); + + yield* TxQueue.take(queue).pipe( + Effect.flatMap((key) => + TxRef.modify(stateRef, (state) => { + const queuedKeys = new Set(state.queuedKeys); + queuedKeys.delete(key); + + const value = state.latestByKey.get(key); + if (value === undefined) { + return [null, { ...state, queuedKeys }] as const; + } + + const latestByKey = new Map(state.latestByKey); + latestByKey.delete(key); + const activeKeys = new Set(state.activeKeys); + activeKeys.add(key); + + return [ + { key, value } as const, + { ...state, latestByKey, queuedKeys, activeKeys }, + ] as const; + }).pipe(Effect.tx), + ), + Effect.flatMap((item) => + item === null + ? Effect.void + : processKey(item.key, item.value).pipe( + Effect.catchCause(() => cleanupFailedKey(item.key)), + ), + ), + Effect.forever, + Effect.forkScoped, + ); + + const enqueue: KeyedCoalescingWorker["enqueue"] = (key, value) => + TxRef.modify(stateRef, (state) => { + const latestByKey = new Map(state.latestByKey); + const existing = latestByKey.get(key); + latestByKey.set(key, existing === undefined ? value : options.merge(existing, value)); + + if (state.queuedKeys.has(key) || state.activeKeys.has(key)) { + return [false, { ...state, latestByKey }] as const; + } + + const queuedKeys = new Set(state.queuedKeys); + queuedKeys.add(key); + return [true, { ...state, latestByKey, queuedKeys }] as const; + }).pipe( + Effect.flatMap((shouldOffer) => (shouldOffer ? TxQueue.offer(queue, key) : Effect.void)), + Effect.tx, + Effect.asVoid, + ); + + const drainKey: KeyedCoalescingWorker["drainKey"] = (key) => + TxRef.get(stateRef).pipe( + Effect.tap((state) => + state.latestByKey.has(key) || state.queuedKeys.has(key) || state.activeKeys.has(key) + ? Effect.txRetry + : Effect.void, + ), + Effect.asVoid, + Effect.tx, + ); + + return { enqueue, drainKey } satisfies KeyedCoalescingWorker; + });