From c80ce7985a6cead9bb5895130e67d1416d404b47 Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 4 Feb 2026 20:44:20 +0300 Subject: [PATCH 1/5] feat(grouper): add Prometheus metrics --- lib/metrics.ts | 39 ++++++++++++++++ runner.ts | 89 ++++++++---------------------------- workers/grouper/src/index.ts | 46 +++++++++++++++++++ 3 files changed, 103 insertions(+), 71 deletions(-) create mode 100644 lib/metrics.ts diff --git a/lib/metrics.ts b/lib/metrics.ts new file mode 100644 index 00000000..380d78d7 --- /dev/null +++ b/lib/metrics.ts @@ -0,0 +1,39 @@ +import * as client from 'prom-client'; +import os from 'os'; +import { nanoid } from 'nanoid'; + +const register = new client.Registry(); + +client.collectDefaultMetrics({ register }); + +export { register, client }; + +/** + * Start periodic push to pushgateway + * + * @param workerName - name of the worker for grouping + */ +export function startMetricsPushing(workerName: string): void { + const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; + const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); + + if (!url) { + return; + } + + const hostname = os.hostname(); + const ID_SIZE = 5; + const id = nanoid(ID_SIZE); + + const gateway = new client.Pushgateway(url, [], register); + + console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`); + + setInterval(() => { + gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { + if (err) { + console.error('Metrics push error:', err); + } + }); + }, interval); +} diff --git a/runner.ts b/runner.ts index 443beea8..9147b54b 100644 --- a/runner.ts +++ b/runner.ts @@ -9,6 +9,7 @@ import * as utils from './lib/utils'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; +import { startMetricsPushing } from './lib/metrics'; dotenv.config(); @@ -57,19 +58,17 @@ class WorkerRunner { .then((workerConstructors) => { this.constructWorkers(workerConstructors); }) - // .then(() => { - // try { - // this.startMetrics(); - // } catch (e) { - // HawkCatcher.send(e); - // console.error(`Metrics not started: ${e}`); - // } - // - // return Promise.resolve(); - // }) .then(() => { return this.startWorkers(); }) + .then(() => { + try { + this.startMetrics(); + } catch (e) { + HawkCatcher.send(e); + console.error(`Metrics not started: ${e}`); + } + }) .then(() => { this.observeProcess(); }) @@ -82,67 +81,15 @@ class WorkerRunner { /** * Run metrics exporter */ - // private startMetrics(): void { - // if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { - // return; - // } - // - // const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL); - // - // if (isNaN(PUSH_INTERVAL)) { - // throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set'); - // } - // - // const collectDefaultMetrics = promClient.collectDefaultMetrics; - // const Registry = promClient.Registry; - // - // const register = new Registry(); - // const startGcStats = gcStats(register); - // - // const hostname = os.hostname(); - // - // const ID_SIZE = 5; - // const id = nanoid(ID_SIZE); - // - // // eslint-disable-next-line node/no-deprecated-api - // const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host; - // - // // Initialize metrics for workers - // this.workers.forEach((worker) => { - // // worker.initMetrics(); - // worker.getMetrics().forEach((metric: promClient.Counter) => register.registerMetric(metric)); - // }); - // - // collectDefaultMetrics({ register }); - // startGcStats(); - // - // this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register); - // - // console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`); - // - // // Pushing metrics to the pushgateway every PUSH_INTERVAL - // this.pushIntervalNumber = setInterval(() => { - // this.workers.forEach((worker) => { - // if (!this.gateway || !instance) { - // return; - // } - // // Use pushAdd not to overwrite previous metrics - // this.gateway.pushAdd({ - // jobName: 'workers', - // groupings: { - // worker: worker.type.replace('/', '_'), - // host: hostname, - // id, - // }, - // }, (err?: Error) => { - // if (err) { - // HawkCatcher.send(err); - // console.log(`Error of pushing metrics to gateway: ${err}`); - // } - // }); - // }); - // }, PUSH_INTERVAL); - // } + private startMetrics(): void { + if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + return; + } + + this.workers.forEach((worker) => { + startMetricsPushing(worker.type.replace('/', '_')); + }); + } /** * Dynamically loads workers through the yarn workspaces diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc7..03fdbd27 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import { register, client } from '../../../lib/metrics'; /** * eslint does not count decorators as a variable usage @@ -72,6 +73,28 @@ export default class GrouperWorker extends Worker { */ private redis = new RedisHelper(); + /** + * Prometheus metrics + */ + private metricsEventsTotal = new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: ['type'], + registers: [register], + }); + + private metricsEventDuration = new client.Histogram({ + name: 'hawk_grouper_event_duration_seconds', + help: 'Duration of event processing in seconds', + registers: [register], + }); + + private metricsErrorsTotal = new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [register], + }); + /** * Start consuming messages */ @@ -105,6 +128,24 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { + const endTimer = this.metricsEventDuration.startTimer(); + + try { + await this.handleInternal(task); + endTimer(); + } catch (error) { + endTimer(); + this.metricsErrorsTotal.inc(); + throw error; + } + } + + /** + * Internal task handling function + * + * @param task - event to handle + */ + private async handleInternal(task: GroupWorkerTask): Promise { let uniqueEventHash = await this.getUniqueEventHash(task); // FIX RELEASE TYPE @@ -147,6 +188,11 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; + /** + * Increment metrics counter + */ + this.metricsEventsTotal.inc({ type: isFirstOccurrence ? 'new' : 'repeated' }); + let repetitionId = null; let incrementDailyAffectedUsers = false; From b7db107a786b1d545ae1d13252fb975e284f1990 Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 11 Feb 2026 00:16:45 +0300 Subject: [PATCH 2/5] feat(grouper): OOM-debug logging Add MongoDB, payload and delta size metrics with OOM-debug logging --- workers/grouper/src/index.ts | 105 +++++++++++++++++++++++++++++++---- 1 file changed, 95 insertions(+), 10 deletions(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 03fdbd27..0717abd9 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -83,9 +83,9 @@ export default class GrouperWorker extends Worker { registers: [register], }); - private metricsEventDuration = new client.Histogram({ - name: 'hawk_grouper_event_duration_seconds', - help: 'Duration of event processing in seconds', + private metricsHandleDuration = new client.Histogram({ + name: 'hawk_grouper_handle_duration_seconds', + help: 'Duration of handle() call in seconds', registers: [register], }); @@ -95,6 +95,33 @@ export default class GrouperWorker extends Worker { registers: [register], }); + private metricsMongoDuration = new client.Histogram({ + name: 'hawk_grouper_mongo_duration_seconds', + help: 'Duration of MongoDB operations in seconds', + labelNames: ['operation'], + registers: [register], + }); + + private metricsDeltaSize = new client.Histogram({ + name: 'hawk_grouper_delta_size_bytes', + help: 'Size of computed repetition delta in bytes', + buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + registers: [register], + }); + + private metricsPayloadSize = new client.Histogram({ + name: 'hawk_grouper_payload_size_bytes', + help: 'Size of incoming event payload in bytes', + buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + registers: [register], + }); + + private metricsDuplicateRetries = new client.Counter({ + name: 'hawk_grouper_duplicate_retries_total', + help: 'Number of retries due to duplicate key errors', + registers: [register], + }); + /** * Start consuming messages */ @@ -128,7 +155,7 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { - const endTimer = this.metricsEventDuration.startTimer(); + const endTimer = this.metricsHandleDuration.startTimer(); try { await this.handleInternal(task); @@ -146,6 +173,12 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ private async handleInternal(task: GroupWorkerTask): Promise { + const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + + this.metricsPayloadSize.observe(taskPayloadSize); + + this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); + let uniqueEventHash = await this.getUniqueEventHash(task); // FIX RELEASE TYPE @@ -211,6 +244,8 @@ export default class GrouperWorker extends Worker { try { const incrementAffectedUsers = !!task.payload.user; + this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`); + /** * Insert new event */ @@ -240,6 +275,8 @@ export default class GrouperWorker extends Worker { * and we need to process this event as repetition */ if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { + this.metricsDuplicateRetries.inc(); + this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); await this.handle(task); return; @@ -266,6 +303,10 @@ export default class GrouperWorker extends Worker { let delta: RepetitionDelta; + const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload)); + + this.logger.info(`[computeDelta] existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`); + try { /** * Calculate delta between original event and repetition @@ -276,9 +317,16 @@ export default class GrouperWorker extends Worker { throw new DiffCalculationError(e, existedEvent.payload, task.payload); } + const deltaStr = JSON.stringify(delta); + const deltaSize = Buffer.byteLength(deltaStr); + + this.metricsDeltaSize.observe(deltaSize); + + this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`); + const newRepetition = { groupHash: uniqueEventHash, - delta: JSON.stringify(delta), + delta: deltaStr, timestamp: task.timestamp, } as RepetitionDBScheme; @@ -296,6 +344,10 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); + const mem = process.memoryUsage(); + + this.logger.info(`[handle] done, heapUsed=${Math.round(mem.heapUsed / 1024 / 1024)}MB heapTotal=${Math.round(mem.heapTotal / 1024 / 1024)}MB rss=${Math.round(mem.rss / 1024 / 1024)}MB`); + /** * Add task for NotifierWorker only if event is not ignored */ @@ -394,7 +446,9 @@ export default class GrouperWorker extends Worker { try { const originalEvent = await this.findFirstEventByPattern(matchingPattern.pattern, projectId); - this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`); + const originalEventSize = Buffer.byteLength(JSON.stringify(originalEvent)); + + this.logger.info(`[findSimilarEvent] found by pattern, originalEventSize=${originalEventSize}b`); if (originalEvent) { return originalEvent; @@ -564,7 +618,9 @@ export default class GrouperWorker extends Worker { const eventCacheKey = await this.getEventCacheKey(projectId, groupHash); return this.cache.get(eventCacheKey, async () => { - return this.eventsDb.getConnection() + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' }); + + const result = await this.eventsDb.getConnection() .collection(`events:${projectId}`) .findOne({ groupHash, @@ -572,6 +628,10 @@ export default class GrouperWorker extends Worker { .catch((err) => { throw new DatabaseReadWriteError(err); }); + + endTimer(); + + return result; }); } @@ -599,12 +659,18 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' }); + const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); encodeUnsafeFields(groupedEventData); - return (await collection + const result = (await collection .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + + endTimer(); + + return result; } /** @@ -618,13 +684,20 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveRepetition' }); + try { const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); encodeUnsafeFields(repetition); - return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + const result = (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + + endTimer(); + + return result; } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err, { repetition: repetition as unknown as Record, projectId, @@ -644,6 +717,8 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'incrementCounter' }); + try { const updateQuery = incrementAffected ? { @@ -658,10 +733,15 @@ export default class GrouperWorker extends Worker { }, }; - return (await this.eventsDb.getConnection() + const result = (await this.eventsDb.getConnection() .collection(`events:${projectId}`) .updateOne(query, updateQuery)).modifiedCount; + + endTimer(); + + return result; } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err); } } @@ -687,6 +767,8 @@ export default class GrouperWorker extends Worker { throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveDailyEvents' }); + try { const midnight = this.getMidnightByEventTimestamp(eventTimestamp); @@ -710,7 +792,10 @@ export default class GrouperWorker extends Worker { }, }, { upsert: true }); + + endTimer(); } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err); } } From 38db3136cf0373b7df1c5e3c0c22e1c028069c66 Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 11 Feb 2026 01:15:32 +0300 Subject: [PATCH 3/5] fix(grouper): handle undefined delta --- workers/grouper/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 0717abd9..0eb756db 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -318,7 +318,7 @@ export default class GrouperWorker extends Worker { } const deltaStr = JSON.stringify(delta); - const deltaSize = Buffer.byteLength(deltaStr); + const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; this.metricsDeltaSize.observe(deltaSize); From ba3f4b90f5ce85597802d83ae2a9c40cbe03a49f Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 18 Feb 2026 20:09:52 +0300 Subject: [PATCH 4/5] feat(grouper): add memory leak diagnostics logs --- .env.sample | 8 ++- workers/grouper/src/index.ts | 114 +++++++++++++++++++++++++++++++++-- 2 files changed, 117 insertions(+), 5 deletions(-) diff --git a/.env.sample b/.env.sample index f8e8b6ef..b45010f3 100644 --- a/.env.sample +++ b/.env.sample @@ -31,6 +31,12 @@ PROMETHEUS_PUSHGATEWAY_URL= # pushgateway push interval in ms PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Grouper memory log controls +GROUPER_MEMORY_LOG_EVERY_TASKS=50 +GROUPER_MEMORY_GROWTH_WINDOW_TASKS=200 +GROUPER_MEMORY_GROWTH_WARN_MB=64 +GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB=16 + # project token for error catching HAWK_CATCHER_TOKEN= @@ -40,4 +46,4 @@ HAWK_CATCHER_TOKEN= IS_NOTIFIER_WORKER_ENABLED=false ## Url for telegram notifications about workspace blocks and unblocks -TELEGRAM_LIMITER_CHAT_URL= \ No newline at end of file +TELEGRAM_LIMITER_CHAT_URL= diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 932b7973..3507149f 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -54,6 +54,26 @@ const DB_DUPLICATE_KEY_ERROR = '11000'; */ const MAX_CODE_LINE_LENGTH = 140; +const MB_IN_BYTES = 1_048_576; +const HUNDRED = 100; +const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50; +const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; +const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; +const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; + +function asPositiveNumber(value: string | undefined, fallback: number): number { + const parsed = Number(value); + + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +const MEMORY_LOG_EVERY_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS); +const MEMORY_GROWTH_WINDOW_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS); +const MEMORY_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB); +const MEMORY_HANDLE_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB); + /** * Worker for handling Javascript events */ @@ -115,14 +135,14 @@ export default class GrouperWorker extends Worker { private metricsDeltaSize = new client.Histogram({ name: 'hawk_grouper_delta_size_bytes', help: 'Size of computed repetition delta in bytes', - buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + buckets: METRICS_SIZE_BUCKETS, registers: [register], }); private metricsPayloadSize = new client.Histogram({ name: 'hawk_grouper_payload_size_bytes', help: 'Size of incoming event payload in bytes', - buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + buckets: METRICS_SIZE_BUCKETS, registers: [register], }); @@ -137,6 +157,17 @@ export default class GrouperWorker extends Worker { */ private cacheCleanupInterval: NodeJS.Timeout | null = null; + /** + * Number of handled tasks in current worker process. + */ + private handledTasksCount = 0; + + /** + * Baseline for memory growth checks. + */ + private memoryCheckpointTask = 0; + private memoryCheckpointHeapUsed = 0; + /** * Start consuming messages */ @@ -158,6 +189,11 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = setInterval(() => { this.clearCache(); }, CACHE_CLEANUP_INTERVAL_MINUTES * TimeMs.MINUTE); + const startupMemory = process.memoryUsage(); + + this.memoryCheckpointTask = 0; + this.memoryCheckpointHeapUsed = startupMemory.heapUsed; + this.logMemoryCheckpoint('startup', startupMemory, this.handledTasksCount); await super.start(); } @@ -174,6 +210,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = null; } + this.logMemoryCheckpoint('shutdown', process.memoryUsage(), this.handledTasksCount); await super.finish(); this.prepareCache(); await this.eventsDb.close(); @@ -195,6 +232,7 @@ export default class GrouperWorker extends Worker { } catch (error) { endTimer(); this.metricsErrorsTotal.inc(); + this.logMemoryCheckpoint('handle-error', process.memoryUsage(), this.handledTasksCount, `title="${task.payload?.title}"`); throw error; } } @@ -206,9 +244,15 @@ export default class GrouperWorker extends Worker { */ private async handleInternal(task: GroupWorkerTask): Promise { const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + const handledTasksCount = ++this.handledTasksCount; + const memoryBeforeHandle = process.memoryUsage(); this.metricsPayloadSize.observe(taskPayloadSize); + if (handledTasksCount === 1 || handledTasksCount % MEMORY_LOG_EVERY_TASKS === 0) { + this.logMemoryCheckpoint('before-handle', memoryBeforeHandle, handledTasksCount, `payloadSize=${taskPayloadSize}b`); + } + this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); let uniqueEventHash = await this.getUniqueEventHash(task); @@ -382,9 +426,21 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); - const mem = process.memoryUsage(); + const memoryAfterHandle = process.memoryUsage(); + const heapDeltaBytes = memoryAfterHandle.heapUsed - memoryBeforeHandle.heapUsed; + const heapDeltaMb = this.bytesToMegabytes(heapDeltaBytes); + + this.logger.info( + `[handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}` + ); + + if (heapDeltaBytes > MEMORY_HANDLE_GROWTH_WARN_MB * MB_IN_BYTES) { + this.logger.warn( + `[memory] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${taskPayloadSize}b title="${task.payload.title}" project=${task.projectId}` + ); + } - this.logger.info(`[handle] done, heapUsed=${Math.round(mem.heapUsed / 1024 / 1024)}MB heapTotal=${Math.round(mem.heapTotal / 1024 / 1024)}MB rss=${Math.round(mem.rss / 1024 / 1024)}MB`); + this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount); /** * Add task for NotifierWorker only if event is not ignored @@ -640,6 +696,56 @@ export default class GrouperWorker extends Worker { return [shouldIncrementRepetitionAffectedUsers, shouldIncrementDailyAffectedUsers]; } + /** + * Logs sustained heap growth over a configurable number of handled tasks. + */ + private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void { + const tasksInWindow = handledTasksCount - this.memoryCheckpointTask; + + if (tasksInWindow < MEMORY_GROWTH_WINDOW_TASKS) { + return; + } + + const heapGrowthBytes = memoryUsage.heapUsed - this.memoryCheckpointHeapUsed; + const heapGrowthMb = this.bytesToMegabytes(heapGrowthBytes); + + this.logger.info( + `[memory] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${this.bytesToMegabytes(memoryUsage.heapUsed)}MB` + ); + + if (heapGrowthBytes > MEMORY_GROWTH_WARN_MB * MB_IN_BYTES) { + this.logger.warn( + `[memory] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks` + ); + } + + this.memoryCheckpointTask = handledTasksCount; + this.memoryCheckpointHeapUsed = memoryUsage.heapUsed; + } + + /** + * Format memory usage for consistent logs. + */ + private formatMemoryUsage(memoryUsage: NodeJS.MemoryUsage): string { + return `heapUsed=${this.bytesToMegabytes(memoryUsage.heapUsed)}MB heapTotal=${this.bytesToMegabytes(memoryUsage.heapTotal)}MB rss=${this.bytesToMegabytes(memoryUsage.rss)}MB external=${this.bytesToMegabytes(memoryUsage.external)}MB arrayBuffers=${this.bytesToMegabytes(memoryUsage.arrayBuffers)}MB`; + } + + /** + * Writes one memory checkpoint record. + */ + private logMemoryCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, suffix = ''): void { + const extra = suffix ? ` ${suffix}` : ''; + + this.logger.info(`[memory] stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`); + } + + /** + * Convert bytes to megabytes with two fractional digits. + */ + private bytesToMegabytes(bytes: number): number { + return Math.round((bytes / MB_IN_BYTES) * HUNDRED) / HUNDRED; + } + /** * Returns finds event by query from project with passed ID * From 0d9f7198152849e9d6e564721c45e3d13d14f71c Mon Sep 17 00:00:00 2001 From: Kuchizu Date: Wed, 25 Feb 2026 15:14:26 +0300 Subject: [PATCH 5/5] fix(metrics): validate push interval, add push cleanup, and prevent retry double-counting in grouper --- lib/metrics.ts | 80 ++- runner.ts | 25 +- workers/grouper/src/index.ts | 492 ++++++------------ workers/grouper/src/metrics/config.ts | 42 ++ workers/grouper/src/metrics/grouperMetrics.ts | 165 ++++++ workers/grouper/src/metrics/memoryMonitor.ts | 171 ++++++ 6 files changed, 638 insertions(+), 337 deletions(-) create mode 100644 workers/grouper/src/metrics/config.ts create mode 100644 workers/grouper/src/metrics/grouperMetrics.ts create mode 100644 workers/grouper/src/metrics/memoryMonitor.ts diff --git a/lib/metrics.ts b/lib/metrics.ts index 380d78d7..9ba78584 100644 --- a/lib/metrics.ts +++ b/lib/metrics.ts @@ -1,39 +1,97 @@ import * as client from 'prom-client'; import os from 'os'; import { nanoid } from 'nanoid'; +import createLogger from './logger'; const register = new client.Registry(); +const logger = createLogger(); + +const DEFAULT_PUSH_INTERVAL_MS = 10_000; +const ID_SIZE = 5; +const METRICS_JOB_NAME = 'workers'; + +let pushInterval: NodeJS.Timeout | null = null; +let currentWorkerName = ''; client.collectDefaultMetrics({ register }); export { register, client }; /** - * Start periodic push to pushgateway + * Parse push interval from environment. + */ +function getPushIntervalMs(): number { + const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; + const parsedInterval = rawInterval === undefined + ? DEFAULT_PUSH_INTERVAL_MS + : Number(rawInterval); + + const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 + ? parsedInterval + : DEFAULT_PUSH_INTERVAL_MS; + + if (rawInterval !== undefined && interval !== parsedInterval) { + logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); + } + + return interval; +} + +/** + * Stop periodic push to pushgateway. + */ +export function stopMetricsPushing(): void { + if (!pushInterval) { + return; + } + + clearInterval(pushInterval); + pushInterval = null; + logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); + currentWorkerName = ''; +} + +/** + * Start periodic push to pushgateway. * - * @param workerName - name of the worker for grouping + * @param workerName - name of the worker for grouping. */ -export function startMetricsPushing(workerName: string): void { +export function startMetricsPushing(workerName: string): () => void { const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; - const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); if (!url) { - return; + return stopMetricsPushing; } + if (pushInterval) { + logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + + return stopMetricsPushing; + } + + const interval = getPushIntervalMs(); const hostname = os.hostname(); - const ID_SIZE = 5; const id = nanoid(ID_SIZE); - const gateway = new client.Pushgateway(url, [], register); - console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`); + currentWorkerName = workerName; + + logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); - setInterval(() => { - gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { + pushInterval = setInterval(() => { + gateway.pushAdd({ + jobName: METRICS_JOB_NAME, + groupings: { + worker: workerName, + host: hostname, + id, + }, + }, (err) => { if (err) { - console.error('Metrics push error:', err); + logger.error(`Metrics push error: ${err.message || err}`); } }); }, interval); + + return stopMetricsPushing; } diff --git a/runner.ts b/runner.ts index 9147b54b..c79ada24 100644 --- a/runner.ts +++ b/runner.ts @@ -41,9 +41,9 @@ class WorkerRunner { // private gateway?: promClient.Pushgateway; /** - * number returned by setInterval() of metrics push function + * Metrics push cleanup callback. */ - private pushIntervalNumber?: ReturnType; + private stopMetricsPushing?: () => void; /** * Create runner instance @@ -86,9 +86,21 @@ class WorkerRunner { return; } - this.workers.forEach((worker) => { - startMetricsPushing(worker.type.replace('/', '_')); - }); + if (this.workers.length === 0) { + return; + } + + const workerTypes = Array.from(new Set(this.workers.map((worker) => { + return worker.type.replace('/', '_'); + }))); + + const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; + + if (workerTypes.length > 1) { + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + } + + this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); } /** @@ -224,7 +236,8 @@ class WorkerRunner { private async stopWorker(worker: Worker): Promise { try { // stop pushing metrics - clearInterval(this.pushIntervalNumber); + this.stopMetricsPushing?.(); + this.stopMetricsPushing = undefined; await worker.finish(); console.log( diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 3507149f..76fc7a5b 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -25,18 +25,20 @@ import RedisHelper from './redisHelper'; import { computeDelta } from './utils/repetitionDiff'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; +import GrouperMetrics from './metrics/grouperMetrics'; +import GrouperMemoryMonitor from './metrics/memoryMonitor'; +import { grouperMemoryConfig } from './metrics/config'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ import { memoize } from '../../../lib/memoize'; -import { register, client } from '../../../lib/metrics'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ const MEMOIZATION_TTL = 600_000; /** @@ -54,26 +56,6 @@ const DB_DUPLICATE_KEY_ERROR = '11000'; */ const MAX_CODE_LINE_LENGTH = 140; -const MB_IN_BYTES = 1_048_576; -const HUNDRED = 100; -const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50; -const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; -const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; -const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; -// eslint-disable-next-line @typescript-eslint/no-magic-numbers -const METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; - -function asPositiveNumber(value: string | undefined, fallback: number): number { - const parsed = Number(value); - - return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; -} - -const MEMORY_LOG_EVERY_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS); -const MEMORY_GROWTH_WINDOW_TASKS = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS); -const MEMORY_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB); -const MEMORY_HANDLE_GROWTH_WARN_MB = asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB); - /** * Worker for handling Javascript events */ @@ -104,53 +86,14 @@ export default class GrouperWorker extends Worker { private redis = new RedisHelper(); /** - * Prometheus metrics + * Prometheus metrics facade. */ - private metricsEventsTotal = new client.Counter({ - name: 'hawk_grouper_events_total', - help: 'Total number of events processed by grouper', - labelNames: ['type'], - registers: [register], - }); - - private metricsHandleDuration = new client.Histogram({ - name: 'hawk_grouper_handle_duration_seconds', - help: 'Duration of handle() call in seconds', - registers: [register], - }); - - private metricsErrorsTotal = new client.Counter({ - name: 'hawk_grouper_errors_total', - help: 'Total number of errors during event processing', - registers: [register], - }); - - private metricsMongoDuration = new client.Histogram({ - name: 'hawk_grouper_mongo_duration_seconds', - help: 'Duration of MongoDB operations in seconds', - labelNames: ['operation'], - registers: [register], - }); - - private metricsDeltaSize = new client.Histogram({ - name: 'hawk_grouper_delta_size_bytes', - help: 'Size of computed repetition delta in bytes', - buckets: METRICS_SIZE_BUCKETS, - registers: [register], - }); - - private metricsPayloadSize = new client.Histogram({ - name: 'hawk_grouper_payload_size_bytes', - help: 'Size of incoming event payload in bytes', - buckets: METRICS_SIZE_BUCKETS, - registers: [register], - }); - - private metricsDuplicateRetries = new client.Counter({ - name: 'hawk_grouper_duplicate_retries_total', - help: 'Number of retries due to duplicate key errors', - registers: [register], - }); + private grouperMetrics = new GrouperMetrics(); + + /** + * Memory leak monitoring helper. + */ + private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig); /** * Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth @@ -162,12 +105,6 @@ export default class GrouperWorker extends Worker { */ private handledTasksCount = 0; - /** - * Baseline for memory growth checks. - */ - private memoryCheckpointTask = 0; - private memoryCheckpointHeapUsed = 0; - /** * Start consuming messages */ @@ -189,11 +126,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = setInterval(() => { this.clearCache(); }, CACHE_CLEANUP_INTERVAL_MINUTES * TimeMs.MINUTE); - const startupMemory = process.memoryUsage(); - - this.memoryCheckpointTask = 0; - this.memoryCheckpointHeapUsed = startupMemory.heapUsed; - this.logMemoryCheckpoint('startup', startupMemory, this.handledTasksCount); + this.memoryMonitor.initialize(this.handledTasksCount); await super.start(); } @@ -210,7 +143,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = null; } - this.logMemoryCheckpoint('shutdown', process.memoryUsage(), this.handledTasksCount); + this.memoryMonitor.logShutdown(this.handledTasksCount); await super.finish(); this.prepareCache(); await this.eventsDb.close(); @@ -224,15 +157,13 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { - const endTimer = this.metricsHandleDuration.startTimer(); - try { - await this.handleInternal(task); - endTimer(); + await this.grouperMetrics.observeHandleDuration(async () => { + await this.handleInternal(task); + }); } catch (error) { - endTimer(); - this.metricsErrorsTotal.inc(); - this.logMemoryCheckpoint('handle-error', process.memoryUsage(), this.handledTasksCount, `title="${task.payload?.title}"`); + this.grouperMetrics.incrementErrorsTotal(); + this.memoryMonitor.logHandleError(this.handledTasksCount, task.payload?.title); throw error; } } @@ -247,16 +178,11 @@ export default class GrouperWorker extends Worker { const handledTasksCount = ++this.handledTasksCount; const memoryBeforeHandle = process.memoryUsage(); - this.metricsPayloadSize.observe(taskPayloadSize); - - if (handledTasksCount === 1 || handledTasksCount % MEMORY_LOG_EVERY_TASKS === 0) { - this.logMemoryCheckpoint('before-handle', memoryBeforeHandle, handledTasksCount, `payloadSize=${taskPayloadSize}b`); - } + this.grouperMetrics.observePayloadSize(taskPayloadSize); + this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize); this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); - let uniqueEventHash = await this.getUniqueEventHash(task); - // FIX RELEASE TYPE // TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher if (task.payload && task.payload.release !== undefined) { @@ -266,44 +192,10 @@ export default class GrouperWorker extends Worker { }; } + let uniqueEventHash = ''; let existedEvent: GroupedEventDBScheme; - - /** - * Find similar events by grouping pattern - */ - const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); - - if (similarEvent) { - this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); - - /** - * Override group hash with found event's group hash - */ - uniqueEventHash = similarEvent.groupHash; - - existedEvent = similarEvent; - } else { - /** - * If we couldn't group by grouping pattern — try grouping by hash (title) - */ - /** - * Find event by group hash. - */ - existedEvent = await this.getEvent(task.projectId, uniqueEventHash); - } - - /** - * Event happened for the first time - */ - const isFirstOccurrence = !existedEvent && !similarEvent; - - /** - * Increment metrics counter - */ - this.metricsEventsTotal.inc({ type: isFirstOccurrence ? 'new' : 'repeated' }); - + let isFirstOccurrence = false; let repetitionId = null; - let incrementDailyAffectedUsers = false; /** @@ -316,51 +208,84 @@ export default class GrouperWorker extends Worker { */ this.dataFilter.processEvent(task.payload); - if (isFirstOccurrence) { - try { - const incrementAffectedUsers = !!task.payload.user; - - this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`); + while (true) { + uniqueEventHash = await this.getUniqueEventHash(task); - /** - * Insert new event - */ - await this.saveEvent(task.projectId, { - groupHash: uniqueEventHash, - totalCount: 1, - catcherType: task.catcherType, - payload: task.payload, - timestamp: task.timestamp, - usersAffected: incrementAffectedUsers ? 1 : 0, - } as GroupedEventDBScheme); + /** + * Find similar events by grouping pattern + */ + const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); - const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); + if (similarEvent) { + this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); /** - * If event is saved, then cached event state is no longer actual, so we should remove it + * Override group hash with found event's group hash */ - this.cache.del(eventCacheKey); + uniqueEventHash = similarEvent.groupHash; + existedEvent = similarEvent; + } else { /** - * Increment daily affected users for the first event + * If we couldn't group by grouping pattern — try grouping by hash (title) */ - incrementDailyAffectedUsers = incrementAffectedUsers; - } catch (e) { /** - * If we caught Database duplication error, then another worker thread has already saved it to the database - * and we need to process this event as repetition + * Find event by group hash. */ - if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { - this.metricsDuplicateRetries.inc(); - this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); - await this.handle(task); + existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + } + + /** + * Event happened for the first time + */ + isFirstOccurrence = !existedEvent && !similarEvent; + + if (isFirstOccurrence) { + try { + const incrementAffectedUsers = !!task.payload.user; + + this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`); + + /** + * Insert new event + */ + await this.saveEvent(task.projectId, { + groupHash: uniqueEventHash, + totalCount: 1, + catcherType: task.catcherType, + payload: task.payload, + timestamp: task.timestamp, + usersAffected: incrementAffectedUsers ? 1 : 0, + } as GroupedEventDBScheme); + + const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); + + /** + * If event is saved, then cached event state is no longer actual, so we should remove it + */ + this.cache.del(eventCacheKey); + + /** + * Increment daily affected users for the first event + */ + incrementDailyAffectedUsers = incrementAffectedUsers; + break; + } catch (e) { + /** + * If we caught Database duplication error, then another worker thread has already saved it to the database + * and we need to process this event as repetition + */ + if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { + this.grouperMetrics.incrementDuplicateRetriesTotal(); + this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); + + continue; + } - return; - } else { throw e; } } - } else { + const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; @@ -396,7 +321,7 @@ export default class GrouperWorker extends Worker { const deltaStr = JSON.stringify(delta); const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; - this.metricsDeltaSize.observe(deltaSize); + this.grouperMetrics.observeDeltaSize(deltaSize); this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`); @@ -413,8 +338,14 @@ export default class GrouperWorker extends Worker { * This prevents memory leaks from retaining full event objects after delta is computed */ delta = undefined; + break; } + /** + * Increment metrics counter once per handled task + */ + this.grouperMetrics.incrementEventsTotal(isFirstOccurrence ? 'new' : 'repeated'); + /** * Store events counter by days */ @@ -426,22 +357,14 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); - const memoryAfterHandle = process.memoryUsage(); - const heapDeltaBytes = memoryAfterHandle.heapUsed - memoryBeforeHandle.heapUsed; - const heapDeltaMb = this.bytesToMegabytes(heapDeltaBytes); - - this.logger.info( - `[handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}` + this.memoryMonitor.logHandleCompletion( + memoryBeforeHandle, + handledTasksCount, + taskPayloadSize, + task.payload.title, + task.projectId ); - if (heapDeltaBytes > MEMORY_HANDLE_GROWTH_WARN_MB * MB_IN_BYTES) { - this.logger.warn( - `[memory] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${taskPayloadSize}b title="${task.payload.title}" project=${task.projectId}` - ); - } - - this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount); - /** * Add task for NotifierWorker only if event is not ignored */ @@ -696,56 +619,6 @@ export default class GrouperWorker extends Worker { return [shouldIncrementRepetitionAffectedUsers, shouldIncrementDailyAffectedUsers]; } - /** - * Logs sustained heap growth over a configurable number of handled tasks. - */ - private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void { - const tasksInWindow = handledTasksCount - this.memoryCheckpointTask; - - if (tasksInWindow < MEMORY_GROWTH_WINDOW_TASKS) { - return; - } - - const heapGrowthBytes = memoryUsage.heapUsed - this.memoryCheckpointHeapUsed; - const heapGrowthMb = this.bytesToMegabytes(heapGrowthBytes); - - this.logger.info( - `[memory] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${this.bytesToMegabytes(memoryUsage.heapUsed)}MB` - ); - - if (heapGrowthBytes > MEMORY_GROWTH_WARN_MB * MB_IN_BYTES) { - this.logger.warn( - `[memory] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks` - ); - } - - this.memoryCheckpointTask = handledTasksCount; - this.memoryCheckpointHeapUsed = memoryUsage.heapUsed; - } - - /** - * Format memory usage for consistent logs. - */ - private formatMemoryUsage(memoryUsage: NodeJS.MemoryUsage): string { - return `heapUsed=${this.bytesToMegabytes(memoryUsage.heapUsed)}MB heapTotal=${this.bytesToMegabytes(memoryUsage.heapTotal)}MB rss=${this.bytesToMegabytes(memoryUsage.rss)}MB external=${this.bytesToMegabytes(memoryUsage.external)}MB arrayBuffers=${this.bytesToMegabytes(memoryUsage.arrayBuffers)}MB`; - } - - /** - * Writes one memory checkpoint record. - */ - private logMemoryCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, suffix = ''): void { - const extra = suffix ? ` ${suffix}` : ''; - - this.logger.info(`[memory] stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`); - } - - /** - * Convert bytes to megabytes with two fractional digits. - */ - private bytesToMegabytes(bytes: number): number { - return Math.round((bytes / MB_IN_BYTES) * HUNDRED) / HUNDRED; - } - /** * Returns finds event by query from project with passed ID * @@ -760,20 +633,16 @@ export default class GrouperWorker extends Worker { const eventCacheKey = await this.getEventCacheKey(projectId, groupHash); return this.cache.get(eventCacheKey, async () => { - const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' }); - - const result = await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .findOne({ - groupHash, - }) - .catch((err) => { - throw new DatabaseReadWriteError(err); - }); - - endTimer(); - - return result; + return this.grouperMetrics.observeMongoDuration('getEvent', async () => { + return this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .findOne({ + groupHash, + }) + .catch((err) => { + throw new DatabaseReadWriteError(err); + }); + }); }); } @@ -801,18 +670,14 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' }); - - const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); - - encodeUnsafeFields(groupedEventData); - - const result = (await collection - .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + return this.grouperMetrics.observeMongoDuration('saveEvent', async () => { + const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); - endTimer(); + encodeUnsafeFields(groupedEventData); - return result; + return (await collection + .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + }); } /** @@ -826,25 +691,20 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing'); } - const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveRepetition' }); - - try { - const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); - - encodeUnsafeFields(repetition); - - const result = (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + return this.grouperMetrics.observeMongoDuration('saveRepetition', async () => { + try { + const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); - endTimer(); + encodeUnsafeFields(repetition); - return result; - } catch (err) { - endTimer(); - throw new DatabaseReadWriteError(err, { - repetition: repetition as unknown as Record, - projectId, - }); - } + return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + } catch (err) { + throw new DatabaseReadWriteError(err, { + repetition: repetition as unknown as Record, + projectId, + }); + } + }); } /** @@ -859,33 +719,28 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - const endTimer = this.metricsMongoDuration.startTimer({ operation: 'incrementCounter' }); - - try { - const updateQuery = incrementAffected - ? { - $inc: { - totalCount: 1, - usersAffected: 1, - }, - } - : { - $inc: { - totalCount: 1, - }, - }; - - const result = (await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .updateOne(query, updateQuery)).modifiedCount; - - endTimer(); + return this.grouperMetrics.observeMongoDuration('incrementCounter', async () => { + try { + const updateQuery = incrementAffected + ? { + $inc: { + totalCount: 1, + usersAffected: 1, + }, + } + : { + $inc: { + totalCount: 1, + }, + }; - return result; - } catch (err) { - endTimer(); - throw new DatabaseReadWriteError(err); - } + return (await this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .updateOne(query, updateQuery)).modifiedCount; + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** @@ -909,37 +764,34 @@ export default class GrouperWorker extends Worker { throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed'); } - const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveDailyEvents' }); + await this.grouperMetrics.observeMongoDuration('saveDailyEvents', async () => { + try { + const midnight = this.getMidnightByEventTimestamp(eventTimestamp); - try { - const midnight = this.getMidnightByEventTimestamp(eventTimestamp); - - await this.eventsDb.getConnection() - .collection(`dailyEvents:${projectId}`) - .updateOne( - { - groupHash: eventHash, - groupingTimestamp: midnight, - }, - { - $set: { + await this.eventsDb.getConnection() + .collection(`dailyEvents:${projectId}`) + .updateOne( + { groupHash: eventHash, groupingTimestamp: midnight, - lastRepetitionTime: eventTimestamp, - lastRepetitionId: repetitionId, }, - $inc: { - count: 1, - affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + { + $set: { + groupHash: eventHash, + groupingTimestamp: midnight, + lastRepetitionTime: eventTimestamp, + lastRepetitionId: repetitionId, + }, + $inc: { + count: 1, + affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + }, }, - }, - { upsert: true }); - - endTimer(); - } catch (err) { - endTimer(); - throw new DatabaseReadWriteError(err); - } + { upsert: true }); + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** diff --git a/workers/grouper/src/metrics/config.ts b/workers/grouper/src/metrics/config.ts new file mode 100644 index 00000000..30fc5541 --- /dev/null +++ b/workers/grouper/src/metrics/config.ts @@ -0,0 +1,42 @@ +/** + * Parsed config for grouper memory monitoring. + */ +export interface GrouperMemoryConfig { + logEveryTasks: number; + growthWindowTasks: number; + growthWarnMb: number; + handleGrowthWarnMb: number; +} + +const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50; +const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; +const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; +const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; + +/** + * Histogram buckets for payload and delta sizes (bytes). + */ +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +export const GROUPER_METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; + +/** + * Parse positive numeric env value. + * + * @param value - env string value. + * @param fallback - default numeric fallback. + */ +function asPositiveNumber(value: string | undefined, fallback: number): number { + const parsed = Number(value); + + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +/** + * Memory monitoring config from environment. + */ +export const grouperMemoryConfig: GrouperMemoryConfig = { + logEveryTasks: asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS), + growthWindowTasks: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS), + growthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB), + handleGrowthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB), +}; diff --git a/workers/grouper/src/metrics/grouperMetrics.ts b/workers/grouper/src/metrics/grouperMetrics.ts new file mode 100644 index 00000000..4be19e87 --- /dev/null +++ b/workers/grouper/src/metrics/grouperMetrics.ts @@ -0,0 +1,165 @@ +import { client, register } from '../../../../lib/metrics'; +import { GROUPER_METRICS_SIZE_BUCKETS } from './config'; + +type EventType = 'new' | 'repeated'; +type MongoOperation = 'getEvent' | 'saveEvent' | 'saveRepetition' | 'incrementCounter' | 'saveDailyEvents'; + +/** + * Reuse already registered metric by name, or create one. + * + * @param name - metric name. + * @param createMetric - metric factory. + */ +function getOrCreateMetric(name: string, createMetric: () => MetricType): MetricType { + const existing = register.getSingleMetric(name); + + if (existing) { + return existing as unknown as MetricType; + } + + return createMetric(); +} + +/** + * Grouper-specific Prometheus metrics facade. + */ +export default class GrouperMetrics { + private readonly eventsTotal = getOrCreateMetric( + 'hawk_grouper_events_total', + () => new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: [ 'type' ], + registers: [ register ], + }) + ); + + private readonly handleDuration = getOrCreateMetric( + 'hawk_grouper_handle_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_handle_duration_seconds', + help: 'Duration of handle() call in seconds', + registers: [ register ], + }) + ); + + private readonly errorsTotal = getOrCreateMetric( + 'hawk_grouper_errors_total', + () => new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [ register ], + }) + ); + + private readonly mongoDuration = getOrCreateMetric( + 'hawk_grouper_mongo_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_mongo_duration_seconds', + help: 'Duration of MongoDB operations in seconds', + labelNames: [ 'operation' ], + registers: [ register ], + }) + ); + + private readonly deltaSize = getOrCreateMetric( + 'hawk_grouper_delta_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_delta_size_bytes', + help: 'Size of computed repetition delta in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly payloadSize = getOrCreateMetric( + 'hawk_grouper_payload_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_payload_size_bytes', + help: 'Size of incoming event payload in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly duplicateRetriesTotal = getOrCreateMetric( + 'hawk_grouper_duplicate_retries_total', + () => new client.Counter({ + name: 'hawk_grouper_duplicate_retries_total', + help: 'Number of retries due to duplicate key errors', + registers: [ register ], + }) + ); + + /** + * Measure top-level handle() duration. + * + * @param callback - callback to execute under timer. + */ + public async observeHandleDuration(callback: () => Promise): Promise { + const endTimer = this.handleDuration.startTimer(); + + try { + return await callback(); + } finally { + endTimer(); + } + } + + /** + * Increment events counter by event type. + * + * @param type - event type label. + */ + public incrementEventsTotal(type: EventType): void { + this.eventsTotal.inc({ type }); + } + + /** + * Increment total processing errors counter. + */ + public incrementErrorsTotal(): void { + this.errorsTotal.inc(); + } + + /** + * Observe incoming payload size. + * + * @param sizeBytes - payload size in bytes. + */ + public observePayloadSize(sizeBytes: number): void { + this.payloadSize.observe(sizeBytes); + } + + /** + * Observe computed delta size. + * + * @param sizeBytes - delta size in bytes. + */ + public observeDeltaSize(sizeBytes: number): void { + this.deltaSize.observe(sizeBytes); + } + + /** + * Increment retries caused by duplicate key races. + */ + public incrementDuplicateRetriesTotal(): void { + this.duplicateRetriesTotal.inc(); + } + + /** + * Measure Mongo operation duration. + * + * @param operation - mongodb operation label. + * @param callback - callback to execute under timer. + */ + public async observeMongoDuration(operation: MongoOperation, callback: () => Promise): Promise { + const endTimer = this.mongoDuration.startTimer({ operation }); + + try { + return await callback(); + } finally { + endTimer(); + } + } +} diff --git a/workers/grouper/src/metrics/memoryMonitor.ts b/workers/grouper/src/metrics/memoryMonitor.ts new file mode 100644 index 00000000..0d86e840 --- /dev/null +++ b/workers/grouper/src/metrics/memoryMonitor.ts @@ -0,0 +1,171 @@ +import type { GrouperMemoryConfig } from './config'; + +interface LoggerLike { + info(message: string): void; + warn(message: string): void; +} + +const ROUND_PRECISION = 100; +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const BYTES_IN_MEBIBYTE = 1024 * 1024; + +/** + * Handles memory checkpoints and leak-oriented logging for Grouper worker. + */ +export default class GrouperMemoryMonitor { + private readonly logger: LoggerLike; + private readonly config: GrouperMemoryConfig; + private memoryCheckpointTask = 0; + private memoryCheckpointHeapUsed = 0; + + /** + * @param logger - logger instance. + * @param config - memory monitor thresholds. + */ + constructor(logger: LoggerLike, config: GrouperMemoryConfig) { + this.logger = logger; + this.config = config; + } + + /** + * Initialize baseline memory state on worker startup. + * + * @param handledTasksCount - currently handled tasks count. + */ + public initialize(handledTasksCount: number): void { + const startupMemory = process.memoryUsage(); + + this.memoryCheckpointTask = 0; + this.memoryCheckpointHeapUsed = startupMemory.heapUsed; + this.logCheckpoint('startup', startupMemory, handledTasksCount); + } + + /** + * Log shutdown memory checkpoint. + * + * @param handledTasksCount - handled tasks count on shutdown. + */ + public logShutdown(handledTasksCount: number): void { + this.logCheckpoint('shutdown', process.memoryUsage(), handledTasksCount); + } + + /** + * Log memory checkpoint on handle() error. + * + * @param handledTasksCount - currently handled tasks count. + * @param title - event title if available. + */ + public logHandleError(handledTasksCount: number, title: string | undefined): void { + const suffix = title ? `title="${title}"` : ''; + + this.logCheckpoint('handle-error', process.memoryUsage(), handledTasksCount, suffix); + } + + /** + * Periodic memory checkpoint before handling task payload. + * + * @param memoryUsage - process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + */ + public logBeforeHandle(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, payloadSizeBytes: number): void { + if (handledTasksCount !== 1 && handledTasksCount % this.config.logEveryTasks !== 0) { + return; + } + + this.logCheckpoint('before-handle', memoryUsage, handledTasksCount, `payloadSize=${payloadSizeBytes}b`); + } + + /** + * Log handle() completion memory details and growth checks. + * + * @param memoryBeforeHandle - memory usage before handling. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + * @param title - event title. + * @param projectId - project id. + */ + public logHandleCompletion( + memoryBeforeHandle: NodeJS.MemoryUsage, + handledTasksCount: number, + payloadSizeBytes: number, + title: string, + projectId: string + ): void { + const memoryAfterHandle = process.memoryUsage(); + const heapDeltaBytes = memoryAfterHandle.heapUsed - memoryBeforeHandle.heapUsed; + const heapDeltaMb = Math.round((heapDeltaBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}` + ); + + if (heapDeltaBytes > this.config.handleGrowthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${payloadSizeBytes}b title="${title}" project=${projectId}` + ); + } + + this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount); + } + + /** + * Logs sustained heap growth over a configurable number of handled tasks. + * + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + */ + private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void { + const tasksInWindow = handledTasksCount - this.memoryCheckpointTask; + + if (tasksInWindow < this.config.growthWindowTasks) { + return; + } + + const heapGrowthBytes = memoryUsage.heapUsed - this.memoryCheckpointHeapUsed; + const heapGrowthMb = Math.round((heapGrowthBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapUsedNowMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[memory] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${heapUsedNowMb}MB` + ); + + if (heapGrowthBytes > this.config.growthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks` + ); + } + + this.memoryCheckpointTask = handledTasksCount; + this.memoryCheckpointHeapUsed = memoryUsage.heapUsed; + } + + /** + * Format memory usage for consistent logs. + * + * @param memoryUsage - current process memory usage. + */ + private formatMemoryUsage(memoryUsage: NodeJS.MemoryUsage): string { + const heapUsedMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapTotalMb = Math.round((memoryUsage.heapTotal / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const rssMb = Math.round((memoryUsage.rss / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const externalMb = Math.round((memoryUsage.external / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const arrayBuffersMb = Math.round((memoryUsage.arrayBuffers / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + return `heapUsed=${heapUsedMb}MB heapTotal=${heapTotalMb}MB rss=${rssMb}MB external=${externalMb}MB arrayBuffers=${arrayBuffersMb}MB`; + } + + /** + * Writes one memory checkpoint record. + * + * @param stage - lifecycle stage. + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param suffix - optional extra suffix. + */ + private logCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, suffix = ''): void { + const extra = suffix ? ` ${suffix}` : ''; + + this.logger.info(`[memory] stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`); + } +}