diff --git a/.changeset/sqlite-state-migration.md b/.changeset/sqlite-state-migration.md new file mode 100644 index 0000000..9df0924 --- /dev/null +++ b/.changeset/sqlite-state-migration.md @@ -0,0 +1,17 @@ +--- +"@techatnyu/ralphd": patch +"@techatnyu/ralph": patch +--- + +Migrate daemon persisted state from `~/.ralph/state.json` to a SQLite +database at `~/.ralph/state.sqlite`. + +The move enables transactional mutations, WAL-mode crash durability, and +indexed queries as the daemon scales. Schema is versioned via SQLite's +`PRAGMA user_version` so future migrations can be additive. + +**Upgrade note:** existing `state.json` files are not migrated automatically. +On first run after upgrade, the daemon starts with an empty database — you +will need to re-register any instances and resubmit in-flight jobs. Old +terminal job history is lost. If preserving state matters to you, hold off +upgrading until an explicit migration path ships. diff --git a/apps/tui/README.md b/apps/tui/README.md index d8da285..ff4b944 100644 --- a/apps/tui/README.md +++ b/apps/tui/README.md @@ -16,7 +16,7 @@ bun run src/cli.ts daemon health bun run src/cli.ts daemon stop ``` -The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.json`. +The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.sqlite` (SQLite). ## Run the TUI shell diff --git a/apps/tui/src/components/app.tsx b/apps/tui/src/components/app.tsx index 0cd22de..4010a56 100644 --- a/apps/tui/src/components/app.tsx +++ b/apps/tui/src/components/app.tsx @@ -167,8 +167,14 @@ function Dashboard({ const selectedInst = instanceList.instances[safeIndex]; const [jobs, sessionResult] = await Promise.all([ daemon.listJobs(selectedInst ? { instanceId: selectedInst.id } : {}), + // If the selected instance was removed between `listInstances` + // and this call, the daemon now throws `not_found` instead of + // returning an empty list. Swallow that narrow race so the + // whole refresh doesn't fail. selectedInst - ? daemon.listSessions(selectedInst.id) + ? daemon + .listSessions(selectedInst.id) + .catch(() => ({ sessions: [] })) : Promise.resolve({ sessions: [] }), ]); setSelectedIndex(safeIndex); diff --git a/packages/daemon/src/__tests__/daemon.test.ts b/packages/daemon/src/__tests__/daemon.test.ts index 8af1cba..bd0aa38 100644 --- a/packages/daemon/src/__tests__/daemon.test.ts +++ b/packages/daemon/src/__tests__/daemon.test.ts @@ -47,7 +47,7 @@ describe("Daemon", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-test-")); - store = new StateStore(join(tmpDir, "state.json")); + store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(40); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); @@ -103,6 +103,42 @@ describe("Daemon", () => { expect(result.job.instanceId).toBe(instance.instance.id); }); + test("forwards new session titles when creating remote sessions", async () => { + const created = await daemon.handleRequest( + req({ + id: "instance-create", + method: "instance.create", + params: { + name: "One", + directory: "/tmp/project-one", + }, + }), + ); + const instance = expectSuccess(created, "instance.create"); + + await daemon.handleRequest( + req({ + id: "job-submit", + method: "job.submit", + params: { + instanceId: instance.instance.id, + session: { type: "new", title: "Sprint Planning" }, + task: { + type: "prompt", + prompt: "hello world", + }, + }, + }), + ); + + await Bun.sleep(80); + expect(registry.sessionCreateCalls).toContainEqual({ + instanceId: instance.instance.id, + directory: "/tmp/project-one", + title: "Sprint Planning", + }); + }); + test("rejects submit with nonexistent instance", async () => { const submit = await daemon.handleRequest( req({ @@ -265,34 +301,227 @@ describe("Daemon", () => { expect(expectSuccess(cancel, "job.cancel").job.state).toBe("cancelled"); }); - test("requeues running jobs after restart", async () => { - await store.save({ - instances: [ - { - id: "instance-1", + test("cancelling a running job preserves the cancel error on the terminal row", async () => { + const created = await daemon.handleRequest( + req({ + id: "instance-create", + method: "instance.create", + params: { name: "One", directory: "/tmp/project-one", - status: "running", maxConcurrency: 1, - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", - }, - ], - sessions: [], - jobs: [ - { - id: "job-1", - instanceId: "instance-1", + }, + }), + ); + const createdResult = expectSuccess(created, "instance.create"); + + const submitted = await daemon.handleRequest( + req({ + id: "job-submit", + method: "job.submit", + params: { + instanceId: createdResult.instance.id, session: { type: "new" }, - task: { type: "prompt", prompt: "recover" }, - state: "running", - createdAt: "2026-01-01T00:00:00.000Z", - updatedAt: "2026-01-01T00:00:00.000Z", + task: { type: "prompt", prompt: "long-task" }, + }, + }), + ); + const submittedResult = expectSuccess(submitted, "job.submit"); + + // Wait until the job has transitioned to running inside the fake runtime. + await Bun.sleep(10); + + const cancel = await daemon.handleRequest( + req({ + id: "job-cancel", + method: "job.cancel", + params: { jobId: submittedResult.job.id }, + }), + ); + const cancelled = expectSuccess(cancel, "job.cancel").job; + expect(cancelled.state).toBe("cancelled"); + expect(cancelled.error).toBe("Job cancelled"); + + // Double-check via store after execution has fully settled — no later + // terminal write should have nulled out the error column. + await Bun.sleep(60); + const get = await daemon.handleRequest( + req({ + id: "job-get", + method: "job.get", + params: { jobId: submittedResult.job.id }, + }), + ); + const finalJob = expectSuccess(get, "job.get").job; + expect(finalJob.state).toBe("cancelled"); + expect(finalJob.error).toBe("Job cancelled"); + }); + + test("cancelling a running job returns within the timeout when the remote is slow", async () => { + // Recreate the daemon with a very long prompt delay and a short + // cancel wait timeout. The fake runtime's `abort` is a no-op so + // the prompt always runs to completion — this simulates a remote + // runtime that ignores `session.abort`. + await daemon.shutdown(); + const slowRegistry = new FakeOpencodeRegistry(1_000); + const slowStore = new StateStore(join(tmpDir, "slow.sqlite")); + const slowDaemon = new Daemon(slowStore, { + registry: slowRegistry, + cancelWaitTimeoutMs: 30, + }); + await slowDaemon.bootstrap(); + + const created = await slowDaemon.handleRequest( + req({ + id: "inst", + method: "instance.create", + params: { + name: "slow", + directory: "/tmp/project-slow", + maxConcurrency: 1, }, - ], + }), + ); + const instanceId = expectSuccess(created, "instance.create").instance.id; + + const submitted = await slowDaemon.handleRequest( + req({ + id: "sub", + method: "job.submit", + params: { + instanceId, + session: { type: "new" }, + task: { type: "prompt", prompt: "slow" }, + }, + }), + ); + const jobId = expectSuccess(submitted, "job.submit").job.id; + + // Wait until the job is running. + await Bun.sleep(20); + + // Cancel and measure. + const start = Date.now(); + const cancel = await slowDaemon.handleRequest( + req({ + id: "cancel", + method: "job.cancel", + params: { jobId }, + }), + ); + const elapsed = Date.now() - start; + + // Must return well before the 1s prompt delay completes. + expect(elapsed).toBeLessThan(500); + // Returned row may still be `running` (timeout fired first) — that + // is the point of the timeout; the terminal row will arrive via + // the `done` stream event. + const returned = expectSuccess(cancel, "job.cancel").job; + expect(["running", "cancelled"]).toContain(returned.state); + + // Allow the fake's prompt to complete and executeJob to record + // the terminal row. + await Bun.sleep(1_100); + const get = await slowDaemon.handleRequest( + req({ + id: "get", + method: "job.get", + params: { jobId }, + }), + ); + const settled = expectSuccess(get, "job.get").job; + expect(settled.state).toBe("cancelled"); + expect(settled.error).toBe("Job cancelled"); + + await slowDaemon.shutdown(); + }); + + test("session.list rejects unknown instance ids", async () => { + const response = await daemon.handleRequest( + req({ + id: "session-list", + method: "session.list", + params: { instanceId: "does-not-exist" }, + }), + ); + expect(expectFailure(response).error.code).toBe("not_found"); + }); + + test("submitting a second job during an in-flight drain still runs it promptly", async () => { + const createOne = await daemon.handleRequest( + req({ + id: "inst-1", + method: "instance.create", + params: { name: "One", directory: "/tmp/project-one" }, + }), + ); + const createTwo = await daemon.handleRequest( + req({ + id: "inst-2", + method: "instance.create", + params: { name: "Two", directory: "/tmp/project-two" }, + }), + ); + const one = expectSuccess(createOne, "instance.create"); + const two = expectSuccess(createTwo, "instance.create"); + + // Submit two jobs back-to-back. The second submit happens while the + // first scheduleDrain()'s microtask/promise is still in flight and + // must be coalesced via drainPending so the second job runs without + // waiting for the first to complete. + await daemon.handleRequest( + req({ + id: "submit-1", + method: "job.submit", + params: { + instanceId: one.instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "first" }, + }, + }), + ); + await daemon.handleRequest( + req({ + id: "submit-2", + method: "job.submit", + params: { + instanceId: two.instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "second" }, + }, + }), + ); + + // Fake prompt delay is 40ms; if drain coalescing regressed, the + // second job would run strictly after the first and both jobs would + // not be concurrently active. + await Bun.sleep(30); + expect(registry.globalMaxConcurrent).toBeGreaterThanOrEqual(2); + }); + + test("requeues running jobs after restart", async () => { + // Shut down the beforeEach daemon so we can simulate a crashed state + // by writing directly to the SQLite file. + await daemon.shutdown(); + + const seedStore = new StateStore(join(tmpDir, "state.sqlite")); + await seedStore.open(); + const instance = seedStore.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + seedStore.setInstanceStatus(instance.id, "running"); + const job = seedStore.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "recover" }, }); + seedStore.markJobRunning(job.id); + seedStore.close(); - const nextDaemon = new Daemon(store, { + const nextStore = new StateStore(join(tmpDir, "state.sqlite")); + const nextDaemon = new Daemon(nextStore, { registry: new FakeOpencodeRegistry(10), }); await nextDaemon.bootstrap(); @@ -300,7 +529,7 @@ describe("Daemon", () => { req({ id: "job-get", method: "job.get", - params: { jobId: "job-1" }, + params: { jobId: job.id }, }), ); expect(["queued", "running", "succeeded"]).toContain( @@ -318,7 +547,7 @@ describe("Daemon sessions", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-session-")); - store = new StateStore(join(tmpDir, "state.json")); + store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(10); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); @@ -542,7 +771,8 @@ describe("Daemon sessions", () => { }), ); - // Verify sessions are gone + // After removal, session.list for that instance must fail with + // not_found — the cascade deletes sessions with the instance row. const after = await daemon.handleRequest( req({ id: "session-list-after", @@ -550,7 +780,7 @@ describe("Daemon sessions", () => { params: { instanceId }, }), ); - expect(expectSuccess(after, "session.list").sessions).toHaveLength(0); + expect(expectFailure(after).error.code).toBe("not_found"); }); }); @@ -562,7 +792,7 @@ describe("Daemon streaming", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-stream-")); - store = new StateStore(join(tmpDir, "state.json")); + store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(40); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); diff --git a/packages/daemon/src/__tests__/env.test.ts b/packages/daemon/src/__tests__/env.test.ts index fcd3606..ab8be19 100644 --- a/packages/daemon/src/__tests__/env.test.ts +++ b/packages/daemon/src/__tests__/env.test.ts @@ -17,7 +17,7 @@ describe("env", () => { expect(env).toEqual({ ralphHome, socketPath: join(ralphHome, "ralphd.sock"), - statePath: join(ralphHome, "state.json"), + databasePath: join(ralphHome, "state.sqlite"), }); }); @@ -29,7 +29,7 @@ describe("env", () => { ).toEqual({ ralphHome: "/tmp/ralph", socketPath: "/tmp/ralph/ralphd.sock", - statePath: "/tmp/ralph/state.json", + databasePath: "/tmp/ralph/state.sqlite", }); expect( diff --git a/packages/daemon/src/__tests__/helpers.ts b/packages/daemon/src/__tests__/helpers.ts index a775ae2..714fc4e 100644 --- a/packages/daemon/src/__tests__/helpers.ts +++ b/packages/daemon/src/__tests__/helpers.ts @@ -60,6 +60,11 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager { sessionId: string; prompt: string; }> = []; + readonly sessionCreateCalls: Array<{ + instanceId: string; + directory?: string; + title?: string; + }> = []; readonly abortCalls: Array<{ instanceId: string; sessionId: string }> = []; readonly directoriesStarted: string[] = []; readonly disposeCalls: Array<{ directory?: string }> = []; @@ -105,9 +110,18 @@ export class FakeOpencodeRegistry implements OpencodeRuntimeManager { }, }, session: { - create: async () => { + create: async (parameters) => { + this.sessionCreateCalls.push({ + instanceId, + directory: parameters.directory, + title: parameters.title, + }); const id = `session-${instanceId}-${this.sessionSequence++}`; - return fakeSession({ id }); + return fakeSession({ + id, + directory: parameters.directory ?? "/fake", + title: parameters.title ?? "fake", + }); }, prompt: async (parameters) => { const sessionId = parameters.sessionID; diff --git a/packages/daemon/src/__tests__/integration.test.ts b/packages/daemon/src/__tests__/integration.test.ts index 4002eb1..d7440e2 100644 --- a/packages/daemon/src/__tests__/integration.test.ts +++ b/packages/daemon/src/__tests__/integration.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; -import { chmod, mkdtemp, rm } from "node:fs/promises"; +import { chmod, mkdtemp, rm, unlink } from "node:fs/promises"; import { createServer, type Server } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -10,7 +10,43 @@ import { createConnectionHandler, Daemon } from "../server"; import { StateStore } from "../store"; import { FakeOpencodeRegistry } from "./helpers"; -describe("Integration: server + client over Unix socket", () => { +/** False in restricted environments (e.g. some sandboxes) where binding a Unix socket returns EPERM. */ +async function canBindUnixSocket(): Promise { + let probeDir: string | undefined; + try { + probeDir = await mkdtemp(join(tmpdir(), "ralph-unix-probe-")); + const path = join(probeDir, "p.sock"); + const srv = createServer(); + await new Promise((resolve, reject) => { + const onError = (err: NodeJS.ErrnoException) => reject(err); + srv.once("error", onError); + try { + srv.listen(path, () => { + srv.removeListener("error", onError); + srv.close((closeErr) => { + if (closeErr) reject(closeErr); + else resolve(); + }); + }); + } catch (err) { + reject(err); + } + }); + return true; + } catch { + return false; + } finally { + if (probeDir) { + await rm(probeDir, { recursive: true, force: true }).catch(() => {}); + } + } +} + +const unixSocketAvailable = await canBindUnixSocket(); + +const integrationDescribe = unixSocketAvailable ? describe : describe.skip; + +integrationDescribe("Integration: server + client over Unix socket", () => { let tmpDir: string; let testSocketPath: string; let server: Server; @@ -21,7 +57,12 @@ describe("Integration: server + client over Unix socket", () => { beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-integration-")); testSocketPath = join(tmpDir, "test.sock"); - const store = new StateStore(join(tmpDir, "state.json")); + try { + await unlink(testSocketPath); + } catch { + // ignore + } + const store = new StateStore(join(tmpDir, "state.sqlite")); registry = new FakeOpencodeRegistry(20); daemon = new Daemon(store, { registry }); await daemon.bootstrap(); @@ -29,16 +70,31 @@ describe("Integration: server + client over Unix socket", () => { server = createServer(createConnectionHandler(daemon)); client = new DaemonClient(testSocketPath); - await new Promise((resolve) => { - server.listen(testSocketPath, async () => { - await chmod(testSocketPath, 0o600); - resolve(); - }); + await new Promise((resolve, reject) => { + const onError = (err: NodeJS.ErrnoException) => { + reject(err); + }; + server.once("error", onError); + try { + server.listen(testSocketPath, async () => { + try { + await chmod(testSocketPath, 0o600); + server.removeListener("error", onError); + resolve(); + } catch (err) { + server.removeListener("error", onError); + reject(err); + } + }); + } catch (err) { + server.removeListener("error", onError); + reject(err); + } }); }); afterEach(async () => { - server.close(); + server?.close(); await daemon.shutdown(); await rm(tmpDir, { recursive: true, force: true }); }); @@ -119,7 +175,6 @@ describe("Integration: server + client over Unix socket", () => { }); test("client.streamJob returns immediately for an already-terminal job", async () => { - // No streaming deltas — fake completes quickly with default delay. const created = await client.createInstance({ name: "fast-instance", directory: "/tmp/project-fast", @@ -130,7 +185,6 @@ describe("Integration: server + client over Unix socket", () => { task: { type: "prompt", prompt: "fast" }, }); - // Wait for the job to finish. await Bun.sleep(100); const events: JobStreamEvent[] = []; @@ -138,8 +192,6 @@ describe("Integration: server + client over Unix socket", () => { events.push(event); } - // Terminal jobs short-circuit: just a done event, no snapshot or - // deltas. expect(events).toHaveLength(1); expect(events[0]?.type).toBe("done"); }); diff --git a/packages/daemon/src/__tests__/protocol.test.ts b/packages/daemon/src/__tests__/protocol.test.ts index 03c6f22..80e8386 100644 --- a/packages/daemon/src/__tests__/protocol.test.ts +++ b/packages/daemon/src/__tests__/protocol.test.ts @@ -1,6 +1,6 @@ import { describe, expect, test } from "bun:test"; -import { DaemonState, RequestMessage, ResponseMessage } from "../protocol"; +import { RequestMessage, ResponseMessage } from "../protocol"; describe("protocol schemas", () => { test("parses a valid submit request", () => { @@ -62,14 +62,4 @@ describe("protocol schemas", () => { expect(parsed.success).toBe(false); }); - - test("parses daemon state", () => { - const parsed = DaemonState.safeParse({ - instances: [], - sessions: [], - jobs: [], - }); - - expect(parsed.success).toBe(true); - }); }); diff --git a/packages/daemon/src/__tests__/store.test.ts b/packages/daemon/src/__tests__/store.test.ts index a146117..d2fd45f 100644 --- a/packages/daemon/src/__tests__/store.test.ts +++ b/packages/daemon/src/__tests__/store.test.ts @@ -1,116 +1,325 @@ +import { Database } from "bun:sqlite"; import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdtemp, readFile, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import type { DaemonJob, DaemonState, ManagedInstance } from "../protocol"; import { StateStore, StoreError } from "../store"; -function makeInstance( - overrides: Partial = {}, -): ManagedInstance { - return { - id: overrides.id ?? "instance-1", - name: overrides.name ?? "Instance One", - directory: overrides.directory ?? "/tmp/project-one", - status: overrides.status ?? "stopped", - maxConcurrency: overrides.maxConcurrency ?? 1, - createdAt: overrides.createdAt ?? "2026-01-01T00:00:00.000Z", - updatedAt: overrides.updatedAt ?? "2026-01-01T00:00:00.000Z", - ...overrides, - }; -} - -function makeJob(overrides: Partial = {}): DaemonJob { - return { - id: overrides.id ?? "job-1", - instanceId: overrides.instanceId ?? "instance-1", - session: overrides.session ?? { type: "new" }, - task: overrides.task ?? { type: "prompt", prompt: "test prompt" }, - state: overrides.state ?? "queued", - createdAt: overrides.createdAt ?? "2026-01-01T00:00:00.000Z", - updatedAt: overrides.updatedAt ?? "2026-01-01T00:00:00.000Z", - ...overrides, - }; -} - describe("StateStore", () => { let tmpDir: string; - let statePath: string; + let databasePath: string; let store: StateStore; beforeEach(async () => { tmpDir = await mkdtemp(join(tmpdir(), "ralph-test-")); - statePath = join(tmpDir, "state.json"); - store = new StateStore(statePath); + databasePath = join(tmpDir, "state.sqlite"); + store = new StateStore(databasePath); + await store.open(); }); afterEach(async () => { + store.close(); await rm(tmpDir, { recursive: true, force: true }); }); - test("returns empty state when file does not exist", async () => { - const state = await store.load(); - expect(state).toEqual({ instances: [], sessions: [], jobs: [] }); + test("starts empty on a fresh database", () => { + expect(store.listInstances()).toEqual([]); + expect(store.listJobs()).toEqual([]); }); - test("writes state to disk as formatted JSON", async () => { - const state: DaemonState = { - instances: [makeInstance()], - sessions: [], - jobs: [makeJob()], - }; - await store.save(state); - - const raw = await readFile(statePath, "utf8"); - expect(raw).toEndWith("\n"); - const parsed = JSON.parse(raw); - expect(parsed.instances).toHaveLength(1); - expect(parsed.jobs).toHaveLength(1); - }); - - test("adds and updates instances", () => { - let state: DaemonState = { instances: [], sessions: [], jobs: [] }; - state = store.createInstance(state, makeInstance()); - expect(state.instances).toHaveLength(1); - state = store.upsertInstance( - state, - makeInstance({ - status: "running", - updatedAt: "2026-01-02T00:00:00.000Z", - }), - ); - expect(state.instances[0]?.status).toBe("running"); + test("writes a SQLite file on open", async () => { + const raw = await readFile(databasePath, "utf8"); + expect(raw.slice(0, 15)).toBe("SQLite format 3"); + }); + + test("creates instances with a generated id and stopped status", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 2, + }); + expect(instance.id).toBeTruthy(); + expect(instance.status).toBe("stopped"); + expect(instance.maxConcurrency).toBe(2); }); test("rejects duplicate instance directories", () => { - const state: DaemonState = { - instances: [makeInstance()], - sessions: [], - jobs: [], + store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + expect(() => + store.createInstance({ + name: "Two", + directory: "/tmp/project-one", + maxConcurrency: 1, + }), + ).toThrow(StoreError); + }); + + test("setInstanceStatus updates status and lastError", () => { + const created = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const running = store.setInstanceStatus(created.id, "running"); + expect(running.status).toBe("running"); + expect(running.lastError).toBeUndefined(); + + const failed = store.setInstanceStatus(created.id, "error", "boom"); + expect(failed.status).toBe("error"); + expect(failed.lastError).toBe("boom"); + }); + + test("createJob creates a queued job linked to a session row", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hello" }, + }); + expect(job.state).toBe("queued"); + expect(job.sessionId).toBeUndefined(); + expect(job.task).toEqual({ type: "prompt", prompt: "hello" }); + }); + + test("deduplicates sessions for two jobs sharing the same remote session", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + store.createJob({ + instanceId: instance.id, + session: { type: "existing", sessionId: "remote-sess-1" }, + task: { type: "prompt", prompt: "a" }, + }); + store.createJob({ + instanceId: instance.id, + session: { type: "existing", sessionId: "remote-sess-1" }, + task: { type: "prompt", prompt: "b" }, + }); + + const db = new Database(databasePath, { readonly: true }); + const row = db.query("SELECT COUNT(*) AS c FROM sessions").get() as { + c: number; }; + db.close(); + expect(row.c).toBe(1); + }); + + test("listSessions and assertSession only include resolved remote ids", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + expect(store.listSessions({ instanceId: instance.id })).toHaveLength(0); + + store.assignRemoteSessionToJob(job.id, "remote-abc", "My title"); + const sessions = store.listSessions({ instanceId: instance.id }); + expect(sessions).toHaveLength(1); + expect(sessions[0]?.id).toBe("remote-abc"); + expect(sessions[0]?.title).toBe("My title"); + expect(store.assertSession("remote-abc").instanceId).toBe(instance.id); + }); + + test("listJobs filters by sessionId", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const j1 = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "a" }, + }); + const j2 = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "b" }, + }); + store.assignRemoteSessionToJob(j1.id, "sess-x", "t1"); + store.assignRemoteSessionToJob(j2.id, "sess-y", "t2"); + + const filtered = store.listJobs({ sessionId: "sess-x" }); + expect(filtered).toHaveLength(1); + expect(filtered[0]?.id).toBe(j1.id); + }); + + test("markJobRunning and markJobTerminal drive state transitions", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + + const running = store.markJobRunning(job.id); + expect(running.state).toBe("running"); + expect(running.startedAt).toBeTruthy(); + + const done = store.markJobTerminal(job.id, "succeeded", { + outputText: "result", + messageId: "msg-1", + }); + expect(done.state).toBe("succeeded"); + expect(done.outputText).toBe("result"); + expect(done.messageId).toBe("msg-1"); + expect(done.endedAt).toBeTruthy(); + }); + + test("appendJobOutput concatenates deltas", () => { + const instance = store.createInstance({ + name: "One", + directory: "/tmp/project-one", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + store.appendJobOutput(job.id, "foo "); + store.appendJobOutput(job.id, "bar"); + expect(store.assertJob(job.id).outputText).toBe("foo bar"); + }); + + test("listJobs filters by instance and state", () => { + const a = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const b = store.createInstance({ + name: "B", + directory: "/tmp/b", + maxConcurrency: 1, + }); + store.createJob({ + instanceId: a.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "1" }, + }); + const j2 = store.createJob({ + instanceId: a.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "2" }, + }); + store.createJob({ + instanceId: b.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "3" }, + }); + store.markJobRunning(j2.id); + + expect(store.listJobs({ instanceId: a.id })).toHaveLength(2); + expect(store.listJobs({ state: "running" })).toHaveLength(1); + expect(store.listJobs({ instanceId: a.id, state: "queued" })).toHaveLength( + 1, + ); + }); + + test("recoverForBootstrap requeues running jobs and resets instances", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + store.setInstanceStatus(instance.id, "running"); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + store.markJobRunning(job.id); + + const requeued = store.recoverForBootstrap(); + expect(requeued).toEqual([{ id: job.id, instanceId: instance.id }]); + expect(store.assertJob(job.id).state).toBe("queued"); + expect(store.assertInstance(instance.id).status).toBe("stopped"); + }); + + test("recoverForBootstrap appends to existing job error on requeue", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: "hi" }, + }); + // Seed a running job that already has a non-null error (e.g. from a + // prior partial terminal write that did not commit to a terminal state). + store.markJobRunning(job.id); + const db = new Database(databasePath); + db.run("UPDATE jobs SET error = 'prior error' WHERE id = ?", [job.id]); + db.close(); + + store.recoverForBootstrap(); + const recovered = store.assertJob(job.id); + expect(recovered.state).toBe("queued"); + expect(recovered.error).toBe("prior error Recovered after daemon restart"); + }); + + test("assignRemoteSessionToJob throws when job does not exist", () => { expect(() => - store.createInstance( - state, - makeInstance({ id: "instance-2", directory: "/tmp/project-one" }), + store.assignRemoteSessionToJob( + "00000000-0000-0000-0000-000000000000", + "remote-xyz", + "title", ), ).toThrow(StoreError); }); - test("filters jobs by instance and state", () => { - const state: DaemonState = { - instances: [ - makeInstance(), - makeInstance({ id: "instance-2", directory: "/tmp/project-two" }), - ], - sessions: [], - jobs: [ - makeJob(), - makeJob({ id: "job-2", instanceId: "instance-2" }), - makeJob({ id: "job-3", state: "running" }), - ], - }; - expect(store.listJobs(state, { instanceId: "instance-1" })).toHaveLength(2); - expect(store.listJobs(state, { state: "running" })).toHaveLength(1); + test("pruneTerminalJobs keeps the newest N terminal rows", () => { + const instance = store.createInstance({ + name: "A", + directory: "/tmp/a", + maxConcurrency: 1, + }); + const ids: string[] = []; + for (let i = 0; i < 5; i++) { + const job = store.createJob({ + instanceId: instance.id, + session: { type: "new" }, + task: { type: "prompt", prompt: `p-${i}` }, + }); + store.markJobTerminal(job.id, "succeeded", { outputText: String(i) }); + ids.push(job.id); + // Space created_at so ordering is deterministic. + const db = new Database(databasePath); + db.run("UPDATE jobs SET created_at = ? WHERE id = ?", [ + new Date(Date.now() + i * 1000).toISOString(), + job.id, + ]); + db.close(); + } + store.pruneTerminalJobs(2); + const remaining = store.listJobs(); + expect(remaining).toHaveLength(2); + // Newest two should survive (ids[4], ids[3]). + const remainingIds = new Set(remaining.map((j) => j.id)); + expect(remainingIds.has(ids[4] as string)).toBe(true); + expect(remainingIds.has(ids[3] as string)).toBe(true); }); }); diff --git a/packages/daemon/src/db.ts b/packages/daemon/src/db.ts new file mode 100644 index 0000000..1ea29b9 --- /dev/null +++ b/packages/daemon/src/db.ts @@ -0,0 +1,104 @@ +import { Database } from "bun:sqlite"; + +export const CURRENT_SCHEMA_VERSION = 1; + +function getUserVersion(db: Database): number { + const row = db.query("PRAGMA user_version").get() as { + user_version: number; + }; + return row?.user_version ?? 0; +} + +function setUserVersion(db: Database, version: number): void { + db.run(`PRAGMA user_version = ${version}`); +} + +/** + * Opens the daemon SQLite database, applies PRAGMAs, and runs migrations. + */ +export function openDaemonDatabase(databasePath: string): Database { + const db = new Database(databasePath, { create: true }); + db.run("PRAGMA foreign_keys = ON;"); + db.run("PRAGMA journal_mode = WAL;"); + + const from = getUserVersion(db); + if (from === CURRENT_SCHEMA_VERSION) { + return db; + } + + if (from > CURRENT_SCHEMA_VERSION) { + db.close(); + throw new Error( + `daemon database schema v${from} is newer than supported v${CURRENT_SCHEMA_VERSION}`, + ); + } + + db.transaction(() => { + if (from < 1) { + db.run(` + CREATE TABLE instances ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + directory TEXT NOT NULL UNIQUE, + status TEXT NOT NULL CHECK(status IN ('stopped','starting','running','error')), + max_concurrency INTEGER NOT NULL CHECK(max_concurrency > 0), + last_error TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + `); + + db.run(` + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + instance_id TEXT NOT NULL REFERENCES instances(id) ON DELETE CASCADE, + remote_session_id TEXT, + kind TEXT NOT NULL CHECK(kind IN ('new','existing')), + title TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + `); + + db.run(` + CREATE UNIQUE INDEX idx_sessions_instance_remote + ON sessions(instance_id, remote_session_id) + WHERE remote_session_id IS NOT NULL; + `); + + db.run(` + CREATE TABLE jobs ( + id TEXT PRIMARY KEY, + instance_id TEXT NOT NULL REFERENCES instances(id) ON DELETE CASCADE, + session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE, + state TEXT NOT NULL CHECK(state IN ('queued','running','succeeded','failed','cancelled')), + prompt TEXT NOT NULL, + agent TEXT, + model_provider_id TEXT, + model_id TEXT, + system_prompt TEXT, + variant TEXT, + message_id TEXT, + error TEXT, + output_text TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + started_at TEXT, + ended_at TEXT + ); + `); + + db.run( + `CREATE INDEX idx_jobs_instance_created ON jobs(instance_id, created_at DESC);`, + ); + db.run(`CREATE INDEX idx_jobs_state ON jobs(state, created_at DESC);`); + db.run( + `CREATE INDEX idx_sessions_instance ON sessions(instance_id, updated_at DESC);`, + ); + + setUserVersion(db, 1); + } + })(); + + return db; +} diff --git a/packages/daemon/src/env.ts b/packages/daemon/src/env.ts index 2a8d9fa..e78fe52 100644 --- a/packages/daemon/src/env.ts +++ b/packages/daemon/src/env.ts @@ -6,7 +6,8 @@ export const DEFAULT_DAEMON_MAX_CONCURRENCY = 4; export interface DaemonPaths { ralphHome: string; socketPath: string; - statePath: string; + /** SQLite database file for daemon persisted state (instances, sessions, jobs). */ + databasePath: string; } export interface DaemonRuntimeEnv extends DaemonPaths { @@ -52,7 +53,7 @@ export function resolveDaemonPaths( return { ralphHome, socketPath: join(ralphHome, "ralphd.sock"), - statePath: join(ralphHome, "state.json"), + databasePath: join(ralphHome, "state.sqlite"), }; } @@ -81,4 +82,4 @@ const defaultPaths = resolveDaemonPaths(); export const RALPH_HOME = defaultPaths.ralphHome; export const SOCKET_PATH = defaultPaths.socketPath; -export const STATE_PATH = defaultPaths.statePath; +export const DATABASE_PATH = defaultPaths.databasePath; diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 2362e54..846dd07 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -1,5 +1,5 @@ export { DaemonClient, daemon } from "./client"; -export { RALPH_HOME, resolveDaemonPaths } from "./env"; +export { DATABASE_PATH, RALPH_HOME, resolveDaemonPaths } from "./env"; export { ensureDaemonRunning, runForegroundDaemon, diff --git a/packages/daemon/src/protocol.ts b/packages/daemon/src/protocol.ts index cde0451..b8545f7 100644 --- a/packages/daemon/src/protocol.ts +++ b/packages/daemon/src/protocol.ts @@ -73,11 +73,10 @@ const ManagedInstance = z.strictObject({ export type ManagedInstance = z.infer; /** A job that has been submitted to the daemon for execution. */ -const DaemonJob = z.strictObject({ +export const DaemonJob = z.strictObject({ id: z.string().min(1), instanceId: z.string().min(1), sessionId: z.string().min(1).optional(), - session: JobSession, task: JobTask, state: JobState, createdAt: IsoDateTime, @@ -90,8 +89,8 @@ const DaemonJob = z.strictObject({ }); export type DaemonJob = z.infer; -/** A conversation session belonging to an instance. */ -const DaemonSession = z.strictObject({ +/** A conversation session belonging to an instance (id is the remote OpenCode session id). */ +export const DaemonSession = z.strictObject({ id: z.string().min(1), instanceId: z.string().min(1), title: z.string().min(1), @@ -667,17 +666,6 @@ export const ResponseMessage = z.union([ ]); export type ResponseMessage = z.infer; -// --------------------------------------------------------------------------- -// Daemon persisted state -// --------------------------------------------------------------------------- - -export const DaemonState = z.strictObject({ - instances: z.array(ManagedInstance), - sessions: z.array(DaemonSession), - jobs: z.array(DaemonJob), -}); -export type DaemonState = z.infer; - // --------------------------------------------------------------------------- // Utility types — type-level helpers for method-based dispatch // --------------------------------------------------------------------------- diff --git a/packages/daemon/src/server.ts b/packages/daemon/src/server.ts index 19557a4..74e406e 100644 --- a/packages/daemon/src/server.ts +++ b/packages/daemon/src/server.ts @@ -14,16 +14,11 @@ import { import { type CancelResult, type DaemonJob, - type DaemonSession, - type DaemonState, - DaemonState as DaemonStateSchema, type ErrorResponse, type GetResult, type HealthResult, - type InstanceHealth, type InstanceListResult, type InstanceResult, - type JobState, type JobStreamEvent, type ListResult, type ManagedInstance, @@ -42,7 +37,18 @@ import { type StreamAckResult, type SubmitResult, } from "./protocol"; -import { StateStore, StoreError } from "./store"; +import { type JobSessionRef, StateStore, StoreError } from "./store"; + +const MAX_TERMINAL_JOBS = 100; +/** + * Upper bound on how long `job.cancel` will wait for the in-flight execution + * to settle after the local abort + remote `session.abort` have been issued. + * If the remote runtime ignores or is slow to honor the abort, the RPC still + * returns promptly with the job's current (non-terminal) row, and the + * executor's eventual terminal write is delivered via the `done` stream + * event. + */ +const CANCEL_WAIT_TIMEOUT_MS = 2_000; interface RunningJob { controller: AbortController; @@ -52,6 +58,8 @@ interface RunningJob { interface DaemonOptions { registry?: OpencodeRuntimeManager; maxConcurrency?: number; + /** Override for `CANCEL_WAIT_TIMEOUT_MS`. Tests only. */ + cancelWaitTimeoutMs?: number; } function extractText(parts: Part[]): string { @@ -66,12 +74,15 @@ function extractText(parts: Part[]): string { const MAX_SESSION_TITLE_LENGTH = 80; -function deriveSessionTitle(job: DaemonJob): string { - if (job.session.type === "new" && job.session.title) { - return job.session.title; +function deriveSessionTitle(sessionRef: JobSessionRef, job: DaemonJob): string { + if (sessionRef.kind === "new" && sessionRef.title) { + return sessionRef.title; } if (job.task.type === "prompt") { const text = job.task.prompt.trim(); + if (text.length === 0) { + return "Untitled"; + } if (text.length > MAX_SESSION_TITLE_LENGTH) { return `${text.slice(0, MAX_SESSION_TITLE_LENGTH - 3)}...`; } @@ -80,6 +91,30 @@ function deriveSessionTitle(job: DaemonJob): string { return "Untitled"; } +/** + * Resolves when either `promise` settles or `timeoutMs` elapses, whichever + * comes first. Never rejects. The caller should read downstream state from + * the store after this returns — the returned value is intentionally unused. + */ +function raceWithTimeout( + promise: Promise, + timeoutMs: number, +): Promise { + return new Promise((resolve) => { + let settled = false; + const done = () => { + if (settled) return; + settled = true; + resolve(); + }; + const timer = setTimeout(done, timeoutMs); + promise.finally(() => { + clearTimeout(timer); + done(); + }); + }); +} + function normalizeErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; @@ -101,17 +136,13 @@ function normalizeErrorMessage(error: unknown): string { } export class Daemon { - private state: DaemonState = structuredClone( - DaemonStateSchema.parse({ - instances: [], - sessions: [], - jobs: [], - }), - ); private readonly registry: OpencodeRuntimeManager; + /** Per-instance queue of job ids waiting to be scheduled. */ private readonly queues = new Map(); private readonly runningJobs = new Map(); private readonly runningTasks = new Map>(); + /** Maps running job id → its OpenCode session id, for delta routing. */ + private readonly runningSessionIds = new Map(); private readonly jobStreams = new Map< string, Set<(event: JobStreamEvent) => void> @@ -119,10 +150,13 @@ export class Daemon { private startedAt = Date.now(); private onShutdown: (() => void) | undefined; private drainPromise: Promise | undefined; + /** True if `scheduleDrain` ran while a drain was already in flight — run again when it finishes. */ + private drainPending = false; private shuttingDown = false; private shutdownPromise: Promise | undefined; private instanceCursor = 0; private readonly maxConcurrency: number; + private readonly cancelWaitTimeoutMs: number; constructor( private readonly store: StateStore, @@ -142,6 +176,8 @@ export class Daemon { }); this.maxConcurrency = options.maxConcurrency ?? resolveDaemonRuntimeEnv().maxConcurrency; + this.cancelWaitTimeoutMs = + options.cancelWaitTimeoutMs ?? CANCEL_WAIT_TIMEOUT_MS; } setShutdownHandler(handler: () => void): void { @@ -149,9 +185,17 @@ export class Daemon { } async bootstrap(): Promise { - this.state = await this.store.load(); - this.state = await this.recoverPersistedState(this.state); - await this.store.save(this.state); + await this.store.open(); + const requeued = this.store.recoverForBootstrap(); + this.queues.clear(); + for (const { id, instanceId } of requeued) { + this.enqueueById(instanceId, id); + } + // Also enqueue any queued-at-startup jobs that were never running. + for (const job of this.store.listJobs({ state: "queued" })) { + this.enqueueById(job.instanceId, job.id); + } + this.store.pruneTerminalJobs(MAX_TERMINAL_JOBS); this.scheduleDrain(); } @@ -184,7 +228,7 @@ export class Daemon { case "session.get": return this.success(raw, this.handleSessionGet(raw)); case "job.submit": - return this.success(raw, await this.handleJobSubmit(raw)); + return this.success(raw, this.handleJobSubmit(raw)); case "job.list": return this.success(raw, this.handleJobList(raw)); case "job.get": @@ -213,7 +257,7 @@ export class Daemon { await this.drainPromise; await Promise.allSettled([...this.runningTasks.values()]); await this.registry.stopAll(); - await this.store.save(this.state); + this.store.close(); })(); return this.shutdownPromise; @@ -223,61 +267,33 @@ export class Daemon { return { pid: process.pid, uptimeSeconds: Math.floor((Date.now() - this.startedAt) / 1000), - queued: this.jobCount("queued"), - running: this.jobCount("running"), - finished: this.state.jobs.filter((job: DaemonJob) => - ["succeeded", "failed", "cancelled"].includes(job.state), - ).length, - instances: this.state.instances.map( - (instance: ManagedInstance): InstanceHealth => ({ - instanceId: instance.id, - name: instance.name, - status: instance.status, - running: this.instanceJobCount(instance.id, "running"), - queued: this.instanceJobCount(instance.id, "queued"), - finished: this.state.jobs.filter( - (job: DaemonJob) => - job.instanceId === instance.id && - ["succeeded", "failed", "cancelled"].includes(job.state), - ).length, - lastError: instance.lastError, - }), - ), + queued: this.store.countJobsByState("queued"), + running: this.store.countJobsByState("running"), + finished: this.store.countFinishedJobs(), + instances: this.store.instanceHealth(), }; } - private async handleInstanceCreate( + private handleInstanceCreate( request: RequestByMethod<"instance.create">, - ): Promise { - const now = new Date().toISOString(); - const instance: ManagedInstance = { - id: randomUUID(), + ): InstanceResult { + const instance = this.store.createInstance({ name: request.params.name.trim(), directory: request.params.directory, - status: "stopped", maxConcurrency: request.params.maxConcurrency ?? 1, - createdAt: now, - updatedAt: now, - }; - this.state = this.store.createInstance(this.state, instance); - await this.store.save(this.state); + }); return { instance }; } private handleInstanceList(): InstanceListResult { - return { - instances: this.store.listInstances(this.state), - }; + return { instances: this.store.listInstances() }; } private handleInstanceGet( request: RequestByMethod<"instance.get">, ): InstanceResult { return { - instance: this.store.assertInstance( - this.state, - request.params.instanceId, - ), + instance: this.store.assertInstance(request.params.instanceId), }; } @@ -291,10 +307,7 @@ export class Daemon { private async handleInstanceStop( request: RequestByMethod<"instance.stop">, ): Promise { - const instance = this.store.assertInstance( - this.state, - request.params.instanceId, - ); + const instance = this.store.assertInstance(request.params.instanceId); if (this.runningCountForInstance(instance.id) > 0) { throw new StoreError( "conflict", @@ -303,29 +316,15 @@ export class Daemon { } await this.registry.stop(instance.id); - const stopped: ManagedInstance = { - ...instance, - status: "stopped", - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, stopped); - await this.store.save(this.state); + const stopped = this.store.setInstanceStatus(instance.id, "stopped"); return { instance: stopped }; } private async handleInstanceRemove( request: RequestByMethod<"instance.remove">, ): Promise { - const instance = this.store.assertInstance( - this.state, - request.params.instanceId, - ); - const active = this.state.jobs.some( - (job: DaemonJob) => - job.instanceId === instance.id && - (job.state === "queued" || job.state === "running"), - ); - if (active) { + const instance = this.store.assertInstance(request.params.instanceId); + if (this.store.hasActiveJobs(instance.id)) { throw new StoreError( "conflict", `instance ${instance.id} has active jobs and cannot be removed`, @@ -334,9 +333,7 @@ export class Daemon { await this.registry.stop(instance.id); this.queues.delete(instance.id); - this.state = this.store.removeInstance(this.state, instance.id); - this.state = this.store.removeSessionsForInstance(this.state, instance.id); - await this.store.save(this.state); + this.store.removeInstance(instance.id); return { instance }; } @@ -344,9 +341,7 @@ export class Daemon { request: RequestByMethod<"provider.list">, ): Promise { return this.registry.queryProviders( - this.state.instances.map( - (instance: ManagedInstance) => instance.directory, - ), + this.store.listInstances().map((instance) => instance.directory), request.params.directory, request.params.refresh, ); @@ -355,8 +350,9 @@ export class Daemon { private handleSessionList( request: RequestByMethod<"session.list">, ): SessionListResult { + this.store.assertInstance(request.params.instanceId); return { - sessions: this.store.listSessions(this.state, { + sessions: this.store.listSessions({ instanceId: request.params.instanceId, }), }; @@ -366,60 +362,49 @@ export class Daemon { request: RequestByMethod<"session.get">, ): SessionGetResult { return { - session: this.store.assertSession(this.state, request.params.sessionId), + session: this.store.assertSession(request.params.sessionId), }; } - private async handleJobSubmit( + private handleJobSubmit( request: RequestByMethod<"job.submit">, - ): Promise { + ): SubmitResult { if (this.shuttingDown) { throw new StoreError("shutdown", "daemon is shutting down"); } const { instanceId } = request.params; - this.store.assertInstance(this.state, instanceId); + this.store.assertInstance(instanceId); - const now = new Date().toISOString(); - const job: DaemonJob = { - id: randomUUID(), + const job = this.store.createJob({ instanceId, session: request.params.session, task: request.params.task, - state: "queued", - createdAt: now, - updatedAt: now, - }; - this.state = this.store.upsertJob(this.state, job); - this.enqueue(job); - await this.store.save(this.state); + }); + this.enqueueById(instanceId, job.id); this.scheduleDrain(); return { job }; } private handleJobList(request: RequestByMethod<"job.list">): ListResult { - return { - jobs: this.store.listJobs(this.state, request.params), - }; + return { jobs: this.store.listJobs(request.params) }; } private handleJobGet(request: RequestByMethod<"job.get">): GetResult { - return { - job: this.store.assertJob(this.state, request.params.jobId), - }; + return { job: this.store.assertJob(request.params.jobId) }; } private handleJobStream( request: RequestByMethod<"job.stream">, ): StreamAckResult { - this.store.assertJob(this.state, request.params.jobId); + this.store.assertJob(request.params.jobId); return { jobId: request.params.jobId }; } private async handleJobCancel( request: RequestByMethod<"job.cancel">, ): Promise { - const job = this.store.assertJob(this.state, request.params.jobId); + const job = this.store.assertJob(request.params.jobId); if ( job.state === "succeeded" || @@ -433,36 +418,47 @@ export class Daemon { } if (job.state === "queued") { - const queue = this.queues.get(job.instanceId); - if (queue) { - const index = queue.indexOf(job.id); - if (index >= 0) { - queue.splice(index, 1); - } - } - job.state = "cancelled"; - job.endedAt = new Date().toISOString(); - job.updatedAt = job.endedAt; - job.error = "Job cancelled"; - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); - return { job }; + this.removeFromQueue(job.instanceId, job.id); + const cancelled = this.store.markJobTerminal(job.id, "cancelled", { + error: "Job cancelled", + }); + return { job: cancelled }; } + // Running branch: let `executeJob` own the single terminal write so + // there is exactly one state transition and one `done` event. We + // abort the controller (which triggers `executeJob`'s catch path or + // the aborted-after-success branch) and await completion before + // returning the terminal job row. const running = this.runningJobs.get(job.id); - if (running) { - job.state = "cancelled"; - job.error = "Job cancelled"; - job.updatedAt = new Date().toISOString(); - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); - running.controller.abort(); - if (job.sessionId) { - void this.abortRemoteSession(running.instanceId, job.sessionId); - } + if (!running) { + // Job was in state=running per the store, but no in-memory task — + // likely a stale state. Fall back to writing the terminal row + // directly so the caller still sees a cancelled job. + const cancelled = this.store.markJobTerminal(job.id, "cancelled", { + error: "Job cancelled", + }); + return { job: cancelled }; } - return { job }; + running.controller.abort(); + const remoteSessionId = this.runningSessionIds.get(job.id); + if (remoteSessionId) { + void this.abortRemoteSession(running.instanceId, remoteSessionId); + } + + // Wait for `executeJob` to settle so the caller sees the terminal + // row, but cap the wait: the SDK prompt is not wired to our abort + // signal, so if the remote runtime is slow to honor `session.abort` + // we would otherwise block the RPC for the full prompt duration. + // On timeout, return the current (still-running) row — the caller + // can subscribe to `job.stream` for the eventual `done` event. + const execution = this.runningTasks.get(job.id); + if (execution) { + await raceWithTimeout(execution, this.cancelWaitTimeoutMs); + } + + return { job: this.store.assertJob(job.id) }; } /** @@ -478,7 +474,7 @@ export class Daemon { * routeDeltaToJob. */ subscribeJob(jobId: string, cb: (event: JobStreamEvent) => void): () => void { - const job = this.store.getJob(this.state, jobId); + const job = this.store.getJob(jobId); if ( job && (job.state === "succeeded" || @@ -530,12 +526,13 @@ export class Daemon { /** * Route an incoming delta from the OpenCode event stream to the matching - * running job. Synchronously appends the delta to the job's accumulated - * `outputText` (only for `text` field deltas) BEFORE emitting the event, - * so the daemon's job state always reflects what subscribers have seen. + * running job. Synchronously appends the delta to the job's `output_text` + * in SQLite BEFORE emitting the event, so the daemon's stored state + * always reflects what subscribers have seen. * * MUST remain fully synchronous to preserve the snapshot/delta ordering - * guarantee — see the concurrency note in subscribeJob. + * guarantee — see the concurrency note in subscribeJob. `bun:sqlite` is + * synchronous, so this holds. */ private routeDeltaToJob( instanceId: string, @@ -545,62 +542,28 @@ export class Daemon { ): void { for (const [jobId, running] of this.runningJobs) { if (running.instanceId !== instanceId) continue; - const job = this.store.getJob(this.state, jobId); - if (job?.sessionId === sessionId) { - if (field === "text") { - job.outputText = (job.outputText ?? "") + delta; - job.updatedAt = new Date().toISOString(); - } - this.emitJobEvent(jobId, { type: "delta", jobId, field, delta }); - return; + if (this.runningSessionIds.get(jobId) !== sessionId) continue; + if (field === "text") { + this.store.appendJobOutput(jobId, delta); } + this.emitJobEvent(jobId, { type: "delta", jobId, field, delta }); + return; } } - private async recoverPersistedState( - state: DaemonState, - ): Promise { - let next: DaemonState = { - ...state, - instances: state.instances.map( - (instance: ManagedInstance): ManagedInstance => ({ - ...instance, - status: "stopped", - updatedAt: new Date().toISOString(), - }), - ), - }; - this.queues.clear(); - - for (const original of next.jobs) { - const job: DaemonJob = - original.state === "running" - ? { - ...original, - state: "queued", - error: [original.error, "Recovered after daemon restart"] - .filter(Boolean) - .join(" "), - updatedAt: new Date().toISOString(), - } - : original; - - next = this.store.upsertJob(next, job); - if (job.state === "queued" && job.instanceId) { - this.enqueue(job); - } + private enqueueById(instanceId: string, jobId: string): void { + const queue = this.queues.get(instanceId) ?? []; + if (!queue.includes(jobId)) { + queue.push(jobId); } - - next = this.store.pruneTerminalJobs(next); - return next; + this.queues.set(instanceId, queue); } - private enqueue(job: DaemonJob): void { - const queue = this.queues.get(job.instanceId) ?? []; - if (!queue.includes(job.id)) { - queue.push(job.id); - } - this.queues.set(job.instanceId, queue); + private removeFromQueue(instanceId: string, jobId: string): void { + const queue = this.queues.get(instanceId); + if (!queue) return; + const idx = queue.indexOf(jobId); + if (idx >= 0) queue.splice(idx, 1); } private async drainQueue(): Promise { @@ -615,16 +578,21 @@ export class Daemon { private scheduleDrain(): void { if (this.drainPromise) { + this.drainPending = true; return; } this.drainPromise = this.drainQueue().finally(() => { this.drainPromise = undefined; + if (this.drainPending) { + this.drainPending = false; + this.scheduleDrain(); + } }); } private dequeueNextJob(): DaemonJob | undefined { - const instances = this.state.instances; + const instances = this.store.listInstances(); if (instances.length === 0) { return undefined; } @@ -647,10 +615,8 @@ export class Daemon { while (queue.length > 0) { const jobId = queue.shift(); - if (!jobId) { - break; - } - const job = this.store.getJob(this.state, jobId); + if (!jobId) break; + const job = this.store.getJob(jobId); if (job && job.state === "queued" && job.instanceId === instance.id) { return job; } @@ -667,18 +633,14 @@ export class Daemon { instanceId: job.instanceId, }); - job.state = "running"; - job.startedAt = new Date().toISOString(); - job.updatedAt = job.startedAt; - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); + const runningJob = this.store.markJobRunning(job.id); - const execution = this.executeJob(job, controller) + const execution = this.executeJob(runningJob, controller) .catch(() => undefined) - .finally(async () => { + .finally(() => { this.runningJobs.delete(job.id); this.runningTasks.delete(job.id); - await this.store.save(this.state); + this.runningSessionIds.delete(job.id); if (!this.shuttingDown) { this.scheduleDrain(); } @@ -690,6 +652,13 @@ export class Daemon { job: DaemonJob, controller: AbortController, ): Promise { + let terminalState: Extract< + DaemonJob["state"], + "succeeded" | "failed" | "cancelled" + >; + const patch: { error?: string; outputText?: string; messageId?: string } = + {}; + try { const instance = await this.startInstance(job.instanceId); const runtime = await this.registry.ensureStarted( @@ -701,47 +670,49 @@ export class Daemon { instance, job, ); + this.runningSessionIds.set(job.id, sessionId); - switch (job.task.type) { - case "prompt": { - const response = await runtime.client.session.prompt({ - sessionID: sessionId, - directory: instance.directory, - agent: job.task.agent, - model: job.task.model - ? { - providerID: job.task.model.providerId, - modelID: job.task.model.modelId, - } - : undefined, - system: job.task.system, - variant: job.task.variant, - parts: [{ type: "text", text: job.task.prompt }], - }); - job.messageId = response.info.id; - // Prefer accumulated text from streamed deltas; fall back to - // the final parts payload if no deltas were received (e.g. a - // non-streaming provider). - const finalText = extractText(response.parts); - if (!job.outputText || job.outputText.length === 0) { - job.outputText = finalText; - } - job.error = undefined; - job.state = controller.signal.aborted ? "cancelled" : "succeeded"; - break; - } + const response = await runtime.client.session.prompt({ + sessionID: sessionId, + directory: instance.directory, + agent: job.task.agent, + model: job.task.model + ? { + providerID: job.task.model.providerId, + modelID: job.task.model.modelId, + } + : undefined, + system: job.task.system, + variant: job.task.variant, + parts: [{ type: "text", text: job.task.prompt }], + }); + patch.messageId = response.info.id; + // Prefer accumulated text from streamed deltas; fall back to the + // final parts payload if no deltas were received (non-streaming + // providers). Any deltas already landed in output_text via + // appendJobOutput, so only write the fallback when nothing was + // accumulated. + const current = this.store.getJob(job.id); + if (!current?.outputText || current.outputText.length === 0) { + patch.outputText = extractText(response.parts); + } + if (controller.signal.aborted) { + // prompt() returned successfully but the job was cancelled before + // the abort was observed — record the cancellation reason. + terminalState = "cancelled"; + patch.error = "Job cancelled"; + } else { + terminalState = "succeeded"; } } catch (error) { - job.state = controller.signal.aborted ? "cancelled" : "failed"; - job.error = controller.signal.aborted + terminalState = controller.signal.aborted ? "cancelled" : "failed"; + patch.error = controller.signal.aborted ? "Job cancelled" : normalizeErrorMessage(error); - } finally { - job.endedAt = new Date().toISOString(); - job.updatedAt = job.endedAt; - this.state = this.store.upsertJob(this.state, job); - this.emitJobEvent(job.id, { type: "done", jobId: job.id, job }); } + + const finalJob = this.store.markJobTerminal(job.id, terminalState, patch); + this.emitJobEvent(job.id, { type: "done", jobId: job.id, job: finalJob }); } private async resolveSession( @@ -749,85 +720,37 @@ export class Daemon { instance: ManagedInstance, job: DaemonJob, ): Promise { - if (job.sessionId) { - return job.sessionId; - } - - if (job.session.type === "existing") { - job.sessionId = job.session.sessionId; - return job.sessionId; - } + if (job.sessionId) return job.sessionId; + const sessionRef = this.store.getSessionForJob(job.id); + if (sessionRef.remoteSessionId) return sessionRef.remoteSessionId; - const title = deriveSessionTitle(job); + const title = deriveSessionTitle(sessionRef, job); const session = await client.session.create({ directory: instance.directory, title, }); - job.sessionId = session.id; - - const now = new Date().toISOString(); - const daemonSession: DaemonSession = { - id: session.id, - instanceId: instance.id, - title, - createdAt: now, - updatedAt: now, - }; - this.state = this.store.upsertSession(this.state, daemonSession); - this.state = this.store.upsertJob(this.state, job); - await this.store.save(this.state); + this.store.assignRemoteSessionToJob(job.id, session.id, title); return session.id; } private async startInstance(instanceId: string): Promise { - const current = this.store.assertInstance(this.state, instanceId); + const current = this.store.assertInstance(instanceId); if (this.registry.isRunning(instanceId)) { if (current.status !== "running") { - const running = { - ...current, - status: "running" as const, - lastError: undefined, - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, running); - await this.store.save(this.state); - return running; + return this.store.setInstanceStatus(instanceId, "running"); } return current; } - const starting: ManagedInstance = { - ...current, - status: "starting", - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, starting); - await this.store.save(this.state); + this.store.setInstanceStatus(instanceId, "starting"); try { await this.registry.ensureStarted(instanceId, current.directory); - const running: ManagedInstance = { - ...starting, - status: "running", - lastError: undefined, - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, running); - await this.store.save(this.state); - return running; + return this.store.setInstanceStatus(instanceId, "running"); } catch (error) { - const failed: ManagedInstance = { - ...starting, - status: "error", - lastError: normalizeErrorMessage(error), - updatedAt: new Date().toISOString(), - }; - this.state = this.store.upsertInstance(this.state, failed); - await this.store.save(this.state); - throw new StoreError( - "instance_unavailable", - failed.lastError ?? "failed to start instance", - ); + const message = normalizeErrorMessage(error); + this.store.setInstanceStatus(instanceId, "error", message); + throw new StoreError("instance_unavailable", message); } } @@ -836,10 +759,8 @@ export class Daemon { sessionId: string, ): Promise { const runtime = this.registry.get(instanceId); - const instance = this.store.getInstance(this.state, instanceId); - if (!runtime || !instance) { - return; - } + const instance = this.store.getInstance(instanceId); + if (!runtime || !instance) return; try { await runtime.client.session.abort({ @@ -854,24 +775,11 @@ export class Daemon { private runningCountForInstance(instanceId: string): number { let total = 0; for (const running of this.runningJobs.values()) { - if (running.instanceId === instanceId) { - total += 1; - } + if (running.instanceId === instanceId) total += 1; } return total; } - private jobCount(state: JobState): number { - return this.state.jobs.filter((job: DaemonJob) => job.state === state) - .length; - } - - private instanceJobCount(instanceId: string, state: JobState): number { - return this.state.jobs.filter( - (job: DaemonJob) => job.instanceId === instanceId && job.state === state, - ).length; - } - private success( request: RequestByMethod, result: ResultByMethod, @@ -1058,7 +966,7 @@ export async function runDaemonServer(): Promise { await ensureSocketDir(env.socketPath); await clearStaleSocket(env.socketPath); - const daemon = new Daemon(new StateStore(env.statePath), { + const daemon = new Daemon(new StateStore(env.databasePath), { maxConcurrency: env.maxConcurrency, }); await daemon.bootstrap(); diff --git a/packages/daemon/src/store.ts b/packages/daemon/src/store.ts index 3af494b..202af40 100644 --- a/packages/daemon/src/store.ts +++ b/packages/daemon/src/store.ts @@ -1,27 +1,25 @@ -import { mkdir, readFile } from "node:fs/promises"; +import type { Database, Statement } from "bun:sqlite"; +import { randomUUID } from "node:crypto"; +import { mkdir } from "node:fs/promises"; import { dirname } from "node:path"; -import type { z } from "zod"; - -import { - type DaemonJob, - type DaemonSession, - type DaemonState, - DaemonState as DaemonStateSchema, - type ManagedInstance, - type ResponseError as ResponseErrorSchema, -} from "./protocol"; -const EMPTY_STATE: DaemonState = { - instances: [], - sessions: [], - jobs: [], -}; +import { openDaemonDatabase } from "./db"; +import type { + DaemonJob, + DaemonSession, + InstanceHealth, + JobSession, + JobState, + JobTask, + ManagedInstance, + ResponseError, +} from "./protocol"; -const MAX_TERMINAL_JOBS = 100; +type ErrorCode = ResponseError["code"]; export class StoreError extends Error { constructor( - public readonly code: z.infer["code"], + public readonly code: ErrorCode, message: string, ) { super(message); @@ -29,253 +27,758 @@ export class StoreError extends Error { } } -export class StateStore { - private pendingSave: Promise = Promise.resolve(); - private directoryReady: Promise | undefined; +interface InstanceRow { + id: string; + name: string; + directory: string; + status: ManagedInstance["status"]; + max_concurrency: number; + last_error: string | null; + created_at: string; + updated_at: string; +} - constructor(private readonly statePath: string) {} +interface JobRow { + id: string; + instance_id: string; + state: JobState; + prompt: string; + agent: string | null; + model_provider_id: string | null; + model_id: string | null; + system_prompt: string | null; + variant: string | null; + message_id: string | null; + error: string | null; + output_text: string | null; + created_at: string; + updated_at: string; + started_at: string | null; + ended_at: string | null; + remote_session_id: string | null; +} - async load(): Promise { - try { - const raw = await readFile(this.statePath, "utf8"); - const parsed = JSON.parse(raw) as unknown; - - // Migrate legacy state files that predate the sessions array. - if ( - typeof parsed === "object" && - parsed !== null && - !("sessions" in parsed) - ) { - (parsed as Record).sessions = []; - } +interface SessionRefRow { + kind: "new" | "existing"; + title: string | null; + remote_session_id: string | null; +} - const current = DaemonStateSchema.safeParse(parsed); - if (current.success) { - return current.data; - } +export interface JobSessionRef { + kind: "new" | "existing"; + title?: string; + remoteSessionId?: string; +} - return structuredClone(EMPTY_STATE); - } catch (error) { - if ((error as NodeJS.ErrnoException).code === "ENOENT") { - return structuredClone(EMPTY_STATE); - } - throw error; - } - } +/** Row shape for sessions that already have a remote OpenCode id (public `DaemonSession.id`). */ +interface DaemonSessionRow { + remote_session_id: string; + instance_id: string; + title: string | null; + created_at: string; + updated_at: string; +} - async save(state: DaemonState): Promise { - const payload = `${JSON.stringify(state, null, 2)}\n`; - const persist = async (): Promise => { - await this.ensureDirectory(); - await Bun.write(this.statePath, payload); - }; +function rowToInstance(row: InstanceRow): ManagedInstance { + const base: ManagedInstance = { + id: row.id, + name: row.name, + directory: row.directory, + status: row.status, + maxConcurrency: row.max_concurrency, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; + return row.last_error ? { ...base, lastError: row.last_error } : base; +} - const savePromise = this.pendingSave.then(persist, persist); - this.pendingSave = savePromise.catch(() => undefined); - await savePromise; - } +function rowToJob(row: JobRow): DaemonJob { + const task: JobTask = { + type: "prompt", + prompt: row.prompt, + ...(row.agent ? { agent: row.agent } : {}), + ...(row.model_provider_id && row.model_id + ? { + model: { + providerId: row.model_provider_id, + modelId: row.model_id, + }, + } + : {}), + ...(row.system_prompt ? { system: row.system_prompt } : {}), + ...(row.variant ? { variant: row.variant } : {}), + }; + + return { + id: row.id, + instanceId: row.instance_id, + task, + state: row.state, + createdAt: row.created_at, + updatedAt: row.updated_at, + ...(row.remote_session_id ? { sessionId: row.remote_session_id } : {}), + ...(row.started_at ? { startedAt: row.started_at } : {}), + ...(row.ended_at ? { endedAt: row.ended_at } : {}), + ...(row.error ? { error: row.error } : {}), + ...(row.output_text !== null ? { outputText: row.output_text } : {}), + ...(row.message_id ? { messageId: row.message_id } : {}), + }; +} + +function rowToDaemonSession(row: DaemonSessionRow): DaemonSession { + const title = + row.title && row.title.length > 0 ? row.title : row.remote_session_id; + return { + id: row.remote_session_id, + instanceId: row.instance_id, + title, + createdAt: row.created_at, + updatedAt: row.updated_at, + }; +} - private async ensureDirectory(): Promise { +const JOB_SELECT = ` + SELECT j.id, j.instance_id, j.state, j.prompt, j.agent, + j.model_provider_id, j.model_id, j.system_prompt, j.variant, + j.message_id, j.error, j.output_text, + j.created_at, j.updated_at, j.started_at, j.ended_at, + s.remote_session_id AS remote_session_id + FROM jobs j + JOIN sessions s ON j.session_id = s.id +`; + +/** + * Relational repository for daemon state: instances, sessions, and jobs. + * + * All queries hit SQLite via `bun:sqlite` and are synchronous once the store + * is open. Multi-statement mutations run inside a transaction so concurrent + * readers never observe torn state. + */ +export class StateStore { + private db: Database | undefined; + private directoryReady: Promise | undefined; + + // Prepared statements, initialized lazily on first open(). + private stmts: + | { + listInstances: Statement; + getInstance: Statement; + insertInstance: Statement; + setInstanceStatus: Statement; + deleteInstance: Statement; + countInstanceActiveJobs: Statement<{ c: number }, [string]>; + + insertSession: Statement; + findSessionByRemote: Statement< + { id: string; remote_session_id: string | null }, + [string, string] + >; + getSessionForJob: Statement; + assignRemoteSessionToJob: Statement; + listSessionsByInstance: Statement; + getSessionByRemoteId: Statement; + + listAllJobs: Statement; + listJobsByInstance: Statement; + listJobsByState: Statement; + listJobsByInstanceAndState: Statement; + getJob: Statement; + insertJob: Statement; + setJobState: Statement; + setJobTerminal: Statement; + appendJobOutput: Statement; + deleteOldTerminalJobs: Statement; + + countJobsByState: Statement<{ c: number }, [JobState]>; + countInstanceJobsByState: Statement<{ c: number }, [string, JobState]>; + countInstanceFinishedJobs: Statement<{ c: number }, [string]>; + resetInstancesToStopped: Statement; + requeueRunningJobs: Statement< + { id: string; instance_id: string }, + [string] + >; + } + | undefined; + + constructor(private readonly databasePath: string) {} + + async open(): Promise { if (!this.directoryReady) { - this.directoryReady = mkdir(dirname(this.statePath), { + this.directoryReady = mkdir(dirname(this.databasePath), { recursive: true, }).then(() => undefined); } await this.directoryReady; - } + if (this.db) return; - upsertJob(state: DaemonState, job: DaemonJob): DaemonState { - const jobs = state.jobs.filter((item: DaemonJob) => item.id !== job.id); - jobs.push(job); - jobs.sort((a: DaemonJob, b: DaemonJob) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - jobs, + const db = openDaemonDatabase(this.databasePath); + this.db = db; + this.stmts = { + listInstances: db.query( + `SELECT * FROM instances ORDER BY datetime(created_at) DESC`, + ), + getInstance: db.query( + `SELECT * FROM instances WHERE id = ?`, + ), + insertInstance: db.query( + `INSERT INTO instances (id, name, directory, status, max_concurrency, last_error, created_at, updated_at) + VALUES ($id, $name, $directory, $status, $max_concurrency, $last_error, $created_at, $updated_at)`, + ), + setInstanceStatus: db.query( + `UPDATE instances + SET status = $status, last_error = $last_error, updated_at = $updated_at + WHERE id = $id`, + ), + deleteInstance: db.query(`DELETE FROM instances WHERE id = ?`), + countInstanceActiveJobs: db.query<{ c: number }, [string]>( + `SELECT COUNT(*) AS c FROM jobs + WHERE instance_id = ? AND state IN ('queued','running')`, + ), + + insertSession: db.query( + `INSERT INTO sessions (id, instance_id, remote_session_id, kind, title, created_at, updated_at) + VALUES ($id, $instance_id, $remote_session_id, $kind, $title, $created_at, $updated_at)`, + ), + findSessionByRemote: db.query< + { id: string; remote_session_id: string | null }, + [string, string] + >( + `SELECT id, remote_session_id FROM sessions + WHERE instance_id = ? AND remote_session_id = ?`, + ), + getSessionForJob: db.query( + `SELECT s.kind, s.title, s.remote_session_id + FROM sessions s + JOIN jobs j ON j.session_id = s.id + WHERE j.id = ?`, + ), + assignRemoteSessionToJob: db.query( + `UPDATE sessions + SET remote_session_id = $remote_session_id, + title = $title, + updated_at = $updated_at + WHERE id = (SELECT session_id FROM jobs WHERE id = $job_id)`, + ), + listSessionsByInstance: db.query( + `SELECT remote_session_id, instance_id, title, created_at, updated_at + FROM sessions + WHERE instance_id = ? AND remote_session_id IS NOT NULL + ORDER BY datetime(updated_at) DESC`, + ), + getSessionByRemoteId: db.query( + `SELECT remote_session_id, instance_id, title, created_at, updated_at + FROM sessions + WHERE remote_session_id = ?`, + ), + + listAllJobs: db.query( + `${JOB_SELECT} ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByInstance: db.query( + `${JOB_SELECT} WHERE j.instance_id = ? ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByState: db.query( + `${JOB_SELECT} WHERE j.state = ? ORDER BY datetime(j.created_at) DESC`, + ), + listJobsByInstanceAndState: db.query( + `${JOB_SELECT} WHERE j.instance_id = ? AND j.state = ? ORDER BY datetime(j.created_at) DESC`, + ), + getJob: db.query(`${JOB_SELECT} WHERE j.id = ?`), + insertJob: db.query( + `INSERT INTO jobs (id, instance_id, session_id, state, prompt, agent, + model_provider_id, model_id, system_prompt, variant, + message_id, error, output_text, created_at, updated_at, + started_at, ended_at) + VALUES ($id, $instance_id, $session_id, $state, $prompt, $agent, + $model_provider_id, $model_id, $system_prompt, $variant, + $message_id, $error, $output_text, $created_at, $updated_at, + $started_at, $ended_at)`, + ), + setJobState: db.query( + `UPDATE jobs SET state = $state, updated_at = $updated_at, + started_at = COALESCE($started_at, started_at) + WHERE id = $id`, + ), + setJobTerminal: db.query( + `UPDATE jobs SET + state = $state, + error = $error, + output_text = COALESCE($output_text, output_text), + message_id = COALESCE($message_id, message_id), + updated_at = $updated_at, + ended_at = $ended_at + WHERE id = $id`, + ), + appendJobOutput: db.query( + `UPDATE jobs SET + output_text = COALESCE(output_text, '') || $delta, + updated_at = $updated_at + WHERE id = $id`, + ), + // Keep the newest `$max` terminal rows; delete the rest. + // In SQLite, `LIMIT -1` means "no upper bound", so this selects + // every row past the first `$max` in DESC order. + deleteOldTerminalJobs: db.query( + `DELETE FROM jobs WHERE id IN ( + SELECT id FROM jobs + WHERE state IN ('succeeded','failed','cancelled') + ORDER BY datetime(created_at) DESC + LIMIT -1 OFFSET ? + )`, + ), + + countJobsByState: db.query<{ c: number }, [JobState]>( + `SELECT COUNT(*) AS c FROM jobs WHERE state = ?`, + ), + countInstanceJobsByState: db.query<{ c: number }, [string, JobState]>( + `SELECT COUNT(*) AS c FROM jobs WHERE instance_id = ? AND state = ?`, + ), + countInstanceFinishedJobs: db.query<{ c: number }, [string]>( + `SELECT COUNT(*) AS c FROM jobs + WHERE instance_id = ? AND state IN ('succeeded','failed','cancelled')`, + ), + resetInstancesToStopped: db.query( + `UPDATE instances SET status = 'stopped', updated_at = $updated_at + WHERE status IN ('starting','running','error')`, + ), + requeueRunningJobs: db.query< + { id: string; instance_id: string }, + [string] + >( + `UPDATE jobs SET + state = 'queued', + error = CASE + WHEN error IS NULL OR error = '' THEN 'Recovered after daemon restart' + ELSE error || ' Recovered after daemon restart' + END, + updated_at = ? + WHERE state = 'running' + RETURNING id, instance_id`, + ), }; } - getJob(state: DaemonState, jobId: string): DaemonJob | undefined { - return state.jobs.find((item: DaemonJob) => item.id === jobId); + close(): void { + this.db?.close(); + this.db = undefined; + this.stmts = undefined; } - listJobs( - state: DaemonState, - filter: { - instanceId?: string; - sessionId?: string; - state?: DaemonJob["state"]; - }, - ): DaemonJob[] { - return state.jobs.filter((job: DaemonJob) => { - if (filter.instanceId && job.instanceId !== filter.instanceId) { - return false; - } - if (filter.sessionId && job.sessionId !== filter.sessionId) { - return false; - } - if (filter.state && job.state !== filter.state) { - return false; - } - return true; - }); + private s(): NonNullable { + if (!this.stmts) { + throw new Error("StateStore is not open; call open() first"); + } + return this.stmts; } - createInstance(state: DaemonState, instance: ManagedInstance): DaemonState { - if ( - state.instances.some( - (item: ManagedInstance) => item.directory === instance.directory, - ) - ) { - throw new StoreError( - "conflict", - `instance already exists for directory ${instance.directory}`, - ); - } + // -------------------------------------------------------------------- + // Instances + // -------------------------------------------------------------------- - const next = [...state.instances, instance].sort( - (a: ManagedInstance, b: ManagedInstance) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - instances: next, - }; + listInstances(): ManagedInstance[] { + return this.s().listInstances.all().map(rowToInstance); } - upsertInstance(state: DaemonState, instance: ManagedInstance): DaemonState { - const duplicate = state.instances.find( - (item: ManagedInstance) => - item.directory === instance.directory && item.id !== instance.id, - ); - if (duplicate) { - throw new StoreError( - "conflict", - `instance already exists for directory ${instance.directory}`, - ); + getInstance(id: string): ManagedInstance | undefined { + const row = this.s().getInstance.get(id); + return row ? rowToInstance(row) : undefined; + } + + assertInstance(id: string): ManagedInstance { + const instance = this.getInstance(id); + if (!instance) { + throw new StoreError("not_found", `instance ${id} not found`); } + return instance; + } - const instances = state.instances.filter( - (item: ManagedInstance) => item.id !== instance.id, - ); - instances.push(instance); - instances.sort((a: ManagedInstance, b: ManagedInstance) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { - ...state, - instances, + createInstance(input: { + name: string; + directory: string; + maxConcurrency: number; + }): ManagedInstance { + const now = new Date().toISOString(); + const instance: ManagedInstance = { + id: randomUUID(), + name: input.name, + directory: input.directory, + status: "stopped", + maxConcurrency: input.maxConcurrency, + createdAt: now, + updatedAt: now, }; + try { + this.s().insertInstance.run({ + $id: instance.id, + $name: instance.name, + $directory: instance.directory, + $status: instance.status, + $max_concurrency: instance.maxConcurrency, + $last_error: null, + $created_at: instance.createdAt, + $updated_at: instance.updatedAt, + }); + } catch (err) { + if (isUniqueConstraint(err)) { + // Today the only unique index on `instances` (aside from the + // primary key) is on `directory`. If that changes the error + // message would become misleading, so check the message + // before blaming the directory column. + const message = err instanceof Error ? err.message : String(err); + if (/instances\.directory/i.test(message)) { + throw new StoreError( + "conflict", + `instance already exists for directory ${instance.directory}`, + ); + } + throw new StoreError("conflict", message); + } + throw err; + } + return instance; } - removeInstance(state: DaemonState, instanceId: string): DaemonState { - return { - ...state, - instances: state.instances.filter( - (item: ManagedInstance) => item.id !== instanceId, - ), - }; + setInstanceStatus( + id: string, + status: ManagedInstance["status"], + lastError?: string, + ): ManagedInstance { + this.s().setInstanceStatus.run({ + $id: id, + $status: status, + $last_error: lastError ?? null, + $updated_at: new Date().toISOString(), + }); + return this.assertInstance(id); } - // Session operations + removeInstance(id: string): void { + this.s().deleteInstance.run(id); + } - upsertSession(state: DaemonState, session: DaemonSession): DaemonState { - const sessions = state.sessions.filter( - (item: DaemonSession) => item.id !== session.id, - ); - sessions.push(session); - sessions.sort((a: DaemonSession, b: DaemonSession) => - a.createdAt < b.createdAt ? 1 : -1, - ); - return { ...state, sessions }; + hasActiveJobs(instanceId: string): boolean { + const row = this.s().countInstanceActiveJobs.get(instanceId); + return (row?.c ?? 0) > 0; } - getSession(state: DaemonState, sessionId: string): DaemonSession | undefined { - return state.sessions.find((item: DaemonSession) => item.id === sessionId); + // -------------------------------------------------------------------- + // Sessions (internal; jobs own the reference to a session row) + // -------------------------------------------------------------------- + + /** + * Find-or-create a `sessions` row for an incoming submit request. + * + * - `{type: 'new', title?}`: always inserts a fresh session row with + * `remote_session_id = NULL`. + * - `{type: 'existing', sessionId}`: returns the existing session row for + * the given remote id, or inserts one if this is the first time we see it. + */ + upsertSessionForSubmit( + instanceId: string, + session: JobSession, + ): { id: string; remoteSessionId: string | null } { + const now = new Date().toISOString(); + if (session.type === "existing") { + const existing = this.s().findSessionByRemote.get( + instanceId, + session.sessionId, + ); + if (existing) { + return { + id: existing.id, + remoteSessionId: existing.remote_session_id, + }; + } + const id = randomUUID(); + this.s().insertSession.run({ + $id: id, + $instance_id: instanceId, + $remote_session_id: session.sessionId, + $kind: "existing", + $title: null, + $created_at: now, + $updated_at: now, + }); + return { id, remoteSessionId: session.sessionId }; + } + + const id = randomUUID(); + this.s().insertSession.run({ + $id: id, + $instance_id: instanceId, + $remote_session_id: null, + $kind: "new", + $title: session.title ?? null, + $created_at: now, + $updated_at: now, + }); + return { id, remoteSessionId: null }; } - listSessions( - state: DaemonState, - filter: { instanceId: string }, - ): DaemonSession[] { - return state.sessions.filter( - (item: DaemonSession) => item.instanceId === filter.instanceId, - ); + /** + * Attach a remote OpenCode session id to the session row that backs + * the given job. Used once a new session has been created on the + * remote runtime during job execution. + * + * Throws `StoreError("not_found")` if the job id (or its backing + * session row) does not exist — this surfaces silent assignment + * failures that would otherwise leave the caller holding a remote + * session that SQLite never linked. + */ + assignRemoteSessionToJob( + jobId: string, + remoteSessionId: string, + title: string, + ): void { + const result = this.s().assignRemoteSessionToJob.run({ + $job_id: jobId, + $remote_session_id: remoteSessionId, + $title: title, + $updated_at: new Date().toISOString(), + }); + if (result.changes === 0) { + throw new StoreError( + "not_found", + `cannot assign remote session to job ${jobId}: job or session row missing`, + ); + } + } + + /** Sessions visible to the TUI: only rows with a resolved remote OpenCode session id. */ + listSessions(filter: { instanceId: string }): DaemonSession[] { + return this.s() + .listSessionsByInstance.all(filter.instanceId) + .map(rowToDaemonSession); + } + + getSession(remoteSessionId: string): DaemonSession | undefined { + const row = this.s().getSessionByRemoteId.get(remoteSessionId); + return row ? rowToDaemonSession(row) : undefined; } - assertSession(state: DaemonState, sessionId: string): DaemonSession { - const session = this.getSession(state, sessionId); + assertSession(remoteSessionId: string): DaemonSession { + const session = this.getSession(remoteSessionId); if (!session) { - throw new StoreError("not_found", `session ${sessionId} not found`); + throw new StoreError("not_found", `session ${remoteSessionId} not found`); } return session; } - removeSessionsForInstance( - state: DaemonState, - instanceId: string, - ): DaemonState { + getSessionForJob(jobId: string): JobSessionRef { + const row = this.s().getSessionForJob.get(jobId); + if (!row) { + throw new StoreError("not_found", `session for job ${jobId} not found`); + } return { - ...state, - sessions: state.sessions.filter( - (item: DaemonSession) => item.instanceId !== instanceId, - ), + kind: row.kind, + ...(row.title ? { title: row.title } : {}), + ...(row.remote_session_id + ? { remoteSessionId: row.remote_session_id } + : {}), }; } - getInstance( - state: DaemonState, - instanceId: string, - ): ManagedInstance | undefined { - return state.instances.find( - (item: ManagedInstance) => item.id === instanceId, - ); - } + // -------------------------------------------------------------------- + // Jobs + // -------------------------------------------------------------------- - listInstances(state: DaemonState): ManagedInstance[] { - return [...state.instances]; + listJobs( + filter: { instanceId?: string; state?: JobState; sessionId?: string } = {}, + ): DaemonJob[] { + const { instanceId, state, sessionId } = filter; + let rows: JobRow[]; + if (sessionId) { + rows = this.listJobsWithFilters({ instanceId, state, sessionId }); + } else if (instanceId && state) { + rows = this.s().listJobsByInstanceAndState.all(instanceId, state); + } else if (instanceId) { + rows = this.s().listJobsByInstance.all(instanceId); + } else if (state) { + rows = this.s().listJobsByState.all(state); + } else { + rows = this.s().listAllJobs.all(); + } + return rows.map(rowToJob); } - assertInstance(state: DaemonState, instanceId: string): ManagedInstance { - const instance = this.getInstance(state, instanceId); - if (!instance) { - throw new StoreError("not_found", `instance ${instanceId} not found`); + private listJobsWithFilters(filter: { + instanceId?: string; + state?: JobState; + sessionId: string; + }): JobRow[] { + const db = this.db; + if (!db) { + throw new Error("StateStore is not open; call open() first"); } - return instance; + const conditions: string[] = []; + const params: Array = []; + if (filter.instanceId) { + conditions.push("j.instance_id = ?"); + params.push(filter.instanceId); + } + if (filter.state) { + conditions.push("j.state = ?"); + params.push(filter.state); + } + conditions.push("s.remote_session_id = ?"); + params.push(filter.sessionId); + const where = `WHERE ${conditions.join(" AND ")}`; + const sql = `${JOB_SELECT} ${where} ORDER BY datetime(j.created_at) DESC`; + return db.query>(sql).all(...params); + } + + getJob(id: string): DaemonJob | undefined { + const row = this.s().getJob.get(id); + return row ? rowToJob(row) : undefined; } - assertJob(state: DaemonState, jobId: string): DaemonJob { - const job = this.getJob(state, jobId); + assertJob(id: string): DaemonJob { + const job = this.getJob(id); if (!job) { - throw new StoreError("not_found", `job ${jobId} not found`); + throw new StoreError("not_found", `job ${id} not found`); } return job; } - pruneTerminalJobs(state: DaemonState): DaemonState { - const terminalStates = new Set(["succeeded", "failed", "cancelled"]); - const active: DaemonJob[] = []; - const terminal: DaemonJob[] = []; + /** + * Create a new job along with its backing session row, atomically. + */ + createJob(input: { + instanceId: string; + session: JobSession; + task: JobTask; + }): DaemonJob { + const now = new Date().toISOString(); + const jobId = randomUUID(); + + this.db?.transaction(() => { + const { id: sessionRowId } = this.upsertSessionForSubmit( + input.instanceId, + input.session, + ); + this.s().insertJob.run({ + $id: jobId, + $instance_id: input.instanceId, + $session_id: sessionRowId, + $state: "queued", + $prompt: input.task.prompt, + $agent: input.task.agent ?? null, + $model_provider_id: input.task.model?.providerId ?? null, + $model_id: input.task.model?.modelId ?? null, + $system_prompt: input.task.system ?? null, + $variant: input.task.variant ?? null, + $message_id: null, + $error: null, + $output_text: null, + $created_at: now, + $updated_at: now, + $started_at: null, + $ended_at: null, + }); + })(); + + return this.assertJob(jobId); + } - for (const job of state.jobs) { - if (terminalStates.has(job.state)) { - terminal.push(job); - } else { - active.push(job); - } - } + markJobRunning(id: string): DaemonJob { + const now = new Date().toISOString(); + this.s().setJobState.run({ + $id: id, + $state: "running", + $updated_at: now, + $started_at: now, + }); + return this.assertJob(id); + } - if (terminal.length <= MAX_TERMINAL_JOBS) { - return state; - } + markJobTerminal( + id: string, + state: Extract, + patch: { + error?: string; + outputText?: string; + messageId?: string; + } = {}, + ): DaemonJob { + const now = new Date().toISOString(); + this.s().setJobTerminal.run({ + $id: id, + $state: state, + $error: patch.error ?? null, + $output_text: patch.outputText ?? null, + $message_id: patch.messageId ?? null, + $updated_at: now, + $ended_at: now, + }); + return this.assertJob(id); + } - // Jobs are already sorted newest-first by upsertJob - const kept = terminal.slice(0, MAX_TERMINAL_JOBS); - const jobs = [...active, ...kept].sort((a: DaemonJob, b: DaemonJob) => - a.createdAt < b.createdAt ? 1 : -1, + appendJobOutput(id: string, delta: string): void { + this.s().appendJobOutput.run({ + $id: id, + $delta: delta, + $updated_at: new Date().toISOString(), + }); + } + + // -------------------------------------------------------------------- + // Recovery + aggregates + // -------------------------------------------------------------------- + + /** + * Reset crash-surviving instance/job state on daemon startup and return + * the ids of jobs that should be re-queued for scheduling. + */ + recoverForBootstrap(): Array<{ id: string; instanceId: string }> { + const now = new Date().toISOString(); + let requeued: Array<{ id: string; instance_id: string }> = []; + this.db?.transaction(() => { + this.s().resetInstancesToStopped.run({ $updated_at: now }); + requeued = this.s().requeueRunningJobs.all(now); + })(); + return requeued.map((r) => ({ id: r.id, instanceId: r.instance_id })); + } + + /** + * Keep at most `max` terminal jobs (succeeded/failed/cancelled), + * deleting the oldest excess rows. + */ + pruneTerminalJobs(max: number): void { + this.s().deleteOldTerminalJobs.run(max); + } + + countJobsByState(state: JobState): number { + return this.s().countJobsByState.get(state)?.c ?? 0; + } + + countFinishedJobs(): number { + return ( + this.countJobsByState("succeeded") + + this.countJobsByState("failed") + + this.countJobsByState("cancelled") ); + } - return { ...state, jobs }; + instanceHealth(): InstanceHealth[] { + return this.listInstances().map((instance) => ({ + instanceId: instance.id, + name: instance.name, + status: instance.status, + running: + this.s().countInstanceJobsByState.get(instance.id, "running")?.c ?? 0, + queued: + this.s().countInstanceJobsByState.get(instance.id, "queued")?.c ?? 0, + finished: this.s().countInstanceFinishedJobs.get(instance.id)?.c ?? 0, + ...(instance.lastError ? { lastError: instance.lastError } : {}), + })); } } + +function isUniqueConstraint(err: unknown): boolean { + if (!(err instanceof Error)) return false; + const code = (err as { code?: string }).code; + return ( + code === "SQLITE_CONSTRAINT_UNIQUE" || + /UNIQUE constraint failed/i.test(err.message) + ); +}