Refactor terminal manager onto Effect runtime#1525
Refactor terminal manager onto Effect runtime#1525juliusmarminge wants to merge 4 commits intomainfrom
Conversation
- Move terminal lifecycle and PTY callbacks onto Effect-managed layers - Update contracts and WebSocket paths for terminalId-aware operations - Expand tests for streaming, shutdown, and session retention behavior
- Run kill escalation inline during shutdown cleanup - Treat subprocess checks as optional and preserve state updates - Add coverage for SIGKILL escalation after grace period
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Autofix Details
Bugbot Autofix prepared fixes for all 3 issues found in the latest run.
- ✅ Fixed: Default test helper path ignores historyLineLimit parameter
- Eliminated the dual-path approach in makeManager so it always uses makeTerminalManagerWithOptions with the provided historyLineLimit parameter instead of a fast path via TerminalManagerLive that silently defaulted to 5000.
- ✅ Fixed: Falsy zero-value options prevent custom runtime creation
- Replaced truthy checks with !== undefined checks for all numeric options (subprocessPollIntervalMs, processKillGraceMs, maxRetainedInactiveSessions) so explicit zero values are correctly propagated.
- ✅ Fixed: Eviction test wait condition is always true
- Changed the wait condition from the tautological ptyAdapter.processes.length === 2 to waiting for 2 'exited' events in the event stream, ensuring exit handlers and eviction complete before assertions.
Or push these changes by commenting:
@cursor push 3b9d72eea8
Preview (3b9d72eea8)
diff --git a/apps/server/src/terminal/Layers/Manager.test.ts b/apps/server/src/terminal/Layers/Manager.test.ts
--- a/apps/server/src/terminal/Layers/Manager.test.ts
+++ b/apps/server/src/terminal/Layers/Manager.test.ts
@@ -12,17 +12,15 @@
import { Effect, Encoding, Exit, Layer, ManagedRuntime, Ref, Scope, Stream } from "effect";
import { afterEach, describe, expect, it } from "vitest";
-import { ServerConfig } from "../../config";
import { TerminalManager } from "../Services/Manager";
import {
- PtyAdapter,
type PtyAdapterShape,
type PtyExitEvent,
type PtyProcess,
type PtySpawnInput,
PtySpawnError,
} from "../Services/PTY";
-import { makeTerminalManagerWithOptions, TerminalManagerLive } from "./Manager";
+import { makeTerminalManagerWithOptions } from "./Manager";
class FakePtyProcess implements PtyProcess {
readonly writes: string[] = [];
@@ -194,13 +192,29 @@
const logsDir = path.join(baseDir, "userdata", "logs", "terminals");
const ptyAdapter = options.ptyAdapter ?? new FakePtyAdapter();
- const terminalLayer = TerminalManagerLive.pipe(
- Layer.provideMerge(Layer.succeed(PtyAdapter, ptyAdapter)),
- Layer.provideMerge(ServerConfig.layerTest(process.cwd(), baseDir)),
- Layer.provideMerge(NodeServices.layer),
- );
+ const layer = Layer.effect(
+ TerminalManager,
+ makeTerminalManagerWithOptions({
+ logsDir,
+ historyLineLimit,
+ ptyAdapter,
+ ...(options.shellResolver !== undefined ? { shellResolver: options.shellResolver } : {}),
+ ...(options.subprocessChecker !== undefined
+ ? { subprocessChecker: options.subprocessChecker }
+ : {}),
+ ...(options.subprocessPollIntervalMs !== undefined
+ ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
+ : {}),
+ ...(options.processKillGraceMs !== undefined
+ ? { processKillGraceMs: options.processKillGraceMs }
+ : {}),
+ ...(options.maxRetainedInactiveSessions !== undefined
+ ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
+ : {}),
+ }),
+ ).pipe(Layer.provideMerge(NodeServices.layer));
- const runtime = ManagedRuntime.make(terminalLayer);
+ const runtime = ManagedRuntime.make(layer);
const manager = await runtime.runPromise(Effect.service(TerminalManager));
const eventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
const eventScope = await Effect.runPromise(Scope.make("sequential"));
@@ -210,60 +224,6 @@
).pipe(Effect.forkIn(eventScope)),
);
- if (
- historyLineLimit !== 5 ||
- options.shellResolver ||
- options.subprocessChecker ||
- options.subprocessPollIntervalMs ||
- options.processKillGraceMs ||
- options.maxRetainedInactiveSessions
- ) {
- await runtime.dispose();
-
- const customLayer = Layer.effect(
- TerminalManager,
- makeTerminalManagerWithOptions({
- logsDir,
- historyLineLimit,
- ptyAdapter,
- ...(options.shellResolver ? { shellResolver: options.shellResolver } : {}),
- ...(options.subprocessChecker ? { subprocessChecker: options.subprocessChecker } : {}),
- ...(options.subprocessPollIntervalMs
- ? { subprocessPollIntervalMs: options.subprocessPollIntervalMs }
- : {}),
- ...(options.processKillGraceMs ? { processKillGraceMs: options.processKillGraceMs } : {}),
- ...(options.maxRetainedInactiveSessions
- ? { maxRetainedInactiveSessions: options.maxRetainedInactiveSessions }
- : {}),
- }),
- ).pipe(Layer.provideMerge(NodeServices.layer));
-
- const customRuntime = ManagedRuntime.make(customLayer);
- const customManager = await customRuntime.runPromise(Effect.service(TerminalManager));
- const customEventsRef = await Effect.runPromise(Ref.make<TerminalEvent[]>([]));
- const customEventScope = await Effect.runPromise(Scope.make("sequential"));
- await customRuntime.runPromise(
- Stream.runForEach(customManager.streamEvents, (event) =>
- Ref.update(customEventsRef, (events) => [...events, event]),
- ).pipe(Effect.forkIn(customEventScope)),
- );
-
- return {
- baseDir,
- logsDir,
- ptyAdapter,
- runtime: customRuntime,
- manager: customManager,
- eventsRef: customEventsRef,
- run: <A, E>(effect: Effect.Effect<A, E>) => customRuntime.runPromise(effect),
- getEvents: () => Effect.runPromise(Ref.get(customEventsRef)),
- dispose: async () => {
- await Effect.runPromise(Scope.close(customEventScope, Exit.void));
- await customRuntime.dispose();
- },
- };
- }
-
return {
baseDir,
logsDir,
@@ -636,7 +596,7 @@
});
it("evicts oldest inactive terminal sessions when retention limit is exceeded", async () => {
- const { manager, ptyAdapter, run, logsDir } = await createManager(5, {
+ const { manager, ptyAdapter, run, logsDir, getEvents } = await createManager(5, {
maxRetainedInactiveSessions: 1,
});
@@ -656,7 +616,10 @@
await new Promise((resolve) => setTimeout(resolve, 5));
second.emitExit({ exitCode: 0, signal: 0 });
- await waitFor(() => ptyAdapter.processes.length === 2);
+ await waitFor(async () => {
+ const events = await getEvents();
+ return events.filter((e) => e.type === "exited").length === 2;
+ });
const reopenedSecond = await run(manager.open(openInput({ threadId: "thread-2" })));
const reopenedFirst = await run(manager.open(openInput({ threadId: "thread-1" })));- Treat omitted terminal IDs as the default session ID in manager operations - Simplify terminal manager tests around retained session eviction - Update terminal contract input types to use encoded schema types
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Autofix Details
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Persist worker exits without processing newly arrived data
- Changed the cleanup block to return a boolean indicating whether new pendingHistory arrived during the race window, and continue the loop instead of returning when it did, ensuring the worker processes newly queued data.
Or push these changes by commenting:
@cursor push 414a2e4f49
Preview (414a2e4f49)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -837,15 +837,21 @@
});
if (!startState) {
- yield* modifyManagerState((state) => {
+ const hasPending = yield* modifyManagerState((state) => {
const existing = state.persistStates.get(sessionKey);
- if (!existing || existing.pendingHistory !== null) {
- return [undefined, state] as const;
+ if (!existing) {
+ return [false, state] as const;
}
+ if (existing.pendingHistory !== null) {
+ return [true, state] as const;
+ }
const persistStates = new Map(state.persistStates);
persistStates.delete(sessionKey);
- return [undefined, { ...state, persistStates }] as const;
+ return [false, { ...state, persistStates }] as const;
});
+ if (hasPending) {
+ continue;
+ }
return;
}- Add shared CoalescingDrainableWorker utility and tests - Simplify terminal history persistence to drain per key - Persist session history on close before removing session state
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Worker error leaves key permanently stuck in activeKeys
- Added Effect.onError handler around processKey to remove the key from activeKeys when the process effect fails, preventing drainKey/drain from hanging forever.
- ✅ Fixed: Subprocess polling runs forever without active sessions
- Added a hasRunningSessions gate and early-return in pollSubprocessActivity so subprocess checker invocations are skipped entirely when no running sessions exist.
Or push these changes by commenting:
@cursor push 92c0e406e4
Preview (92c0e406e4)
diff --git a/apps/server/src/terminal/Layers/Manager.ts b/apps/server/src/terminal/Layers/Manager.ts
--- a/apps/server/src/terminal/Layers/Manager.ts
+++ b/apps/server/src/terminal/Layers/Manager.ts
@@ -1377,6 +1377,10 @@
session.status === "running" && Number.isInteger(session.pid),
);
+ if (runningSessions.length === 0) {
+ return;
+ }
+
yield* Effect.forEach(
runningSessions,
(session) =>
@@ -1434,8 +1438,20 @@
);
});
+ const hasRunningSessions = readManagerState.pipe(
+ Effect.map((state) => [...state.sessions.values()].some((s) => s.status === "running")),
+ );
+
yield* Effect.forever(
- Effect.sleep(subprocessPollIntervalMs).pipe(Effect.flatMap(() => pollSubprocessActivity())),
+ hasRunningSessions.pipe(
+ Effect.flatMap((active) =>
+ active
+ ? pollSubprocessActivity().pipe(
+ Effect.flatMap(() => Effect.sleep(subprocessPollIntervalMs)),
+ )
+ : Effect.sleep(subprocessPollIntervalMs),
+ ),
+ ),
).pipe(Effect.forkIn(workerScope));
yield* Effect.addFinalizer(() =>
diff --git a/packages/shared/src/CoalescingDrainableWorker.ts b/packages/shared/src/CoalescingDrainableWorker.ts
--- a/packages/shared/src/CoalescingDrainableWorker.ts
+++ b/packages/shared/src/CoalescingDrainableWorker.ts
@@ -77,7 +77,17 @@
] as const;
}).pipe(Effect.tx),
),
- Effect.flatMap((item) => (item === null ? Effect.void : processKey(item.key, item.value))),
+ Effect.flatMap((item) =>
+ item === null
+ ? Effect.void
+ : Effect.onError(processKey(item.key, item.value), () =>
+ TxRef.update(stateRef, (state) => {
+ const activeKeys = new Set(state.activeKeys);
+ activeKeys.delete(item.key);
+ return { ...state, activeKeys };
+ }),
+ ),
+ ),
Effect.forever,
Effect.forkScoped,
);| Effect.flatMap((nextValue) => | ||
| nextValue === null ? Effect.void : processKey(key, nextValue), | ||
| ), | ||
| ); |
There was a problem hiding this comment.
Worker error leaves key permanently stuck in activeKeys
Medium Severity
If options.process fails with an error in processKey, the Effect.flatMap that removes the key from activeKeys never executes. The key stays permanently in activeKeys, causing drainKey and drain to hang forever (they retry until activeKeys is empty). The worker fiber also crashes, stopping all subsequent processing. The current terminal manager usage avoids this by using E = never, but any future consumer with a fallible process function would hit this.
Additional Locations (1)
| if (this.subprocessPollInFlight) return; | ||
| yield* Effect.forever( | ||
| Effect.sleep(subprocessPollIntervalMs).pipe(Effect.flatMap(() => pollSubprocessActivity())), | ||
| ).pipe(Effect.forkIn(workerScope)); |
There was a problem hiding this comment.
Subprocess polling runs forever without active sessions
Low Severity
The subprocess polling loop runs via Effect.forever regardless of whether any terminal sessions exist. The old implementation dynamically started and stopped the polling interval based on running sessions. Now the poll wakes up every subprocessPollIntervalMs (default 1 second) even when no sessions are active, performing a state read and filter on every tick for the entire lifetime of the server.



