Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion api/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from fastapi_pagination.ext.motor import paginate
from motor import motor_asyncio
from redis import asyncio as aioredis
from kernelci.api.models import EventHistory, Hierarchy, Node, parse_node_obj
from kernelci.api.models import (

Check failure on line 14 in api/db.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
EventHistory, Hierarchy, Node, TelemetryEvent, parse_node_obj
)
from .models import User, UserGroup


Expand All @@ -28,6 +30,7 @@
Node: 'node',
UserGroup: 'usergroup',
EventHistory: 'eventhistory',
TelemetryEvent: 'telemetry',
}

OPERATOR_MAP = {
Expand Down Expand Up @@ -242,6 +245,12 @@
obj.id = res.inserted_id
return obj

async def insert_many(self, model, documents):
"""Create multiple documents in a collection."""
col = self._get_collection(model)
result = await col.insert_many(documents)
return result.inserted_ids

async def _create_recursively(self, hierarchy: Hierarchy, parent: Node,
cls, col):
obj = parse_node_obj(hierarchy.node)
Expand Down Expand Up @@ -294,6 +303,12 @@
raise ValueError(f"No object found with id: {obj.id}")
return obj.__class__(**await col.find_one(ObjectId(obj.id)))

async def aggregate(self, model, pipeline):
"""Run an aggregation pipeline on a model's collection"""
col = self._get_collection(model)
cursor = col.aggregate(pipeline)
return await cursor.to_list(length=None)

async def delete_by_id(self, model, obj_id):
"""Delete one object matching a given id"""
col = self._get_collection(model)
Expand Down
319 changes: 319 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
from pydantic import BaseModel
from jose import jwt
from jose.exceptions import JWTError
from kernelci.api.models import (

Check failure on line 49 in api/main.py

View workflow job for this annotation

GitHub Actions / Lint

Unable to import 'kernelci.api.models'
Node,
Hierarchy,
PublishEvent,
parse_node_obj,
KernelVersion,
EventHistory,
TelemetryEvent,
)
from .auth import Authentication
from .db import Database
Expand Down Expand Up @@ -953,6 +954,324 @@
return JSONResponse(content=json_comp)


# -----------------------------------------------------------------------------
# Telemetry of pipeline execution and other events(not node stuff).
# This is a separate collection from
# EventHistory since it may have a much higher volume and different
# query patterns and allows us to optimize indexes and storage
# separately.

@app.post('/telemetry', response_model=dict, tags=["telemetry"])
async def post_telemetry(
events: List[dict],
current_user: User = Depends(get_current_user),
):
"""Bulk insert telemetry events.

Accepts a list of telemetry event dicts. Each event must have at
least 'kind' and 'runtime' fields. Events are validated against
the TelemetryEvent model before insertion.
"""
metrics.add('http_requests_total', 1)
if not events:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Events list cannot be empty",
)
docs = []
for event in events:
try:
obj = TelemetryEvent(**event)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid telemetry event: {exc}",
) from exc
doc = obj.model_dump(by_alias=True)
doc.pop('_id', None)
docs.append(doc)
inserted_ids = await db.insert_many(TelemetryEvent, docs)
return {"inserted": len(inserted_ids)}


@app.get('/telemetry', response_model=PageModel, tags=["telemetry"])
async def get_telemetry(request: Request):
"""Query telemetry events with optional filters.

Supports filtering by any TelemetryEvent field, plus time range
via 'since' and 'until' parameters (ISO 8601 format).
Results are paginated (default limit=50).
"""
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)

for pg_key in ['limit', 'offset']:
query_params.pop(pg_key, None)

since = query_params.pop('since', None)
until = query_params.pop('until', None)
if since or until:
ts_filter = {}
if since:
ts_filter['$gte'] = datetime.fromisoformat(since)
if until:
ts_filter['$lte'] = datetime.fromisoformat(until)
query_params['ts'] = ts_filter

# Convert string 'true'/'false' for boolean fields
if 'is_infra_error' in query_params:
val = query_params['is_infra_error'].lower()
if val not in ['true', 'false']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Bad is_infra_error value, use 'true' or 'false'",
)
if val == 'true':
query_params['is_infra_error'] = True
else:
query_params['is_infra_error'] = False

paginated_resp = await db.find_by_attributes(
TelemetryEvent, query_params
)
paginated_resp.items = serialize_paginated_data(
TelemetryEvent, paginated_resp.items
)
return paginated_resp


TELEMETRY_STATS_GROUP_FIELDS = {
'runtime', 'device_type', 'job_name', 'tree', 'branch',
'arch', 'kind', 'error_type',
}


@app.get('/telemetry/stats', tags=["telemetry"])
async def get_telemetry_stats(request: Request):
"""Get aggregated telemetry statistics.

This is rule-based anomaly detection using
thresholded empirical rates computed over
a sliding (rolling) time window.
This is not a full anomaly detection system
with baselines or machine learning, but at
last something to start with.

Query parameters:
- group_by: Comma-separated fields to group by
(runtime, device_type, job_name, tree, branch, arch,
kind, error_type)
- kind: Filter by event kind before aggregating
- runtime: Filter by runtime name
- since/until: Time range (ISO 8601)

Returns grouped counts with pass/fail/incomplete/infra_error
breakdowns for result-bearing events.
"""
metrics.add('http_requests_total', 1)
query_params = dict(request.query_params)

