Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions services/apps/automatic_projects_discovery_worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Automatic Projects Discovery Worker

Temporal worker that discovers open-source projects from external data sources and writes them to the `projectCatalog` table.

## Architecture

### Source abstraction

Every data source implements the `IDiscoverySource` interface (`src/sources/types.ts`):

| Method | Purpose |
| ----------------------------- | --------------------------------------------------------------------------- |
| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first |
| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) |
| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip |

Sources are registered in `src/sources/registry.ts` as a simple name → factory map.

**To add a new source:** create a class implementing `IDiscoverySource`, then add one line to the registry.

### Current sources

| Name | Folder | Description |
| ------------------------ | ------------------------------------- | ------------------------------------------------------------------------------------ |
| `ossf-criticality-score` | `src/sources/ossf-criticality-score/` | OSSF Criticality Score snapshots from a public GCS bucket (~750K repos per snapshot) |

### Workflow

```
discoverProjects({ mode: 'incremental' | 'full' })
├─ Activity: listDatasets(sourceName)
│ → returns dataset descriptors sorted newest-first
├─ Selection: incremental → latest only, full → all datasets
└─ For each dataset:
└─ Activity: processDataset(sourceName, dataset)
→ HTTP stream → csv-parse → batches of 5000 → bulkUpsertProjectCatalog
```

### Timeouts

| Activity | startToCloseTimeout | retries |
| ------------------ | ------------------- | ------- |
| `listDatasets` | 2 min | 3 |
| `processDataset` | 30 min | 3 |
| Workflow execution | 2 hours | 3 |

### Schedule

Runs daily at midnight via Temporal cron (`0 0 * * *`).

## File structure

```
src/
├── main.ts # Service bootstrap (postgres enabled)
├── activities.ts # Barrel re-export
├── workflows.ts # Barrel re-export
├── activities/
│ └── activities.ts # listDatasets, processDataset
├── workflows/
│ └── discoverProjects.ts # Orchestration with mode selection
├── schedules/
│ └── scheduleProjectsDiscovery.ts # Temporal cron schedule
└── sources/
├── types.ts # IDiscoverySource, IDatasetDescriptor
├── registry.ts # Source factory map
└── ossf-criticality-score/
├── source.ts # IDiscoverySource implementation
└── bucketClient.ts # GCS public bucket HTTP client
```
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/automatic-projects-discovery-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
Expand All @@ -24,6 +24,7 @@
"@temporalio/activity": "~1.11.8",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"csv-parse": "^5.5.6",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * from './activities/activities'
import { listDatasets, processDataset } from './activities/activities'

export { listDatasets, processDataset }
Original file line number Diff line number Diff line change
@@ -1,7 +1,113 @@
import { parse } from 'csv-parse'

import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer'
import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'

import { svc } from '../main'
import { getSource } from '../sources/registry'
import { IDatasetDescriptor } from '../sources/types'

const log = getServiceLogger()

export async function logDiscoveryRun(): Promise<void> {
log.info('Automatic projects discovery workflow executed successfully.')
const BATCH_SIZE = 5000

export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
const source = getSource(sourceName)
const datasets = await source.listAvailableDatasets()

log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.')

return datasets
}

export async function processDataset(
sourceName: string,
dataset: IDatasetDescriptor,
): Promise<void> {
const qx = pgpQx(svc.postgres.writer.connection())
const startTime = Date.now()

log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')

const source = getSource(sourceName)

// We use streaming (not full download) because each CSV is ~119MB / ~750K rows.
// Streaming keeps memory usage low (only one batch in memory at a time) and leverages
// Node.js backpressure: if DB writes are slow, the HTTP stream pauses automatically.
const httpStream = await source.fetchDatasetStream(dataset)

httpStream.on('error', (err: Error) => {
log.error({ datasetId: dataset.id, error: err.message }, 'HTTP stream error.')
})

// Pipe the raw HTTP response directly into csv-parse.
// Data flows as: HTTP response → csv-parse → for-await → batch → DB
const parser = httpStream.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
}),
)

parser.on('error', (err) => {
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
})

let batch: IDbProjectCatalogCreate[] = []
let totalProcessed = 0
let totalSkipped = 0
let batchNumber = 0
let totalRows = 0

