From 09a7af03eda6ff735d60333b63b9d5a3e55aba2c Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Wed, 11 Feb 2026 11:38:31 +0200 Subject: [PATCH 1/2] api/models: Add TelemetryEvent model for pipeline statistics Add a new TelemetryEvent database model to capture structured telemetry events from pipeline services. This enables tracking of scheduler submissions, runtime errors, job skips, and test results across all runtimes, we will send this telemetry events from pipeline to api. The collection uses a 14-day TTL index (configurable via TELEMETRY_TTL_SECONDS env var) for automatic cleanup, plus compound indexes optimized for common query patterns (by kind, runtime+device_type, job_name, and result+infra_error). Also moves EVENT_HISTORY_TTL_SECONDS to the top of the file alongside the new TELEMETRY_TTL_SECONDS constant for consistency. Signed-off-by: Denys Fedoryshchenko --- kernelci/api/models.py | 107 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 6 deletions(-) diff --git a/kernelci/api/models.py b/kernelci/api/models.py index e3e300fea1..5f4f702034 100644 --- a/kernelci/api/models.py +++ b/kernelci/api/models.py @@ -39,6 +39,11 @@ any_url_adapter = TypeAdapter(AnyUrl) any_http_url_adapter = TypeAdapter(AnyHttpUrl) +# TTL configuration for time-limited collections +# Set environment variables to override defaults +EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800')) +TELEMETRY_TTL_SECONDS = int(os.getenv('TELEMETRY_TTL_SECONDS', '1209600')) + class StateValues(str, enum.Enum): """Enumeration to declare values to be used for Node.state""" @@ -855,12 +860,6 @@ def parse_node_obj(node: Node): raise ValueError(f"Unsupported node kind: {node.kind}") -# eventhistory model -# Environment variable to configure TTL (default 7 days = 604800 seconds) -# Set EVENT_HISTORY_TTL_SECONDS to override -EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800')) - - class EventHistory(DatabaseModel): """Event history object model""" timestamp: datetime = Field( @@ -897,3 +896,99 @@ def get_indexes(cls): cls.Index('timestamp', {'expireAfterSeconds': EVENT_HISTORY_TTL_SECONDS}), cls.Index([('channel', 1), ('sequence_id', 1)], {}), ] + + +class TelemetryEvent(DatabaseModel): + """Telemetry event for tracking pipeline statistics. + + Captures scheduler submissions, runtime errors, job skips, + and test results across all runtimes (LAVA, Kubernetes, Docker, shell). + """ + ts: datetime = Field( + description='Timestamp of the event', + default_factory=datetime.utcnow + ) + kind: str = Field( + description='Event type: runtime_error, job_submission, ' + 'job_skip, job_result, test_result' + ) + runtime: str = Field( + description='Runtime environment name (e.g. lava-collabora, ' + 'k8s-gke-eu-west4)' + ) + device_type: Optional[str] = Field( + default=None, + description='Device type (e.g. rk3588, qemu)' + ) + device_id: Optional[str] = Field( + default=None, + description='Actual device identifier (LAVA only)' + ) + job_name: Optional[str] = Field( + default=None, + description='Job configuration name' + ) + test_name: Optional[str] = Field( + default=None, + description='Individual test case name' + ) + job_id: Optional[str] = Field( + default=None, + description='Runtime job identifier (e.g. LAVA job ID)' + ) + node_id: Optional[str] = Field( + default=None, + description='API node identifier' + ) + tree: Optional[str] = Field( + default=None, + description='Kernel tree name' + ) + branch: Optional[str] = Field( + default=None, + description='Kernel branch name' + ) + arch: Optional[str] = Field( + default=None, + description='CPU architecture' + ) + result: Optional[str] = Field( + default=None, + description='Result: pass, fail, skip, incomplete' + ) + is_infra_error: bool = Field( + default=False, + description='Whether the failure is an infrastructure error' + ) + error_type: Optional[str] = Field( + default=None, + description='Error category: online_check, submission, ' + 'queue_depth, etc.' + ) + error_msg: Optional[str] = Field( + default=None, + description='Error message details' + ) + retry: int = Field( + default=0, + description='Retry attempt counter' + ) + extra: Dict[str, Any] = Field( + default={}, + description='Additional fields for future extensibility' + ) + + @classmethod + def get_indexes(cls): + """Create indexes for telemetry queries with TTL auto-cleanup. + + Default TTL is 1209600 seconds (14 days), configurable via + TELEMETRY_TTL_SECONDS environment variable. + """ + return [ + cls.Index('ts', {'expireAfterSeconds': TELEMETRY_TTL_SECONDS}), + cls.Index([('kind', 1), ('ts', -1)], {}), + cls.Index([('runtime', 1), ('device_type', 1), ('ts', -1)], {}), + cls.Index([('job_name', 1), ('ts', -1)], {}), + cls.Index([('result', 1), ('is_infra_error', 1), ('ts', -1)], {}), + ] From a60ab8f290960d5cf1edfa45919f51634f9a7360 Mon Sep 17 00:00:00 2001 From: Denys Fedoryshchenko Date: Wed, 11 Feb 2026 11:42:51 +0200 Subject: [PATCH 2/2] api: Add telemetry API client methods Add abstract Telemetry interface and concrete LatestAPI implementation for pipeline telemetry endpoints: add(events): Bulk insert telemetry events via POST /telemetry find(attributes): Query events with filters via GET /telemetry stats(attributes): Get aggregated statistics via GET /telemetry/stats Signed-off-by: Denys Fedoryshchenko --- kernelci/api/__init__.py | 28 ++++++++++++++++++++++++++++ kernelci/api/latest.py | 20 ++++++++++++++++++++ kernelci/api/models.py | 2 +- 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/kernelci/api/__init__.py b/kernelci/api/__init__.py index 314b80cb83..360d40c211 100644 --- a/kernelci/api/__init__.py +++ b/kernelci/api/__init__.py @@ -244,6 +244,7 @@ def __init__(self, config: kernelci.config.api.API, token: str): Base.__init__(self, data) self._node = self.Node(self.data) # type: ignore[abstract] self._user = self.User(self.data) # type: ignore[abstract] + self._telemetry = self.Telemetry(self.data) # type: ignore[abstract] @property def config(self) -> kernelci.config.api.API: @@ -361,6 +362,33 @@ def node(self) -> Node: """API.Node part of the interface""" return self._node + # --------- + # Telemetry + # --------- + + class Telemetry(abc.ABC, Base): + """Interface to manage telemetry events""" + + @abc.abstractmethod + def add(self, events: list) -> dict: + """Bulk insert telemetry events""" + + @abc.abstractmethod + def find( + self, attributes: Dict[str, str], + offset: Optional[int] = None, limit: Optional[int] = None + ) -> Sequence[dict]: + """Find telemetry events matching the provided attributes""" + + @abc.abstractmethod + def stats(self, attributes: Dict[str, str]) -> list: + """Get aggregated telemetry statistics""" + + @property + def telemetry(self) -> Telemetry: + """API.Telemetry part of the interface""" + return self._telemetry + # ------- # Pub/Sub # ------- diff --git a/kernelci/api/latest.py b/kernelci/api/latest.py index e233719d9a..488e32ef51 100644 --- a/kernelci/api/latest.py +++ b/kernelci/api/latest.py @@ -152,6 +152,26 @@ def bulkset(self, nodes: list, field: str, value: str): } return self._put('batch/nodeset', data=param) + class Telemetry(API.Telemetry): + """Telemetry bindings for the latest API version""" + + def add(self, events: list) -> dict: + """Bulk insert telemetry events""" + return self._post('telemetry', events).json() + + def find( + self, attributes: Dict[str, str], + offset: Optional[int] = None, limit: Optional[int] = None + ) -> Sequence[dict]: + """Find telemetry events matching the provided attributes""" + params = attributes.copy() if attributes else {} + return self._get_paginated(params, 'telemetry', offset, limit) + + def stats(self, attributes: Dict[str, str]) -> list: + """Get aggregated telemetry statistics""" + params = attributes.copy() if attributes else {} + return self._get('telemetry/stats', params=params).json() + def subscribe(self, channel: str, promisc: Optional[bool] = None) -> int: params = {'promisc': promisc} if promisc else None resp = self._post(f'subscribe/{channel}', params=params) diff --git a/kernelci/api/models.py b/kernelci/api/models.py index 5f4f702034..6ba60d82c5 100644 --- a/kernelci/api/models.py +++ b/kernelci/api/models.py @@ -902,7 +902,7 @@ class TelemetryEvent(DatabaseModel): """Telemetry event for tracking pipeline statistics. Captures scheduler submissions, runtime errors, job skips, - and test results across all runtimes (LAVA, Kubernetes, Docker, shell). + and test results across all runtimes. """ ts: datetime = Field( description='Timestamp of the event',