Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
493 changes: 256 additions & 237 deletions apps/server/src/terminal/Layers/Manager.test.ts

Large diffs are not rendered by default.

2,094 changes: 1,212 additions & 882 deletions apps/server/src/terminal/Layers/Manager.ts

Large diffs are not rendered by default.

95 changes: 57 additions & 38 deletions apps/server/src/terminal/Services/Manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,71 @@ import {
TerminalResizeInput,
TerminalRestartInput,
TerminalSessionSnapshot,
TerminalSessionStatus,
TerminalWriteInput,
} from "@t3tools/contracts";
import { PtyProcess } from "./PTY";
import { Effect, Schema, ServiceMap } from "effect";
import { Effect, Schema, ServiceMap, Stream } from "effect";

export class TerminalError extends Schema.TaggedErrorClass<TerminalError>()("TerminalError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
export class TerminalCwdError extends Schema.TaggedErrorClass<TerminalCwdError>()(
"TerminalCwdError",
{
cwd: Schema.String,
reason: Schema.Literals(["notFound", "notDirectory"]),
cause: Schema.optional(Schema.Defect),
},
) {
override get message() {
return this.reason === "notDirectory"
? `Terminal cwd is not a directory: ${this.cwd}`
: `Terminal cwd does not exist: ${this.cwd}`;
}
}

export interface TerminalSessionState {
threadId: string;
terminalId: string;
cwd: string;
status: TerminalSessionStatus;
pid: number | null;
history: string;
pendingHistoryControlSequence: string;
exitCode: number | null;
exitSignal: number | null;
updatedAt: string;
cols: number;
rows: number;
process: PtyProcess | null;
unsubscribeData: (() => void) | null;
unsubscribeExit: (() => void) | null;
hasRunningSubprocess: boolean;
runtimeEnv: Record<string, string> | null;
export class TerminalHistoryError extends Schema.TaggedErrorClass<TerminalHistoryError>()(
"TerminalHistoryError",
{
operation: Schema.Literals(["read", "truncate", "migrate"]),
threadId: Schema.String,
terminalId: Schema.String,
cause: Schema.optional(Schema.Defect),
},
) {
override get message() {
return `Failed to ${this.operation} terminal history for thread: ${this.threadId}, terminal: ${this.terminalId}`;
}
}

export interface ShellCandidate {
shell: string;
args?: string[];
export class TerminalSessionLookupError extends Schema.TaggedErrorClass<TerminalSessionLookupError>()(
"TerminalSessionLookupError",
{
threadId: Schema.String,
terminalId: Schema.String,
},
) {
override get message() {
return `Unknown terminal thread: ${this.threadId}, terminal: ${this.terminalId}`;
}
}

export interface TerminalStartInput extends TerminalOpenInput {
cols: number;
rows: number;
export class TerminalNotRunningError extends Schema.TaggedErrorClass<TerminalNotRunningError>()(
"TerminalNotRunningError",
{
threadId: Schema.String,
terminalId: Schema.String,
},
) {
override get message() {
return `Terminal is not running for thread: ${this.threadId}, terminal: ${this.terminalId}`;
}
}

export const TerminalError = Schema.Union([
TerminalCwdError,
TerminalHistoryError,
TerminalSessionLookupError,
TerminalNotRunningError,
]);
export type TerminalError = typeof TerminalError.Type;

/**
* TerminalManagerShape - Service API for terminal session lifecycle operations.
*/
Expand Down Expand Up @@ -101,14 +125,9 @@ export interface TerminalManagerShape {
readonly close: (input: TerminalCloseInput) => Effect.Effect<void, TerminalError>;

/**
* Subscribe to terminal runtime events.
*/
readonly subscribe: (listener: (event: TerminalEvent) => void) => Effect.Effect<() => void>;

/**
* Dispose all managed terminal resources.
* Stream terminal runtime events.
*/
readonly dispose: Effect.Effect<void>;
readonly streamEvents: Stream.Stream<TerminalEvent>;
}

/**
Expand Down
46 changes: 28 additions & 18 deletions apps/server/src/wsServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,19 @@ const defaultServerSettings = DEFAULT_SERVER_SETTINGS;

class MockTerminalManager implements TerminalManagerShape {
private readonly sessions = new Map<string, TerminalSessionSnapshot>();
private readonly listeners = new Set<(event: TerminalEvent) => void>();
private readonly eventPubSub = Effect.runSync(PubSub.unbounded<TerminalEvent>());
private activeSubscriptions = 0;

private key(threadId: string, terminalId: string): string {
return `${threadId}\u0000${terminalId}`;
}

emitEvent(event: TerminalEvent): void {
for (const listener of this.listeners) {
listener(event);
}
Effect.runSync(PubSub.publish(this.eventPubSub, event));
}

subscriptionCount(): number {
return this.listeners.size;
return this.activeSubscriptions;
}

readonly open: TerminalManagerShape["open"] = (input: TerminalOpenInput) =>
Expand Down Expand Up @@ -208,15 +207,20 @@ class MockTerminalManager implements TerminalManagerShape {
}
});

readonly subscribe: TerminalManagerShape["subscribe"] = (listener) =>
Effect.sync(() => {
this.listeners.add(listener);
return () => {
this.listeners.delete(listener);
};
});

readonly dispose: TerminalManagerShape["dispose"] = Effect.void;
get streamEvents(): TerminalManagerShape["streamEvents"] {
return Stream.unwrap(
Effect.acquireRelease(
Effect.sync(() => {
this.activeSubscriptions += 1;
return Stream.fromPubSub(this.eventPubSub);
}),
() =>
Effect.sync(() => {
this.activeSubscriptions -= 1;
}),
),
).pipe(Stream.scoped);
}
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -1454,19 +1458,25 @@ describe("WebSocket Server", () => {
expect(push.channel).toBe(WS_CHANNELS.terminalEvent);
});

it("detaches terminal event listener on stop for injected manager", async () => {
it("shuts down cleanly for injected terminal managers", async () => {
const terminalManager = new MockTerminalManager();
server = await createTestServer({
cwd: "/test",
terminalManager,
});

expect(terminalManager.subscriptionCount()).toBe(1);

await closeTestServer();
server = null;

expect(terminalManager.subscriptionCount()).toBe(0);
expect(() =>
terminalManager.emitEvent({
type: "output",
threadId: "thread-1",
terminalId: DEFAULT_TERMINAL_ID,
createdAt: new Date().toISOString(),
data: "after shutdown\n",
}),
).not.toThrow();
});

it("returns validation errors for invalid terminal open params", async () => {
Expand Down
7 changes: 3 additions & 4 deletions apps/server/src/wsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -718,10 +718,9 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return<
);
}

const unsubscribeTerminalEvents = yield* terminalManager.subscribe((event) =>
runPromise(pushBus.publishAll(WS_CHANNELS.terminalEvent, event)),
);
yield* Effect.addFinalizer(() => Effect.sync(() => unsubscribeTerminalEvents()));
yield* Stream.runForEach(terminalManager.streamEvents, (event) =>
pushBus.publishAll(WS_CHANNELS.terminalEvent, event),
).pipe(Effect.forkIn(subscriptionsScope));
yield* readiness.markTerminalSubscriptionsReady;

yield* NodeHttpServer.make(() => httpServer, listenOptions).pipe(
Expand Down
12 changes: 6 additions & 6 deletions packages/contracts/src/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ export interface NativeApi {
confirm: (message: string) => Promise<boolean>;
};
terminal: {
open: (input: TerminalOpenInput) => Promise<TerminalSessionSnapshot>;
write: (input: TerminalWriteInput) => Promise<void>;
resize: (input: TerminalResizeInput) => Promise<void>;
clear: (input: TerminalClearInput) => Promise<void>;
restart: (input: TerminalRestartInput) => Promise<TerminalSessionSnapshot>;
close: (input: TerminalCloseInput) => Promise<void>;
open: (input: typeof TerminalOpenInput.Encoded) => Promise<TerminalSessionSnapshot>;
write: (input: typeof TerminalWriteInput.Encoded) => Promise<void>;
resize: (input: typeof TerminalResizeInput.Encoded) => Promise<void>;
clear: (input: typeof TerminalClearInput.Encoded) => Promise<void>;
restart: (input: typeof TerminalRestartInput.Encoded) => Promise<TerminalSessionSnapshot>;
close: (input: typeof TerminalCloseInput.Encoded) => Promise<void>;
onEvent: (callback: (event: TerminalEvent) => void) => () => void;
};
projects: {
Expand Down
4 changes: 2 additions & 2 deletions packages/contracts/src/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const TerminalIdWithDefaultSchema = TerminalIdSchema.pipe(
export const TerminalThreadInput = Schema.Struct({
threadId: TrimmedNonEmptyStringSchema,
});
export type TerminalThreadInput = Schema.Codec.Encoded<typeof TerminalThreadInput>;
export type TerminalThreadInput = typeof TerminalThreadInput.Type;

const TerminalSessionInput = Schema.Struct({
...TerminalThreadInput.fields,
Expand Down Expand Up @@ -73,7 +73,7 @@ export const TerminalCloseInput = Schema.Struct({
terminalId: Schema.optional(TerminalIdSchema),
deleteHistory: Schema.optional(Schema.Boolean),
});
export type TerminalCloseInput = Schema.Codec.Encoded<typeof TerminalCloseInput>;
export type TerminalCloseInput = typeof TerminalCloseInput.Type;

export const TerminalSessionStatus = Schema.Literals(["starting", "running", "exited", "error"]);
export type TerminalSessionStatus = typeof TerminalSessionStatus.Type;
Expand Down
4 changes: 4 additions & 0 deletions packages/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
"types": "./src/DrainableWorker.ts",
"import": "./src/DrainableWorker.ts"
},
"./CoalescingDrainableWorker": {
"types": "./src/CoalescingDrainableWorker.ts",
"import": "./src/CoalescingDrainableWorker.ts"
},
"./schemaJson": {
"types": "./src/schemaJson.ts",
"import": "./src/schemaJson.ts"
Expand Down
58 changes: 58 additions & 0 deletions packages/shared/src/CoalescingDrainableWorker.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { it } from "@effect/vitest";
import { describe, expect } from "vitest";
import { Deferred, Effect } from "effect";

import { makeCoalescingDrainableWorker } from "./CoalescingDrainableWorker";

describe("makeCoalescingDrainableWorker", () => {
it.live("waits for latest work enqueued during active processing before draining the key", () =>
Effect.scoped(
Effect.gen(function* () {
const processed: string[] = [];
const firstStarted = yield* Deferred.make<void>();
const releaseFirst = yield* Deferred.make<void>();
const secondStarted = yield* Deferred.make<void>();
const releaseSecond = yield* Deferred.make<void>();

const worker = yield* makeCoalescingDrainableWorker<string, string, never, never>({
merge: (_current, next) => next,
process: (key, value) =>
Effect.gen(function* () {
processed.push(`${key}:${value}`);

if (value === "first") {
yield* Deferred.succeed(firstStarted, undefined).pipe(Effect.orDie);
yield* Deferred.await(releaseFirst);
}

if (value === "second") {
yield* Deferred.succeed(secondStarted, undefined).pipe(Effect.orDie);
yield* Deferred.await(releaseSecond);
}
}),
});

yield* worker.enqueue("terminal-1", "first");
yield* Deferred.await(firstStarted);

const drained = yield* Deferred.make<void>();
yield* Effect.forkChild(
worker
.drainKey("terminal-1")
.pipe(Effect.tap(() => Deferred.succeed(drained, undefined).pipe(Effect.orDie))),
);

yield* worker.enqueue("terminal-1", "second");
yield* Deferred.succeed(releaseFirst, undefined);
yield* Deferred.await(secondStarted);

expect(yield* Deferred.isDone(drained)).toBe(false);

yield* Deferred.succeed(releaseSecond, undefined);
yield* Deferred.await(drained);

expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]);
}),
),
);
});
Loading
Loading