Skip to content
11 changes: 9 additions & 2 deletions sdks/python/src/agent_control/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
from typing import TYPE_CHECKING, Any

import httpx
from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult
from agent_control_telemetry.sinks import (
BaseControlEventSink,
ControlEventSink,
SinkResult,
)

from agent_control.settings import configure_settings, get_settings

Expand Down Expand Up @@ -888,7 +892,10 @@ def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult:
"""Write events through the active global sink."""
if _event_sink is None:
return SinkResult(accepted=0, dropped=len(events))
return _event_sink.write_events(events)
result = _event_sink.write_events(events)
if isinstance(result, SinkResult):
return result
raise RuntimeError("SDK observability sink must return a synchronous SinkResult.")


def sync_shutdown_observability() -> None:
Expand Down
34 changes: 23 additions & 11 deletions server/src/agent_control_server/observability/ingest/direct.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Direct event ingestor implementation.

This module provides the DirectEventIngestor, which processes events
immediately (synchronously) by storing them directly to the EventStore.
immediately by writing them to a control-event sink. Existing store-based
callers are preserved by wrapping EventStore instances in the default
EventStoreControlEventSink internally.

For high-throughput scenarios, users can implement their own buffered
ingestor (e.g., QueuedEventIngestor, RedisEventIngestor).
Expand All @@ -11,39 +13,48 @@
import logging

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import ControlEventSink, resolve_sink_result

from ..sinks import EventStoreControlEventSink
from ..store.base import EventStore
from .base import EventIngestor, IngestResult

logger = logging.getLogger(__name__)


class DirectEventIngestor(EventIngestor):
"""Processes events immediately by storing them to the EventStore.
"""Processes events immediately by writing them to a control-event sink.

This is the simplest ingestor implementation. Events are stored
directly to the database, adding ~5-20ms latency per batch.
This is the simplest ingestor implementation. Events are written
directly to the configured sink, adding ~5-20ms latency per batch.

For use cases that require lower latency or higher throughput,
implement a custom buffered ingestor (e.g., QueuedEventIngestor).

Attributes:
store: The EventStore to write events to
sink: The ControlEventSink used to write events
log_to_stdout: Whether to log events as structured JSON
"""

def __init__(self, store: EventStore, log_to_stdout: bool = False):
def __init__(
self,
store: EventStore | ControlEventSink,
log_to_stdout: bool = False,
):
"""Initialize the ingestor.

Args:
store: The EventStore to write events to
store: Either an EventStore or a ControlEventSink implementation
log_to_stdout: Whether to log events as structured JSON (default: False)
"""
self.store = store
if isinstance(store, EventStore):
self.sink: ControlEventSink = EventStoreControlEventSink(store)
else:
self.sink = store
self.log_to_stdout = log_to_stdout

async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
"""Ingest events by storing them directly to the EventStore.
"""Ingest events by writing them directly to the configured sink.

Args:
events: List of control execution events to ingest
Expand All @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult:
dropped = 0

try:
# Store events
processed = await self.store.store(events)
sink_result = await resolve_sink_result(self.sink.write_events(events))
processed = sink_result.accepted
dropped = sink_result.dropped

# Log to stdout if enabled
if self.log_to_stdout:
Expand Down
23 changes: 23 additions & 0 deletions server/src/agent_control_server/observability/sinks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Server-side sink implementations for observability event delivery."""

from __future__ import annotations

from collections.abc import Sequence

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult

from .store.base import EventStore


class EventStoreControlEventSink:
"""Write events through an EventStore-backed sink."""

def __init__(self, store: EventStore):
self.store = store

async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
"""Write events to the underlying store and report accepted/dropped counts."""
stored = await self.store.store(list(events))
dropped = max(len(events) - stored, 0)
return SinkResult(accepted=stored, dropped=dropped)
37 changes: 37 additions & 0 deletions server/tests/test_observability_direct_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

from agent_control_models.observability import ControlExecutionEvent
from agent_control_telemetry.sinks import SinkResult
from agent_control_server.observability.ingest.direct import DirectEventIngestor
from agent_control_server.observability.store.base import EventStore

Expand Down Expand Up @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used
raise NotImplementedError


class CountingSink:
def __init__(self) -> None:
self.calls: list[list[ControlExecutionEvent]] = []

async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult:
self.calls.append(events)
return SinkResult(accepted=len(events), dropped=0)


@pytest.mark.asyncio
async def test_direct_ingestor_drops_on_store_error() -> None:
# Given: an ingestor with a failing store
Expand Down Expand Up @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None:

# Then: no error is raised
assert True


@pytest.mark.asyncio
async def test_direct_ingestor_accepts_control_event_sink() -> None:
sink = CountingSink()
ingestor = DirectEventIngestor(sink)
events = [
ControlExecutionEvent(
trace_id="a" * 32,
span_id="b" * 16,
agent_name="agent-test-01",
control_id=1,
control_name="c",
check_stage="pre",
applies_to="llm_call",
action="observe",
matched=True,
confidence=0.9,
)
]

result = await ingestor.ingest(events)

assert result.received == 1
assert result.processed == 1
assert result.dropped == 0
assert sink.calls == [events]
15 changes: 13 additions & 2 deletions telemetry/src/agent_control_telemetry/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from __future__ import annotations

from collections.abc import Sequence
import inspect
from collections.abc import Awaitable, Sequence
from dataclasses import dataclass
from typing import Protocol

Expand All @@ -22,10 +23,13 @@ def success(self) -> bool:
return self.accepted > 0


type SinkWriteResult = SinkResult | Awaitable[SinkResult]


class ControlEventSink(Protocol):
"""Write-side abstraction for delivering control execution events."""

def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkWriteResult:
"""Write a batch of control execution events."""


Expand All @@ -47,6 +51,13 @@ def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult:
return SinkResult(accepted=accepted, dropped=dropped)


async def resolve_sink_result(result: SinkWriteResult) -> SinkResult:
"""Resolve a sync or async sink result into a SinkResult."""
if inspect.isawaitable(result):
return await result
return result


class AsyncControlEventSink(Protocol):
"""Async write-side abstraction for delivering control execution events."""

Expand Down
24 changes: 24 additions & 0 deletions telemetry/tests/test_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
BaseControlEventSink,
SinkResult,
)
from agent_control_telemetry.sinks import resolve_sink_result


def _make_event(*, control_id: int) -> ControlExecutionEvent:
Expand Down Expand Up @@ -101,6 +102,29 @@ async def write_event(self, event: ControlExecutionEvent) -> SinkResult:
assert result == SinkResult(accepted=2, dropped=1)


def test_resolve_sink_result_returns_sync_result_directly() -> None:
# Given: a plain SinkResult (not awaitable)
result = SinkResult(accepted=3, dropped=1)

# When: resolved
resolved = asyncio.run(resolve_sink_result(result))

# Then: the same result is returned unchanged
assert resolved == SinkResult(accepted=3, dropped=1)


def test_resolve_sink_result_awaits_async_result() -> None:
# Given: a coroutine that returns a SinkResult
async def async_result() -> SinkResult:
return SinkResult(accepted=2, dropped=0)

# When: resolved
resolved = asyncio.run(resolve_sink_result(async_result()))

# Then: the awaited result is returned
assert resolved == SinkResult(accepted=2, dropped=0)


def test_base_async_control_event_sink_single_event_delegates_to_batch_writer() -> None:
# Given: an async sink that only implements batch writes
class RecordingAsyncSink(BaseAsyncControlEventSink):
Expand Down
Loading