From f6e2a82254c88f1f27b847b5a3cb8b8314d4adbc Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Wed, 22 Apr 2026 15:50:08 -0400 Subject: [PATCH 1/6] add tests --- apps/tui/README.md | 2 +- packages/daemon/src/__tests__/daemon.test.ts | 50 ++-- packages/daemon/src/__tests__/env.test.ts | 4 +- .../daemon/src/__tests__/integration.test.ts | 78 +++++- .../daemon/src/__tests__/protocol.test.ts | 11 +- packages/daemon/src/__tests__/store.test.ts | 261 ++++++++++++------ 6 files changed, 273 insertions(+), 133 deletions(-) diff --git a/apps/tui/README.md b/apps/tui/README.md index d8da285..ff4b944 100644 --- a/apps/tui/README.md +++ b/apps/tui/README.md @@ -16,7 +16,7 @@ bun run src/cli.ts daemon health bun run src/cli.ts daemon stop ``` -The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.json`. +The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.sqlite` (SQLite). ## Run the TUI shell diff --git a/packages/daemon/src/__tests__/daemon.test.ts b/packages/daemon/src/__tests__/daemon.test.ts index 535e30a..2fcbad6 100644 --- a/packages/daemon/src/__tests__/daemon.test.ts +++ b/packages/daemon/src/__tests__/daemon.test.ts @@ -47,7 +47,7 @@ describe("Daemon", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-test-")); - store = new StateStore(join(tmpDir, "state.json")); + store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(40); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); @@ -266,32 +266,28 @@ describe("Daemon", () => { }); test("requeues running jobs after restart", async () => { - await store.save({ - instances: [ - { - id: "instance-1", - name: "One", - directory: "/tmp/project-one", - status: "running", - maxConcurrency: 1, - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - }, - ], - jobs: [ - { - id: "job-1", - instanceId: "instance-1", - session: { type: "new" }, - task: { type: "prompt", prompt: "recover" }, - state: "running", - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - }, - ], + // Shut down the beforeEach daemon so we can simulate a crashed state + // by writing directly to the SQLite file. + await daemon.shutdown(); + + const seedStore = new StateStore(join(tmpDir, "state.sqlite")); + await seedStore.open(); + const instance = seedStore.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + seedStore.setInstanceStatus(instance.id, "running"); + const job = seedStore.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "recover" }, }); + seedStore.markJobRunning(job.id); + seedStore.close(); - const nextDaemon = new Daemon(store, { + const nextStore = new StateStore(join(tmpDir, "state.sqlite")); + const nextDaemon = new Daemon(nextStore, { registry: new FakeOpencodeRegistry(10), }); await nextDaemon.bootstrap(); @@ -299,7 +295,7 @@ describe("Daemon", () => { req({ id: "job-get", method: "job.get", - params: { jobId: "job-1" }, + params: { jobId: job.id }, }), ); expect(["queued", "running", "succeeded"]).toContain( @@ -317,7 +313,7 @@ describe("Daemon streaming", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-stream-")); - store = new StateStore(join(tmpDir, "state.json")); + store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(40); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); diff --git a/packages/daemon/src/__tests__/env.test.ts b/packages/daemon/src/__tests__/env.test.ts index fcd3606..ab8be19 100644 --- a/packages/daemon/src/__tests__/env.test.ts +++ b/packages/daemon/src/__tests__/env.test.ts @@ -17,7 +17,7 @@ describe("env", () => { expect(env).toEqual({ ralphHome, socketPath: join(ralphHome, "ralphd.sock"), - statePath: join(ralphHome, "state.json"), + databasePath: join(ralphHome, "state.sqlite"), }); }); @@ -29,7 +29,7 @@ describe("env", () => { ).toEqual({ ralphHome: "/tmp/ralph", socketPath: "/tmp/ralph/ralphd.sock", - statePath: "/tmp/ralph/state.json", + databasePath: "/tmp/ralph/state.sqlite", }); expect( diff --git a/packages/daemon/src/__tests__/integration.test.ts b/packages/daemon/src/__tests__/integration.test.ts index 4002eb1..d7440e2 100644 --- a/packages/daemon/src/__tests__/integration.test.ts +++ b/packages/daemon/src/__tests__/integration.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { chmod, mkdtemp, rm } from "node:fs/promises"; +import { chmod, mkdtemp, rm, unlink } from "node:fs/promises"; import { createServer, type Server } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -10,7 +10,43 @@ import { createConnectionHandler, Daemon } from "../server"; import { StateStore } from "../store"; import { FakeOpencodeRegistry } from "./helpers"; -describe("Integration: server + client over Unix socket", () => { +/** False in restricted environments (e.g. some sandboxes) where binding a Unix socket returns EPERM. */ +async function canBindUnixSocket(): Promise { + let probeDir: string | undefined; + try { + probeDir = await mkdtemp(join(tmpdir(), "ralph-unix-probe-")); + const path = join(probeDir, "p.sock"); + const srv = createServer(); + await new Promise((resolve, reject) => { + const onError = (err: NodeJS.ErrnoException) => reject(err); + srv.once("error", onError); + try { + srv.listen(path, () => { + srv.removeListener("error", onError); + srv.close((closeErr) => { + if (closeErr) reject(closeErr); + else resolve(); + }); + }); + } catch (err) { + reject(err); + } + }); + return true; + } catch { + return false; + } finally { + if (probeDir) { + await rm(probeDir, { recursive: true, force: true }).catch(() => {}); + } + } +} + +const unixSocketAvailable = await canBindUnixSocket(); + +const integrationDescribe = unixSocketAvailable ? describe : describe.skip; + +integrationDescribe("Integration: server + client over Unix socket", () => { let tmpDir: string; let testSocketPath: string; let server: Server; @@ -21,7 +57,12 @@ describe("Integration: server + client over Unix socket", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-integration-")); testSocketPath = join(tmpDir, "test.sock"); - const store = new StateStore(join(tmpDir, "state.json")); + try { + await unlink(testSocketPath); + } catch { + // ignore + } + const store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(20); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); @@ -29,16 +70,31 @@ describe("Integration: server + client over Unix socket", () => { server = createServer(createConnectionHandler(daemon)); client = new DaemonClient(testSocketPath); - await new Promise((resolve) => { - server.listen(testSocketPath, async () => { - await chmod(testSocketPath, 0o600); - resolve(); - }); + await new Promise((resolve, reject) => { + const onError = (err: NodeJS.ErrnoException) => { + reject(err); + }; + server.once("error", onError); + try { + server.listen(testSocketPath, async () => { + try { + await chmod(testSocketPath, 0o600); + server.removeListener("error", onError); + resolve(); + } catch (err) { + server.removeListener("error", onError); + reject(err); + } + }); + } catch (err) { + server.removeListener("error", onError); + reject(err); + } }); }); afterEach(async () => { - server.close(); + server?.close(); await daemon.shutdown(); await rm(tmpDir, { recursive: true, force: true }); }); @@ -119,7 +175,6 @@ describe("Integration: server + client over Unix socket", () => { }); test("client.streamJob returns immediately for an already-terminal job", async () => { - // No streaming deltas — fake completes quickly with default delay. const created = await client.createInstance({ name: "fast-instance", directory: "/tmp/project-fast", @@ -130,7 +185,6 @@ describe("Integration: server + client over Unix socket", () => { task: { type: "prompt", prompt: "fast" }, }); - // Wait for the job to finish. await Bun.sleep(100); const events: JobStreamEvent[] = []; @@ -138,8 +192,6 @@ describe("Integration: server + client over Unix socket", () => { events.push(event); } - // Terminal jobs short-circuit: just a done event, no snapshot or - // deltas. expect(events).toHaveLength(1); expect(events[0]?.type).toBe("done"); }); diff --git a/packages/daemon/src/__tests__/protocol.test.ts b/packages/daemon/src/__tests__/protocol.test.ts index ced81db..80e8386 100644 --- a/packages/daemon/src/__tests__/protocol.test.ts +++ b/packages/daemon/src/__tests__/protocol.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test"; -import { DaemonState, RequestMessage, ResponseMessage } from "../protocol"; +import { RequestMessage, ResponseMessage } from "../protocol"; describe("protocol schemas", () => { test("parses a valid submit request", () => { @@ -62,13 +62,4 @@ describe("protocol schemas", () => { expect(parsed.success).toBe(false); }); - - test("parses daemon state", () => { - const parsed = DaemonState.safeParse({ - instances: [], - jobs: [], - }); - - expect(parsed.success).toBe(true); - }); }); diff --git a/packages/daemon/src/__tests__/store.test.ts b/packages/daemon/src/__tests__/store.test.ts index ad7d72a..d5006a8 100644 --- a/packages/daemon/src/__tests__/store.test.ts +++ b/packages/daemon/src/__tests__/store.test.ts @@ -1,113 +1,214 @@ +import { Database } from "bun:sqlite"; import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdtemp, readFile, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import type { DaemonJob, DaemonState, ManagedInstance } from "../protocol"; import { StateStore, StoreError } from "../store"; -function makeInstance( - overrides: Partial = {}, -): ManagedInstance { - return { - id: overrides.id ?? "instance-1", - name: overrides.name ?? "Instance One", - directory: overrides.directory ?? "/tmp/project-one", - status: overrides.status ?? "stopped", - maxConcurrency: overrides.maxConcurrency ?? 1, - createdAt: overrides.createdAt ?? "2026-01-01T00:00:00.000Z", - updatedAt: overrides.updatedAt ?? "2026-01-01T00:00:00.000Z", - ...overrides, - }; -} - -function makeJob(overrides: Partial = {}): DaemonJob { - return { - id: overrides.id ?? "job-1", - instanceId: overrides.instanceId ?? "instance-1", - session: overrides.session ?? { type: "new" }, - task: overrides.task ?? { type: "prompt", prompt: "test prompt" }, - state: overrides.state ?? "queued", - createdAt: overrides.createdAt ?? "2026-01-01T00:00:00.000Z", - updatedAt: overrides.updatedAt ?? "2026-01-01T00:00:00.000Z", - ...overrides, - }; -} - describe("StateStore", () => { let tmpDir: string; - let statePath: string; + let databasePath: string; let store: StateStore; beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-test-")); - statePath = join(tmpDir, "state.json"); - store = new StateStore(statePath); + databasePath = join(tmpDir, "state.sqlite"); + store = new StateStore(databasePath); + await store.open(); }); afterEach(async () => { + store.close(); await rm(tmpDir, { recursive: true, force: true }); }); - test("returns empty state when file does not exist", async () => { - const state = await store.load(); - expect(state).toEqual({ instances: [], jobs: [] }); + test("starts empty on a fresh database", () => { + expect(store.listInstances()).toEqual([]); + expect(store.listJobs()).toEqual([]); }); - test("writes state to disk as formatted JSON", async () => { - const state: DaemonState = { - instances: [makeInstance()], - jobs: [makeJob()], - }; - await store.save(state); - - const raw = await readFile(statePath, "utf8"); - expect(raw).toEndWith("\n"); - const parsed = JSON.parse(raw); - expect(parsed.instances).toHaveLength(1); - expect(parsed.jobs).toHaveLength(1); + test("writes a SQLite file on open", async () => { + const raw = await readFile(databasePath, "utf8"); + expect(raw.slice(0, 15)).toBe("SQLite format 3"); }); - test("adds and updates instances", () => { - let state: DaemonState = { instances: [], jobs: [] }; - state = store.createInstance(state, makeInstance()); - expect(state.instances).toHaveLength(1); - state = store.upsertInstance( - state, - makeInstance({ - status: "running", - updatedAt: "2026-01-02T00:00:00.000Z", - }), - ); - expect(state.instances[0]?.status).toBe("running"); + test("creates instances with a generated id and stopped status", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 2, + }); + expect(instance.id).toBeTruthy(); + expect(instance.status).toBe("stopped"); + expect(instance.maxConcurrency).toBe(2); }); test("rejects duplicate instance directories", () => { - const state: DaemonState = { - instances: [makeInstance()], - jobs: [], - }; + store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); expect(() => - store.createInstance( - state, - makeInstance({ id: "instance-2", directory: "/tmp/project-one" }), - ), + store.createInstance({ + name: "Two", + directory: "/tmp/project-one", + maxConcurrency: 1, + }), ).toThrow(StoreError); }); - test("filters jobs by instance and state", () => { - const state: DaemonState = { - instances: [ - makeInstance(), - makeInstance({ id: "instance-2", directory: "/tmp/project-two" }), - ], - jobs: [ - makeJob(), - makeJob({ id: "job-2", instanceId: "instance-2" }), - makeJob({ id: "job-3", state: "running" }), - ], + test("setInstanceStatus updates status and lastError", () => { + const created = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const running = store.setInstanceStatus(created.id, "running"); + expect(running.status).toBe("running"); + expect(running.lastError).toBeUndefined(); + + const failed = store.setInstanceStatus(created.id, "error", "boom"); + expect(failed.status).toBe("error"); + expect(failed.lastError).toBe("boom"); + }); + + test("createJob creates a queued job linked to a session row", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hello" }, + }); + expect(job.state).toBe("queued"); + expect(job.sessionId).toBeUndefined(); + expect(job.task).toEqual({ type: "prompt", prompt: "hello" }); + }); + + test("deduplicates sessions for two jobs sharing the same remote session", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + store.createJob({ + instanceId: instance.id, + session: { type: "existing", sessionId: "remote-sess-1" }, + task: { type: "prompt", prompt: "a" }, + }); + store.createJob({ + instanceId: instance.id, + session: { type: "existing", sessionId: "remote-sess-1" }, + task: { type: "prompt", prompt: "b" }, + }); + + const db = new Database(databasePath, { readonly: true }); + const row = db.query("SELECT COUNT(*) AS c FROM sessions").get() as { + c: number; }; - expect(store.listJobs(state, { instanceId: "instance-1" })).toHaveLength(2); - expect(store.listJobs(state, { state: "running" })).toHaveLength(1); + db.close(); + expect(row.c).toBe(1); + }); + + test("markJobRunning and markJobTerminal drive state transitions", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + + const running = store.markJobRunning(job.id); + expect(running.state).toBe("running"); + expect(running.startedAt).toBeTruthy(); + + const done = store.markJobTerminal(job.id, "succeeded", { + outputText: "result", + messageId: "msg-1", + }); + expect(done.state).toBe("succeeded"); + expect(done.outputText).toBe("result"); + expect(done.messageId).toBe("msg-1"); + expect(done.endedAt).toBeTruthy(); + }); + + test("appendJobOutput concatenates deltas", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + store.appendJobOutput(job.id, "foo "); + store.appendJobOutput(job.id, "bar"); + expect(store.assertJob(job.id).outputText).toBe("foo bar"); + }); + + test("listJobs filters by instance and state", () => { + const a = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const b = store.createInstance({ + name: "B", + directory: "/tmp/b", + maxConcurrency: 1, + }); + store.createJob({ + instanceId: a.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "1" }, + }); + const j2 = store.createJob({ + instanceId: a.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "2" }, + }); + store.createJob({ + instanceId: b.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "3" }, + }); + store.markJobRunning(j2.id); + + expect(store.listJobs({ instanceId: a.id })).toHaveLength(2); + expect(store.listJobs({ state: "running" })).toHaveLength(1); + expect(store.listJobs({ instanceId: a.id, state: "queued" })).toHaveLength( + 1, + ); + }); + + test("recoverForBootstrap requeues running jobs and resets instances", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + store.setInstanceStatus(instance.id, "running"); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + store.markJobRunning(job.id); + + const requeued = store.recoverForBootstrap(); + expect(requeued).toEqual([{ id: job.id, instanceId: instance.id }]); + expect(store.assertJob(job.id).state).toBe("queued"); + expect(store.assertInstance(instance.id).status).toBe("stopped"); }); }); From 5e68eb6901399ffde3f4499fe73a74af41de69b2 Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Wed, 22 Apr 2026 15:54:44 -0400 Subject: [PATCH 2/6] add db schemas --- packages/daemon/src/db.ts | 104 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 packages/daemon/src/db.ts diff --git a/packages/daemon/src/db.ts b/packages/daemon/src/db.ts new file mode 100644 index 0000000..1ea29b9 --- /dev/null +++ b/packages/daemon/src/db.ts @@ -0,0 +1,104 @@ +import { Database } from "bun:sqlite"; + +export const CURRENT_SCHEMA_VERSION = 1; + +function getUserVersion(db: Database): number { + const row = db.query("PRAGMA user_version").get() as { + user_version: number; + }; + return row?.user_version ?? 0; +} + +function setUserVersion(db: Database, version: number): void { + db.run(`PRAGMA user_version = ${version}`); +} + +/** + * Opens the daemon SQLite database, applies PRAGMAs, and runs migrations. + */ +export function openDaemonDatabase(databasePath: string): Database { + const db = new Database(databasePath, { create: true }); + db.run("PRAGMA foreign_keys = ON;"); + db.run("PRAGMA journal_mode = WAL;"); + + const from = getUserVersion(db); + if (from === CURRENT_SCHEMA_VERSION) { + return db; + } + + if (from > CURRENT_SCHEMA_VERSION) { + db.close(); + throw new Error( + `daemon database schema v${from} is newer than supported v${CURRENT_SCHEMA_VERSION}`, + ); + } + + db.transaction(() => { + if (from < 1) { + db.run(` + CREATE TABLE instances ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + directory TEXT NOT NULL UNIQUE, + status TEXT NOT NULL CHECK(status IN ('stopped','starting','running','error')), + max_concurrency INTEGER NOT NULL CHECK(max_concurrency > 0), + last_error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + `); + + db.run(` + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + instance_id TEXT NOT NULL REFERENCES instances(id) ON DELETE CASCADE, + remote_session_id TEXT, + kind TEXT NOT NULL CHECK(kind IN ('new','existing')), + title TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + `); + + db.run(` + CREATE UNIQUE INDEX idx_sessions_instance_remote + ON sessions(instance_id, remote_session_id) + WHERE remote_session_id IS NOT NULL; + `); + + db.run(` + CREATE TABLE jobs ( + id TEXT PRIMARY KEY, + instance_id TEXT NOT NULL REFERENCES instances(id) ON DELETE CASCADE, + session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + state TEXT NOT NULL CHECK(state IN ('queued','running','succeeded','failed','cancelled')), + prompt TEXT NOT NULL, + agent TEXT, + model_provider_id TEXT, + model_id TEXT, + system_prompt TEXT, + variant TEXT, + message_id TEXT, + error TEXT, + output_text TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + started_at TEXT, + ended_at TEXT + ); + `); + + db.run( + `CREATE INDEX idx_jobs_instance_created ON jobs(instance_id, created_at DESC);`, + ); + db.run(`CREATE INDEX idx_jobs_state ON jobs(state, created_at DESC);`); + db.run( + `CREATE INDEX idx_sessions_instance ON sessions(instance_id, updated_at DESC);`, + ); + + setUserVersion(db, 1); + } + })(); + + return db; +} From f1264ea90f75cf20f88412880f8daaa7018f34ca Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Wed, 22 Apr 2026 16:14:51 -0400 Subject: [PATCH 3/6] update helpers --- packages/daemon/src/env.ts | 7 +- packages/daemon/src/index.ts | 2 +- packages/daemon/src/protocol.ts | 13 +- packages/daemon/src/server.ts | 424 ++++++------------ packages/daemon/src/store.ts | 736 +++++++++++++++++++++++++------- 5 files changed, 726 insertions(+), 456 deletions(-) diff --git a/packages/daemon/src/env.ts b/packages/daemon/src/env.ts index 2a8d9fa..e78fe52 100644 --- a/packages/daemon/src/env.ts +++ b/packages/daemon/src/env.ts @@ -6,7 +6,8 @@ export const DEFAULT_DAEMON_MAX_CONCURRENCY = 4; export interface DaemonPaths { ralphHome: string; socketPath: string; - statePath: string; + /** SQLite database file for daemon persisted state (instances, sessions, jobs). */ + databasePath: string; } export interface DaemonRuntimeEnv extends DaemonPaths { @@ -52,7 +53,7 @@ export function resolveDaemonPaths( return { ralphHome, socketPath: join(ralphHome, "ralphd.sock"), - statePath: join(ralphHome, "state.json"), + databasePath: join(ralphHome, "state.sqlite"), }; } @@ -81,4 +82,4 @@ const defaultPaths = resolveDaemonPaths(); export const RALPH_HOME = defaultPaths.ralphHome; export const SOCKET_PATH = defaultPaths.socketPath; -export const STATE_PATH = defaultPaths.statePath; +export const DATABASE_PATH = defaultPaths.databasePath; diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 2362e54..846dd07 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -1,5 +1,5 @@ export { DaemonClient, daemon } from "./client"; -export { RALPH_HOME, resolveDaemonPaths } from "./env"; +export { DATABASE_PATH, RALPH_HOME, resolveDaemonPaths } from "./env"; export { ensureDaemonRunning, runForegroundDaemon, diff --git a/packages/daemon/src/protocol.ts b/packages/daemon/src/protocol.ts index bae0f54..2d95bda 100644 --- a/packages/daemon/src/protocol.ts +++ b/packages/daemon/src/protocol.ts @@ -73,11 +73,10 @@ const ManagedInstance = z.strictObject({ export type ManagedInstance = z.infer; /** A job that has been submitted to the daemon for execution. */ -const DaemonJob = z.strictObject({ +export const DaemonJob = z.strictObject({ id: z.string().min(1), instanceId: z.string().min(1), sessionId: z.string().min(1).optional(), - session: JobSession, task: JobTask, state: JobState, createdAt: IsoDateTime, @@ -596,16 +595,6 @@ export const ResponseMessage = z.union([ ]); export type ResponseMessage = z.infer; -// --------------------------------------------------------------------------- -// Daemon persisted state -// --------------------------------------------------------------------------- - -export const DaemonState = z.strictObject({ - instances: z.array(ManagedInstance), - jobs: z.array(DaemonJob), -}); -export type DaemonState = z.infer; - // --------------------------------------------------------------------------- // Utility types — type-level helpers for method-based dispatch // --------------------------------------------------------------------------- diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index f0e6a27..fb6b70e 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -14,15 +14,11 @@ import { import { type CancelResult, type DaemonJob, - type DaemonState, - DaemonState as DaemonStateSchema, type ErrorResponse, type GetResult, type HealthResult, - type InstanceHealth, type InstanceListResult, type InstanceResult, - type JobState, type JobStreamEvent, type ListResult, type ManagedInstance, @@ -41,6 +37,8 @@ import { } from "./protocol"; import { StateStore, StoreError } from "./store"; +const MAX_TERMINAL_JOBS = 100; + interface RunningJob { controller: AbortController; instanceId: string; @@ -82,16 +80,13 @@ function normalizeErrorMessage(error: unknown): string { } export class Daemon { - private state: DaemonState = structuredClone( - DaemonStateSchema.parse({ - instances: [], - jobs: [], - }), - ); private readonly registry: OpencodeRuntimeManager; + /** Per-instance queue of job ids waiting to be scheduled. */ private readonly queues = new Map(); private readonly runningJobs = new Map(); private readonly runningTasks = new Map>(); + /** Maps running job id → its OpenCode session id, for delta routing. */ + private readonly runningSessionIds = new Map(); private readonly jobStreams = new Map< string, Set<(event: JobStreamEvent) => void> @@ -129,9 +124,17 @@ export class Daemon { } async bootstrap(): Promise { - this.state = await this.store.load(); - this.state = await this.recoverPersistedState(this.state); - await this.store.save(this.state); + await this.store.open(); + const requeued = this.store.recoverForBootstrap(); + this.queues.clear(); + for (const { id, instanceId } of requeued) { + this.enqueueById(instanceId, id); + } + // Also enqueue any queued-at-startup jobs that were never running. + for (const job of this.store.listJobs({ state: "queued" })) { + this.enqueueById(job.instanceId, job.id); + } + this.store.pruneTerminalJobs(MAX_TERMINAL_JOBS); this.scheduleDrain(); } @@ -189,7 +192,7 @@ export class Daemon { await this.drainPromise; await Promise.allSettled([...this.runningTasks.values()]); await this.registry.stopAll(); - await this.store.save(this.state); + this.store.close(); })(); return this.shutdownPromise; @@ -199,61 +202,33 @@ export class Daemon { return { pid: process.pid, uptimeSeconds: Math.floor((Date.now() - this.startedAt) / 1000), - queued: this.jobCount("queued"), - running: this.jobCount("running"), - finished: this.state.jobs.filter((job: DaemonJob) => - ["succeeded", "failed", "cancelled"].includes(job.state), - ).length, - instances: this.state.instances.map( - (instance: ManagedInstance): InstanceHealth => ({ - instanceId: instance.id, - name: instance.name, - status: instance.status, - running: this.instanceJobCount(instance.id, "running"), - queued: this.instanceJobCount(instance.id, "queued"), - finished: this.state.jobs.filter( - (job: DaemonJob) => - job.instanceId === instance.id && - ["succeeded", "failed", "cancelled"].includes(job.state), - ).length, - lastError: instance.lastError, - }), - ), + queued: this.store.countJobsByState("queued"), + running: this.store.countJobsByState("running"), + finished: this.store.countFinishedJobs(), + instances: this.store.instanceHealth(), }; } - private async handleInstanceCreate( + private handleInstanceCreate( request: RequestByMethod<"instance.create">, - ): Promise { - const now = new Date().toISOString(); - const instance: ManagedInstance = { - id: randomUUID(), + ): InstanceResult { + const instance = this.store.createInstance({ name: request.params.name.trim(), directory: request.params.directory, - status: "stopped", maxConcurrency: request.params.maxConcurrency ?? 1, - createdAt: now, - updatedAt: now, - }; - this.state = this.store.createInstance(this.state, instance); - await this.store.save(this.state); + }); return { instance }; } private handleInstanceList(): InstanceListResult { - return { - instances: this.store.listInstances(this.state), - }; + return { instances: this.store.listInstances() }; } private handleInstanceGet( request: RequestByMethod<"instance.get">, ): InstanceResult { return { - instance: this.store.assertInstance( - this.state, - request.params.instanceId, - ), + instance: this.store.assertInstance(request.params.instanceId), }; } @@ -267,10 +242,7 @@ export class Daemon { private async handleInstanceStop( request: RequestByMethod<"instance.stop">, ): Promise { - const instance = this.store.assertInstance( - this.state, - request.params.instanceId, - ); + const instance = this.store.assertInstance(request.params.instanceId); if (this.runningCountForInstance(instance.id) > 0) { throw new StoreError( "conflict", @@ -279,29 +251,15 @@ export class Daemon { } await this.registry.stop(instance.id); - const stopped: ManagedInstance = { - ...instance, - status: "stopped", - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, stopped); - await this.store.save(this.state); + const stopped = this.store.setInstanceStatus(instance.id, "stopped"); return { instance: stopped }; } private async handleInstanceRemove( request: RequestByMethod<"instance.remove">, ): Promise { - const instance = this.store.assertInstance( - this.state, - request.params.instanceId, - ); - const active = this.state.jobs.some( - (job: DaemonJob) => - job.instanceId === instance.id && - (job.state === "queued" || job.state === "running"), - ); - if (active) { + const instance = this.store.assertInstance(request.params.instanceId); + if (this.store.hasActiveJobs(instance.id)) { throw new StoreError( "conflict", `instance ${instance.id} has active jobs and cannot be removed`, @@ -310,8 +268,7 @@ export class Daemon { await this.registry.stop(instance.id); this.queues.delete(instance.id); - this.state = this.store.removeInstance(this.state, instance.id); - await this.store.save(this.state); + this.store.removeInstance(instance.id); return { instance }; } @@ -324,56 +281,45 @@ export class Daemon { ); } - private async handleJobSubmit( + private handleJobSubmit( request: RequestByMethod<"job.submit">, - ): Promise { + ): SubmitResult { if (this.shuttingDown) { throw new StoreError("shutdown", "daemon is shutting down"); } const { instanceId } = request.params; - this.store.assertInstance(this.state, instanceId); + this.store.assertInstance(instanceId); - const now = new Date().toISOString(); - const job: DaemonJob = { - id: randomUUID(), + const job = this.store.createJob({ instanceId, session: request.params.session, task: request.params.task, - state: "queued", - createdAt: now, - updatedAt: now, - }; - this.state = this.store.upsertJob(this.state, job); - this.enqueue(job); - await this.store.save(this.state); + }); + this.enqueueById(instanceId, job.id); this.scheduleDrain(); return { job }; } private handleJobList(request: RequestByMethod<"job.list">): ListResult { - return { - jobs: this.store.listJobs(this.state, request.params), - }; + return { jobs: this.store.listJobs(request.params) }; } private handleJobGet(request: RequestByMethod<"job.get">): GetResult { - return { - job: this.store.assertJob(this.state, request.params.jobId), - }; + return { job: this.store.assertJob(request.params.jobId) }; } private handleJobStream( request: RequestByMethod<"job.stream">, ): StreamAckResult { - this.store.assertJob(this.state, request.params.jobId); + this.store.assertJob(request.params.jobId); return { jobId: request.params.jobId }; } private async handleJobCancel( request: RequestByMethod<"job.cancel">, ): Promise { - const job = this.store.assertJob(this.state, request.params.jobId); + const job = this.store.assertJob(request.params.jobId); if ( job.state === "succeeded" || @@ -387,36 +333,25 @@ export class Daemon { } if (job.state === "queued") { - const queue = this.queues.get(job.instanceId); - if (queue) { - const index = queue.indexOf(job.id); - if (index >= 0) { - queue.splice(index, 1); - } - } - job.state = "cancelled"; - job.endedAt = new Date().toISOString(); - job.updatedAt = job.endedAt; - job.error = "Job cancelled"; - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); - return { job }; + this.removeFromQueue(job.instanceId, job.id); + const cancelled = this.store.markJobTerminal(job.id, "cancelled", { + error: "Job cancelled", + }); + return { job: cancelled }; } const running = this.runningJobs.get(job.id); + const cancelled = this.store.markJobTerminal(job.id, "cancelled", { + error: "Job cancelled", + }); if (running) { - job.state = "cancelled"; - job.error = "Job cancelled"; - job.updatedAt = new Date().toISOString(); - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); running.controller.abort(); - if (job.sessionId) { - void this.abortRemoteSession(running.instanceId, job.sessionId); + const remoteSessionId = this.runningSessionIds.get(job.id); + if (remoteSessionId) { + void this.abortRemoteSession(running.instanceId, remoteSessionId); } } - - return { job }; + return { job: cancelled }; } /** @@ -432,7 +367,7 @@ export class Daemon { * routeDeltaToJob. */ subscribeJob(jobId: string, cb: (event: JobStreamEvent) => void): () => void { - const job = this.store.getJob(this.state, jobId); + const job = this.store.getJob(jobId); if ( job && (job.state === "succeeded" || @@ -484,12 +419,13 @@ export class Daemon { /** * Route an incoming delta from the OpenCode event stream to the matching - * running job. Synchronously appends the delta to the job's accumulated - * `outputText` (only for `text` field deltas) BEFORE emitting the event, - * so the daemon's job state always reflects what subscribers have seen. + * running job. Synchronously appends the delta to the job's `output_text` + * in SQLite BEFORE emitting the event, so the daemon's stored state + * always reflects what subscribers have seen. * * MUST remain fully synchronous to preserve the snapshot/delta ordering - * guarantee — see the concurrency note in subscribeJob. + * guarantee — see the concurrency note in subscribeJob. `bun:sqlite` is + * synchronous, so this holds. */ private routeDeltaToJob( instanceId: string, @@ -499,62 +435,28 @@ export class Daemon { ): void { for (const [jobId, running] of this.runningJobs) { if (running.instanceId !== instanceId) continue; - const job = this.store.getJob(this.state, jobId); - if (job?.sessionId === sessionId) { - if (field === "text") { - job.outputText = (job.outputText ?? "") + delta; - job.updatedAt = new Date().toISOString(); - } - this.emitJobEvent(jobId, { type: "delta", jobId, field, delta }); - return; + if (this.runningSessionIds.get(jobId) !== sessionId) continue; + if (field === "text") { + this.store.appendJobOutput(jobId, delta); } + this.emitJobEvent(jobId, { type: "delta", jobId, field, delta }); + return; } } - private async recoverPersistedState( - state: DaemonState, - ): Promise { - let next: DaemonState = { - ...state, - instances: state.instances.map( - (instance: ManagedInstance): ManagedInstance => ({ - ...instance, - status: "stopped", - updatedAt: new Date().toISOString(), - }), - ), - }; - this.queues.clear(); - - for (const original of next.jobs) { - const job: DaemonJob = - original.state === "running" - ? { - ...original, - state: "queued", - error: [original.error, "Recovered after daemon restart"] - .filter(Boolean) - .join(" "), - updatedAt: new Date().toISOString(), - } - : original; - - next = this.store.upsertJob(next, job); - if (job.state === "queued" && job.instanceId) { - this.enqueue(job); - } + private enqueueById(instanceId: string, jobId: string): void { + const queue = this.queues.get(instanceId) ?? []; + if (!queue.includes(jobId)) { + queue.push(jobId); } - - next = this.store.pruneTerminalJobs(next); - return next; + this.queues.set(instanceId, queue); } - private enqueue(job: DaemonJob): void { - const queue = this.queues.get(job.instanceId) ?? []; - if (!queue.includes(job.id)) { - queue.push(job.id); - } - this.queues.set(job.instanceId, queue); + private removeFromQueue(instanceId: string, jobId: string): void { + const queue = this.queues.get(instanceId); + if (!queue) return; + const idx = queue.indexOf(jobId); + if (idx >= 0) queue.splice(idx, 1); } private async drainQueue(): Promise { @@ -578,7 +480,7 @@ export class Daemon { } private dequeueNextJob(): DaemonJob | undefined { - const instances = this.state.instances; + const instances = this.store.listInstances(); if (instances.length === 0) { return undefined; } @@ -601,10 +503,8 @@ export class Daemon { while (queue.length > 0) { const jobId = queue.shift(); - if (!jobId) { - break; - } - const job = this.store.getJob(this.state, jobId); + if (!jobId) break; + const job = this.store.getJob(jobId); if (job && job.state === "queued" && job.instanceId === instance.id) { return job; } @@ -621,18 +521,14 @@ export class Daemon { instanceId: job.instanceId, }); - job.state = "running"; - job.startedAt = new Date().toISOString(); - job.updatedAt = job.startedAt; - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); + const runningJob = this.store.markJobRunning(job.id); - const execution = this.executeJob(job, controller) + const execution = this.executeJob(runningJob, controller) .catch(() => undefined) - .finally(async () => { + .finally(() => { this.runningJobs.delete(job.id); this.runningTasks.delete(job.id); - await this.store.save(this.state); + this.runningSessionIds.delete(job.id); if (!this.shuttingDown) { this.scheduleDrain(); } @@ -644,6 +540,13 @@ export class Daemon { job: DaemonJob, controller: AbortController, ): Promise { + let terminalState: Extract< + DaemonJob["state"], + "succeeded" | "failed" | "cancelled" + >; + const patch: { error?: string; outputText?: string; messageId?: string } = + {}; + try { const instance = await this.startInstance(job.instanceId); const runtime = await this.registry.ensureStarted(instance.id); @@ -652,47 +555,42 @@ export class Daemon { instance, job, ); + this.runningSessionIds.set(job.id, sessionId); - switch (job.task.type) { - case "prompt": { - const response = await runtime.client.session.prompt({ - sessionID: sessionId, - directory: instance.directory, - agent: job.task.agent, - model: job.task.model - ? { - providerID: job.task.model.providerId, - modelID: job.task.model.modelId, - } - : undefined, - system: job.task.system, - variant: job.task.variant, - parts: [{ type: "text", text: job.task.prompt }], - }); - job.messageId = response.info.id; - // Prefer accumulated text from streamed deltas; fall back to - // the final parts payload if no deltas were received (e.g. a - // non-streaming provider). - const finalText = extractText(response.parts); - if (!job.outputText || job.outputText.length === 0) { - job.outputText = finalText; - } - job.error = undefined; - job.state = controller.signal.aborted ? "cancelled" : "succeeded"; - break; - } + const response = await runtime.client.session.prompt({ + sessionID: sessionId, + directory: instance.directory, + agent: job.task.agent, + model: job.task.model + ? { + providerID: job.task.model.providerId, + modelID: job.task.model.modelId, + } + : undefined, + system: job.task.system, + variant: job.task.variant, + parts: [{ type: "text", text: job.task.prompt }], + }); + patch.messageId = response.info.id; + // Prefer accumulated text from streamed deltas; fall back to the + // final parts payload if no deltas were received (non-streaming + // providers). Any deltas already landed in output_text via + // appendJobOutput, so only write the fallback when nothing was + // accumulated. + const current = this.store.getJob(job.id); + if (!current?.outputText || current.outputText.length === 0) { + patch.outputText = extractText(response.parts); } + terminalState = controller.signal.aborted ? "cancelled" : "succeeded"; } catch (error) { - job.state = controller.signal.aborted ? "cancelled" : "failed"; - job.error = controller.signal.aborted + terminalState = controller.signal.aborted ? "cancelled" : "failed"; + patch.error = controller.signal.aborted ? "Job cancelled" : normalizeErrorMessage(error); - } finally { - job.endedAt = new Date().toISOString(); - job.updatedAt = job.endedAt; - this.state = this.store.upsertJob(this.state, job); - this.emitJobEvent(job.id, { type: "done", jobId: job.id, job }); } + + const finalJob = this.store.markJobTerminal(job.id, terminalState, patch); + this.emitJobEvent(job.id, { type: "done", jobId: job.id, job: finalJob }); } private async resolveSession( @@ -700,74 +598,35 @@ export class Daemon { instance: ManagedInstance, job: DaemonJob, ): Promise { - if (job.sessionId) { - return job.sessionId; - } - - if (job.session.type === "existing") { - job.sessionId = job.session.sessionId; - return job.sessionId; - } + if (job.sessionId) return job.sessionId; + // No remote id yet — this is a `{type: 'new'}` submission. The + // sessions row was created at submit time; fill in its remote id. const session = await client.session.create({ directory: instance.directory, - title: job.session.title, }); - job.sessionId = session.id; - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); + this.store.assignRemoteSessionToJob(job.id, session.id); return session.id; } private async startInstance(instanceId: string): Promise { - const current = this.store.assertInstance(this.state, instanceId); + const current = this.store.assertInstance(instanceId); if (this.registry.isRunning(instanceId)) { if (current.status !== "running") { - const running = { - ...current, - status: "running" as const, - lastError: undefined, - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, running); - await this.store.save(this.state); - return running; + return this.store.setInstanceStatus(instanceId, "running"); } return current; } - const starting: ManagedInstance = { - ...current, - status: "starting", - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, starting); - await this.store.save(this.state); + this.store.setInstanceStatus(instanceId, "starting"); try { await this.registry.ensureStarted(instanceId); - const running: ManagedInstance = { - ...starting, - status: "running", - lastError: undefined, - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, running); - await this.store.save(this.state); - return running; + return this.store.setInstanceStatus(instanceId, "running"); } catch (error) { - const failed: ManagedInstance = { - ...starting, - status: "error", - lastError: normalizeErrorMessage(error), - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, failed); - await this.store.save(this.state); - throw new StoreError( - "instance_unavailable", - failed.lastError ?? "failed to start instance", - ); + const message = normalizeErrorMessage(error); + this.store.setInstanceStatus(instanceId, "error", message); + throw new StoreError("instance_unavailable", message); } } @@ -776,10 +635,8 @@ export class Daemon { sessionId: string, ): Promise { const runtime = this.registry.get(instanceId); - const instance = this.store.getInstance(this.state, instanceId); - if (!runtime || !instance) { - return; - } + const instance = this.store.getInstance(instanceId); + if (!runtime || !instance) return; try { await runtime.client.session.abort({ @@ -794,24 +651,11 @@ export class Daemon { private runningCountForInstance(instanceId: string): number { let total = 0; for (const running of this.runningJobs.values()) { - if (running.instanceId === instanceId) { - total += 1; - } + if (running.instanceId === instanceId) total += 1; } return total; } - private jobCount(state: JobState): number { - return this.state.jobs.filter((job: DaemonJob) => job.state === state) - .length; - } - - private instanceJobCount(instanceId: string, state: JobState): number { - return this.state.jobs.filter( - (job: DaemonJob) => job.instanceId === instanceId && job.state === state, - ).length; - } - private success( request: RequestByMethod, result: ResultByMethod, @@ -998,7 +842,7 @@ export async function runDaemonServer(): Promise { await ensureSocketDir(env.socketPath); await clearStaleSocket(env.socketPath); - const daemon = new Daemon(new StateStore(env.statePath), { + const daemon = new Daemon(new StateStore(env.databasePath), { maxConcurrency: env.maxConcurrency, }); await daemon.bootstrap(); diff --git a/packages/daemon/src/store.ts b/packages/daemon/src/store.ts index 94acbca..318e394 100644 --- a/packages/daemon/src/store.ts +++ b/packages/daemon/src/store.ts @@ -1,25 +1,25 @@ -import { mkdir, readFile } from "node:fs/promises"; +import type { Database, Statement } from "bun:sqlite"; +import { randomUUID } from "node:crypto"; +import { mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import type { z } from "zod"; -import { - type DaemonJob, - type DaemonState, - DaemonState as DaemonStateSchema, - type ManagedInstance, - type ResponseError as ResponseErrorSchema, +import { openDaemonDatabase } from "./db"; +import type { + DaemonJob, + InstanceHealth, + JobSession, + JobState, + JobTask, + ManagedInstance, + ResponseError, } from "./protocol"; -const EMPTY_STATE: DaemonState = { - instances: [], - jobs: [], -}; - -const MAX_TERMINAL_JOBS = 100; +type ErrorCode = z.infer["code"]; export class StoreError extends Error { constructor( - public readonly code: z.infer["code"], + public readonly code: ErrorCode, message: string, ) { super(message); @@ -27,190 +27,626 @@ export class StoreError extends Error { } } +interface InstanceRow { + id: string; + name: string; + directory: string; + status: ManagedInstance["status"]; + max_concurrency: number; + last_error: string | null; + created_at: string; + updated_at: string; +} + +interface JobRow { + id: string; + instance_id: string; + state: JobState; + prompt: string; + agent: string | null; + model_provider_id: string | null; + model_id: string | null; + system_prompt: string | null; + variant: string | null; + message_id: string | null; + error: string | null; + output_text: string | null; + created_at: string; + updated_at: string; + started_at: string | null; + ended_at: string | null; + remote_session_id: string | null; +} + +function rowToInstance(row: InstanceRow): ManagedInstance { + const base: ManagedInstance = { + id: row.id, + name: row.name, + directory: row.directory, + status: row.status, + maxConcurrency: row.max_concurrency, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; + return row.last_error ? { ...base, lastError: row.last_error } : base; +} + +function rowToJob(row: JobRow): DaemonJob { + const task: JobTask = { + type: "prompt", + prompt: row.prompt, + ...(row.agent ? { agent: row.agent } : {}), + ...(row.model_provider_id && row.model_id + ? { + model: { + providerId: row.model_provider_id, + modelId: row.model_id, + }, + } + : {}), + ...(row.system_prompt ? { system: row.system_prompt } : {}), + ...(row.variant ? { variant: row.variant } : {}), + }; + + return { + id: row.id, + instanceId: row.instance_id, + task, + state: row.state, + createdAt: row.created_at, + updatedAt: row.updated_at, + ...(row.remote_session_id ? { sessionId: row.remote_session_id } : {}), + ...(row.started_at ? { startedAt: row.started_at } : {}), + ...(row.ended_at ? { endedAt: row.ended_at } : {}), + ...(row.error ? { error: row.error } : {}), + ...(row.output_text !== null ? { outputText: row.output_text } : {}), + ...(row.message_id ? { messageId: row.message_id } : {}), + }; +} + +const JOB_SELECT = ` + SELECT j.id, j.instance_id, j.state, j.prompt, j.agent, + j.model_provider_id, j.model_id, j.system_prompt, j.variant, + j.message_id, j.error, j.output_text, + j.created_at, j.updated_at, j.started_at, j.ended_at, + s.remote_session_id AS remote_session_id + FROM jobs j + JOIN sessions s ON j.session_id = s.id +`; + +/** + * Relational repository for daemon state: instances, sessions, and jobs. + * + * All queries hit SQLite via `bun:sqlite` and are synchronous once the store + * is open. Multi-statement mutations run inside a transaction so concurrent + * readers never observe torn state. + */ export class StateStore { - private pendingSave: Promise = Promise.resolve(); + private db: Database | undefined; private directoryReady: Promise | undefined; - constructor(private readonly statePath: string) {} + // Prepared statements, initialized lazily on first open(). + private stmts: + | { + listInstances: Statement; + getInstance: Statement; + insertInstance: Statement; + setInstanceStatus: Statement; + deleteInstance: Statement; + countInstanceActiveJobs: Statement<{ c: number }, [string]>; - async load(): Promise { - try { - const raw = await readFile(this.statePath, "utf8"); - const parsed = JSON.parse(raw) as unknown; - const current = DaemonStateSchema.safeParse(parsed); - if (current.success) { - return current.data; - } + insertSession: Statement; + findSessionByRemote: Statement< + { id: string; remote_session_id: string | null }, + [string, string] + >; + assignRemoteSessionToJob: Statement; - return structuredClone(EMPTY_STATE); - } catch (error) { - if ((error as NodeJS.ErrnoException).code === "ENOENT") { - return structuredClone(EMPTY_STATE); - } - throw error; - } - } + listAllJobs: Statement; + listJobsByInstance: Statement; + listJobsByState: Statement; + listJobsByInstanceAndState: Statement; + getJob: Statement; + insertJob: Statement; + setJobState: Statement; + setJobTerminal: Statement; + appendJobOutput: Statement; + setJobMessageId: Statement; + deleteOldTerminalJobs: Statement; - async save(state: DaemonState): Promise { - const payload = `${JSON.stringify(state, null, 2)}\n`; - const persist = async (): Promise => { - await this.ensureDirectory(); - await Bun.write(this.statePath, payload); - }; + countJobsByState: Statement<{ c: number }, [JobState]>; + countInstanceJobsByState: Statement<{ c: number }, [string, JobState]>; + countInstanceFinishedJobs: Statement<{ c: number }, [string]>; + resetInstancesToStopped: Statement; + requeueRunningJobs: Statement< + { id: string; instance_id: string }, + [string] + >; + } + | undefined; - const savePromise = this.pendingSave.then(persist, persist); - this.pendingSave = savePromise.catch(() => undefined); - await savePromise; - } + constructor(private readonly databasePath: string) {} - private async ensureDirectory(): Promise { + async open(): Promise { if (!this.directoryReady) { - this.directoryReady = mkdir(dirname(this.statePath), { + this.directoryReady = mkdir(dirname(this.databasePath), { recursive: true, }).then(() => undefined); } await this.directoryReady; - } + if (this.db) return; - upsertJob(state: DaemonState, job: DaemonJob): DaemonState { - const jobs = state.jobs.filter((item: DaemonJob) => item.id !== job.id); - jobs.push(job); - jobs.sort((a: DaemonJob, b: DaemonJob) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - jobs, + const db = openDaemonDatabase(this.databasePath); + this.db = db; + this.stmts = { + listInstances: db.query( + `SELECT * FROM instances ORDER BY datetime(created_at) DESC`, + ), + getInstance: db.query( + `SELECT * FROM instances WHERE id = ?`, + ), + insertInstance: db.query( + `INSERT INTO instances (id, name, directory, status, max_concurrency, last_error, created_at, updated_at) + VALUES ($id, $name, $directory, $status, $max_concurrency, $last_error, $created_at, $updated_at)`, + ), + setInstanceStatus: db.query( + `UPDATE instances + SET status = $status, last_error = $last_error, updated_at = $updated_at + WHERE id = $id`, + ), + deleteInstance: db.query(`DELETE FROM instances WHERE id = ?`), + countInstanceActiveJobs: db.query<{ c: number }, [string]>( + `SELECT COUNT(*) AS c FROM jobs + WHERE instance_id = ? AND state IN ('queued','running')`, + ), + + insertSession: db.query( + `INSERT INTO sessions (id, instance_id, remote_session_id, kind, title, created_at, updated_at) + VALUES ($id, $instance_id, $remote_session_id, $kind, $title, $created_at, $updated_at)`, + ), + findSessionByRemote: db.query< + { id: string; remote_session_id: string | null }, + [string, string] + >( + `SELECT id, remote_session_id FROM sessions + WHERE instance_id = ? AND remote_session_id = ?`, + ), + assignRemoteSessionToJob: db.query( + `UPDATE sessions + SET remote_session_id = $remote_session_id, updated_at = $updated_at + WHERE id = (SELECT session_id FROM jobs WHERE id = $job_id)`, + ), + + listAllJobs: db.query( + `${JOB_SELECT} ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByInstance: db.query( + `${JOB_SELECT} WHERE j.instance_id = ? ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByState: db.query( + `${JOB_SELECT} WHERE j.state = ? ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByInstanceAndState: db.query( + `${JOB_SELECT} WHERE j.instance_id = ? AND j.state = ? ORDER BY datetime(j.created_at) DESC`, + ), + getJob: db.query(`${JOB_SELECT} WHERE j.id = ?`), + insertJob: db.query( + `INSERT INTO jobs (id, instance_id, session_id, state, prompt, agent, + model_provider_id, model_id, system_prompt, variant, + message_id, error, output_text, created_at, updated_at, + started_at, ended_at) + VALUES ($id, $instance_id, $session_id, $state, $prompt, $agent, + $model_provider_id, $model_id, $system_prompt, $variant, + $message_id, $error, $output_text, $created_at, $updated_at, + $started_at, $ended_at)`, + ), + setJobState: db.query( + `UPDATE jobs SET state = $state, updated_at = $updated_at, + started_at = COALESCE($started_at, started_at) + WHERE id = $id`, + ), + setJobTerminal: db.query( + `UPDATE jobs SET + state = $state, + error = $error, + output_text = COALESCE($output_text, output_text), + message_id = COALESCE($message_id, message_id), + updated_at = $updated_at, + ended_at = $ended_at + WHERE id = $id`, + ), + appendJobOutput: db.query( + `UPDATE jobs SET + output_text = COALESCE(output_text, '') || $delta, + updated_at = $updated_at + WHERE id = $id`, + ), + setJobMessageId: db.query( + `UPDATE jobs SET message_id = $message_id, updated_at = $updated_at WHERE id = $id`, + ), + deleteOldTerminalJobs: db.query( + `DELETE FROM jobs WHERE id IN ( + SELECT id FROM jobs + WHERE state IN ('succeeded','failed','cancelled') + ORDER BY datetime(created_at) DESC + LIMIT -1 OFFSET ? + )`, + ), + + countJobsByState: db.query<{ c: number }, [JobState]>( + `SELECT COUNT(*) AS c FROM jobs WHERE state = ?`, + ), + countInstanceJobsByState: db.query<{ c: number }, [string, JobState]>( + `SELECT COUNT(*) AS c FROM jobs WHERE instance_id = ? AND state = ?`, + ), + countInstanceFinishedJobs: db.query<{ c: number }, [string]>( + `SELECT COUNT(*) AS c FROM jobs + WHERE instance_id = ? AND state IN ('succeeded','failed','cancelled')`, + ), + resetInstancesToStopped: db.query( + `UPDATE instances SET status = 'stopped', updated_at = $updated_at + WHERE status IN ('starting','running','error')`, + ), + requeueRunningJobs: db.query< + { id: string; instance_id: string }, + [string] + >( + `UPDATE jobs SET + state = 'queued', + error = CASE + WHEN error IS NULL OR error = '' THEN 'Recovered after daemon restart' + ELSE error || ' Recovered after daemon restart' + END, + updated_at = ? + WHERE state = 'running' + RETURNING id, instance_id`, + ), }; } - getJob(state: DaemonState, jobId: string): DaemonJob | undefined { - return state.jobs.find((item: DaemonJob) => item.id === jobId); + close(): void { + this.db?.close(); + this.db = undefined; + this.stmts = undefined; } - listJobs( - state: DaemonState, - filter: { instanceId?: string; state?: DaemonJob["state"] }, - ): DaemonJob[] { - return state.jobs.filter((job: DaemonJob) => { - if (filter.instanceId && job.instanceId !== filter.instanceId) { - return false; - } - if (filter.state && job.state !== filter.state) { - return false; - } - return true; - }); + private s(): NonNullable { + if (!this.stmts) { + throw new Error("StateStore is not open; call open() first"); + } + return this.stmts; } - createInstance(state: DaemonState, instance: ManagedInstance): DaemonState { - if ( - state.instances.some( - (item: ManagedInstance) => item.directory === instance.directory, - ) - ) { - throw new StoreError( - "conflict", - `instance already exists for directory ${instance.directory}`, - ); - } + // -------------------------------------------------------------------- + // Instances + // -------------------------------------------------------------------- - const next = [...state.instances, instance].sort( - (a: ManagedInstance, b: ManagedInstance) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - instances: next, - }; + listInstances(): ManagedInstance[] { + return this.s().listInstances.all().map(rowToInstance); } - upsertInstance(state: DaemonState, instance: ManagedInstance): DaemonState { - const duplicate = state.instances.find( - (item: ManagedInstance) => - item.directory === instance.directory && item.id !== instance.id, - ); - if (duplicate) { - throw new StoreError( - "conflict", - `instance already exists for directory ${instance.directory}`, - ); + getInstance(id: string): ManagedInstance | undefined { + const row = this.s().getInstance.get(id); + return row ? rowToInstance(row) : undefined; + } + + assertInstance(id: string): ManagedInstance { + const instance = this.getInstance(id); + if (!instance) { + throw new StoreError("not_found", `instance ${id} not found`); } + return instance; + } - const instances = state.instances.filter( - (item: ManagedInstance) => item.id !== instance.id, - ); - instances.push(instance); - instances.sort((a: ManagedInstance, b: ManagedInstance) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - instances, + createInstance(input: { + name: string; + directory: string; + maxConcurrency: number; + }): ManagedInstance { + const now = new Date().toISOString(); + const instance: ManagedInstance = { + id: randomUUID(), + name: input.name, + directory: input.directory, + status: "stopped", + maxConcurrency: input.maxConcurrency, + createdAt: now, + updatedAt: now, }; + try { + this.s().insertInstance.run({ + $id: instance.id, + $name: instance.name, + $directory: instance.directory, + $status: instance.status, + $max_concurrency: instance.maxConcurrency, + $last_error: null, + $created_at: instance.createdAt, + $updated_at: instance.updatedAt, + }); + } catch (err) { + if (isUniqueConstraint(err)) { + throw new StoreError( + "conflict", + `instance already exists for directory ${instance.directory}`, + ); + } + throw err; + } + return instance; } - removeInstance(state: DaemonState, instanceId: string): DaemonState { - return { - ...state, - instances: state.instances.filter( - (item: ManagedInstance) => item.id !== instanceId, - ), - }; + setInstanceStatus( + id: string, + status: ManagedInstance["status"], + lastError?: string, + ): ManagedInstance { + this.s().setInstanceStatus.run({ + $id: id, + $status: status, + $last_error: lastError ?? null, + $updated_at: new Date().toISOString(), + }); + return this.assertInstance(id); + } + + removeInstance(id: string): void { + this.s().deleteInstance.run(id); } - getInstance( - state: DaemonState, + hasActiveJobs(instanceId: string): boolean { + const row = this.s().countInstanceActiveJobs.get(instanceId); + return (row?.c ?? 0) > 0; + } + + // -------------------------------------------------------------------- + // Sessions (internal; jobs own the reference to a session row) + // -------------------------------------------------------------------- + + /** + * Find-or-create a `sessions` row for an incoming submit request. + * + * - `{type: 'new', title?}`: always inserts a fresh session row with + * `remote_session_id = NULL`. + * - `{type: 'existing', sessionId}`: returns the existing session row for + * the given remote id, or inserts one if this is the first time we see it. + */ + upsertSessionForSubmit( instanceId: string, - ): ManagedInstance | undefined { - return state.instances.find( - (item: ManagedInstance) => item.id === instanceId, - ); + session: JobSession, + ): { id: string; remoteSessionId: string | null } { + const now = new Date().toISOString(); + if (session.type === "existing") { + const existing = this.s().findSessionByRemote.get( + instanceId, + session.sessionId, + ); + if (existing) { + return { + id: existing.id, + remoteSessionId: existing.remote_session_id, + }; + } + const id = randomUUID(); + this.s().insertSession.run({ + $id: id, + $instance_id: instanceId, + $remote_session_id: session.sessionId, + $kind: "existing", + $title: null, + $created_at: now, + $updated_at: now, + }); + return { id, remoteSessionId: session.sessionId }; + } + + const id = randomUUID(); + this.s().insertSession.run({ + $id: id, + $instance_id: instanceId, + $remote_session_id: null, + $kind: "new", + $title: session.title ?? null, + $created_at: now, + $updated_at: now, + }); + return { id, remoteSessionId: null }; } - listInstances(state: DaemonState): ManagedInstance[] { - return [...state.instances]; + /** + * Attach a remote OpenCode session id to the session row that backs + * the given job. Used once a new session has been created on the + * remote runtime during job execution. + */ + assignRemoteSessionToJob(jobId: string, remoteSessionId: string): void { + this.s().assignRemoteSessionToJob.run({ + $job_id: jobId, + $remote_session_id: remoteSessionId, + $updated_at: new Date().toISOString(), + }); } - assertInstance(state: DaemonState, instanceId: string): ManagedInstance { - const instance = this.getInstance(state, instanceId); - if (!instance) { - throw new StoreError("not_found", `instance ${instanceId} not found`); + // -------------------------------------------------------------------- + // Jobs + // -------------------------------------------------------------------- + + listJobs( + filter: { instanceId?: string; state?: JobState } = {}, + ): DaemonJob[] { + const { instanceId, state } = filter; + let rows: JobRow[]; + if (instanceId && state) { + rows = this.s().listJobsByInstanceAndState.all(instanceId, state); + } else if (instanceId) { + rows = this.s().listJobsByInstance.all(instanceId); + } else if (state) { + rows = this.s().listJobsByState.all(state); + } else { + rows = this.s().listAllJobs.all(); } - return instance; + return rows.map(rowToJob); } - assertJob(state: DaemonState, jobId: string): DaemonJob { - const job = this.getJob(state, jobId); + getJob(id: string): DaemonJob | undefined { + const row = this.s().getJob.get(id); + return row ? rowToJob(row) : undefined; + } + + assertJob(id: string): DaemonJob { + const job = this.getJob(id); if (!job) { - throw new StoreError("not_found", `job ${jobId} not found`); + throw new StoreError("not_found", `job ${id} not found`); } return job; } - pruneTerminalJobs(state: DaemonState): DaemonState { - const terminalStates = new Set(["succeeded", "failed", "cancelled"]); - const active: DaemonJob[] = []; - const terminal: DaemonJob[] = []; + /** + * Create a new job along with its backing session row, atomically. + */ + createJob(input: { + instanceId: string; + session: JobSession; + task: JobTask; + }): DaemonJob { + const now = new Date().toISOString(); + const jobId = randomUUID(); - for (const job of state.jobs) { - if (terminalStates.has(job.state)) { - terminal.push(job); - } else { - active.push(job); - } - } + this.db?.transaction(() => { + const { id: sessionRowId } = this.upsertSessionForSubmit( + input.instanceId, + input.session, + ); + this.s().insertJob.run({ + $id: jobId, + $instance_id: input.instanceId, + $session_id: sessionRowId, + $state: "queued", + $prompt: input.task.prompt, + $agent: input.task.agent ?? null, + $model_provider_id: input.task.model?.providerId ?? null, + $model_id: input.task.model?.modelId ?? null, + $system_prompt: input.task.system ?? null, + $variant: input.task.variant ?? null, + $message_id: null, + $error: null, + $output_text: null, + $created_at: now, + $updated_at: now, + $started_at: null, + $ended_at: null, + }); + })(); - if (terminal.length <= MAX_TERMINAL_JOBS) { - return state; - } + return this.assertJob(jobId); + } - // Jobs are already sorted newest-first by upsertJob - const kept = terminal.slice(0, MAX_TERMINAL_JOBS); - const jobs = [...active, ...kept].sort((a: DaemonJob, b: DaemonJob) => - a.createdAt < b.createdAt ? 1 : -1, + markJobRunning(id: string): DaemonJob { + const now = new Date().toISOString(); + this.s().setJobState.run({ + $id: id, + $state: "running", + $updated_at: now, + $started_at: now, + }); + return this.assertJob(id); + } + + markJobTerminal( + id: string, + state: Extract, + patch: { + error?: string; + outputText?: string; + messageId?: string; + } = {}, + ): DaemonJob { + const now = new Date().toISOString(); + this.s().setJobTerminal.run({ + $id: id, + $state: state, + $error: patch.error ?? null, + $output_text: patch.outputText ?? null, + $message_id: patch.messageId ?? null, + $updated_at: now, + $ended_at: now, + }); + return this.assertJob(id); + } + + appendJobOutput(id: string, delta: string): void { + this.s().appendJobOutput.run({ + $id: id, + $delta: delta, + $updated_at: new Date().toISOString(), + }); + } + + setJobMessageId(id: string, messageId: string): void { + this.s().setJobMessageId.run({ + $id: id, + $message_id: messageId, + $updated_at: new Date().toISOString(), + }); + } + + // -------------------------------------------------------------------- + // Recovery + aggregates + // -------------------------------------------------------------------- + + /** + * Reset crash-surviving instance/job state on daemon startup and return + * the ids of jobs that should be re-queued for scheduling. + */ + recoverForBootstrap(): Array<{ id: string; instanceId: string }> { + const now = new Date().toISOString(); + let requeued: Array<{ id: string; instance_id: string }> = []; + this.db?.transaction(() => { + this.s().resetInstancesToStopped.run({ $updated_at: now }); + requeued = this.s().requeueRunningJobs.all(now); + })(); + return requeued.map((r) => ({ id: r.id, instanceId: r.instance_id })); + } + + /** + * Keep at most `max` terminal jobs (succeeded/failed/cancelled), + * deleting the oldest excess rows. + */ + pruneTerminalJobs(max: number): void { + this.s().deleteOldTerminalJobs.run(max); + } + + countJobsByState(state: JobState): number { + return this.s().countJobsByState.get(state)?.c ?? 0; + } + + countFinishedJobs(): number { + return ( + this.countJobsByState("succeeded") + + this.countJobsByState("failed") + + this.countJobsByState("cancelled") ); + } - return { ...state, jobs }; + instanceHealth(): InstanceHealth[] { + return this.listInstances().map((instance) => ({ + instanceId: instance.id, + name: instance.name, + status: instance.status, + running: + this.s().countInstanceJobsByState.get(instance.id, "running")?.c ?? 0, + queued: + this.s().countInstanceJobsByState.get(instance.id, "queued")?.c ?? 0, + finished: this.s().countInstanceFinishedJobs.get(instance.id)?.c ?? 0, + ...(instance.lastError ? { lastError: instance.lastError } : {}), + })); } } + +function isUniqueConstraint(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const code = (err as { code?: string }).code; + return ( + code === "SQLITE_CONSTRAINT_UNIQUE" || + /UNIQUE constraint failed/i.test(err.message) + ); +} From fa9e19b633d57cd65ed628e1e4e06618c8b979cb Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Wed, 22 Apr 2026 18:47:10 -0400 Subject: [PATCH 4/6] feat: forward session title when creating remote sessions --- packages/daemon/src/__tests__/daemon.test.ts | 36 ++++++++++++++++++++ packages/daemon/src/__tests__/helpers.ts | 18 ++++++++-- packages/daemon/src/server.ts | 3 ++ packages/daemon/src/store.ts | 33 ++++++++++++++++++ 4 files changed, 88 insertions(+), 2 deletions(-) diff --git a/packages/daemon/src/__tests__/daemon.test.ts b/packages/daemon/src/__tests__/daemon.test.ts index 2fcbad6..7040307 100644 --- a/packages/daemon/src/__tests__/daemon.test.ts +++ b/packages/daemon/src/__tests__/daemon.test.ts @@ -103,6 +103,42 @@ describe("Daemon", () => { expect(result.job.instanceId).toBe(instance.instance.id); }); + test("forwards new session titles when creating remote sessions", async () => { + const created = await daemon.handleRequest( + req({ + id: "instance-create", + method: "instance.create", + params: { + name: "One", + directory: "/tmp/project-one", + }, + }), + ); + const instance = expectSuccess(created, "instance.create"); + + await daemon.handleRequest( + req({ + id: "job-submit", + method: "job.submit", + params: { + instanceId: instance.instance.id, + session: { type: "new", title: "Sprint Planning" }, + task: { + type: "prompt", + prompt: "hello world", + }, + }, + }), + ); + + await Bun.sleep(80); + expect(registry.sessionCreateCalls).toContainEqual({ + instanceId: instance.instance.id, + directory: "/tmp/project-one", + title: "Sprint Planning", + }); + }); + test("rejects submit with nonexistent instance", async () => { const submit = await daemon.handleRequest( req({ diff --git a/packages/daemon/src/__tests__/helpers.ts b/packages/daemon/src/__tests__/helpers.ts index ed36799..10ebd9c 100644 --- a/packages/daemon/src/__tests__/helpers.ts +++ b/packages/daemon/src/__tests__/helpers.ts @@ -60,6 +60,11 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager { sessionId: string; prompt: string; }> = []; + readonly sessionCreateCalls: Array<{ + instanceId: string; + directory?: string; + title?: string; + }> = []; readonly abortCalls: Array<{ instanceId: string; sessionId: string }> = []; globalMaxConcurrent = 0; /** Configurable per-test: an array of text deltas the fake will emit @@ -96,9 +101,18 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager { dispose: async () => undefined, }, session: { - create: async () => { + create: async (parameters) => { + this.sessionCreateCalls.push({ + instanceId, + directory: parameters.directory, + title: parameters.title, + }); const id = `session-${instanceId}-${this.sessionSequence++}`; - return fakeSession({ id }); + return fakeSession({ + id, + directory: parameters.directory ?? "/fake", + title: parameters.title ?? "fake", + }); }, prompt: async (parameters) => { const sessionId = parameters.sessionID; diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index fb6b70e..63c2a32 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -599,11 +599,14 @@ export class Daemon { job: DaemonJob, ): Promise { if (job.sessionId) return job.sessionId; + const sessionRef = this.store.getSessionForJob(job.id); + if (sessionRef.remoteSessionId) return sessionRef.remoteSessionId; // No remote id yet — this is a `{type: 'new'}` submission. The // sessions row was created at submit time; fill in its remote id. const session = await client.session.create({ directory: instance.directory, + title: sessionRef.kind === "new" ? sessionRef.title : undefined, }); this.store.assignRemoteSessionToJob(job.id, session.id); return session.id; diff --git a/packages/daemon/src/store.ts b/packages/daemon/src/store.ts index 318e394..80fb45b 100644 --- a/packages/daemon/src/store.ts +++ b/packages/daemon/src/store.ts @@ -58,6 +58,18 @@ interface JobRow { remote_session_id: string | null; } +interface SessionRefRow { + kind: "new" | "existing"; + title: string | null; + remote_session_id: string | null; +} + +export interface JobSessionRef { + kind: "new" | "existing"; + title?: string; + remoteSessionId?: string; +} + function rowToInstance(row: InstanceRow): ManagedInstance { const base: ManagedInstance = { id: row.id, @@ -140,6 +152,7 @@ export class StateStore { { id: string; remote_session_id: string | null }, [string, string] >; + getSessionForJob: Statement; assignRemoteSessionToJob: Statement; listAllJobs: Statement; @@ -211,6 +224,12 @@ export class StateStore { `SELECT id, remote_session_id FROM sessions WHERE instance_id = ? AND remote_session_id = ?`, ), + getSessionForJob: db.query( + `SELECT s.kind, s.title, s.remote_session_id + FROM sessions s + JOIN jobs j ON j.session_id = s.id + WHERE j.id = ?`, + ), assignRemoteSessionToJob: db.query( `UPDATE sessions SET remote_session_id = $remote_session_id, updated_at = $updated_at @@ -466,6 +485,20 @@ export class StateStore { }); } + getSessionForJob(jobId: string): JobSessionRef { + const row = this.s().getSessionForJob.get(jobId); + if (!row) { + throw new StoreError("not_found", `session for job ${jobId} not found`); + } + return { + kind: row.kind, + ...(row.title ? { title: row.title } : {}), + ...(row.remote_session_id + ? { remoteSessionId: row.remote_session_id } + : {}), + }; + } + // -------------------------------------------------------------------- // Jobs // -------------------------------------------------------------------- From 4fc2419813ac62e82a4e7ee52353345e463c5c8c Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Thu, 23 Apr 2026 14:28:24 -0400 Subject: [PATCH 5/6] fix(daemon): correct cancel race, guard session.list, and harden store error paths Await executeJob completion before returning from job.cancel so the terminal row is written exactly once and the cancel error is never overwritten by a racing prompt() success path. Add assertInstance guard on session.list so unknown instance ids surface not_found instead of an empty list. Strengthen assignRemoteSessionToJob to throw when the job row is missing, improve createInstance unique-constraint messaging, and remove the unused setJobMessageId method. Tests cover all three new behaviours. --- packages/daemon/src/__tests__/daemon.test.ts | 124 ++++++++++++++++++- packages/daemon/src/__tests__/store.test.ts | 66 ++++++++++ packages/daemon/src/server.ts | 46 +++++-- packages/daemon/src/store.ts | 44 ++++--- 4 files changed, 250 insertions(+), 30 deletions(-) diff --git a/packages/daemon/src/__tests__/daemon.test.ts b/packages/daemon/src/__tests__/daemon.test.ts index ad16c1e..d2fb99a 100644 --- a/packages/daemon/src/__tests__/daemon.test.ts +++ b/packages/daemon/src/__tests__/daemon.test.ts @@ -301,6 +301,125 @@ describe("Daemon", () => { expect(expectSuccess(cancel, "job.cancel").job.state).toBe("cancelled"); }); + test("cancelling a running job preserves the cancel error on the terminal row", async () => { + const created = await daemon.handleRequest( + req({ + id: "instance-create", + method: "instance.create", + params: { + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }, + }), + ); + const createdResult = expectSuccess(created, "instance.create"); + + const submitted = await daemon.handleRequest( + req({ + id: "job-submit", + method: "job.submit", + params: { + instanceId: createdResult.instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "long-task" }, + }, + }), + ); + const submittedResult = expectSuccess(submitted, "job.submit"); + + // Wait until the job has transitioned to running inside the fake runtime. + await Bun.sleep(10); + + const cancel = await daemon.handleRequest( + req({ + id: "job-cancel", + method: "job.cancel", + params: { jobId: submittedResult.job.id }, + }), + ); + const cancelled = expectSuccess(cancel, "job.cancel").job; + expect(cancelled.state).toBe("cancelled"); + expect(cancelled.error).toBe("Job cancelled"); + + // Double-check via store after execution has fully settled — no later + // terminal write should have nulled out the error column. + await Bun.sleep(60); + const get = await daemon.handleRequest( + req({ + id: "job-get", + method: "job.get", + params: { jobId: submittedResult.job.id }, + }), + ); + const finalJob = expectSuccess(get, "job.get").job; + expect(finalJob.state).toBe("cancelled"); + expect(finalJob.error).toBe("Job cancelled"); + }); + + test("session.list rejects unknown instance ids", async () => { + const response = await daemon.handleRequest( + req({ + id: "session-list", + method: "session.list", + params: { instanceId: "does-not-exist" }, + }), + ); + expect(expectFailure(response).error.code).toBe("not_found"); + }); + + test("submitting a second job during an in-flight drain still runs it promptly", async () => { + const createOne = await daemon.handleRequest( + req({ + id: "inst-1", + method: "instance.create", + params: { name: "One", directory: "/tmp/project-one" }, + }), + ); + const createTwo = await daemon.handleRequest( + req({ + id: "inst-2", + method: "instance.create", + params: { name: "Two", directory: "/tmp/project-two" }, + }), + ); + const one = expectSuccess(createOne, "instance.create"); + const two = expectSuccess(createTwo, "instance.create"); + + // Submit two jobs back-to-back. The second submit happens while the + // first scheduleDrain()'s microtask/promise is still in flight and + // must be coalesced via drainPending so the second job runs without + // waiting for the first to complete. + await daemon.handleRequest( + req({ + id: "submit-1", + method: "job.submit", + params: { + instanceId: one.instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "first" }, + }, + }), + ); + await daemon.handleRequest( + req({ + id: "submit-2", + method: "job.submit", + params: { + instanceId: two.instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "second" }, + }, + }), + ); + + // Fake prompt delay is 40ms; if drain coalescing regressed, the + // second job would run strictly after the first and both jobs would + // not be concurrently active. + await Bun.sleep(30); + expect(registry.globalMaxConcurrent).toBeGreaterThanOrEqual(2); + }); + test("requeues running jobs after restart", async () => { // Shut down the beforeEach daemon so we can simulate a crashed state // by writing directly to the SQLite file. @@ -573,7 +692,8 @@ describe("Daemon sessions", () => { }), ); - // Verify sessions are gone + // After removal, session.list for that instance must fail with + // not_found — the cascade deletes sessions with the instance row. const after = await daemon.handleRequest( req({ id: "session-list-after", @@ -581,7 +701,7 @@ describe("Daemon sessions", () => { params: { instanceId }, }), ); - expect(expectSuccess(after, "session.list").sessions).toHaveLength(0); + expect(expectFailure(after).error.code).toBe("not_found"); }); }); diff --git a/packages/daemon/src/__tests__/store.test.ts b/packages/daemon/src/__tests__/store.test.ts index d88ea16..d2fd45f 100644 --- a/packages/daemon/src/__tests__/store.test.ts +++ b/packages/daemon/src/__tests__/store.test.ts @@ -256,4 +256,70 @@ describe("StateStore", () => { expect(store.assertJob(job.id).state).toBe("queued"); expect(store.assertInstance(instance.id).status).toBe("stopped"); }); + + test("recoverForBootstrap appends to existing job error on requeue", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + // Seed a running job that already has a non-null error (e.g. from a + // prior partial terminal write that did not commit to a terminal state). + store.markJobRunning(job.id); + const db = new Database(databasePath); + db.run("UPDATE jobs SET error = 'prior error' WHERE id = ?", [job.id]); + db.close(); + + store.recoverForBootstrap(); + const recovered = store.assertJob(job.id); + expect(recovered.state).toBe("queued"); + expect(recovered.error).toBe("prior error Recovered after daemon restart"); + }); + + test("assignRemoteSessionToJob throws when job does not exist", () => { + expect(() => + store.assignRemoteSessionToJob( + "00000000-0000-0000-0000-000000000000", + "remote-xyz", + "title", + ), + ).toThrow(StoreError); + }); + + test("pruneTerminalJobs keeps the newest N terminal rows", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const ids: string[] = []; + for (let i = 0; i < 5; i++) { + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: `p-${i}` }, + }); + store.markJobTerminal(job.id, "succeeded", { outputText: String(i) }); + ids.push(job.id); + // Space created_at so ordering is deterministic. + const db = new Database(databasePath); + db.run("UPDATE jobs SET created_at = ? WHERE id = ?", [ + new Date(Date.now() + i * 1000).toISOString(), + job.id, + ]); + db.close(); + } + store.pruneTerminalJobs(2); + const remaining = store.listJobs(); + expect(remaining).toHaveLength(2); + // Newest two should survive (ids[4], ids[3]). + const remainingIds = new Set(remaining.map((j) => j.id)); + expect(remainingIds.has(ids[4] as string)).toBe(true); + expect(remainingIds.has(ids[3] as string)).toBe(true); + }); }); diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index fdac571..ed4947d 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -312,6 +312,7 @@ export class Daemon { private handleSessionList( request: RequestByMethod<"session.list">, ): SessionListResult { + this.store.assertInstance(request.params.instanceId); return { sessions: this.store.listSessions({ instanceId: request.params.instanceId, @@ -386,18 +387,34 @@ export class Daemon { return { job: cancelled }; } + // Running branch: let `executeJob` own the single terminal write so + // there is exactly one state transition and one `done` event. We + // abort the controller (which triggers `executeJob`'s catch path or + // the aborted-after-success branch) and await completion before + // returning the terminal job row. const running = this.runningJobs.get(job.id); - const cancelled = this.store.markJobTerminal(job.id, "cancelled", { - error: "Job cancelled", - }); - if (running) { - running.controller.abort(); - const remoteSessionId = this.runningSessionIds.get(job.id); - if (remoteSessionId) { - void this.abortRemoteSession(running.instanceId, remoteSessionId); - } + if (!running) { + // Job was in state=running per the store, but no in-memory task — + // likely a stale state. Fall back to writing the terminal row + // directly so the caller still sees a cancelled job. + const cancelled = this.store.markJobTerminal(job.id, "cancelled", { + error: "Job cancelled", + }); + return { job: cancelled }; } - return { job: cancelled }; + + running.controller.abort(); + const remoteSessionId = this.runningSessionIds.get(job.id); + if (remoteSessionId) { + void this.abortRemoteSession(running.instanceId, remoteSessionId); + } + + const execution = this.runningTasks.get(job.id); + if (execution) { + await execution; + } + + return { job: this.store.assertJob(job.id) }; } /** @@ -635,7 +652,14 @@ export class Daemon { if (!current?.outputText || current.outputText.length === 0) { patch.outputText = extractText(response.parts); } - terminalState = controller.signal.aborted ? "cancelled" : "succeeded"; + if (controller.signal.aborted) { + // prompt() returned successfully but the job was cancelled before + // the abort was observed — record the cancellation reason. + terminalState = "cancelled"; + patch.error = "Job cancelled"; + } else { + terminalState = "succeeded"; + } } catch (error) { terminalState = controller.signal.aborted ? "cancelled" : "failed"; patch.error = controller.signal.aborted diff --git a/packages/daemon/src/store.ts b/packages/daemon/src/store.ts index 928f09d..202af40 100644 --- a/packages/daemon/src/store.ts +++ b/packages/daemon/src/store.ts @@ -187,7 +187,6 @@ export class StateStore { setJobState: Statement; setJobTerminal: Statement; appendJobOutput: Statement; - setJobMessageId: Statement; deleteOldTerminalJobs: Statement; countJobsByState: Statement<{ c: number }, [JobState]>; @@ -316,9 +315,9 @@ export class StateStore { updated_at = $updated_at WHERE id = $id`, ), - setJobMessageId: db.query( - `UPDATE jobs SET message_id = $message_id, updated_at = $updated_at WHERE id = $id`, - ), + // Keep the newest `$max` terminal rows; delete the rest. + // In SQLite, `LIMIT -1` means "no upper bound", so this selects + // every row past the first `$max` in DESC order. deleteOldTerminalJobs: db.query( `DELETE FROM jobs WHERE id IN ( SELECT id FROM jobs @@ -421,10 +420,18 @@ export class StateStore { }); } catch (err) { if (isUniqueConstraint(err)) { - throw new StoreError( - "conflict", - `instance already exists for directory ${instance.directory}`, - ); + // Today the only unique index on `instances` (aside from the + // primary key) is on `directory`. If that changes the error + // message would become misleading, so check the message + // before blaming the directory column. + const message = err instanceof Error ? err.message : String(err); + if (/instances\.directory/i.test(message)) { + throw new StoreError( + "conflict", + `instance already exists for directory ${instance.directory}`, + ); + } + throw new StoreError("conflict", message); } throw err; } @@ -512,18 +519,29 @@ export class StateStore { * Attach a remote OpenCode session id to the session row that backs * the given job. Used once a new session has been created on the * remote runtime during job execution. + * + * Throws `StoreError("not_found")` if the job id (or its backing + * session row) does not exist — this surfaces silent assignment + * failures that would otherwise leave the caller holding a remote + * session that SQLite never linked. */ assignRemoteSessionToJob( jobId: string, remoteSessionId: string, title: string, ): void { - this.s().assignRemoteSessionToJob.run({ + const result = this.s().assignRemoteSessionToJob.run({ $job_id: jobId, $remote_session_id: remoteSessionId, $title: title, $updated_at: new Date().toISOString(), }); + if (result.changes === 0) { + throw new StoreError( + "not_found", + `cannot assign remote session to job ${jobId}: job or session row missing`, + ); + } } /** Sessions visible to the TUI: only rows with a resolved remote OpenCode session id. */ @@ -703,14 +721,6 @@ export class StateStore { }); } - setJobMessageId(id: string, messageId: string): void { - this.s().setJobMessageId.run({ - $id: id, - $message_id: messageId, - $updated_at: new Date().toISOString(), - }); - } - // -------------------------------------------------------------------- // Recovery + aggregates // -------------------------------------------------------------------- From 9b0347c62757f8fa8495be5d245545d5b5703803 Mon Sep 17 00:00:00 2001 From: Chenxin Yan Date: Thu, 23 Apr 2026 14:41:29 -0400 Subject: [PATCH 6/6] fix(daemon): cap job.cancel wait on slow remotes, swallow session.list race in TUI --- .changeset/sqlite-state-migration.md | 17 +++++ apps/tui/src/components/app.tsx | 8 +- packages/daemon/src/__tests__/daemon.test.ts | 79 ++++++++++++++++++++ packages/daemon/src/server.ts | 46 +++++++++++- 4 files changed, 148 insertions(+), 2 deletions(-) create mode 100644 .changeset/sqlite-state-migration.md diff --git a/.changeset/sqlite-state-migration.md b/.changeset/sqlite-state-migration.md new file mode 100644 index 0000000..9df0924 --- /dev/null +++ b/.changeset/sqlite-state-migration.md @@ -0,0 +1,17 @@ +--- +"@techatnyu/ralphd": patch +"@techatnyu/ralph": patch +--- + +Migrate daemon persisted state from `~/.ralph/state.json` to a SQLite +database at `~/.ralph/state.sqlite`. + +The move enables transactional mutations, WAL-mode crash durability, and +indexed queries as the daemon scales. Schema is versioned via SQLite's +`PRAGMA user_version` so future migrations can be additive. + +**Upgrade note:** existing `state.json` files are not migrated automatically. +On first run after upgrade, the daemon starts with an empty database — you +will need to re-register any instances and resubmit in-flight jobs. Old +terminal job history is lost. If preserving state matters to you, hold off +upgrading until an explicit migration path ships. diff --git a/apps/tui/src/components/app.tsx b/apps/tui/src/components/app.tsx index 0cd22de..4010a56 100644 --- a/apps/tui/src/components/app.tsx +++ b/apps/tui/src/components/app.tsx @@ -167,8 +167,14 @@ function Dashboard({ const selectedInst = instanceList.instances[safeIndex]; const [jobs, sessionResult] = await Promise.all([ daemon.listJobs(selectedInst ? { instanceId: selectedInst.id } : {}), + // If the selected instance was removed between `listInstances` + // and this call, the daemon now throws `not_found` instead of + // returning an empty list. Swallow that narrow race so the + // whole refresh doesn't fail. selectedInst - ? daemon.listSessions(selectedInst.id) + ? daemon + .listSessions(selectedInst.id) + .catch(() => ({ sessions: [] })) : Promise.resolve({ sessions: [] }), ]); setSelectedIndex(safeIndex); diff --git a/packages/daemon/src/__tests__/daemon.test.ts b/packages/daemon/src/__tests__/daemon.test.ts index d2fb99a..bd0aa38 100644 --- a/packages/daemon/src/__tests__/daemon.test.ts +++ b/packages/daemon/src/__tests__/daemon.test.ts @@ -357,6 +357,85 @@ describe("Daemon", () => { expect(finalJob.error).toBe("Job cancelled"); }); + test("cancelling a running job returns within the timeout when the remote is slow", async () => { + // Recreate the daemon with a very long prompt delay and a short + // cancel wait timeout. The fake runtime's `abort` is a no-op so + // the prompt always runs to completion — this simulates a remote + // runtime that ignores `session.abort`. + await daemon.shutdown(); + const slowRegistry = new FakeOpencodeRegistry(1_000); + const slowStore = new StateStore(join(tmpDir, "slow.sqlite")); + const slowDaemon = new Daemon(slowStore, { + registry: slowRegistry, + cancelWaitTimeoutMs: 30, + }); + await slowDaemon.bootstrap(); + + const created = await slowDaemon.handleRequest( + req({ + id: "inst", + method: "instance.create", + params: { + name: "slow", + directory: "/tmp/project-slow", + maxConcurrency: 1, + }, + }), + ); + const instanceId = expectSuccess(created, "instance.create").instance.id; + + const submitted = await slowDaemon.handleRequest( + req({ + id: "sub", + method: "job.submit", + params: { + instanceId, + session: { type: "new" }, + task: { type: "prompt", prompt: "slow" }, + }, + }), + ); + const jobId = expectSuccess(submitted, "job.submit").job.id; + + // Wait until the job is running. + await Bun.sleep(20); + + // Cancel and measure. + const start = Date.now(); + const cancel = await slowDaemon.handleRequest( + req({ + id: "cancel", + method: "job.cancel", + params: { jobId }, + }), + ); + const elapsed = Date.now() - start; + + // Must return well before the 1s prompt delay completes. + expect(elapsed).toBeLessThan(500); + // Returned row may still be `running` (timeout fired first) — that + // is the point of the timeout; the terminal row will arrive via + // the `done` stream event. + const returned = expectSuccess(cancel, "job.cancel").job; + expect(["running", "cancelled"]).toContain(returned.state); + + // Allow the fake's prompt to complete and executeJob to record + // the terminal row. + await Bun.sleep(1_100); + const get = await slowDaemon.handleRequest( + req({ + id: "get", + method: "job.get", + params: { jobId }, + }), + ); + const settled = expectSuccess(get, "job.get").job; + expect(settled.state).toBe("cancelled"); + expect(settled.error).toBe("Job cancelled"); + + await slowDaemon.shutdown(); + }); + test("session.list rejects unknown instance ids", async () => { const response = await daemon.handleRequest( req({ diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index ed4947d..74e406e 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -40,6 +40,15 @@ import { import { type JobSessionRef, StateStore, StoreError } from "./store"; const MAX_TERMINAL_JOBS = 100; +/** + * Upper bound on how long `job.cancel` will wait for the in-flight execution + * to settle after the local abort + remote `session.abort` have been issued. + * If the remote runtime ignores or is slow to honor the abort, the RPC still + * returns promptly with the job's current (non-terminal) row, and the + * executor's eventual terminal write is delivered via the `done` stream + * event. + */ +const CANCEL_WAIT_TIMEOUT_MS = 2_000; interface RunningJob { controller: AbortController; @@ -49,6 +58,8 @@ interface RunningJob { interface DaemonOptions { registry?: OpencodeRuntimeManager; maxConcurrency?: number; + /** Override for `CANCEL_WAIT_TIMEOUT_MS`. Tests only. */ + cancelWaitTimeoutMs?: number; } function extractText(parts: Part[]): string { @@ -80,6 +91,30 @@ function deriveSessionTitle(sessionRef: JobSessionRef, job: DaemonJob): string { return "Untitled"; } +/** + * Resolves when either `promise` settles or `timeoutMs` elapses, whichever + * comes first. Never rejects. The caller should read downstream state from + * the store after this returns — the returned value is intentionally unused. + */ +function raceWithTimeout( + promise: Promise, + timeoutMs: number, +): Promise { + return new Promise((resolve) => { + let settled = false; + const done = () => { + if (settled) return; + settled = true; + resolve(); + }; + const timer = setTimeout(done, timeoutMs); + promise.finally(() => { + clearTimeout(timer); + done(); + }); + }); +} + function normalizeErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; @@ -121,6 +156,7 @@ export class Daemon { private shutdownPromise: Promise | undefined; private instanceCursor = 0; private readonly maxConcurrency: number; + private readonly cancelWaitTimeoutMs: number; constructor( private readonly store: StateStore, @@ -140,6 +176,8 @@ export class Daemon { }); this.maxConcurrency = options.maxConcurrency ?? resolveDaemonRuntimeEnv().maxConcurrency; + this.cancelWaitTimeoutMs = + options.cancelWaitTimeoutMs ?? CANCEL_WAIT_TIMEOUT_MS; } setShutdownHandler(handler: () => void): void { @@ -409,9 +447,15 @@ export class Daemon { void this.abortRemoteSession(running.instanceId, remoteSessionId); } + // Wait for `executeJob` to settle so the caller sees the terminal + // row, but cap the wait: the SDK prompt is not wired to our abort + // signal, so if the remote runtime is slow to honor `session.abort` + // we would otherwise block the RPC for the full prompt duration. + // On timeout, return the current (still-running) row — the caller + // can subscribe to `job.stream` for the eventual `done` event. const execution = this.runningTasks.get(job.id); if (execution) { - await execution; + await raceWithTimeout(execution, this.cancelWaitTimeoutMs); } return { job: this.store.assertJob(job.id) };