diff --git a/kernelci/api/__init__.py b/kernelci/api/__init__.py index 314b80cb83..360d40c211 100644 --- a/kernelci/api/__init__.py +++ b/kernelci/api/__init__.py @@ -244,6 +244,7 @@ def __init__(self, config: kernelci.config.api.API, token: str): Base.__init__(self, data) self._node = self.Node(self.data) # type: ignore[abstract] self._user = self.User(self.data) # type: ignore[abstract] + self._telemetry = self.Telemetry(self.data) # type: ignore[abstract] @property def config(self) -> kernelci.config.api.API: @@ -361,6 +362,33 @@ def node(self) -> Node: """API.Node part of the interface""" return self._node + # --------- + # Telemetry + # --------- + + class Telemetry(abc.ABC, Base): + """Interface to manage telemetry events""" + + @abc.abstractmethod + def add(self, events: list) -> dict: + """Bulk insert telemetry events""" + + @abc.abstractmethod + def find( + self, attributes: Dict[str, str], + offset: Optional[int] = None, limit: Optional[int] = None + ) -> Sequence[dict]: + """Find telemetry events matching the provided attributes""" + + @abc.abstractmethod + def stats(self, attributes: Dict[str, str]) -> list: + """Get aggregated telemetry statistics""" + + @property + def telemetry(self) -> Telemetry: + """API.Telemetry part of the interface""" + return self._telemetry + # ------- # Pub/Sub # ------- diff --git a/kernelci/api/latest.py b/kernelci/api/latest.py index e233719d9a..488e32ef51 100644 --- a/kernelci/api/latest.py +++ b/kernelci/api/latest.py @@ -152,6 +152,26 @@ def bulkset(self, nodes: list, field: str, value: str): } return self._put('batch/nodeset', data=param) + class Telemetry(API.Telemetry): + """Telemetry bindings for the latest API version""" + + def add(self, events: list) -> dict: + """Bulk insert telemetry events""" + return self._post('telemetry', events).json() + + def find( + self, attributes: Dict[str, str], + offset: Optional[int] = None, limit: Optional[int] = None + ) -> Sequence[dict]: + """Find telemetry events matching the provided attributes""" + params = attributes.copy() if attributes else {} + return self._get_paginated(params, 'telemetry', offset, limit) + + def stats(self, attributes: Dict[str, str]) -> list: + """Get aggregated telemetry statistics""" + params = attributes.copy() if attributes else {} + return self._get('telemetry/stats', params=params).json() + def subscribe(self, channel: str, promisc: Optional[bool] = None) -> int: params = {'promisc': promisc} if promisc else None resp = self._post(f'subscribe/{channel}', params=params) diff --git a/kernelci/api/models.py b/kernelci/api/models.py index e3e300fea1..6ba60d82c5 100644 --- a/kernelci/api/models.py +++ b/kernelci/api/models.py @@ -39,6 +39,11 @@ any_url_adapter = TypeAdapter(AnyUrl) any_http_url_adapter = TypeAdapter(AnyHttpUrl) +# TTL configuration for time-limited collections +# Set environment variables to override defaults +EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800')) +TELEMETRY_TTL_SECONDS = int(os.getenv('TELEMETRY_TTL_SECONDS', '1209600')) + class StateValues(str, enum.Enum): """Enumeration to declare values to be used for Node.state""" @@ -855,12 +860,6 @@ def parse_node_obj(node: Node): raise ValueError(f"Unsupported node kind: {node.kind}") -# eventhistory model -# Environment variable to configure TTL (default 7 days = 604800 seconds) -# Set EVENT_HISTORY_TTL_SECONDS to override -EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800')) - - class EventHistory(DatabaseModel): """Event history object model""" timestamp: datetime = Field( @@ -897,3 +896,99 @@ def get_indexes(cls): cls.Index('timestamp', {'expireAfterSeconds': EVENT_HISTORY_TTL_SECONDS}), cls.Index([('channel', 1), ('sequence_id', 1)], {}), ] + + +class TelemetryEvent(DatabaseModel): + """Telemetry event for tracking pipeline statistics. + + Captures scheduler submissions, runtime errors, job skips, + and test results across all runtimes. + """ + ts: datetime = Field( + description='Timestamp of the event', + default_factory=datetime.utcnow + ) + kind: str = Field( + description='Event type: runtime_error, job_submission, ' + 'job_skip, job_result, test_result' + ) + runtime: str = Field( + description='Runtime environment name (e.g. lava-collabora, ' + 'k8s-gke-eu-west4)' + ) + device_type: Optional[str] = Field( + default=None, + description='Device type (e.g. rk3588, qemu)' + ) + device_id: Optional[str] = Field( + default=None, + description='Actual device identifier (LAVA only)' + ) + job_name: Optional[str] = Field( + default=None, + description='Job configuration name' + ) + test_name: Optional[str] = Field( + default=None, + description='Individual test case name' + ) + job_id: Optional[str] = Field( + default=None, + description='Runtime job identifier (e.g. LAVA job ID)' + ) + node_id: Optional[str] = Field( + default=None, + description='API node identifier' + ) + tree: Optional[str] = Field( + default=None, + description='Kernel tree name' + ) + branch: Optional[str] = Field( + default=None, + description='Kernel branch name' + ) + arch: Optional[str] = Field( + default=None, + description='CPU architecture' + ) + result: Optional[str] = Field( + default=None, + description='Result: pass, fail, skip, incomplete' + ) + is_infra_error: bool = Field( + default=False, + description='Whether the failure is an infrastructure error' + ) + error_type: Optional[str] = Field( + default=None, + description='Error category: online_check, submission, ' + 'queue_depth, etc.' + ) + error_msg: Optional[str] = Field( + default=None, + description='Error message details' + ) + retry: int = Field( + default=0, + description='Retry attempt counter' + ) + extra: Dict[str, Any] = Field( + default={}, + description='Additional fields for future extensibility' + ) + + @classmethod + def get_indexes(cls): + """Create indexes for telemetry queries with TTL auto-cleanup. + + Default TTL is 1209600 seconds (14 days), configurable via + TELEMETRY_TTL_SECONDS environment variable. + """ + return [ + cls.Index('ts', {'expireAfterSeconds': TELEMETRY_TTL_SECONDS}), + cls.Index([('kind', 1), ('ts', -1)], {}), + cls.Index([('runtime', 1), ('device_type', 1), ('ts', -1)], {}), + cls.Index([('job_name', 1), ('ts', -1)], {}), + cls.Index([('result', 1), ('is_infra_error', 1), ('ts', -1)], {}), + ]