for await (const rawRow of parser) {
totalRows++

const parsed = source.parseRow(rawRow)
if (!parsed) {
totalSkipped++
continue
}

batch.push({
projectSlug: parsed.projectSlug,
repoName: parsed.repoName,
repoUrl: parsed.repoUrl,
criticalityScore: parsed.criticalityScore,
})

if (batch.length >= BATCH_SIZE) {
batchNumber++

await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
batch = []

log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.')
}
}

// Flush remaining rows that didn't fill a complete batch
if (batch.length > 0) {
batchNumber++
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
}

const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1)

log.info(
{
sourceName,
datasetId: dataset.id,
totalRows,
totalProcessed,
totalSkipped,
totalBatches: batchNumber,
elapsedSeconds,
},
'Dataset processing complete.',
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const config: Config = {

const options: Options = {
postgres: {
enabled: false,
enabled: true,
},
opensearch: {
enabled: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien
import { svc } from '../main'
import { discoverProjects } from '../workflows'

const DEFAULT_CRON = '0 2 * * *' // Daily at 2:00 AM

export const scheduleProjectsDiscovery = async () => {
const cronExpression = process.env.CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON || DEFAULT_CRON

svc.log.info(`Scheduling projects discovery with cron: ${cronExpression}`)
svc.log.info(`Scheduling projects discovery`)

try {
await svc.temporal.schedule.create({
scheduleId: 'automaticProjectsDiscovery',
spec: {
cronExpressions: [cronExpression],
// Run every day at midnight
cronExpressions: ['0 0 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
Expand All @@ -24,6 +21,8 @@ export const scheduleProjectsDiscovery = async () => {
type: 'startWorkflow',
workflowType: discoverProjects,
taskQueue: 'automatic-projects-discovery',
args: [{ mode: 'incremental' as const }],
workflowExecutionTimeout: '2 hours',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import https from 'https'

const BUCKET_URL = 'https://commondatastorage.googleapis.com/ossf-criticality-score'

function httpsGet(url: string): Promise<string> {
return new Promise((resolve, reject) => {
https
.get(url, (res) => {
if (
res.statusCode &&
res.statusCode >= 300 &&
res.statusCode < 400 &&
res.headers.location
) {
httpsGet(res.headers.location).then(resolve, reject)
return
}

if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
reject(new Error(`HTTP ${res.statusCode} for ${url}`))
return
}

const chunks: Uint8Array[] = []
res.on('data', (chunk: Uint8Array) => chunks.push(chunk))
res.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')))
res.on('error', reject)
})
.on('error', reject)
})
}

function extractPrefixes(xml: string): string[] {
const prefixes: string[] = []
const regex = /<Prefix>([^<]+)<\/Prefix>/g
let match: RegExpExecArray | null

while ((match = regex.exec(xml)) !== null) {
prefixes.push(match[1])
}

return prefixes
}

/**
* List all date prefixes in the OSSF Criticality Score bucket.
* Returns prefixes like ['2024.07.01/', '2024.07.08/', ...]
*/
export async function listDatePrefixes(): Promise<string[]> {
const xml = await httpsGet(`${BUCKET_URL}?delimiter=/`)
return extractPrefixes(xml).filter((p) => /^\d{4}\.\d{2}\.\d{2}\/$/.test(p))
}

/**
* List time sub-prefixes for a given date prefix.
* E.g., for '2024.07.01/' returns ['2024.07.01/060102/', ...]
*/
export async function listTimePrefixes(datePrefix: string): Promise<string[]> {
const xml = await httpsGet(`${BUCKET_URL}?prefix=${encodeURIComponent(datePrefix)}&delimiter=/`)
return extractPrefixes(xml).filter((p) => p !== datePrefix)
}

/**
* Build the full URL for the all.csv file within a given dataset prefix.
*/
export function buildDatasetUrl(prefix: string): string {
return `${BUCKET_URL}/${prefix}all.csv`
}

/**
* Get an HTTPS readable stream for a given URL.
*/
export function getHttpsStream(url: string): Promise<NodeJS.ReadableStream> {
return new Promise((resolve, reject) => {
https
.get(url, (res) => {
if (
res.statusCode &&
res.statusCode >= 300 &&
res.statusCode < 400 &&
res.headers.location
) {
getHttpsStream(res.headers.location).then(resolve, reject)
return
}

if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
reject(new Error(`HTTP ${res.statusCode} for ${url}`))
return
}

resolve(res)
})
.on('error', reject)
})
}
Loading