@@ -45,6 +45,11 @@ import {
4545 removePrivateProperties ,
4646 isEmptyObject ,
4747} from "./common.server" ;
48+ import {
49+ isClickHouseJsonParseError ,
50+ parseRowNumberFromError ,
51+ sanitizeRows ,
52+ } from "./sanitizeRowsOnParseError.server" ;
4853import type {
4954 CompleteableTaskRun ,
5055 CreateEventInput ,
@@ -104,6 +109,13 @@ export class ClickhouseEventRepository implements IEventRepository {
104109 private readonly _llmMetricsFlushScheduler : DynamicFlushScheduler < LlmMetricsV1Input > ;
105110 private _tracer : Tracer ;
106111 private _version : "v1" | "v2" ;
112+ /**
113+ * Counts batches that hit a ClickHouse JSON parse failure that survived
114+ * one sanitize-retry. These batches are dropped on the floor (the scheduler
115+ * is told the flush "succeeded" so its queue counter doesn't leak), and we
116+ * track the drop count for observability.
117+ */
118+ private _permanentlyDroppedBatches = 0 ;
107119
108120 constructor ( config : ClickhouseEventRepositoryConfig ) {
109121 this . _clickhouse = config . clickhouse ;
@@ -147,6 +159,11 @@ export class ClickhouseEventRepository implements IEventRepository {
147159 return this . _config . maximumLiveReloadingSetting ?? 1000 ;
148160 }
149161
162+ /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
163+ get permanentlyDroppedBatches ( ) {
164+ return this . _permanentlyDroppedBatches ;
165+ }
166+
150167 /**
151168 * Clamps a start time (in nanoseconds) to now if it's too far in the past.
152169 * Returns the clamped value as a bigint.
@@ -215,19 +232,32 @@ export class ClickhouseEventRepository implements IEventRepository {
215232 ? this . _clickhouse . taskEventsV2 . insert
216233 : this . _clickhouse . taskEvents . insert ;
217234
218- const [ insertError , insertResult ] = await insertFn ( events , {
219- params : {
220- clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
221- } ,
222- } ) ;
235+ const doInsert = async ( ) => {
236+ const [ insertError , insertResult ] = await insertFn ( events , {
237+ params : {
238+ clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
239+ } ,
240+ } ) ;
241+ if ( insertError ) throw insertError ;
242+ return insertResult ;
243+ } ;
244+
245+ const outcome = await this . #insertWithJsonParseRecovery(
246+ flushId ,
247+ events ,
248+ doInsert ,
249+ `task_events_${ this . _version } `
250+ ) ;
223251
224- if ( insertError ) {
225- throw insertError ;
252+ if ( outcome . kind === "dropped" ) {
253+ // Loud log already emitted; nothing landed in ClickHouse — don't publish to Redis.
254+ return ;
226255 }
227256
228257 logger . info ( "ClickhouseEventRepository.flushBatch Inserted batch into clickhouse" , {
229258 events : events . length ,
230- insertResult,
259+ insertResult : outcome . insertResult ,
260+ sanitized : outcome . kind === "sanitized" ,
231261 version : this . _version ,
232262 } ) ;
233263
@@ -236,22 +266,134 @@ export class ClickhouseEventRepository implements IEventRepository {
236266 }
237267
238268 async #flushLlmMetricsBatch( flushId : string , rows : LlmMetricsV1Input [ ] ) {
269+ const doInsert = async ( ) => {
270+ const [ insertError , insertResult ] = await this . _clickhouse . llmMetrics . insert ( rows , {
271+ params : {
272+ clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
273+ } ,
274+ } ) ;
275+ if ( insertError ) throw insertError ;
276+ return insertResult ;
277+ } ;
239278
240- const [ insertError ] = await this . _clickhouse . llmMetrics . insert ( rows , {
241- params : {
242- clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
243- } ,
244- } ) ;
279+ const outcome = await this . #insertWithJsonParseRecovery(
280+ flushId ,
281+ rows ,
282+ doInsert ,
283+ "llm_metrics_v1"
284+ ) ;
245285
246- if ( insertError ) {
247- throw insertError ;
286+ if ( outcome . kind === "dropped" ) {
287+ return ;
248288 }
249289
250290 logger . info ( "ClickhouseEventRepository.flushLlmMetricsBatch Inserted LLM metrics batch" , {
251291 rows : rows . length ,
292+ sanitized : outcome . kind === "sanitized" ,
252293 } ) ;
253294 }
254295
296+ /**
297+ * Wraps a ClickHouse insert callable with reactive UTF-16 sanitization.
298+ *
299+ * On a `Cannot parse JSON object` failure:
300+ * 1. Sanitize the batch from `max(0, parsedRowN - 1)` onwards (rows
301+ * before the failing one parsed fine — known good).
302+ * 2. Retry the insert once with the sanitized batch.
303+ * 3. If the retry still fails with the same error class, log loudly,
304+ * increment `permanentlyDroppedBatches`, and return without
305+ * throwing — the scheduler's transient-retry path would just repeat
306+ * the same deterministic failure.
307+ *
308+ * Non-parse errors propagate unchanged so the scheduler's existing
309+ * backoff/retry behaviour still handles transient network or CH issues.
310+ */
311+ async #insertWithJsonParseRecovery< T extends object > (
312+ flushId : string ,
313+ rows : T [ ] ,
314+ doInsert : ( ) => Promise < unknown > ,
315+ contextLabel : string
316+ ) : Promise <
317+ | { kind : "inserted" ; insertResult : unknown }
318+ | { kind : "sanitized" ; insertResult : unknown }
319+ | { kind : "dropped" }
320+ > {
321+ try {
322+ return { kind : "inserted" , insertResult : await doInsert ( ) } ;
323+ } catch ( firstError ) {
324+ if ( ! isClickHouseJsonParseError ( firstError ) ) throw firstError ;
325+
326+ const firstMessage =
327+ typeof firstError === "object" && firstError !== null && "message" in firstError
328+ ? String ( ( firstError as { message ?: unknown } ) . message ?? "" )
329+ : String ( firstError ) ;
330+
331+ // Sanitize the whole batch. ClickHouse's `at row N` index is logged
332+ // for observability but not used to slice — its semantics under
333+ // parallel parsing are not stable enough to safely skip rows.
334+ const rowHint = parseRowNumberFromError ( firstMessage ) ;
335+ const { rowsTouched, fieldsSanitized } = sanitizeRows ( rows ) ;
336+
337+ // Sanitizer found nothing to fix → retrying the exact same batch is
338+ // guaranteed to hit the same deterministic parse failure. Skip the
339+ // wasted ClickHouse round-trip and drop loudly. Throwing instead would
340+ // hand the failure back to the scheduler's 3× transient-retry loop —
341+ // exactly the retry storm this wrapper is designed to avoid.
342+ if ( fieldsSanitized === 0 ) {
343+ this . _permanentlyDroppedBatches += 1 ;
344+ logger . error (
345+ "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix" ,
346+ {
347+ flushId,
348+ contextLabel,
349+ batchSize : rows . length ,
350+ clickhouseRowHint : rowHint ,
351+ permanentlyDroppedBatches : this . _permanentlyDroppedBatches ,
352+ sampleRow : JSON . stringify ( rows [ 0 ] ?? null ) . slice ( 0 , 1024 ) ,
353+ clickhouseError : firstMessage . split ( "\n" ) [ 0 ] ,
354+ }
355+ ) ;
356+ return { kind : "dropped" } ;
357+ }
358+
359+ logger . warn ( "Sanitizing batch after ClickHouse JSON parse error" , {
360+ flushId,
361+ contextLabel,
362+ batchSize : rows . length ,
363+ clickhouseRowHint : rowHint ,
364+ rowsTouched,
365+ fieldsSanitized,
366+ clickhouseError : firstMessage . split ( "\n" ) [ 0 ] ,
367+ } ) ;
368+
369+ try {
370+ return { kind : "sanitized" , insertResult : await doInsert ( ) } ;
371+ } catch ( retryError ) {
372+ if ( ! isClickHouseJsonParseError ( retryError ) ) throw retryError ;
373+
374+ this . _permanentlyDroppedBatches += 1 ;
375+ const retryMessage =
376+ typeof retryError === "object" && retryError !== null && "message" in retryError
377+ ? String ( ( retryError as { message ?: unknown } ) . message ?? "" )
378+ : String ( retryError ) ;
379+ logger . error (
380+ "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error" ,
381+ {
382+ flushId,
383+ contextLabel,
384+ batchSize : rows . length ,
385+ permanentlyDroppedBatches : this . _permanentlyDroppedBatches ,
386+ sampleRow : JSON . stringify ( rows [ 0 ] ?? null ) . slice ( 0 , 1024 ) ,
387+ firstError : firstMessage . split ( "\n" ) [ 0 ] ,
388+ retryError : retryMessage . split ( "\n" ) [ 0 ] ,
389+ }
390+ ) ;
391+
392+ return { kind : "dropped" } ;
393+ }
394+ }
395+ }
396+
255397 #createLlmMetricsInput( event : CreateEventInput ) : LlmMetricsV1Input {
256398 const llmMetrics = event . _llmMetrics ! ;
257399
0 commit comments