Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/sqlite-state-migration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"@techatnyu/ralphd": patch
"@techatnyu/ralph": patch
---

Migrate daemon persisted state from `~/.ralph/state.json` to a SQLite
database at `~/.ralph/state.sqlite`.

The move enables transactional mutations, WAL-mode crash durability, and
indexed queries as the daemon scales. Schema is versioned via SQLite's
`PRAGMA user_version` so future migrations can be additive.

**Upgrade note:** existing `state.json` files are not migrated automatically.
On first run after upgrade, the daemon starts with an empty database — you
will need to re-register any instances and resubmit in-flight jobs. Old
terminal job history is lost. If preserving state matters to you, hold off
upgrading until an explicit migration path ships.
2 changes: 1 addition & 1 deletion apps/tui/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bun run src/cli.ts daemon health
bun run src/cli.ts daemon stop
```

The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.json`.
The daemon listens on a Unix socket at `~/.ralph/ralphd.sock` and persists state in `~/.ralph/state.sqlite` (SQLite).

## Run the TUI shell

Expand Down
8 changes: 7 additions & 1 deletion apps/tui/src/components/app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,14 @@ function Dashboard({
const selectedInst = instanceList.instances[safeIndex];
const [jobs, sessionResult] = await Promise.all([
daemon.listJobs(selectedInst ? { instanceId: selectedInst.id } : {}),
// If the selected instance was removed between `listInstances`
// and this call, the daemon now throws `not_found` instead of
// returning an empty list. Swallow that narrow race so the
// whole refresh doesn't fail.
selectedInst
? daemon.listSessions(selectedInst.id)
? daemon
.listSessions(selectedInst.id)
.catch(() => ({ sessions: [] }))
: Promise.resolve({ sessions: [] }),
]);
setSelectedIndex(safeIndex);
Expand Down
284 changes: 257 additions & 27 deletions packages/daemon/src/__tests__/daemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ describe("Daemon", () => {

beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-test-"));
store = new StateStore(join(tmpDir, "state.json"));
store = new StateStore(join(tmpDir, "state.sqlite"));
registry = new FakeOpencodeRegistry(40);
daemon = new Daemon(store, { registry });
await daemon.bootstrap();
Expand Down Expand Up @@ -103,6 +103,42 @@ describe("Daemon", () => {
expect(result.job.instanceId).toBe(instance.instance.id);
});

test("forwards new session titles when creating remote sessions", async () => {
const created = await daemon.handleRequest(
req({
id: "instance-create",
method: "instance.create",
params: {
name: "One",
directory: "/tmp/project-one",
},
}),
);
const instance = expectSuccess(created, "instance.create");

await daemon.handleRequest(
req({
id: "job-submit",
method: "job.submit",
params: {
instanceId: instance.instance.id,
session: { type: "new", title: "Sprint Planning" },
task: {
type: "prompt",
prompt: "hello world",
},
},
}),
);

await Bun.sleep(80);
expect(registry.sessionCreateCalls).toContainEqual({
instanceId: instance.instance.id,
directory: "/tmp/project-one",
title: "Sprint Planning",
});
});

test("rejects submit with nonexistent instance", async () => {
const submit = await daemon.handleRequest(
req({
Expand Down Expand Up @@ -265,42 +301,235 @@ describe("Daemon", () => {
expect(expectSuccess(cancel, "job.cancel").job.state).toBe("cancelled");
});

test("requeues running jobs after restart", async () => {
await store.save({
instances: [
{
id: "instance-1",
test("cancelling a running job preserves the cancel error on the terminal row", async () => {
const created = await daemon.handleRequest(
req({
id: "instance-create",
method: "instance.create",
params: {
name: "One",
directory: "/tmp/project-one",
status: "running",
maxConcurrency: 1,
createdAt: "2026-01-01T00:00:00.000Z",
updatedAt: "2026-01-01T00:00:00.000Z",
},
],
sessions: [],
jobs: [
{
id: "job-1",
instanceId: "instance-1",
},
}),
);
const createdResult = expectSuccess(created, "instance.create");

const submitted = await daemon.handleRequest(
req({
id: "job-submit",
method: "job.submit",
params: {
instanceId: createdResult.instance.id,
session: { type: "new" },
task: { type: "prompt", prompt: "recover" },
state: "running",
createdAt: "2026-01-01T00:00:00.000Z",
updatedAt: "2026-01-01T00:00:00.000Z",
task: { type: "prompt", prompt: "long-task" },
},
}),
);
const submittedResult = expectSuccess(submitted, "job.submit");

// Wait until the job has transitioned to running inside the fake runtime.
await Bun.sleep(10);

const cancel = await daemon.handleRequest(
req({
id: "job-cancel",
method: "job.cancel",
params: { jobId: submittedResult.job.id },
}),
);
const cancelled = expectSuccess(cancel, "job.cancel").job;
expect(cancelled.state).toBe("cancelled");
expect(cancelled.error).toBe("Job cancelled");

// Double-check via store after execution has fully settled — no later
// terminal write should have nulled out the error column.
await Bun.sleep(60);
const get = await daemon.handleRequest(
req({
id: "job-get",
method: "job.get",
params: { jobId: submittedResult.job.id },
}),
);
const finalJob = expectSuccess(get, "job.get").job;
expect(finalJob.state).toBe("cancelled");
expect(finalJob.error).toBe("Job cancelled");
});

test("cancelling a running job returns within the timeout when the remote is slow", async () => {
// Recreate the daemon with a very long prompt delay and a short
// cancel wait timeout. The fake runtime's `abort` is a no-op so
// the prompt always runs to completion — this simulates a remote
// runtime that ignores `session.abort`.
await daemon.shutdown();
const slowRegistry = new FakeOpencodeRegistry(1_000);
const slowStore = new StateStore(join(tmpDir, "slow.sqlite"));
const slowDaemon = new Daemon(slowStore, {
registry: slowRegistry,
cancelWaitTimeoutMs: 30,
});
await slowDaemon.bootstrap();

const created = await slowDaemon.handleRequest(
req({
id: "inst",
method: "instance.create",
params: {
name: "slow",
directory: "/tmp/project-slow",
maxConcurrency: 1,
},
],
}),
);
const instanceId = expectSuccess(created, "instance.create").instance.id;

const submitted = await slowDaemon.handleRequest(
req({
id: "sub",
method: "job.submit",
params: {
instanceId,
session: { type: "new" },
task: { type: "prompt", prompt: "slow" },
},
}),
);
const jobId = expectSuccess(submitted, "job.submit").job.id;

// Wait until the job is running.
await Bun.sleep(20);

// Cancel and measure.
const start = Date.now();
const cancel = await slowDaemon.handleRequest(
req({
id: "cancel",
method: "job.cancel",
params: { jobId },
}),
);
const elapsed = Date.now() - start;

// Must return well before the 1s prompt delay completes.
expect(elapsed).toBeLessThan(500);
// Returned row may still be `running` (timeout fired first) — that
// is the point of the timeout; the terminal row will arrive via
// the `done` stream event.
const returned = expectSuccess(cancel, "job.cancel").job;
expect(["running", "cancelled"]).toContain(returned.state);

// Allow the fake's prompt to complete and executeJob to record
// the terminal row.
await Bun.sleep(1_100);
const get = await slowDaemon.handleRequest(
req({
id: "get",
method: "job.get",
params: { jobId },
}),
);
const settled = expectSuccess(get, "job.get").job;
expect(settled.state).toBe("cancelled");
expect(settled.error).toBe("Job cancelled");

await slowDaemon.shutdown();
});

test("session.list rejects unknown instance ids", async () => {
const response = await daemon.handleRequest(
req({
id: "session-list",
method: "session.list",
params: { instanceId: "does-not-exist" },
}),
);
expect(expectFailure(response).error.code).toBe("not_found");
});

test("submitting a second job during an in-flight drain still runs it promptly", async () => {
const createOne = await daemon.handleRequest(
req({
id: "inst-1",
method: "instance.create",
params: { name: "One", directory: "/tmp/project-one" },
}),
);
const createTwo = await daemon.handleRequest(
req({
id: "inst-2",
method: "instance.create",
params: { name: "Two", directory: "/tmp/project-two" },
}),
);
const one = expectSuccess(createOne, "instance.create");
const two = expectSuccess(createTwo, "instance.create");

// Submit two jobs back-to-back. The second submit happens while the
// first scheduleDrain()'s microtask/promise is still in flight and
// must be coalesced via drainPending so the second job runs without
// waiting for the first to complete.
await daemon.handleRequest(
req({
id: "submit-1",
method: "job.submit",
params: {
instanceId: one.instance.id,
session: { type: "new" },
task: { type: "prompt", prompt: "first" },
},
}),
);
await daemon.handleRequest(
req({
id: "submit-2",
method: "job.submit",
params: {
instanceId: two.instance.id,
session: { type: "new" },
task: { type: "prompt", prompt: "second" },
},
}),
);

// Fake prompt delay is 40ms; if drain coalescing regressed, the
// second job would run strictly after the first and both jobs would
// not be concurrently active.
await Bun.sleep(30);
expect(registry.globalMaxConcurrent).toBeGreaterThanOrEqual(2);
});

test("requeues running jobs after restart", async () => {
// Shut down the beforeEach daemon so we can simulate a crashed state
// by writing directly to the SQLite file.
await daemon.shutdown();

const seedStore = new StateStore(join(tmpDir, "state.sqlite"));
await seedStore.open();
const instance = seedStore.createInstance({
name: "One",
directory: "/tmp/project-one",
maxConcurrency: 1,
});
seedStore.setInstanceStatus(instance.id, "running");
const job = seedStore.createJob({
instanceId: instance.id,
session: { type: "new" },
task: { type: "prompt", prompt: "recover" },
});
seedStore.markJobRunning(job.id);
seedStore.close();

const nextDaemon = new Daemon(store, {
const nextStore = new StateStore(join(tmpDir, "state.sqlite"));
const nextDaemon = new Daemon(nextStore, {
registry: new FakeOpencodeRegistry(10),
});
await nextDaemon.bootstrap();
const response = await nextDaemon.handleRequest(
req({
id: "job-get",
method: "job.get",
params: { jobId: "job-1" },
params: { jobId: job.id },
}),
);
expect(["queued", "running", "succeeded"]).toContain(
Expand All @@ -318,7 +547,7 @@ describe("Daemon sessions", () => {

beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-session-"));
store = new StateStore(join(tmpDir, "state.json"));
store = new StateStore(join(tmpDir, "state.sqlite"));
registry = new FakeOpencodeRegistry(10);
daemon = new Daemon(store, { registry });
await daemon.bootstrap();
Expand Down Expand Up @@ -542,15 +771,16 @@ describe("Daemon sessions", () => {
}),
);

// Verify sessions are gone
// After removal, session.list for that instance must fail with
// not_found — the cascade deletes sessions with the instance row.
const after = await daemon.handleRequest(
req({
id: "session-list-after",
method: "session.list",
params: { instanceId },
}),
);
expect(expectSuccess(after, "session.list").sessions).toHaveLength(0);
expect(expectFailure(after).error.code).toBe("not_found");
});
});

Expand All @@ -562,7 +792,7 @@ describe("Daemon streaming", () => {

beforeEach(async () => {
tmpDir = await mkdtemp(join(tmpdir(), "ralph-daemon-stream-"));
store = new StateStore(join(tmpDir, "state.json"));
store = new StateStore(join(tmpDir, "state.sqlite"));
registry = new FakeOpencodeRegistry(40);
daemon = new Daemon(store, { registry });
await daemon.bootstrap();
Expand Down
Loading
Loading