From 79c9acd074f5982a889aad927f8604d8f3f8ac61 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 10 Feb 2026 17:17:34 +0100 Subject: [PATCH 1/8] feat: schedule structure --- pnpm-lock.yaml | 3 + .../README.md | 73 ++++++++++++ .../package.json | 3 +- .../src/activities.ts | 4 +- .../src/activities/activities.ts | 108 +++++++++++++++++- .../src/main.ts | 2 +- .../schedules/scheduleProjectsDiscovery.ts | 9 +- .../ossf-criticality-score/bucketClient.ts | 86 ++++++++++++++ .../sources/ossf-criticality-score/source.ts | 75 ++++++++++++ .../src/sources/registry.ts | 19 +++ .../src/sources/types.ts | 21 ++++ .../src/workflows/discoverProjects.ts | 50 +++++++- 12 files changed, 438 insertions(+), 15 deletions(-) create mode 100644 services/apps/automatic_projects_discovery_worker/README.md create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/registry.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/types.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1fce3b1f03..32e0ea48b0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -557,6 +557,9 @@ importers: '@temporalio/workflow': specifier: ~1.11.8 version: 1.11.8 + csv-parse: + specifier: ^5.5.6 + version: 5.5.6 tsx: specifier: ^4.7.1 version: 4.7.3 diff --git a/services/apps/automatic_projects_discovery_worker/README.md b/services/apps/automatic_projects_discovery_worker/README.md new file mode 100644 index 0000000000..85162f9974 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/README.md @@ -0,0 +1,73 @@ +# Automatic Projects Discovery Worker + +Temporal worker that discovers open-source projects from external data sources and writes them to the `projectCatalog` table. + +## Architecture + +### Source abstraction + +Every data source implements the `IDiscoverySource` interface (`src/sources/types.ts`): + +| Method | Purpose | +|--------|---------| +| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first | +| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) | +| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip | + +Sources are registered in `src/sources/registry.ts` as a simple name → factory map. + +**To add a new source:** create a class implementing `IDiscoverySource`, then add one line to the registry. + +### Current sources + +| Name | Folder | Description | +|------|--------|-------------| +| `ossf-criticality-score` | `src/sources/ossf-criticality-score/` | OSSF Criticality Score snapshots from a public GCS bucket (~750K repos per snapshot) | + +### Workflow + +``` +discoverProjects({ mode: 'incremental' | 'full' }) + │ + ├─ Activity: listDatasets(sourceName) + │ → returns dataset descriptors sorted newest-first + │ + ├─ Selection: incremental → latest only, full → all datasets + │ + └─ For each dataset: + └─ Activity: processDataset(sourceName, dataset) + → HTTP stream → csv-parse → batches of 5000 → bulkUpsertProjectCatalog +``` + +### Timeouts + +| Activity | startToCloseTimeout | retries | +|----------|-------------------|---------| +| `listDatasets` | 2 min | 3 | +| `processDataset` | 30 min | 3 | +| Workflow execution | 2 hours | 3 | + +### Schedule + +Runs daily via Temporal cron. The cron expression can be overridden with the `CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON` env var. + +## File structure + +``` +src/ +├── main.ts # Service bootstrap (postgres enabled) +├── activities.ts # Barrel re-export +├── workflows.ts # Barrel re-export +├── activities/ +│ └── activities.ts # listDatasets, processDataset +├── workflows/ +│ └── discoverProjects.ts # Orchestration with mode selection +├── schedules/ +│ └── scheduleProjectsDiscovery.ts # Temporal cron schedule +└── sources/ + ├── types.ts # IDiscoverySource, IDatasetDescriptor + ├── registry.ts # Source factory map + └── ossf-criticality-score/ + ├── source.ts # IDiscoverySource implementation + └── bucketClient.ts # GCS public bucket HTTP client +``` diff --git a/services/apps/automatic_projects_discovery_worker/package.json b/services/apps/automatic_projects_discovery_worker/package.json index 1c79505f89..022c1a6297 100644 --- a/services/apps/automatic_projects_discovery_worker/package.json +++ b/services/apps/automatic_projects_discovery_worker/package.json @@ -2,7 +2,7 @@ "name": "@crowd/automatic-projects-discovery-worker", "scripts": { "start": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx src/main.ts", - "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", + "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx --inspect=0.0.0.0:9232 src/main.ts", "start:debug": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", @@ -24,6 +24,7 @@ "@temporalio/activity": "~1.11.8", "@temporalio/client": "~1.11.8", "@temporalio/workflow": "~1.11.8", + "csv-parse": "^5.5.6", "tsx": "^4.7.1", "typescript": "^5.6.3" }, diff --git a/services/apps/automatic_projects_discovery_worker/src/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities.ts index 3662234550..1718218b3e 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities.ts @@ -1 +1,3 @@ -export * from './activities/activities' +import { listDatasets, processDataset } from './activities/activities' + +export { listDatasets, processDataset } diff --git a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts index 3aea7f8200..bc337a3516 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts @@ -1,7 +1,111 @@ +import { parse } from 'csv-parse' + +import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types' import { getServiceLogger } from '@crowd/logging' +import { svc } from '../main' +import { getSource } from '../sources/registry' +import { IDatasetDescriptor, IDiscoverySourceRow } from '../sources/types' + const log = getServiceLogger() -export async function logDiscoveryRun(): Promise { - log.info('Automatic projects discovery workflow executed successfully.') +const BATCH_SIZE = 5000 + +export async function listDatasets(sourceName: string): Promise { + const source = getSource(sourceName) + const datasets = await source.listAvailableDatasets() + + log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.') + + return datasets +} + +export async function processDataset( + sourceName: string, + dataset: IDatasetDescriptor, +): Promise { + const source = getSource(sourceName) + const qx = pgpQx(svc.postgres.writer.connection()) + const startTime = Date.now() + + log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...') + + // We use streaming (not full download) because each CSV is ~119MB / ~750K rows. + // Streaming keeps memory usage low (only one batch in memory at a time) and leverages + // Node.js backpressure: if DB writes are slow, the HTTP stream pauses automatically. + const httpStream = await source.fetchDatasetStream(dataset) + + // Pipe the raw HTTP response directly into csv-parse. + // Data flows as: HTTP response → csv-parse → for-await → batch → DB + const parser = httpStream.pipe( + parse({ + columns: true, + skip_empty_lines: true, + trim: true, + }), + ) + + parser.on('error', (err) => { + log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.') + }) + + httpStream.on('error', (err: Error) => { + log.error({ datasetId: dataset.id, error: err.message }, 'HTTP stream error.') + }) + + let batch: IDbProjectCatalogCreate[] = [] + let totalProcessed = 0 + let totalSkipped = 0 + let batchNumber = 0 + let totalRows = 0 + + for await (const rawRow of parser) { + totalRows++ + + const parsed: IDiscoverySourceRow | null = source.parseRow(rawRow) + if (!parsed) { + totalSkipped++ + continue + } + + batch.push({ + projectSlug: parsed.projectSlug, + repoName: parsed.repoName, + repoUrl: parsed.repoUrl, + criticalityScore: parsed.criticalityScore, + }) + + if (batch.length >= BATCH_SIZE) { + batchNumber++ + await bulkUpsertProjectCatalog(qx, batch) + totalProcessed += batch.length + batch = [] + + log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.') + } + } + + // Flush remaining rows that didn't fill a complete batch + if (batch.length > 0) { + batchNumber++ + await bulkUpsertProjectCatalog(qx, batch) + totalProcessed += batch.length + } + + const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1) + + log.info( + { + sourceName, + datasetId: dataset.id, + totalRows, + totalProcessed, + totalSkipped, + totalBatches: batchNumber, + elapsedSeconds, + }, + 'Dataset processing complete.', + ) } diff --git a/services/apps/automatic_projects_discovery_worker/src/main.ts b/services/apps/automatic_projects_discovery_worker/src/main.ts index 326c3a361a..0345c420f8 100644 --- a/services/apps/automatic_projects_discovery_worker/src/main.ts +++ b/services/apps/automatic_projects_discovery_worker/src/main.ts @@ -18,7 +18,7 @@ const config: Config = { const options: Options = { postgres: { - enabled: false, + enabled: true, }, opensearch: { enabled: false, diff --git a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts index 847c2e4ce9..3366470d75 100644 --- a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts +++ b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts @@ -3,18 +3,15 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien import { svc } from '../main' import { discoverProjects } from '../workflows' -const DEFAULT_CRON = '0 2 * * *' // Daily at 2:00 AM - export const scheduleProjectsDiscovery = async () => { - const cronExpression = process.env.CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON || DEFAULT_CRON - svc.log.info(`Scheduling projects discovery with cron: ${cronExpression}`) + svc.log.info(`Scheduling projects discovery`) try { await svc.temporal.schedule.create({ scheduleId: 'automaticProjectsDiscovery', spec: { - cronExpressions: [cronExpression], + cronExpressions: ['55 14 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.SKIP, @@ -24,6 +21,8 @@ export const scheduleProjectsDiscovery = async () => { type: 'startWorkflow', workflowType: discoverProjects, taskQueue: 'automatic-projects-discovery', + args: [{ mode: 'full' as const }], + workflowExecutionTimeout: '2 hours', retry: { initialInterval: '15 seconds', backoffCoefficient: 2, diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts new file mode 100644 index 0000000000..7d6cb7f561 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts @@ -0,0 +1,86 @@ +import https from 'https' + +const BUCKET_URL = 'https://commondatastorage.googleapis.com/ossf-criticality-score' + +function httpsGet(url: string): Promise { + return new Promise((resolve, reject) => { + https + .get(url, (res) => { + if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { + httpsGet(res.headers.location).then(resolve, reject) + return + } + + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)) + return + } + + const chunks: Uint8Array[] = [] + res.on('data', (chunk: Uint8Array) => chunks.push(chunk)) + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))) + res.on('error', reject) + }) + .on('error', reject) + }) +} + +function extractPrefixes(xml: string): string[] { + const prefixes: string[] = [] + const regex = /([^<]+)<\/Prefix>/g + let match: RegExpExecArray | null + + while ((match = regex.exec(xml)) !== null) { + prefixes.push(match[1]) + } + + return prefixes +} + +/** + * List all date prefixes in the OSSF Criticality Score bucket. + * Returns prefixes like ['2024.07.01/', '2024.07.08/', ...] + */ +export async function listDatePrefixes(): Promise { + const xml = await httpsGet(`${BUCKET_URL}?delimiter=/`) + return extractPrefixes(xml).filter((p) => /^\d{4}\.\d{2}\.\d{2}\/$/.test(p)) +} + +/** + * List time sub-prefixes for a given date prefix. + * E.g., for '2024.07.01/' returns ['2024.07.01/060102/', ...] + */ +export async function listTimePrefixes(datePrefix: string): Promise { + const xml = await httpsGet(`${BUCKET_URL}?prefix=${encodeURIComponent(datePrefix)}&delimiter=/`) + return extractPrefixes(xml).filter((p) => p !== datePrefix) +} + +/** + * Build the full URL for the all.csv file within a given dataset prefix. + */ +export function buildDatasetUrl(prefix: string): string { + return `${BUCKET_URL}/${prefix}all.csv` +} + +/** + * Get an HTTPS readable stream for a given URL. + */ +export function getHttpsStream(url: string): Promise { + return new Promise((resolve, reject) => { + https + .get(url, (res) => { + if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { + getHttpsStream(res.headers.location).then(resolve, reject) + return + } + + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)) + return + } + + resolve(res) + }) + .on('error', reject) + }) +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts new file mode 100644 index 0000000000..9b3338b867 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts @@ -0,0 +1,75 @@ +import { Readable } from 'stream' + +import { IDatasetDescriptor, IDiscoverySource, IDiscoverySourceRow } from '../types' + +import { buildDatasetUrl, getHttpsStream, listDatePrefixes, listTimePrefixes } from './bucketClient' + +export class OssfCriticalityScoreSource implements IDiscoverySource { + public readonly name = 'ossf-criticality-score' + + async listAvailableDatasets(): Promise { + const datePrefixes = await listDatePrefixes() + + const datasets: IDatasetDescriptor[] = [] + + for (const datePrefix of datePrefixes) { + const timePrefixes = await listTimePrefixes(datePrefix) + + for (const timePrefix of timePrefixes) { + const date = datePrefix.replace(/\/$/, '') + const url = buildDatasetUrl(timePrefix) + + datasets.push({ + id: timePrefix.replace(/\/$/, ''), + date, + url, + }) + } + } + + // Sort newest-first by date + datasets.sort((a, b) => b.date.localeCompare(a.date)) + + return datasets + } + + async fetchDatasetStream(dataset: IDatasetDescriptor): Promise { + const stream = await getHttpsStream(dataset.url) + return stream as Readable + } + + // CSV columns use dot notation (e.g. "repo.url", "default_score") + parseRow(rawRow: Record): IDiscoverySourceRow | null { + const repoUrl = rawRow['repo.url'] + if (!repoUrl) { + return null + } + + let repoName = '' + let projectSlug = '' + + try { + const urlPath = new URL(repoUrl).pathname.replace(/^\//, '').replace(/\/$/, '') + projectSlug = urlPath + repoName = urlPath.split('/').pop() || '' + } catch { + const parts = repoUrl.replace(/\/$/, '').split('/') + projectSlug = parts.slice(-2).join('/') + repoName = parts.pop() || '' + } + + if (!projectSlug || !repoName) { + return null + } + + const criticalityScoreRaw = rawRow['default_score'] + const criticalityScore = criticalityScoreRaw ? parseFloat(criticalityScoreRaw) : undefined + + return { + projectSlug, + repoName, + repoUrl, + criticalityScore: Number.isNaN(criticalityScore) ? undefined : criticalityScore, + } + } +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts new file mode 100644 index 0000000000..0d05783c71 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts @@ -0,0 +1,19 @@ +import { IDiscoverySource } from './types' +import { OssfCriticalityScoreSource } from './ossf-criticality-score/source' + +// To add a new source: instantiate it here. +const sources: IDiscoverySource[] = [ + new OssfCriticalityScoreSource(), +] + +export function getSource(name: string): IDiscoverySource { + const source = sources.find((s) => s.name === name) + if (!source) { + throw new Error(`Unknown source: ${name}. Available: ${sources.map((s) => s.name).join(', ')}`) + } + return source +} + +export function getAvailableSourceNames(): string[] { + return sources.map((s) => s.name) +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/types.ts b/services/apps/automatic_projects_discovery_worker/src/sources/types.ts new file mode 100644 index 0000000000..c2b30afa83 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/types.ts @@ -0,0 +1,21 @@ +import { Readable } from 'stream' + +export interface IDatasetDescriptor { + id: string + date: string + url: string +} + +export interface IDiscoverySource { + name: string + listAvailableDatasets(): Promise + fetchDatasetStream(dataset: IDatasetDescriptor): Promise + parseRow(rawRow: Record): IDiscoverySourceRow | null +} + +export interface IDiscoverySourceRow { + projectSlug: string + repoName: string + repoUrl: string + criticalityScore?: number +} diff --git a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts index f43a9b5a12..17b8706e89 100644 --- a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts +++ b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts @@ -1,11 +1,51 @@ -import { proxyActivities } from '@temporalio/workflow' +import { log, proxyActivities } from '@temporalio/workflow' import type * as activities from '../activities' -const activity = proxyActivities({ - startToCloseTimeout: '1 minutes', +const listActivities = proxyActivities({ + startToCloseTimeout: '2 minutes', + retry: { + maximumAttempts: 3, + }, }) -export async function discoverProjects(): Promise { - await activity.logDiscoveryRun() +// processDataset is long-running (10-20 min for ~119MB / ~750K rows). +const processActivities = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { + maximumAttempts: 3, + }, +}) + +export interface DiscoverProjectsInput { + mode: 'incremental' | 'full' +} + +export async function discoverProjects( + input: DiscoverProjectsInput = { mode: 'incremental' }, +): Promise { + const sourceName = 'ossf-criticality-score' + const { mode } = input + + const allDatasets = await listActivities.listDatasets(sourceName) + + if (allDatasets.length === 0) { + log.warn('No datasets found. Nothing to process.') + return + } + + // allDatasets is sorted newest-first. + // Incremental: process only the latest snapshot. + // Full: process oldest-first so the newest data wins the final upsert. + const datasets = mode === 'incremental' ? [allDatasets[0]] : [...allDatasets].reverse() + + log.info(`mode=${mode}, ${datasets.length}/${allDatasets.length} datasets to process.`) + + for (let i = 0; i < datasets.length; i++) { + const dataset = datasets[i] + log.info(`Processing dataset ${i + 1}/${datasets.length}: ${dataset.id}`) + await processActivities.processDataset(sourceName, dataset) + } + + log.info(`Done. Processed ${datasets.length} dataset(s).`) } From 37efe6a36ee223059be3ac678f15a8581ba292de Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 10 Feb 2026 17:23:21 +0100 Subject: [PATCH 2/8] fix: lint --- .../src/activities/activities.ts | 2 +- .../src/schedules/scheduleProjectsDiscovery.ts | 1 - .../sources/ossf-criticality-score/bucketClient.ts | 14 ++++++++++++-- .../src/sources/registry.ts | 6 ++---- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts index bc337a3516..98176c1745 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts @@ -1,8 +1,8 @@ import { parse } from 'csv-parse' import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer' -import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceLogger } from '@crowd/logging' import { svc } from '../main' diff --git a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts index 3366470d75..11a3801ef6 100644 --- a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts +++ b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts @@ -4,7 +4,6 @@ import { svc } from '../main' import { discoverProjects } from '../workflows' export const scheduleProjectsDiscovery = async () => { - svc.log.info(`Scheduling projects discovery`) try { diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts index 7d6cb7f561..71b2066ae7 100644 --- a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts +++ b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts @@ -6,7 +6,12 @@ function httpsGet(url: string): Promise { return new Promise((resolve, reject) => { https .get(url, (res) => { - if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { + if ( + res.statusCode && + res.statusCode >= 300 && + res.statusCode < 400 && + res.headers.location + ) { httpsGet(res.headers.location).then(resolve, reject) return } @@ -69,7 +74,12 @@ export function getHttpsStream(url: string): Promise { return new Promise((resolve, reject) => { https .get(url, (res) => { - if (res.statusCode && res.statusCode >= 300 && res.statusCode < 400 && res.headers.location) { + if ( + res.statusCode && + res.statusCode >= 300 && + res.statusCode < 400 && + res.headers.location + ) { getHttpsStream(res.headers.location).then(resolve, reject) return } diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts index 0d05783c71..7c8796094f 100644 --- a/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts +++ b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts @@ -1,10 +1,8 @@ -import { IDiscoverySource } from './types' import { OssfCriticalityScoreSource } from './ossf-criticality-score/source' +import { IDiscoverySource } from './types' // To add a new source: instantiate it here. -const sources: IDiscoverySource[] = [ - new OssfCriticalityScoreSource(), -] +const sources: IDiscoverySource[] = [new OssfCriticalityScoreSource()] export function getSource(name: string): IDiscoverySource { const source = sources.find((s) => s.name === name) From a048ef15856c5b5887e68acb14e80fd93f4cb4be Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 10 Feb 2026 17:52:51 +0100 Subject: [PATCH 3/8] fix: lint --- .../README.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/services/apps/automatic_projects_discovery_worker/README.md b/services/apps/automatic_projects_discovery_worker/README.md index 85162f9974..ff4ce16302 100644 --- a/services/apps/automatic_projects_discovery_worker/README.md +++ b/services/apps/automatic_projects_discovery_worker/README.md @@ -8,11 +8,11 @@ Temporal worker that discovers open-source projects from external data sources a Every data source implements the `IDiscoverySource` interface (`src/sources/types.ts`): -| Method | Purpose | -|--------|---------| -| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first | -| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) | -| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip | +| Method | Purpose | +| ----------------------------- | --------------------------------------------------------------------------- | +| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first | +| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) | +| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip | Sources are registered in `src/sources/registry.ts` as a simple name → factory map. @@ -20,8 +20,8 @@ Sources are registered in `src/sources/registry.ts` as a simple name → factory ### Current sources -| Name | Folder | Description | -|------|--------|-------------| +| Name | Folder | Description | +| ------------------------ | ------------------------------------- | ------------------------------------------------------------------------------------ | | `ossf-criticality-score` | `src/sources/ossf-criticality-score/` | OSSF Criticality Score snapshots from a public GCS bucket (~750K repos per snapshot) | ### Workflow @@ -41,11 +41,11 @@ discoverProjects({ mode: 'incremental' | 'full' }) ### Timeouts -| Activity | startToCloseTimeout | retries | -|----------|-------------------|---------| -| `listDatasets` | 2 min | 3 | -| `processDataset` | 30 min | 3 | -| Workflow execution | 2 hours | 3 | +| Activity | startToCloseTimeout | retries | +| ------------------ | ------------------- | ------- | +| `listDatasets` | 2 min | 3 | +| `processDataset` | 30 min | 3 | +| Workflow execution | 2 hours | 3 | ### Schedule From a757414c3885ba85059521ce18b9cd522c13b820 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 11 Feb 2026 10:37:40 +0100 Subject: [PATCH 4/8] fix: format --- .../src/activities/activities.ts | 14 ++++++++------ .../src/workflows/discoverProjects.ts | 17 +++++------------ 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts index 98176c1745..fc82cadea5 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts @@ -26,17 +26,22 @@ export async function processDataset( sourceName: string, dataset: IDatasetDescriptor, ): Promise { - const source = getSource(sourceName) const qx = pgpQx(svc.postgres.writer.connection()) const startTime = Date.now() log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...') + const source = getSource(sourceName) + // We use streaming (not full download) because each CSV is ~119MB / ~750K rows. // Streaming keeps memory usage low (only one batch in memory at a time) and leverages // Node.js backpressure: if DB writes are slow, the HTTP stream pauses automatically. const httpStream = await source.fetchDatasetStream(dataset) + httpStream.on('error', (err: Error) => { + log.error({ datasetId: dataset.id, error: err.message }, 'HTTP stream error.') + }) + // Pipe the raw HTTP response directly into csv-parse. // Data flows as: HTTP response → csv-parse → for-await → batch → DB const parser = httpStream.pipe( @@ -51,10 +56,6 @@ export async function processDataset( log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.') }) - httpStream.on('error', (err: Error) => { - log.error({ datasetId: dataset.id, error: err.message }, 'HTTP stream error.') - }) - let batch: IDbProjectCatalogCreate[] = [] let totalProcessed = 0 let totalSkipped = 0 @@ -64,7 +65,7 @@ export async function processDataset( for await (const rawRow of parser) { totalRows++ - const parsed: IDiscoverySourceRow | null = source.parseRow(rawRow) + const parsed = source.parseRow(rawRow) if (!parsed) { totalSkipped++ continue @@ -79,6 +80,7 @@ export async function processDataset( if (batch.length >= BATCH_SIZE) { batchNumber++ + await bulkUpsertProjectCatalog(qx, batch) totalProcessed += batch.length batch = [] diff --git a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts index 17b8706e89..6e9893949b 100644 --- a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts +++ b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts @@ -4,27 +4,20 @@ import type * as activities from '../activities' const listActivities = proxyActivities({ startToCloseTimeout: '2 minutes', - retry: { - maximumAttempts: 3, - }, + retry: { maximumAttempts: 3 }, }) // processDataset is long-running (10-20 min for ~119MB / ~750K rows). const processActivities = proxyActivities({ startToCloseTimeout: '30 minutes', - retry: { - maximumAttempts: 3, - }, + retry: { maximumAttempts: 3 }, }) -export interface DiscoverProjectsInput { - mode: 'incremental' | 'full' -} - export async function discoverProjects( - input: DiscoverProjectsInput = { mode: 'incremental' }, + input: { mode: 'incremental' | 'full' } = { mode: 'incremental' }, ): Promise { const sourceName = 'ossf-criticality-score' + const { mode } = input const allDatasets = await listActivities.listDatasets(sourceName) @@ -34,7 +27,7 @@ export async function discoverProjects( return } - // allDatasets is sorted newest-first. + // allDatasets is sorted newest-first, that is the reason we need the .reverse(). // Incremental: process only the latest snapshot. // Full: process oldest-first so the newest data wins the final upsert. const datasets = mode === 'incremental' ? [allDatasets[0]] : [...allDatasets].reverse() From 8a73dfa349f121dcb361c9e900498cd18086d551 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 11 Feb 2026 10:42:20 +0100 Subject: [PATCH 5/8] fix: lint --- .../src/activities/activities.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts index fc82cadea5..54ce9feb64 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts @@ -7,7 +7,7 @@ import { getServiceLogger } from '@crowd/logging' import { svc } from '../main' import { getSource } from '../sources/registry' -import { IDatasetDescriptor, IDiscoverySourceRow } from '../sources/types' +import { IDatasetDescriptor } from '../sources/types' const log = getServiceLogger() From cd182c51700a2820c7c5ee5aefee4fc035a72bd9 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 11 Feb 2026 10:49:30 +0100 Subject: [PATCH 6/8] fix: update cron expression --- .../src/schedules/scheduleProjectsDiscovery.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts index 11a3801ef6..f3b29a2d7a 100644 --- a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts +++ b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts @@ -10,7 +10,8 @@ export const scheduleProjectsDiscovery = async () => { await svc.temporal.schedule.create({ scheduleId: 'automaticProjectsDiscovery', spec: { - cronExpressions: ['55 14 * * *'], + // Run every day at midnight + cronExpressions: ['0 0 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.SKIP, From d16533f928da1fd440abf9724e1617fbc1a286c3 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 11 Feb 2026 11:32:41 +0100 Subject: [PATCH 7/8] feat: mode incremental --- .../src/schedules/scheduleProjectsDiscovery.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts index f3b29a2d7a..b173126a78 100644 --- a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts +++ b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts @@ -21,7 +21,7 @@ export const scheduleProjectsDiscovery = async () => { type: 'startWorkflow', workflowType: discoverProjects, taskQueue: 'automatic-projects-discovery', - args: [{ mode: 'full' as const }], + args: [{ mode: 'incremental' as const }], workflowExecutionTimeout: '2 hours', retry: { initialInterval: '15 seconds', From c516e6aec047bc6a7e65cd284320b781c9041720 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Wed, 11 Feb 2026 12:52:36 +0100 Subject: [PATCH 8/8] fix: update readme --- services/apps/automatic_projects_discovery_worker/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/apps/automatic_projects_discovery_worker/README.md b/services/apps/automatic_projects_discovery_worker/README.md index ff4ce16302..77623513cc 100644 --- a/services/apps/automatic_projects_discovery_worker/README.md +++ b/services/apps/automatic_projects_discovery_worker/README.md @@ -49,7 +49,7 @@ discoverProjects({ mode: 'incremental' | 'full' }) ### Schedule -Runs daily via Temporal cron. The cron expression can be overridden with the `CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON` env var. +Runs daily at midnight via Temporal cron (`0 0 * * *`). ## File structure