diff --git a/.env.sample b/.env.sample index f8e8b6ef..b45010f3 100644 --- a/.env.sample +++ b/.env.sample @@ -31,6 +31,12 @@ PROMETHEUS_PUSHGATEWAY_URL= # pushgateway push interval in ms PROMETHEUS_PUSHGATEWAY_INTERVAL=10000 +# Grouper memory log controls +GROUPER_MEMORY_LOG_EVERY_TASKS=50 +GROUPER_MEMORY_GROWTH_WINDOW_TASKS=200 +GROUPER_MEMORY_GROWTH_WARN_MB=64 +GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB=16 + # project token for error catching HAWK_CATCHER_TOKEN= @@ -40,4 +46,4 @@ HAWK_CATCHER_TOKEN= IS_NOTIFIER_WORKER_ENABLED=false ## Url for telegram notifications about workspace blocks and unblocks -TELEGRAM_LIMITER_CHAT_URL= \ No newline at end of file +TELEGRAM_LIMITER_CHAT_URL= diff --git a/lib/metrics.ts b/lib/metrics.ts new file mode 100644 index 00000000..9ba78584 --- /dev/null +++ b/lib/metrics.ts @@ -0,0 +1,97 @@ +import * as client from 'prom-client'; +import os from 'os'; +import { nanoid } from 'nanoid'; +import createLogger from './logger'; + +const register = new client.Registry(); +const logger = createLogger(); + +const DEFAULT_PUSH_INTERVAL_MS = 10_000; +const ID_SIZE = 5; +const METRICS_JOB_NAME = 'workers'; + +let pushInterval: NodeJS.Timeout | null = null; +let currentWorkerName = ''; + +client.collectDefaultMetrics({ register }); + +export { register, client }; + +/** + * Parse push interval from environment. + */ +function getPushIntervalMs(): number { + const rawInterval = process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL; + const parsedInterval = rawInterval === undefined + ? DEFAULT_PUSH_INTERVAL_MS + : Number(rawInterval); + + const interval = Number.isFinite(parsedInterval) && parsedInterval > 0 + ? parsedInterval + : DEFAULT_PUSH_INTERVAL_MS; + + if (rawInterval !== undefined && interval !== parsedInterval) { + logger.warn(`[metrics] invalid PROMETHEUS_PUSHGATEWAY_INTERVAL="${rawInterval}", fallback to ${DEFAULT_PUSH_INTERVAL_MS}ms`); + } + + return interval; +} + +/** + * Stop periodic push to pushgateway. + */ +export function stopMetricsPushing(): void { + if (!pushInterval) { + return; + } + + clearInterval(pushInterval); + pushInterval = null; + logger.info(`[metrics] stopped pushing metrics for worker=${currentWorkerName}`); + currentWorkerName = ''; +} + +/** + * Start periodic push to pushgateway. + * + * @param workerName - name of the worker for grouping. + */ +export function startMetricsPushing(workerName: string): () => void { + const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; + + if (!url) { + return stopMetricsPushing; + } + + if (pushInterval) { + logger.warn(`[metrics] pushing is already started for worker=${currentWorkerName}, skip duplicate start for worker=${workerName}`); + + return stopMetricsPushing; + } + + const interval = getPushIntervalMs(); + const hostname = os.hostname(); + const id = nanoid(ID_SIZE); + const gateway = new client.Pushgateway(url, [], register); + + currentWorkerName = workerName; + + logger.info(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id}, worker: ${workerName})`); + + pushInterval = setInterval(() => { + gateway.pushAdd({ + jobName: METRICS_JOB_NAME, + groupings: { + worker: workerName, + host: hostname, + id, + }, + }, (err) => { + if (err) { + logger.error(`Metrics push error: ${err.message || err}`); + } + }); + }, interval); + + return stopMetricsPushing; +} diff --git a/runner.ts b/runner.ts index 443beea8..c79ada24 100644 --- a/runner.ts +++ b/runner.ts @@ -9,6 +9,7 @@ import * as utils from './lib/utils'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; +import { startMetricsPushing } from './lib/metrics'; dotenv.config(); @@ -40,9 +41,9 @@ class WorkerRunner { // private gateway?: promClient.Pushgateway; /** - * number returned by setInterval() of metrics push function + * Metrics push cleanup callback. */ - private pushIntervalNumber?: ReturnType; + private stopMetricsPushing?: () => void; /** * Create runner instance @@ -57,19 +58,17 @@ class WorkerRunner { .then((workerConstructors) => { this.constructWorkers(workerConstructors); }) - // .then(() => { - // try { - // this.startMetrics(); - // } catch (e) { - // HawkCatcher.send(e); - // console.error(`Metrics not started: ${e}`); - // } - // - // return Promise.resolve(); - // }) .then(() => { return this.startWorkers(); }) + .then(() => { + try { + this.startMetrics(); + } catch (e) { + HawkCatcher.send(e); + console.error(`Metrics not started: ${e}`); + } + }) .then(() => { this.observeProcess(); }) @@ -82,67 +81,27 @@ class WorkerRunner { /** * Run metrics exporter */ - // private startMetrics(): void { - // if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { - // return; - // } - // - // const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL); - // - // if (isNaN(PUSH_INTERVAL)) { - // throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set'); - // } - // - // const collectDefaultMetrics = promClient.collectDefaultMetrics; - // const Registry = promClient.Registry; - // - // const register = new Registry(); - // const startGcStats = gcStats(register); - // - // const hostname = os.hostname(); - // - // const ID_SIZE = 5; - // const id = nanoid(ID_SIZE); - // - // // eslint-disable-next-line node/no-deprecated-api - // const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host; - // - // // Initialize metrics for workers - // this.workers.forEach((worker) => { - // // worker.initMetrics(); - // worker.getMetrics().forEach((metric: promClient.Counter) => register.registerMetric(metric)); - // }); - // - // collectDefaultMetrics({ register }); - // startGcStats(); - // - // this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register); - // - // console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`); - // - // // Pushing metrics to the pushgateway every PUSH_INTERVAL - // this.pushIntervalNumber = setInterval(() => { - // this.workers.forEach((worker) => { - // if (!this.gateway || !instance) { - // return; - // } - // // Use pushAdd not to overwrite previous metrics - // this.gateway.pushAdd({ - // jobName: 'workers', - // groupings: { - // worker: worker.type.replace('/', '_'), - // host: hostname, - // id, - // }, - // }, (err?: Error) => { - // if (err) { - // HawkCatcher.send(err); - // console.log(`Error of pushing metrics to gateway: ${err}`); - // } - // }); - // }); - // }, PUSH_INTERVAL); - // } + private startMetrics(): void { + if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + return; + } + + if (this.workers.length === 0) { + return; + } + + const workerTypes = Array.from(new Set(this.workers.map((worker) => { + return worker.type.replace('/', '_'); + }))); + + const workerTypeForMetrics = workerTypes.length === 1 ? workerTypes[0] : 'multi_worker_process'; + + if (workerTypes.length > 1) { + console.warn(`[metrics] ${workerTypes.length} workers are running in one process; pushing metrics as "${workerTypeForMetrics}" to avoid duplicated attribution`); + } + + this.stopMetricsPushing = startMetricsPushing(workerTypeForMetrics); + } /** * Dynamically loads workers through the yarn workspaces @@ -277,7 +236,8 @@ class WorkerRunner { private async stopWorker(worker: Worker): Promise { try { // stop pushing metrics - clearInterval(this.pushIntervalNumber); + this.stopMetricsPushing?.(); + this.stopMetricsPushing = undefined; await worker.finish(); console.log( diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 52c71ca8..76fc7a5b 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -25,17 +25,20 @@ import RedisHelper from './redisHelper'; import { computeDelta } from './utils/repetitionDiff'; import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; +import GrouperMetrics from './metrics/grouperMetrics'; +import GrouperMemoryMonitor from './metrics/memoryMonitor'; +import { grouperMemoryConfig } from './metrics/config'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ import { memoize } from '../../../lib/memoize'; /** * eslint does not count decorators as a variable usage */ -/* eslint-disable-next-line no-unused-vars */ +/* eslint-disable-next-line no-unused-vars, @typescript-eslint/no-unused-vars */ const MEMOIZATION_TTL = 600_000; /** @@ -82,11 +85,26 @@ export default class GrouperWorker extends Worker { */ private redis = new RedisHelper(); + /** + * Prometheus metrics facade. + */ + private grouperMetrics = new GrouperMetrics(); + + /** + * Memory leak monitoring helper. + */ + private memoryMonitor = new GrouperMemoryMonitor(this.logger, grouperMemoryConfig); + /** * Interval for periodic cache cleanup to prevent memory leaks from unbounded cache growth */ private cacheCleanupInterval: NodeJS.Timeout | null = null; + /** + * Number of handled tasks in current worker process. + */ + private handledTasksCount = 0; + /** * Start consuming messages */ @@ -108,6 +126,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = setInterval(() => { this.clearCache(); }, CACHE_CLEANUP_INTERVAL_MINUTES * TimeMs.MINUTE); + this.memoryMonitor.initialize(this.handledTasksCount); await super.start(); } @@ -124,6 +143,7 @@ export default class GrouperWorker extends Worker { this.cacheCleanupInterval = null; } + this.memoryMonitor.logShutdown(this.handledTasksCount); await super.finish(); this.prepareCache(); await this.eventsDb.close(); @@ -137,7 +157,31 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { - let uniqueEventHash = await this.getUniqueEventHash(task); + try { + await this.grouperMetrics.observeHandleDuration(async () => { + await this.handleInternal(task); + }); + } catch (error) { + this.grouperMetrics.incrementErrorsTotal(); + this.memoryMonitor.logHandleError(this.handledTasksCount, task.payload?.title); + throw error; + } + } + + /** + * Internal task handling function + * + * @param task - event to handle + */ + private async handleInternal(task: GroupWorkerTask): Promise { + const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + const handledTasksCount = ++this.handledTasksCount; + const memoryBeforeHandle = process.memoryUsage(); + + this.grouperMetrics.observePayloadSize(taskPayloadSize); + this.memoryMonitor.logBeforeHandle(memoryBeforeHandle, handledTasksCount, taskPayloadSize); + + this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); // FIX RELEASE TYPE // TODO: REMOVE AFTER 01.01.2026, after the most of the users update to new js catcher @@ -148,39 +192,10 @@ export default class GrouperWorker extends Worker { }; } + let uniqueEventHash = ''; let existedEvent: GroupedEventDBScheme; - - /** - * Find similar events by grouping pattern - */ - const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); - - if (similarEvent) { - this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); - - /** - * Override group hash with found event's group hash - */ - uniqueEventHash = similarEvent.groupHash; - - existedEvent = similarEvent; - } else { - /** - * If we couldn't group by grouping pattern — try grouping by hash (title) - */ - /** - * Find event by group hash. - */ - existedEvent = await this.getEvent(task.projectId, uniqueEventHash); - } - - /** - * Event happened for the first time - */ - const isFirstOccurrence = !existedEvent && !similarEvent; - + let isFirstOccurrence = false; let repetitionId = null; - let incrementDailyAffectedUsers = false; /** @@ -193,47 +208,84 @@ export default class GrouperWorker extends Worker { */ this.dataFilter.processEvent(task.payload); - if (isFirstOccurrence) { - try { - const incrementAffectedUsers = !!task.payload.user; + while (true) { + uniqueEventHash = await this.getUniqueEventHash(task); - /** - * Insert new event - */ - await this.saveEvent(task.projectId, { - groupHash: uniqueEventHash, - totalCount: 1, - catcherType: task.catcherType, - payload: task.payload, - timestamp: task.timestamp, - usersAffected: incrementAffectedUsers ? 1 : 0, - } as GroupedEventDBScheme); + /** + * Find similar events by grouping pattern + */ + const similarEvent = await this.findSimilarEvent(task.projectId, task.payload.title); - const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); + if (similarEvent) { + this.logger.info(`[handle] similar event found, groupHash=${similarEvent.groupHash} totalCount=${similarEvent.totalCount}`); /** - * If event is saved, then cached event state is no longer actual, so we should remove it + * Override group hash with found event's group hash */ - this.cache.del(eventCacheKey); + uniqueEventHash = similarEvent.groupHash; + existedEvent = similarEvent; + } else { /** - * Increment daily affected users for the first event + * If we couldn't group by grouping pattern — try grouping by hash (title) */ - incrementDailyAffectedUsers = incrementAffectedUsers; - } catch (e) { /** - * If we caught Database duplication error, then another worker thread has already saved it to the database - * and we need to process this event as repetition + * Find event by group hash. */ - if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { - await this.handle(task); + existedEvent = await this.getEvent(task.projectId, uniqueEventHash); + } + + /** + * Event happened for the first time + */ + isFirstOccurrence = !existedEvent && !similarEvent; + + if (isFirstOccurrence) { + try { + const incrementAffectedUsers = !!task.payload.user; + + this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`); + + /** + * Insert new event + */ + await this.saveEvent(task.projectId, { + groupHash: uniqueEventHash, + totalCount: 1, + catcherType: task.catcherType, + payload: task.payload, + timestamp: task.timestamp, + usersAffected: incrementAffectedUsers ? 1 : 0, + } as GroupedEventDBScheme); + + const eventCacheKey = await this.getEventCacheKey(task.projectId, uniqueEventHash); + + /** + * If event is saved, then cached event state is no longer actual, so we should remove it + */ + this.cache.del(eventCacheKey); + + /** + * Increment daily affected users for the first event + */ + incrementDailyAffectedUsers = incrementAffectedUsers; + break; + } catch (e) { + /** + * If we caught Database duplication error, then another worker thread has already saved it to the database + * and we need to process this event as repetition + */ + if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { + this.grouperMetrics.incrementDuplicateRetriesTotal(); + this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); + + continue; + } - return; - } else { throw e; } } - } else { + const [incrementAffectedUsers, shouldIncrementDailyAffectedUsers] = await this.shouldIncrementAffectedUsers(task, existedEvent); incrementDailyAffectedUsers = shouldIncrementDailyAffectedUsers; @@ -252,6 +304,10 @@ export default class GrouperWorker extends Worker { let delta: RepetitionDelta; + const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload)); + + this.logger.info(`[computeDelta] existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`); + try { /** * Calculate delta between original event and repetition @@ -262,9 +318,16 @@ export default class GrouperWorker extends Worker { throw new DiffCalculationError(e, existedEvent.payload, task.payload); } + const deltaStr = JSON.stringify(delta); + const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; + + this.grouperMetrics.observeDeltaSize(deltaSize); + + this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`); + const newRepetition = { groupHash: uniqueEventHash, - delta: JSON.stringify(delta), + delta: deltaStr, timestamp: task.timestamp, } as RepetitionDBScheme; @@ -275,8 +338,14 @@ export default class GrouperWorker extends Worker { * This prevents memory leaks from retaining full event objects after delta is computed */ delta = undefined; + break; } + /** + * Increment metrics counter once per handled task + */ + this.grouperMetrics.incrementEventsTotal(isFirstOccurrence ? 'new' : 'repeated'); + /** * Store events counter by days */ @@ -288,6 +357,14 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); + this.memoryMonitor.logHandleCompletion( + memoryBeforeHandle, + handledTasksCount, + taskPayloadSize, + task.payload.title, + task.projectId + ); + /** * Add task for NotifierWorker only if event is not ignored */ @@ -556,14 +633,16 @@ export default class GrouperWorker extends Worker { const eventCacheKey = await this.getEventCacheKey(projectId, groupHash); return this.cache.get(eventCacheKey, async () => { - return this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .findOne({ - groupHash, - }) - .catch((err) => { - throw new DatabaseReadWriteError(err); - }); + return this.grouperMetrics.observeMongoDuration('getEvent', async () => { + return this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .findOne({ + groupHash, + }) + .catch((err) => { + throw new DatabaseReadWriteError(err); + }); + }); }); } @@ -591,12 +670,14 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); + return this.grouperMetrics.observeMongoDuration('saveEvent', async () => { + const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); - encodeUnsafeFields(groupedEventData); + encodeUnsafeFields(groupedEventData); - return (await collection - .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + return (await collection + .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + }); } /** @@ -610,18 +691,20 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing'); } - try { - const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); + return this.grouperMetrics.observeMongoDuration('saveRepetition', async () => { + try { + const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); - encodeUnsafeFields(repetition); + encodeUnsafeFields(repetition); - return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; - } catch (err) { - throw new DatabaseReadWriteError(err, { - repetition: repetition as unknown as Record, - projectId, - }); - } + return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + } catch (err) { + throw new DatabaseReadWriteError(err, { + repetition: repetition as unknown as Record, + projectId, + }); + } + }); } /** @@ -636,26 +719,28 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } - try { - const updateQuery = incrementAffected - ? { - $inc: { - totalCount: 1, - usersAffected: 1, - }, - } - : { - $inc: { - totalCount: 1, - }, - }; + return this.grouperMetrics.observeMongoDuration('incrementCounter', async () => { + try { + const updateQuery = incrementAffected + ? { + $inc: { + totalCount: 1, + usersAffected: 1, + }, + } + : { + $inc: { + totalCount: 1, + }, + }; - return (await this.eventsDb.getConnection() - .collection(`events:${projectId}`) - .updateOne(query, updateQuery)).modifiedCount; - } catch (err) { - throw new DatabaseReadWriteError(err); - } + return (await this.eventsDb.getConnection() + .collection(`events:${projectId}`) + .updateOne(query, updateQuery)).modifiedCount; + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** @@ -679,32 +764,34 @@ export default class GrouperWorker extends Worker { throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed'); } - try { - const midnight = this.getMidnightByEventTimestamp(eventTimestamp); - - await this.eventsDb.getConnection() - .collection(`dailyEvents:${projectId}`) - .updateOne( - { - groupHash: eventHash, - groupingTimestamp: midnight, - }, - { - $set: { + await this.grouperMetrics.observeMongoDuration('saveDailyEvents', async () => { + try { + const midnight = this.getMidnightByEventTimestamp(eventTimestamp); + + await this.eventsDb.getConnection() + .collection(`dailyEvents:${projectId}`) + .updateOne( + { groupHash: eventHash, groupingTimestamp: midnight, - lastRepetitionTime: eventTimestamp, - lastRepetitionId: repetitionId, }, - $inc: { - count: 1, - affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + { + $set: { + groupHash: eventHash, + groupingTimestamp: midnight, + lastRepetitionTime: eventTimestamp, + lastRepetitionId: repetitionId, + }, + $inc: { + count: 1, + affectedUsers: shouldIncrementAffectedUsers ? 1 : 0, + }, }, - }, - { upsert: true }); - } catch (err) { - throw new DatabaseReadWriteError(err); - } + { upsert: true }); + } catch (err) { + throw new DatabaseReadWriteError(err); + } + }); } /** diff --git a/workers/grouper/src/metrics/config.ts b/workers/grouper/src/metrics/config.ts new file mode 100644 index 00000000..30fc5541 --- /dev/null +++ b/workers/grouper/src/metrics/config.ts @@ -0,0 +1,42 @@ +/** + * Parsed config for grouper memory monitoring. + */ +export interface GrouperMemoryConfig { + logEveryTasks: number; + growthWindowTasks: number; + growthWarnMb: number; + handleGrowthWarnMb: number; +} + +const DEFAULT_MEMORY_LOG_EVERY_TASKS = 50; +const DEFAULT_MEMORY_GROWTH_WINDOW_TASKS = 200; +const DEFAULT_MEMORY_GROWTH_WARN_MB = 64; +const DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB = 16; + +/** + * Histogram buckets for payload and delta sizes (bytes). + */ +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +export const GROUPER_METRICS_SIZE_BUCKETS = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000]; + +/** + * Parse positive numeric env value. + * + * @param value - env string value. + * @param fallback - default numeric fallback. + */ +function asPositiveNumber(value: string | undefined, fallback: number): number { + const parsed = Number(value); + + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback; +} + +/** + * Memory monitoring config from environment. + */ +export const grouperMemoryConfig: GrouperMemoryConfig = { + logEveryTasks: asPositiveNumber(process.env.GROUPER_MEMORY_LOG_EVERY_TASKS, DEFAULT_MEMORY_LOG_EVERY_TASKS), + growthWindowTasks: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WINDOW_TASKS, DEFAULT_MEMORY_GROWTH_WINDOW_TASKS), + growthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_GROWTH_WARN_MB, DEFAULT_MEMORY_GROWTH_WARN_MB), + handleGrowthWarnMb: asPositiveNumber(process.env.GROUPER_MEMORY_HANDLE_GROWTH_WARN_MB, DEFAULT_MEMORY_HANDLE_GROWTH_WARN_MB), +}; diff --git a/workers/grouper/src/metrics/grouperMetrics.ts b/workers/grouper/src/metrics/grouperMetrics.ts new file mode 100644 index 00000000..4be19e87 --- /dev/null +++ b/workers/grouper/src/metrics/grouperMetrics.ts @@ -0,0 +1,165 @@ +import { client, register } from '../../../../lib/metrics'; +import { GROUPER_METRICS_SIZE_BUCKETS } from './config'; + +type EventType = 'new' | 'repeated'; +type MongoOperation = 'getEvent' | 'saveEvent' | 'saveRepetition' | 'incrementCounter' | 'saveDailyEvents'; + +/** + * Reuse already registered metric by name, or create one. + * + * @param name - metric name. + * @param createMetric - metric factory. + */ +function getOrCreateMetric(name: string, createMetric: () => MetricType): MetricType { + const existing = register.getSingleMetric(name); + + if (existing) { + return existing as unknown as MetricType; + } + + return createMetric(); +} + +/** + * Grouper-specific Prometheus metrics facade. + */ +export default class GrouperMetrics { + private readonly eventsTotal = getOrCreateMetric( + 'hawk_grouper_events_total', + () => new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: [ 'type' ], + registers: [ register ], + }) + ); + + private readonly handleDuration = getOrCreateMetric( + 'hawk_grouper_handle_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_handle_duration_seconds', + help: 'Duration of handle() call in seconds', + registers: [ register ], + }) + ); + + private readonly errorsTotal = getOrCreateMetric( + 'hawk_grouper_errors_total', + () => new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [ register ], + }) + ); + + private readonly mongoDuration = getOrCreateMetric( + 'hawk_grouper_mongo_duration_seconds', + () => new client.Histogram({ + name: 'hawk_grouper_mongo_duration_seconds', + help: 'Duration of MongoDB operations in seconds', + labelNames: [ 'operation' ], + registers: [ register ], + }) + ); + + private readonly deltaSize = getOrCreateMetric( + 'hawk_grouper_delta_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_delta_size_bytes', + help: 'Size of computed repetition delta in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly payloadSize = getOrCreateMetric( + 'hawk_grouper_payload_size_bytes', + () => new client.Histogram({ + name: 'hawk_grouper_payload_size_bytes', + help: 'Size of incoming event payload in bytes', + buckets: GROUPER_METRICS_SIZE_BUCKETS, + registers: [ register ], + }) + ); + + private readonly duplicateRetriesTotal = getOrCreateMetric( + 'hawk_grouper_duplicate_retries_total', + () => new client.Counter({ + name: 'hawk_grouper_duplicate_retries_total', + help: 'Number of retries due to duplicate key errors', + registers: [ register ], + }) + ); + + /** + * Measure top-level handle() duration. + * + * @param callback - callback to execute under timer. + */ + public async observeHandleDuration(callback: () => Promise): Promise { + const endTimer = this.handleDuration.startTimer(); + + try { + return await callback(); + } finally { + endTimer(); + } + } + + /** + * Increment events counter by event type. + * + * @param type - event type label. + */ + public incrementEventsTotal(type: EventType): void { + this.eventsTotal.inc({ type }); + } + + /** + * Increment total processing errors counter. + */ + public incrementErrorsTotal(): void { + this.errorsTotal.inc(); + } + + /** + * Observe incoming payload size. + * + * @param sizeBytes - payload size in bytes. + */ + public observePayloadSize(sizeBytes: number): void { + this.payloadSize.observe(sizeBytes); + } + + /** + * Observe computed delta size. + * + * @param sizeBytes - delta size in bytes. + */ + public observeDeltaSize(sizeBytes: number): void { + this.deltaSize.observe(sizeBytes); + } + + /** + * Increment retries caused by duplicate key races. + */ + public incrementDuplicateRetriesTotal(): void { + this.duplicateRetriesTotal.inc(); + } + + /** + * Measure Mongo operation duration. + * + * @param operation - mongodb operation label. + * @param callback - callback to execute under timer. + */ + public async observeMongoDuration(operation: MongoOperation, callback: () => Promise): Promise { + const endTimer = this.mongoDuration.startTimer({ operation }); + + try { + return await callback(); + } finally { + endTimer(); + } + } +} diff --git a/workers/grouper/src/metrics/memoryMonitor.ts b/workers/grouper/src/metrics/memoryMonitor.ts new file mode 100644 index 00000000..0d86e840 --- /dev/null +++ b/workers/grouper/src/metrics/memoryMonitor.ts @@ -0,0 +1,171 @@ +import type { GrouperMemoryConfig } from './config'; + +interface LoggerLike { + info(message: string): void; + warn(message: string): void; +} + +const ROUND_PRECISION = 100; +// eslint-disable-next-line @typescript-eslint/no-magic-numbers +const BYTES_IN_MEBIBYTE = 1024 * 1024; + +/** + * Handles memory checkpoints and leak-oriented logging for Grouper worker. + */ +export default class GrouperMemoryMonitor { + private readonly logger: LoggerLike; + private readonly config: GrouperMemoryConfig; + private memoryCheckpointTask = 0; + private memoryCheckpointHeapUsed = 0; + + /** + * @param logger - logger instance. + * @param config - memory monitor thresholds. + */ + constructor(logger: LoggerLike, config: GrouperMemoryConfig) { + this.logger = logger; + this.config = config; + } + + /** + * Initialize baseline memory state on worker startup. + * + * @param handledTasksCount - currently handled tasks count. + */ + public initialize(handledTasksCount: number): void { + const startupMemory = process.memoryUsage(); + + this.memoryCheckpointTask = 0; + this.memoryCheckpointHeapUsed = startupMemory.heapUsed; + this.logCheckpoint('startup', startupMemory, handledTasksCount); + } + + /** + * Log shutdown memory checkpoint. + * + * @param handledTasksCount - handled tasks count on shutdown. + */ + public logShutdown(handledTasksCount: number): void { + this.logCheckpoint('shutdown', process.memoryUsage(), handledTasksCount); + } + + /** + * Log memory checkpoint on handle() error. + * + * @param handledTasksCount - currently handled tasks count. + * @param title - event title if available. + */ + public logHandleError(handledTasksCount: number, title: string | undefined): void { + const suffix = title ? `title="${title}"` : ''; + + this.logCheckpoint('handle-error', process.memoryUsage(), handledTasksCount, suffix); + } + + /** + * Periodic memory checkpoint before handling task payload. + * + * @param memoryUsage - process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + */ + public logBeforeHandle(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, payloadSizeBytes: number): void { + if (handledTasksCount !== 1 && handledTasksCount % this.config.logEveryTasks !== 0) { + return; + } + + this.logCheckpoint('before-handle', memoryUsage, handledTasksCount, `payloadSize=${payloadSizeBytes}b`); + } + + /** + * Log handle() completion memory details and growth checks. + * + * @param memoryBeforeHandle - memory usage before handling. + * @param handledTasksCount - currently handled tasks count. + * @param payloadSizeBytes - task payload size. + * @param title - event title. + * @param projectId - project id. + */ + public logHandleCompletion( + memoryBeforeHandle: NodeJS.MemoryUsage, + handledTasksCount: number, + payloadSizeBytes: number, + title: string, + projectId: string + ): void { + const memoryAfterHandle = process.memoryUsage(); + const heapDeltaBytes = memoryAfterHandle.heapUsed - memoryBeforeHandle.heapUsed; + const heapDeltaMb = Math.round((heapDeltaBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[handle] done, ${this.formatMemoryUsage(memoryAfterHandle)} heapDelta=${heapDeltaMb}MB handled=${handledTasksCount}` + ); + + if (heapDeltaBytes > this.config.handleGrowthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory] high heap growth in single handle: heapDelta=${heapDeltaMb}MB payloadSize=${payloadSizeBytes}b title="${title}" project=${projectId}` + ); + } + + this.checkMemoryGrowthWindow(memoryAfterHandle, handledTasksCount); + } + + /** + * Logs sustained heap growth over a configurable number of handled tasks. + * + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + */ + private checkMemoryGrowthWindow(memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number): void { + const tasksInWindow = handledTasksCount - this.memoryCheckpointTask; + + if (tasksInWindow < this.config.growthWindowTasks) { + return; + } + + const heapGrowthBytes = memoryUsage.heapUsed - this.memoryCheckpointHeapUsed; + const heapGrowthMb = Math.round((heapGrowthBytes / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapUsedNowMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + this.logger.info( + `[memory] growth window tasks=${tasksInWindow} handled=${this.memoryCheckpointTask + 1}-${handledTasksCount} heapGrowth=${heapGrowthMb}MB heapUsedNow=${heapUsedNowMb}MB` + ); + + if (heapGrowthBytes > this.config.growthWarnMb * BYTES_IN_MEBIBYTE) { + this.logger.warn( + `[memory] possible leak detected: heap grew by ${heapGrowthMb}MB in ${tasksInWindow} handled tasks` + ); + } + + this.memoryCheckpointTask = handledTasksCount; + this.memoryCheckpointHeapUsed = memoryUsage.heapUsed; + } + + /** + * Format memory usage for consistent logs. + * + * @param memoryUsage - current process memory usage. + */ + private formatMemoryUsage(memoryUsage: NodeJS.MemoryUsage): string { + const heapUsedMb = Math.round((memoryUsage.heapUsed / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const heapTotalMb = Math.round((memoryUsage.heapTotal / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const rssMb = Math.round((memoryUsage.rss / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const externalMb = Math.round((memoryUsage.external / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + const arrayBuffersMb = Math.round((memoryUsage.arrayBuffers / BYTES_IN_MEBIBYTE) * ROUND_PRECISION) / ROUND_PRECISION; + + return `heapUsed=${heapUsedMb}MB heapTotal=${heapTotalMb}MB rss=${rssMb}MB external=${externalMb}MB arrayBuffers=${arrayBuffersMb}MB`; + } + + /** + * Writes one memory checkpoint record. + * + * @param stage - lifecycle stage. + * @param memoryUsage - current process memory usage. + * @param handledTasksCount - currently handled tasks count. + * @param suffix - optional extra suffix. + */ + private logCheckpoint(stage: string, memoryUsage: NodeJS.MemoryUsage, handledTasksCount: number, suffix = ''): void { + const extra = suffix ? ` ${suffix}` : ''; + + this.logger.info(`[memory] stage=${stage} handled=${handledTasksCount} ${this.formatMemoryUsage(memoryUsage)}${extra}`); + } +}