Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions kernelci/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def __init__(self, config: kernelci.config.api.API, token: str):
Base.__init__(self, data)
self._node = self.Node(self.data) # type: ignore[abstract]
self._user = self.User(self.data) # type: ignore[abstract]
self._telemetry = self.Telemetry(self.data) # type: ignore[abstract]

@property
def config(self) -> kernelci.config.api.API:
Expand Down Expand Up @@ -361,6 +362,33 @@ def node(self) -> Node:
"""API.Node part of the interface"""
return self._node

# ---------
# Telemetry
# ---------

class Telemetry(abc.ABC, Base):
"""Interface to manage telemetry events"""

@abc.abstractmethod
def add(self, events: list) -> dict:
"""Bulk insert telemetry events"""

@abc.abstractmethod
def find(
self, attributes: Dict[str, str],
offset: Optional[int] = None, limit: Optional[int] = None
) -> Sequence[dict]:
"""Find telemetry events matching the provided attributes"""

@abc.abstractmethod
def stats(self, attributes: Dict[str, str]) -> list:
"""Get aggregated telemetry statistics"""

@property
def telemetry(self) -> Telemetry:
"""API.Telemetry part of the interface"""
return self._telemetry

# -------
# Pub/Sub
# -------
Expand Down
20 changes: 20 additions & 0 deletions kernelci/api/latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ def bulkset(self, nodes: list, field: str, value: str):
}
return self._put('batch/nodeset', data=param)

class Telemetry(API.Telemetry):
"""Telemetry bindings for the latest API version"""

def add(self, events: list) -> dict:
"""Bulk insert telemetry events"""
return self._post('telemetry', events).json()

def find(
self, attributes: Dict[str, str],
offset: Optional[int] = None, limit: Optional[int] = None
) -> Sequence[dict]:
"""Find telemetry events matching the provided attributes"""
params = attributes.copy() if attributes else {}
return self._get_paginated(params, 'telemetry', offset, limit)

def stats(self, attributes: Dict[str, str]) -> list:
"""Get aggregated telemetry statistics"""
params = attributes.copy() if attributes else {}
return self._get('telemetry/stats', params=params).json()

def subscribe(self, channel: str, promisc: Optional[bool] = None) -> int:
params = {'promisc': promisc} if promisc else None
resp = self._post(f'subscribe/{channel}', params=params)
Expand Down
107 changes: 101 additions & 6 deletions kernelci/api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
any_url_adapter = TypeAdapter(AnyUrl)
any_http_url_adapter = TypeAdapter(AnyHttpUrl)

# TTL configuration for time-limited collections
# Set environment variables to override defaults
EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800'))
TELEMETRY_TTL_SECONDS = int(os.getenv('TELEMETRY_TTL_SECONDS', '1209600'))


class StateValues(str, enum.Enum):
"""Enumeration to declare values to be used for Node.state"""
Expand Down Expand Up @@ -203,7 +208,7 @@
description="Parent commit SHA",
default=None
)
state: StateValues = Field(

Check failure on line 211 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "str", variable has type "StateValues") [assignment]
default=StateValues.RUNNING.value,
description="State of the node"
)
Expand Down Expand Up @@ -440,7 +445,7 @@
default='checkout',
description='Type of the object',
)
data: CheckoutData = Field(

Check failure on line 448 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "CheckoutData", base class "Node" defined the type as "dict[str, Any] | None") [assignment]
description="Checkout details",
default=None
)
Expand Down Expand Up @@ -515,7 +520,7 @@
default='kbuild',
description='Type of the object',
)
data: KbuildData = Field(

Check failure on line 523 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "KbuildData", base class "Node" defined the type as "dict[str, Any] | None") [assignment]
description="Kbuild details",
default=None
)
Expand Down Expand Up @@ -610,7 +615,7 @@
default='test',
description='Type of the object',
)
data: TestData = Field(

Check failure on line 618 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "TestData", base class "Node" defined the type as "dict[str, Any] | None") [assignment]
description="Test details",
default=None
)
Expand All @@ -627,7 +632,7 @@
default='job',
description='Type of the object',
)
data: TestData = Field(

Check failure on line 635 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "TestData", base class "Node" defined the type as "dict[str, Any] | None") [assignment]
description="Test suite details",
default=None
)
Expand Down Expand Up @@ -715,14 +720,14 @@
default='regression',
description='Type of the object',
)
result: Optional[ResultValues] = Field(

Check failure on line 723 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "str", variable has type "ResultValues | None") [assignment]
default=ResultValues.FAIL.value,
description=("PASS if the regression is 'inactive', that is, if the "
"test has ever passed after the regression was created. "
"FAIL if the regression is still 'active', ie. the test "
"is still failing"),
)
data: RegressionData = Field(

Check failure on line 730 in kernelci/api/models.py

View workflow job for this annotation

GitHub Actions / Lint

Incompatible types in assignment (expression has type "RegressionData", base class "Node" defined the type as "dict[str, Any] | None") [assignment]
description="Regression details",
default=None
)
Expand Down Expand Up @@ -855,12 +860,6 @@
raise ValueError(f"Unsupported node kind: {node.kind}")