group_by_str = query_params.pop('group_by', None)
if not group_by_str:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="'group_by' parameter is required",
)
group_by = [f.strip() for f in group_by_str.split(',')]
invalid = set(group_by) - TELEMETRY_STATS_GROUP_FIELDS
if invalid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid group_by fields: {invalid}",
)

match_stage = {
key: query_params.pop(key)
for key in ('kind', 'runtime', 'device_type', 'job_name',
'tree', 'branch', 'arch')
if query_params.get(key)
}

since = query_params.pop('since', None)
until = query_params.pop('until', None)
if since or until:
match_stage['ts'] = {
**({'$gte': datetime.fromisoformat(since)} if since else {}),
**({'$lte': datetime.fromisoformat(until)} if until else {}),
}

pipeline = [{'$match': match_stage}] if match_stage else []
pipeline.append({
'$group': {
'_id': {f: f'${f}' for f in group_by},
'total': {'$sum': 1},
'pass': {'$sum': {
'$cond': [{'$eq': ['$result', 'pass']}, 1, 0]
}},
'fail': {'$sum': {
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
}},
'incomplete': {'$sum': {
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
}},
'skip': {'$sum': {
'$cond': [{'$eq': ['$result', 'skip']}, 1, 0]
}},
'infra_error': {'$sum': {
'$cond': ['$is_infra_error', 1, 0]
}},
}
})
pipeline.append({'$sort': {'total': -1}})

results = await db.aggregate(TelemetryEvent, pipeline)

results = await db.aggregate(TelemetryEvent, pipeline)
return JSONResponse(content=jsonable_encoder([
{
**doc['_id'].copy(),
'total': doc['total'],
'pass': doc['pass'],
'fail': doc['fail'],
'incomplete': doc['incomplete'],
'skip': doc['skip'],
'infra_error': doc['infra_error'],
}
for doc in results
]))

# This is test value, can adjust based on expected query patterns and volumes.
ANOMALY_WINDOW_MAP = {
'1h': 1, '3h': 3, '6h': 6, '12h': 12, '24h': 24, '48h': 48,
}


@app.get('/telemetry/anomalies', tags=["telemetry"])
async def get_telemetry_anomalies(
window: str = Query(
'6h', description='Time window: 1h, 3h, 6h, 12h, 24h, 48h'
),
threshold: float = Query(
0.5, ge=0.0, le=1.0,
description='Min failure/infra error rate to flag (0.0-1.0)'
),
min_total: int = Query(
3, ge=1,
description='Min events in window to consider (avoids noise)'
),
):
"""Detect anomalies in telemetry data.

Finds runtime+device_type combinations where the infra error
rate or failure rate exceeds the threshold within the given
time window. Also detects runtimes with submission errors.

Returns a list sorted by severity (highest error rate first).
"""
metrics.add('http_requests_total', 1)

hours = ANOMALY_WINDOW_MAP.get(window)
if not hours:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid window '{window}'. "
f"Use: {', '.join(ANOMALY_WINDOW_MAP.keys())}",
)
since = datetime.utcnow() - timedelta(hours=hours)

# Anomaly 1: High infra error / failure rate per runtime+device_type
result_pipeline = [
{'$match': {
'kind': {'$in': ['job_result', 'test_result']},
'ts': {'$gte': since},
}},
{'$group': {
'_id': {
'runtime': '$runtime',
'device_type': '$device_type',
},
'total': {'$sum': 1},
'fail': {'$sum': {
'$cond': [{'$eq': ['$result', 'fail']}, 1, 0]
}},
'incomplete': {'$sum': {
'$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0]
}},
'infra_error': {'$sum': {
'$cond': ['$is_infra_error', 1, 0]
}},
}},
{'$match': {'total': {'$gte': min_total}}},
{'$addFields': {
'infra_rate': {
'$divide': ['$infra_error', '$total']
},
'fail_rate': {
'$divide': [
{'$add': ['$fail', '$incomplete']}, '$total'
]
},
}},
{'$match': {
'$or': [
{'infra_rate': {'$gte': threshold}},
{'fail_rate': {'$gte': threshold}},
]
}},
{'$sort': {'infra_rate': -1, 'fail_rate': -1}},
]

# Anomaly 2: Submission/connectivity errors per runtime
error_pipeline = [
{'$match': {
'kind': {'$in': ['runtime_error', 'job_skip']},
'ts': {'$gte': since},
}},
{'$group': {
'_id': {
'runtime': '$runtime',
'error_type': '$error_type',
},
'count': {'$sum': 1},
}},
{'$match': {'count': {'$gte': min_total}}},
{'$sort': {'count': -1}},
]

result_anomalies = await db.aggregate(
TelemetryEvent, result_pipeline
)
error_anomalies = await db.aggregate(
TelemetryEvent, error_pipeline
)

output = {
'window': window,
'threshold': threshold,
'min_total': min_total,
'since': since.isoformat(),
'result_anomalies': [],
'error_anomalies': [],
}

for doc in result_anomalies:
row = doc['_id'].copy()
row['total'] = doc['total']
row['fail'] = doc['fail']
row['incomplete'] = doc['incomplete']
row['infra_error'] = doc['infra_error']
row['infra_rate'] = round(doc['infra_rate'], 3)
row['fail_rate'] = round(doc['fail_rate'], 3)
output['result_anomalies'].append(row)

for doc in error_anomalies:
row = doc['_id'].copy()
row['count'] = doc['count']
output['error_anomalies'].append(row)

return JSONResponse(content=jsonable_encoder(output))


# -----------------------------------------------------------------------------
# Nodes
def _get_node_event_data(operation, node, is_hierarchy=False):
Expand Down