Summary
SIGKILLafter the grace period.Testing
apps/server/src/terminal/Layers/Manager.test.tsfor lifecycle, persistence, subprocess activity, and shutdown behavior.apps/server/src/wsServer.test.tsfor websocket protocol changes.Note
High Risk
Large refactor of terminal lifecycle, persistence, and process-kill behavior plus a breaking
TerminalManagerShapeAPI change (callbacks/dispose tostreamEvents), which can impact terminal reliability and websocket event delivery.Overview
Refactors the server
TerminalManagerfrom anEventEmitter-backed class to an Effect-native implementation (PubSub+Stream,SynchronizedRefstate, semaphores for per-thread locking, scoped fibers/finalizers), exposed viamakeTerminalManagerWithOptions.Reworks terminal history persistence to use a new shared
makeCoalescingDrainableWorker(coalescing per-session writes + explicit drain on close/restart), adds structured errors for cwd/history/session/not-running and subprocess checks, and ensures shutdown escalation (SIGTERMthenSIGKILL) runs under scope cleanup.Updates consumers/tests to the new
streamEventsAPI:wsServernow forwards terminal events by forkingStream.runForEach, terminal and websocket tests are rewritten aroundManagedRuntime/scopes, and contracts are tightened to pass encoded terminal IPC inputs and adjust a couple of terminal input type exports.Written by Cursor Bugbot for commit 0a77ebc. This will update automatically on new commits. Configure here.
Note
Refactor terminal manager onto Effect runtime with Stream-based event API
EventEmitter-basedTerminalManagerRuntimeclass with a purely effectful implementation inManager.ts, exposingstreamEventsas aStream<TerminalEvent>and all methods as typedEffectoperations.TerminalCwdError,TerminalHistoryError,TerminalSessionLookupError,TerminalNotRunningError) replacing the previous genericTerminalError.CoalescingDrainableWorkerprimitive that coalesces writes per session key and supports per-key draining.setInterval-based subprocess polling andsetTimeout-based kill escalation with continuously running Effect fibers managed in a dedicatedScope.wsServer.tsto consumeterminalManager.streamEventsviaStream.runForEachinstead of using the removedsubscribe/disposecallback API.TerminalManagerShapeinterface is a breaking change — consumers must switch fromsubscribe(listener)+dispose()to consuming aStream<TerminalEvent>.Macroscope summarized 0a77ebc.