Skip to content

Commit ce895ca

Browse files
committed
fix(webapp,sdk): keep chat.agent snapshots on one object store
When a webapp is configured with both a default object store (`OBJECT_STORE_BASE_URL`) and a named protocol provider (`OBJECT_STORE_DEFAULT_PROTOCOL=s3`), chat.agent session snapshot writes landed in the named provider but reads fell through to the default — so the recovery boot couldn't find the snapshot it had just written. After a mid-stream cancel, the missing snapshot triggered a fallback replay path that dropped the user's follow-up message, leaving the chat stuck in `submitted` indefinitely. Fix: - New `/api/v1/sessions/:id/snapshot-url` route handles PUT + GET symmetrically — both prefix unprefixed keys with `OBJECT_STORE_DEFAULT_PROTOCOL` so they always round-trip through the same store. - `Session.chatSnapshotStoragePath` persists the resolved URI on first write so future protocol changes don't strand existing snapshots. Reads prefer the stored URI and fall back to the computed default for pre-column sessions. - SDK calls `createChatSnapshotUploadUrl` / `getChatSnapshotUrl`; the generic v1/v2 packets endpoints are unchanged.
1 parent 6b46a34 commit ce895ca

8 files changed

Lines changed: 175 additions & 51 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Pin chat.agent session snapshots to a single object store so writes and reads
7+
always round-trip through the same provider when `OBJECT_STORE_DEFAULT_PROTOCOL`
8+
is set.
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { z } from "zod";
4+
import { $replica, prisma } from "~/db.server";
5+
import { env } from "~/env.server";
6+
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { resolveSessionByIdOrExternalId } from "~/services/realtime/sessions.server";
8+
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
9+
import { generatePresignedUrl } from "~/v3/objectStore.server";
10+
11+
const ParamsSchema = z.object({
12+
sessionId: z.string(),
13+
});
14+
15+
// Canonical key for new sessions, prefixed with the default protocol so
16+
// PUT/GET resolve to the same store on multi-provider deployments.
17+
function defaultSnapshotKey(sessionId: string): string {
18+
const path = `sessions/${sessionId}/snapshot.json`;
19+
const protocol = env.OBJECT_STORE_DEFAULT_PROTOCOL;
20+
return protocol ? `${protocol}://${path}` : path;
21+
}
22+
23+
export async function action({ request, params }: ActionFunctionArgs) {
24+
if (request.method.toUpperCase() !== "PUT") {
25+
return { status: 405, body: "Method Not Allowed" };
26+
}
27+
28+
const auth = await authenticateApiRequest(request);
29+
if (!auth) {
30+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
31+
}
32+
33+
const parsed = ParamsSchema.parse(params);
34+
const session = await resolveSessionByIdOrExternalId(
35+
$replica,
36+
auth.environment.id,
37+
parsed.sessionId
38+
);
39+
if (!session) {
40+
return json({ error: "Session not found" }, { status: 404 });
41+
}
42+
43+
// Reuse the stored path on subsequent writes; persist on first write so
44+
// future default-protocol changes don't strand existing snapshots.
45+
const key = session.chatSnapshotStoragePath ?? defaultSnapshotKey(parsed.sessionId);
46+
47+
const signed = await generatePresignedUrl(
48+
auth.environment.project.externalRef,
49+
auth.environment.slug,
50+
key,
51+
"PUT"
52+
);
53+
if (!signed.success) {
54+
return json({ error: `Failed to generate presigned URL: ${signed.error}` }, { status: 500 });
55+
}
56+
57+
if (session.chatSnapshotStoragePath === null) {
58+
await prisma.session
59+
.updateMany({
60+
where: { id: session.id, chatSnapshotStoragePath: null },
61+
data: { chatSnapshotStoragePath: key },
62+
})
63+
.catch(() => {
64+
// Best-effort; concurrent writers may have already set it.
65+
});
66+
}
67+
68+
return json({ presignedUrl: signed.url });
69+
}
70+
71+
export const loader = createLoaderApiRoute(
72+
{
73+
params: ParamsSchema,
74+
allowJWT: true,
75+
corsStrategy: "all",
76+
findResource: async (params, auth) =>
77+
resolveSessionByIdOrExternalId($replica, auth.environment.id, params.sessionId),
78+
},
79+
async ({ params, authentication, resource: session }) => {
80+
if (!session) {
81+
return json({ error: "Session not found" }, { status: 404 });
82+
}
83+
84+
// Stored path wins; fall back to computed default for pre-column sessions.
85+
const key = session.chatSnapshotStoragePath ?? defaultSnapshotKey(params.sessionId);
86+
87+
const signed = await generatePresignedUrl(
88+
authentication.environment.project.externalRef,
89+
authentication.environment.slug,
90+
key,
91+
"GET"
92+
);
93+
if (!signed.success) {
94+
return json({ error: `Failed to generate presigned URL: ${signed.error}` }, { status: 500 });
95+
}
96+
97+
return json({ presignedUrl: signed.url });
98+
}
99+
);

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
7070
// customer-facing surface so customer rate limits shouldn't apply.
7171
/^\/api\/v1\/packets\//,
7272
/^\/api\/v2\/packets\//,
73+
/^\/api\/v1\/sessions\/[^\/]+\/snapshot-url$/,
7374
],
7475
log: {
7576
rejections: env.API_RATE_LIMIT_REJECTION_LOGS_ENABLED === "1",
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ALTER TABLE "Session" ADD COLUMN IF NOT EXISTS "chatSnapshotStoragePath" TEXT;

internal-packages/database/prisma/schema.prisma

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,11 @@ model Session {
830830
/// (OSS, or pre-backfill); reads fall back to the global basin.
831831
streamBasinName String?
832832
833+
/// Storage URI (with protocol prefix) for this session's chat.agent
834+
/// snapshot blob. Set on first snapshot write. Null = pre-column session,
835+
/// fall back to computed default path.
836+
chatSnapshotStoragePath String?
837+
833838
runs SessionRun[]
834839
835840
/// Idempotency: `(env, externalId)` uniquely identifies a session.

packages/core/src/v3/apiClient/index.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,32 @@ export class ApiClient {
602602
);
603603
}
604604

605+
/** Presigned PUT URL for a `chat.agent` session snapshot. */
606+
createChatSnapshotUploadUrl(sessionId: string, requestOptions?: ZodFetchOptions) {
607+
return zodfetch(
608+
CreateUploadPayloadUrlResponseBody,
609+
`${this.baseUrl}/api/v1/sessions/${encodeURIComponent(sessionId)}/snapshot-url`,
610+
{
611+
method: "PUT",
612+
headers: this.#getHeaders(false),
613+
},
614+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
615+
);
616+
}
617+
618+
/** Presigned GET URL for a `chat.agent` session snapshot. */
619+
getChatSnapshotUrl(sessionId: string, requestOptions?: ZodFetchOptions) {
620+
return zodfetch(
621+
CreateUploadPayloadUrlResponseBody,
622+
`${this.baseUrl}/api/v1/sessions/${encodeURIComponent(sessionId)}/snapshot-url`,
623+
{
624+
method: "GET",
625+
headers: this.#getHeaders(false),
626+
},
627+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
628+
);
629+
}
630+
605631
retrieveRun(runId: string, requestOptions?: ZodFetchOptions) {
606632
return zodfetch(
607633
RetrieveRunResponse,

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -219,20 +219,6 @@ async function findLatestSessionInCursor(
219219
*/
220220
export type { ChatSnapshotV1 } from "@trigger.dev/core/v3";
221221

222-
/**
223-
* S3 key suffix for a session's snapshot blob. The webapp's presigned-URL
224-
* routes prefix this with `packets/{projectRef}/{envSlug}/` server-side, so
225-
* the final S3 key lands at
226-
* `packets/{projectRef}/{envSlug}/sessions/{sessionId}/snapshot.json`.
227-
*
228-
* Stable per session: the friendlyId persists across `chat.requestUpgrade`
229-
* continuations and idle-suspend restarts.
230-
* @internal
231-
*/
232-
function snapshotFilename(sessionId: string): string {
233-
return `sessions/${sessionId}/snapshot.json`;
234-
}
235-
236222
/**
237223
* Test-only override hook — `mockChatAgent` installs a fake to return
238224
* synthetic snapshots without hitting S3. Mirrors the `__set*ImplForTests`
@@ -285,7 +271,7 @@ async function readChatSnapshot<TUIMessage extends UIMessage>(
285271
const apiClient = apiClientManager.clientOrThrow();
286272
let presignedUrl: string;
287273
try {
288-
const resp = await apiClient.getPayloadUrl(snapshotFilename(sessionId));
274+
const resp = await apiClient.getChatSnapshotUrl(sessionId);
289275
presignedUrl = resp.presignedUrl;
290276
} catch (error) {
291277
logger.warn("chat.agent: snapshot presign (read) failed; continuing without snapshot", {
@@ -360,7 +346,7 @@ async function writeChatSnapshot<TUIMessage extends UIMessage>(
360346
const apiClient = apiClientManager.clientOrThrow();
361347
let presignedUrl: string;
362348
try {
363-
const resp = await apiClient.createUploadPayloadUrl(snapshotFilename(sessionId));
349+
const resp = await apiClient.createChatSnapshotUploadUrl(sessionId);
364350
presignedUrl = resp.presignedUrl;
365351
} catch (error) {
366352
logger.warn("chat.agent: snapshot presign (write) failed; next run will replay further", {

packages/trigger-sdk/test/chat-snapshot.test.ts

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,27 @@ function buildSnapshot(count = 1): ChatSnapshotV1 {
3434

3535
/**
3636
* Stub `apiClientManager.clientOrThrow()` so the helpers see a fake API
37-
* client whose `getPayloadUrl` / `createUploadPayloadUrl` resolve with the
38-
* presigned URLs the test wants. Returns spies for assertion.
37+
* client whose `getChatSnapshotUrl` / `createChatSnapshotUploadUrl` resolve
38+
* with the presigned URLs the test wants. Returns spies for assertion.
3939
*/
4040
function stubApiClient(opts: {
41-
getPayloadUrl?: (filename: string) => Promise<{ presignedUrl: string }>;
42-
createUploadPayloadUrl?: (filename: string) => Promise<{ presignedUrl: string }>;
41+
getChatSnapshotUrl?: (sessionId: string) => Promise<{ presignedUrl: string }>;
42+
createChatSnapshotUploadUrl?: (sessionId: string) => Promise<{ presignedUrl: string }>;
4343
}) {
44-
const getPayloadUrl = vi.fn(
45-
opts.getPayloadUrl ?? (async (_filename: string) => ({ presignedUrl: "https://example.invalid/get" }))
44+
const getChatSnapshotUrl = vi.fn(
45+
opts.getChatSnapshotUrl ??
46+
(async (_sessionId: string) => ({ presignedUrl: "https://example.invalid/get" }))
4647
);
47-
const createUploadPayloadUrl = vi.fn(
48-
opts.createUploadPayloadUrl ??
49-
(async (_filename: string) => ({ presignedUrl: "https://example.invalid/put" }))
48+
const createChatSnapshotUploadUrl = vi.fn(
49+
opts.createChatSnapshotUploadUrl ??
50+
(async (_sessionId: string) => ({ presignedUrl: "https://example.invalid/put" }))
5051
);
5152
const fakeClient = {
52-
getPayloadUrl,
53-
createUploadPayloadUrl,
53+
getChatSnapshotUrl,
54+
createChatSnapshotUploadUrl,
5455
};
55-
vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue(
56-
fakeClient as never
57-
);
58-
return { getPayloadUrl, createUploadPayloadUrl };
56+
vi.spyOn(apiClientManager, "clientOrThrow").mockReturnValue(fakeClient as never);
57+
return { getChatSnapshotUrl, createChatSnapshotUploadUrl };
5958
}
6059

6160
/**
@@ -87,7 +86,7 @@ describe("chat snapshot helpers", () => {
8786

8887
describe("readChatSnapshot", () => {
8988
it("returns the snapshot on a successful GET", async () => {
90-
const { getPayloadUrl } = stubApiClient({});
89+
const { getChatSnapshotUrl } = stubApiClient({});
9190
const snapshot = buildSnapshot(2);
9291
stubFetch(async () =>
9392
new Response(JSON.stringify(snapshot), {
@@ -97,7 +96,7 @@ describe("chat snapshot helpers", () => {
9796
);
9897

9998
const result = await readChatSnapshot("session-1");
100-
expect(getPayloadUrl).toHaveBeenCalledWith("sessions/session-1/snapshot.json");
99+
expect(getChatSnapshotUrl).toHaveBeenCalledWith("session-1");
101100
expect(result).toMatchObject({
102101
version: 1,
103102
messages: snapshot.messages,
@@ -177,7 +176,7 @@ describe("chat snapshot helpers", () => {
177176

178177
it("returns undefined when presign call fails", async () => {
179178
stubApiClient({
180-
getPayloadUrl: async () => {
179+
getChatSnapshotUrl: async () => {
181180
throw new Error("presign denied");
182181
},
183182
});
@@ -202,13 +201,13 @@ describe("chat snapshot helpers", () => {
202201

203202
describe("writeChatSnapshot", () => {
204203
it("PUTs the snapshot JSON to the presigned URL", async () => {
205-
const { createUploadPayloadUrl } = stubApiClient({});
204+
const { createChatSnapshotUploadUrl } = stubApiClient({});
206205
const fetchSpy = stubFetch(async () => new Response(null, { status: 200 }));
207206

208207
const snapshot = buildSnapshot(3);
209208
await writeChatSnapshot("session-2", snapshot);
210209

211-
expect(createUploadPayloadUrl).toHaveBeenCalledWith("sessions/session-2/snapshot.json");
210+
expect(createChatSnapshotUploadUrl).toHaveBeenCalledWith("session-2");
212211
expect(fetchSpy).toHaveBeenCalledOnce();
213212
const [url, init] = fetchSpy.mock.calls[0]!;
214213
expect(url).toBe("https://example.invalid/put");
@@ -239,7 +238,7 @@ describe("chat snapshot helpers", () => {
239238

240239
it("returns without throwing when presign fails (warns)", async () => {
241240
stubApiClient({
242-
createUploadPayloadUrl: async () => {
241+
createChatSnapshotUploadUrl: async () => {
243242
throw new Error("presign denied");
244243
},
245244
});
@@ -250,29 +249,28 @@ describe("chat snapshot helpers", () => {
250249
expect(fetchSpy).not.toHaveBeenCalled();
251250
});
252251

253-
it("uses the same `snapshotFilename(sessionId)` convention as the read path", async () => {
254-
// Round-trip check: read and write target the same key for a given
255-
// sessionId. The runtime relies on this to make read-after-write
256-
// coherent on subsequent boots.
257-
const { getPayloadUrl } = stubApiClient({
258-
getPayloadUrl: async () => ({ presignedUrl: "https://example.invalid/get" }),
252+
it("addresses reads and writes by the same sessionId", async () => {
253+
// Round-trip check: both presign methods receive the same sessionId.
254+
// The canonical key (`sessions/{id}/snapshot.json`) lives server-side
255+
// now, so the SDK has no key string to compare — sessionId equality
256+
// is the SDK-visible invariant.
257+
const { getChatSnapshotUrl } = stubApiClient({
258+
getChatSnapshotUrl: async () => ({ presignedUrl: "https://example.invalid/get" }),
259259
});
260260
stubFetch(async () => new Response(null, { status: 404 }));
261261

262-
// Trigger a read.
263262
await readChatSnapshot("round-trip-session");
264-
const [readKey] = getPayloadUrl.mock.calls[0]!;
263+
const [readArg] = getChatSnapshotUrl.mock.calls[0]!;
265264

266-
// Trigger a write to the same session.
267-
const { createUploadPayloadUrl } = stubApiClient({
268-
createUploadPayloadUrl: async () => ({ presignedUrl: "https://example.invalid/put" }),
265+
const { createChatSnapshotUploadUrl } = stubApiClient({
266+
createChatSnapshotUploadUrl: async () => ({ presignedUrl: "https://example.invalid/put" }),
269267
});
270268
stubFetch(async () => new Response(null, { status: 200 }));
271269
await writeChatSnapshot("round-trip-session", buildSnapshot());
272-
const [writeKey] = createUploadPayloadUrl.mock.calls[0]!;
270+
const [writeArg] = createChatSnapshotUploadUrl.mock.calls[0]!;
273271

274-
expect(readKey).toBe(writeKey);
275-
expect(readKey).toBe("sessions/round-trip-session/snapshot.json");
272+
expect(readArg).toBe(writeArg);
273+
expect(readArg).toBe("round-trip-session");
276274
});
277275
});
278276
});

0 commit comments

Comments
 (0)