Skip to content

Commit 71d98b4

Browse files
matt-aitkendevin-ai-integration[bot]claude
authored
Support for org-scoped ClickHouse (#3333)
Added `OrganizationDataStore` which allows orgs to have data stored in specific separate services. For now this is just used for ClickHouse. When using ClickHouse we get a client for the factory and pass in the org id. Particular care has to be made with two hot-insert paths: 1. RunReplicationService 2. OTLPExporter --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 832cf72 commit 71d98b4

87 files changed

Lines changed: 2909 additions & 1050 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.cursor/mcp.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
{
2-
"mcpServers": {}
2+
"mcpServers": {
3+
"linear": {
4+
"url": "https://mcp.linear.app/mcp"
5+
}
6+
}
37
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances

CLAUDE.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,17 @@ containerTest("should use both", async ({ prisma, redisOptions }) => {
6969
});
7070
```
7171

72+
## Code Style
73+
74+
### Imports
75+
76+
**Prefer static imports over dynamic imports.** Only use dynamic `import()` when:
77+
- Circular dependencies cannot be resolved otherwise
78+
- Code splitting is genuinely needed for performance
79+
- The module must be loaded conditionally at runtime
80+
81+
Dynamic imports add unnecessary overhead in hot paths and make code harder to analyze. If you find yourself using `await import()`, ask if a regular `import` statement would work instead.
82+
7283
## Changesets and Server Changes
7384

7485
When modifying any public package (`packages/*` or `integrations/*`), add a changeset:

apps/webapp/app/entry.server.tsx

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,12 @@ import {
2424
registerRunEngineEventBusHandlers,
2525
setupBatchQueueCallbacks,
2626
} from "./v3/runEngineHandlers.server";
27+
// Touch the sessions replication singleton at entry so it boots deterministically
28+
// on webapp startup. The singleton's initializer wires start (gated on
29+
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
30+
// runsReplicationInstance.
2731
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
28-
import { signalsEmitter } from "./services/signals.server";
29-
30-
// Start the sessions replication service (subscribes to the logical replication
31-
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
32-
// runs deterministically on webapp boot rather than lazily via a singleton
33-
// reference elsewhere in the module graph.
34-
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
35-
// Capture a non-nullable reference so the shutdown closure below
36-
// doesn't need to re-null-check (TS narrowing doesn't follow through
37-
// an inner function scope).
38-
const replicator = sessionsReplicationInstance;
39-
replicator
40-
.start()
41-
.then(() => {
42-
console.log("🗃️ Sessions replication service started");
43-
})
44-
.catch((error) => {
45-
console.error("🗃️ Sessions replication service failed to start", {
46-
error,
47-
});
48-
});
49-
50-
// Wrap the async shutdown in a sync handler that catches rejections —
51-
// SIGTERM/SIGINT fire during process teardown, and an unhandled
52-
// promise rejection from `_replicationClient.stop()` there would
53-
// bubble up past the process exit. Matches the pattern in
54-
// dynamicFlushScheduler.server.ts.
55-
const shutdownSessionsReplication = () => {
56-
replicator.shutdown().catch((error) => {
57-
console.error("🗃️ Sessions replication service shutdown error", {
58-
error,
59-
});
60-
});
61-
};
62-
signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
63-
signalsEmitter.on("SIGINT", shutdownSessionsReplication);
64-
}
32+
void sessionsReplicationInstance;
6533

6634
const ABORT_DELAY = 30000;
6735

apps/webapp/app/env.server.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,10 @@ const EnvironmentSchema = z
459459
// If specified, you must configure the corresponding provider using OBJECT_STORE_{PROTOCOL}_* env vars.
460460
// Example: OBJECT_STORE_DEFAULT_PROTOCOL=s3 requires OBJECT_STORE_S3_BASE_URL, OBJECT_STORE_S3_ACCESS_KEY_ID, etc.
461461
// Enables zero-downtime migration between providers (old data keeps working, new data uses new provider).
462-
OBJECT_STORE_DEFAULT_PROTOCOL: z.string().regex(/^[a-z0-9]+$/).optional(),
462+
OBJECT_STORE_DEFAULT_PROTOCOL: z
463+
.string()
464+
.regex(/^[a-z0-9]+$/)
465+
.optional(),
463466

464467
ARTIFACTS_OBJECT_STORE_BUCKET: z.string().optional(),
465468
ARTIFACTS_OBJECT_STORE_BASE_URL: z.string().optional(),
@@ -1489,9 +1492,18 @@ const EnvironmentSchema = z
14891492
EVENTS_CLICKHOUSE_MAX_TRACE_DETAILED_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(5_000),
14901493
EVENTS_CLICKHOUSE_MAX_LIVE_RELOADING_SETTING: z.coerce.number().int().default(2000),
14911494

1495+
// Organization data stores registry
1496+
ORGANIZATION_DATA_STORES_RELOAD_INTERVAL_MS: z.coerce
1497+
.number()
1498+
.int()
1499+
.default(60 * 1000), // 1 minute
1500+
14921501
// LLM cost tracking
14931502
LLM_COST_TRACKING_ENABLED: BoolEnv.default(true),
1494-
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce.number().int().default(5 * 60 * 1000), // 5 minutes
1503+
LLM_PRICING_RELOAD_INTERVAL_MS: z.coerce
1504+
.number()
1505+
.int()
1506+
.default(5 * 60 * 1000), // 5 minutes
14951507
LLM_PRICING_RELOAD_CHANNEL: z.string().default("llm-registry:reload"),
14961508
LLM_PRICING_RELOAD_DEBOUNCE_MS: z.coerce.number().int().default(1000),
14971509
// Whether to subscribe this process to the LLM_PRICING_RELOAD_CHANNEL.

apps/webapp/app/presenters/v3/AgentListPresenter.server.ts

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ import {
33
type RuntimeEnvironmentType,
44
type TaskTriggerSource,
55
} from "@trigger.dev/database";
6-
import { ClickHouse } from "@internal/clickhouse";
6+
import { type ClickHouse } from "@internal/clickhouse";
77
import { z } from "zod";
88
import { $replica } from "~/db.server";
9-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
9+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
1010
import { singleton } from "~/utils/singleton";
1111
import { findCurrentWorkerFromEnvironment } from "~/v3/models/workerDeployment.server";
1212

@@ -24,10 +24,7 @@ export type AgentActiveState = {
2424
};
2525

2626
export class AgentListPresenter {
27-
constructor(
28-
private readonly clickhouse: ClickHouse,
29-
private readonly _replica: PrismaClientOrTransaction
30-
) {}
27+
constructor(private readonly _replica: PrismaClientOrTransaction) {}
3128

3229
public async call({
3330
organizationId,
@@ -40,6 +37,11 @@ export class AgentListPresenter {
4037
environmentId: string;
4138
environmentType: RuntimeEnvironmentType;
4239
}) {
40+
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(
41+
organizationId,
42+
"standard"
43+
);
44+
4345
const currentWorker = await findCurrentWorkerFromEnvironment(
4446
{
4547
id: environmentId,
@@ -89,20 +91,21 @@ export class AgentListPresenter {
8991
}
9092

9193
// All queries are deferred for streaming
92-
const activeStates = this.#getActiveStates(environmentId, slugs);
93-
const conversationSparklines = this.#getConversationSparklines(environmentId, slugs);
94-
const costSparklines = this.#getCostSparklines(environmentId, slugs);
95-
const tokenSparklines = this.#getTokenSparklines(environmentId, slugs);
94+
const activeStates = this.#getActiveStates(clickhouse, environmentId, slugs);
95+
const conversationSparklines = this.#getConversationSparklines(clickhouse, environmentId, slugs);
96+
const costSparklines = this.#getCostSparklines(clickhouse, environmentId, slugs);
97+
const tokenSparklines = this.#getTokenSparklines(clickhouse, environmentId, slugs);
9698

9799
return { agents, activeStates, conversationSparklines, costSparklines, tokenSparklines };
98100
}
99101

100102
/** Count runs currently executing vs suspended per agent */
101103
async #getActiveStates(
104+
clickhouse: ClickHouse,
102105
environmentId: string,
103106
slugs: string[]
104107
): Promise<Record<string, AgentActiveState>> {
105-
const queryFn = this.clickhouse.reader.query({
108+
const queryFn = clickhouse.reader.query({
106109
name: "agentActiveStates",
107110
query: `SELECT
108111
task_identifier,
@@ -140,10 +143,11 @@ export class AgentListPresenter {
140143

141144
/** 24h hourly sparkline of conversation (run) count per agent */
142145
async #getConversationSparklines(
146+
clickhouse: ClickHouse,
143147
environmentId: string,
144148
slugs: string[]
145149
): Promise<Record<string, number[]>> {
146-
const queryFn = this.clickhouse.reader.query({
150+
const queryFn = clickhouse.reader.query({
147151
name: "agentConversationSparklines",
148152
query: `SELECT
149153
task_identifier,
@@ -172,10 +176,11 @@ export class AgentListPresenter {
172176

173177
/** 24h hourly sparkline of LLM cost per agent */
174178
async #getCostSparklines(
179+
clickhouse: ClickHouse,
175180
environmentId: string,
176181
slugs: string[]
177182
): Promise<Record<string, number[]>> {
178-
const queryFn = this.clickhouse.reader.query({
183+
const queryFn = clickhouse.reader.query({
179184
name: "agentCostSparklines",
180185
query: `SELECT
181186
task_identifier,
@@ -203,10 +208,11 @@ export class AgentListPresenter {
203208

204209
/** 24h hourly sparkline of total tokens per agent */
205210
async #getTokenSparklines(
211+
clickhouse: ClickHouse,
206212
environmentId: string,
207213
slugs: string[]
208214
): Promise<Record<string, number[]>> {
209-
const queryFn = this.clickhouse.reader.query({
215+
const queryFn = clickhouse.reader.query({
210216
name: "agentTokenSparklines",
211217
query: `SELECT
212218
task_identifier,
@@ -284,5 +290,5 @@ export class AgentListPresenter {
284290
export const agentListPresenter = singleton("agentListPresenter", setupAgentListPresenter);
285291

286292
function setupAgentListPresenter() {
287-
return new AgentListPresenter(clickhouseClient, $replica);
293+
return new AgentListPresenter($replica);
288294
}

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig
99
import assertNever from "assert-never";
1010
import { z } from "zod";
1111
import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions";
12-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
12+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
1313
import { logger } from "~/services/logger.server";
1414
import { CoercedDate } from "~/utils/zod";
1515
import { ServiceValidationError } from "~/v3/services/baseService.server";
@@ -269,7 +269,8 @@ export class ApiRunListPresenter extends BasePresenter {
269269
options.machines = searchParams["filter[machine]"];
270270
}
271271

272-
const presenter = new NextRunListPresenter(this._replica, clickhouseClient);
272+
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
273+
const presenter = new NextRunListPresenter(this._replica, clickhouse);
273274

274275
logger.debug("Calling RunListPresenter", { options });
275276

apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type PrismaClient } from "@trigger.dev/database";
22
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
3-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
3+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
44
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
55
import { getRunFiltersFromRequest } from "../RunFilters.server";
66
import { BasePresenter } from "./basePresenter.server";
@@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter {
2424
Object.fromEntries(new URL(request.url).searchParams)
2525
);
2626

27+
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
2728
const runsRepository = new RunsRepository({
28-
clickhouse: clickhouseClient,
29+
clickhouse,
2930
prisma: this._replica as PrismaClient,
3031
});
3132

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/Tr
33
import { prisma, type PrismaClient } from "~/db.server";
44
import { createTimelineSpanEventsFromSpanEvents } from "~/utils/timelineSpanEvents";
55
import { getUsername } from "~/utils/username";
6-
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
76
import { SpanSummary } from "~/v3/eventRepository/eventRepository.types";
87
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
98
import { isFinalRunStatus } from "~/v3/taskStatus";
109
import { env } from "~/env.server";
10+
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1111

1212
type Result = Awaited<ReturnType<RunPresenter["call"]>>;
1313
export type Run = Result["run"];
@@ -145,10 +145,13 @@ export class RunPresenter {
145145
};
146146
}
147147

148-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore);
148+
const repository = await getEventRepositoryForStore(
149+
run.taskEventStore,
150+
run.runtimeEnvironment.organizationId
151+
);
149152

150153
// get the events
151-
let traceSummary = await eventRepository.getTraceSummary(
154+
let traceSummary = await repository.getTraceSummary(
152155
getTaskEventStoreTableForRun(run),
153156
run.runtimeEnvironment.id,
154157
run.traceId,
@@ -272,7 +275,7 @@ export class RunPresenter {
272275
overridesBySpanId: traceSummary.overridesBySpanId,
273276
linkedRunIdBySpanId,
274277
},
275-
maximumLiveReloadingSetting: eventRepository.maximumLiveReloadingSetting,
278+
maximumLiveReloadingSetting: repository.maximumLiveReloadingSetting,
276279
};
277280
}
278281
}

apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
22
import { BasePresenter } from "./basePresenter.server";
3-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
3+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server";
44
import { type PrismaClient } from "@trigger.dev/database";
55
import { timeFilters } from "~/components/runs/v3/SharedFilters";
66

@@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter {
3737
}: TagListOptions) {
3838
const hasFilters = Boolean(name?.trim());
3939

40+
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "standard");
4041
const runsRepository = new RunsRepository({
41-
clickhouse: clickhouseClient,
42+
clickhouse,
4243
prisma: this._replica as PrismaClient,
4344
});
4445

0 commit comments

Comments
 (0)