# eventhistory model
# Environment variable to configure TTL (default 7 days = 604800 seconds)
# Set EVENT_HISTORY_TTL_SECONDS to override
EVENT_HISTORY_TTL_SECONDS = int(os.getenv('EVENT_HISTORY_TTL_SECONDS', '604800'))


class EventHistory(DatabaseModel):
"""Event history object model"""
timestamp: datetime = Field(
Expand Down Expand Up @@ -897,3 +896,99 @@
cls.Index('timestamp', {'expireAfterSeconds': EVENT_HISTORY_TTL_SECONDS}),
cls.Index([('channel', 1), ('sequence_id', 1)], {}),
]


class TelemetryEvent(DatabaseModel):
"""Telemetry event for tracking pipeline statistics.

Captures scheduler submissions, runtime errors, job skips,
and test results across all runtimes.
"""
ts: datetime = Field(
description='Timestamp of the event',
default_factory=datetime.utcnow
)
kind: str = Field(
description='Event type: runtime_error, job_submission, '
'job_skip, job_result, test_result'
)
runtime: str = Field(
description='Runtime environment name (e.g. lava-collabora, '
'k8s-gke-eu-west4)'
)
device_type: Optional[str] = Field(
default=None,
description='Device type (e.g. rk3588, qemu)'
)
device_id: Optional[str] = Field(
default=None,
description='Actual device identifier (LAVA only)'
)
job_name: Optional[str] = Field(
default=None,
description='Job configuration name'
)
test_name: Optional[str] = Field(
default=None,
description='Individual test case name'
)
job_id: Optional[str] = Field(
default=None,
description='Runtime job identifier (e.g. LAVA job ID)'
)
node_id: Optional[str] = Field(
default=None,
description='API node identifier'
)
tree: Optional[str] = Field(
default=None,
description='Kernel tree name'
)
branch: Optional[str] = Field(
default=None,
description='Kernel branch name'
)
arch: Optional[str] = Field(
default=None,
description='CPU architecture'
)
result: Optional[str] = Field(
default=None,
description='Result: pass, fail, skip, incomplete'
)
is_infra_error: bool = Field(
default=False,
description='Whether the failure is an infrastructure error'
)
error_type: Optional[str] = Field(
default=None,
description='Error category: online_check, submission, '
'queue_depth, etc.'
)
error_msg: Optional[str] = Field(
default=None,
description='Error message details'
)
retry: int = Field(
default=0,
description='Retry attempt counter'
)
extra: Dict[str, Any] = Field(
default={},
description='Additional fields for future extensibility'
)

@classmethod
def get_indexes(cls):
"""Create indexes for telemetry queries with TTL auto-cleanup.

Default TTL is 1209600 seconds (14 days), configurable via
TELEMETRY_TTL_SECONDS environment variable.
"""
return [
cls.Index('ts', {'expireAfterSeconds': TELEMETRY_TTL_SECONDS}),
cls.Index([('kind', 1), ('ts', -1)], {}),
cls.Index([('runtime', 1), ('device_type', 1), ('ts', -1)], {}),
cls.Index([('job_name', 1), ('ts', -1)], {}),
cls.Index([('result', 1), ('is_infra_error', 1), ('ts', -1)], {}),
]