diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 1568f13e..53fd00c3 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -54,7 +54,11 @@ from typing import TYPE_CHECKING, Any import httpx -from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult +from agent_control_telemetry.sinks import ( + BaseControlEventSink, + ControlEventSink, + SinkResult, +) from agent_control.settings import configure_settings, get_settings @@ -888,7 +892,10 @@ def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: """Write events through the active global sink.""" if _event_sink is None: return SinkResult(accepted=0, dropped=len(events)) - return _event_sink.write_events(events) + result = _event_sink.write_events(events) + if isinstance(result, SinkResult): + return result + raise RuntimeError("SDK observability sink must return a synchronous SinkResult.") def sync_shutdown_observability() -> None: diff --git a/server/src/agent_control_server/observability/ingest/direct.py b/server/src/agent_control_server/observability/ingest/direct.py index b2b09e6d..9d6a2faa 100644 --- a/server/src/agent_control_server/observability/ingest/direct.py +++ b/server/src/agent_control_server/observability/ingest/direct.py @@ -1,7 +1,9 @@ """Direct event ingestor implementation. This module provides the DirectEventIngestor, which processes events -immediately (synchronously) by storing them directly to the EventStore. +immediately by writing them to a control-event sink. Existing store-based +callers are preserved by wrapping EventStore instances in the default +EventStoreControlEventSink internally. For high-throughput scenarios, users can implement their own buffered ingestor (e.g., QueuedEventIngestor, RedisEventIngestor). @@ -11,7 +13,9 @@ import logging from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import ControlEventSink, resolve_sink_result +from ..sinks import EventStoreControlEventSink from ..store.base import EventStore from .base import EventIngestor, IngestResult @@ -19,31 +23,38 @@ class DirectEventIngestor(EventIngestor): - """Processes events immediately by storing them to the EventStore. + """Processes events immediately by writing them to a control-event sink. - This is the simplest ingestor implementation. Events are stored - directly to the database, adding ~5-20ms latency per batch. + This is the simplest ingestor implementation. Events are written + directly to the configured sink, adding ~5-20ms latency per batch. For use cases that require lower latency or higher throughput, implement a custom buffered ingestor (e.g., QueuedEventIngestor). Attributes: - store: The EventStore to write events to + sink: The ControlEventSink used to write events log_to_stdout: Whether to log events as structured JSON """ - def __init__(self, store: EventStore, log_to_stdout: bool = False): + def __init__( + self, + store: EventStore | ControlEventSink, + log_to_stdout: bool = False, + ): """Initialize the ingestor. Args: - store: The EventStore to write events to + store: Either an EventStore or a ControlEventSink implementation log_to_stdout: Whether to log events as structured JSON (default: False) """ - self.store = store + if isinstance(store, EventStore): + self.sink: ControlEventSink = EventStoreControlEventSink(store) + else: + self.sink = store self.log_to_stdout = log_to_stdout async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: - """Ingest events by storing them directly to the EventStore. + """Ingest events by writing them directly to the configured sink. Args: events: List of control execution events to ingest @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: dropped = 0 try: - # Store events - processed = await self.store.store(events) + sink_result = await resolve_sink_result(self.sink.write_events(events)) + processed = sink_result.accepted + dropped = sink_result.dropped # Log to stdout if enabled if self.log_to_stdout: diff --git a/server/src/agent_control_server/observability/sinks.py b/server/src/agent_control_server/observability/sinks.py new file mode 100644 index 00000000..b96f4eca --- /dev/null +++ b/server/src/agent_control_server/observability/sinks.py @@ -0,0 +1,23 @@ +"""Server-side sink implementations for observability event delivery.""" + +from __future__ import annotations + +from collections.abc import Sequence + +from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult + +from .store.base import EventStore + + +class EventStoreControlEventSink: + """Write events through an EventStore-backed sink.""" + + def __init__(self, store: EventStore): + self.store = store + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write events to the underlying store and report accepted/dropped counts.""" + stored = await self.store.store(list(events)) + dropped = max(len(events) - stored, 0) + return SinkResult(accepted=stored, dropped=dropped) diff --git a/server/tests/test_observability_direct_ingest.py b/server/tests/test_observability_direct_ingest.py index 1e83f816..f3f6db81 100644 --- a/server/tests/test_observability_direct_ingest.py +++ b/server/tests/test_observability_direct_ingest.py @@ -7,6 +7,7 @@ from uuid import uuid4 from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult from agent_control_server.observability.ingest.direct import DirectEventIngestor from agent_control_server.observability.store.base import EventStore @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used raise NotImplementedError +class CountingSink: + def __init__(self) -> None: + self.calls: list[list[ControlExecutionEvent]] = [] + + async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult: + self.calls.append(events) + return SinkResult(accepted=len(events), dropped=0) + + @pytest.mark.asyncio async def test_direct_ingestor_drops_on_store_error() -> None: # Given: an ingestor with a failing store @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None: # Then: no error is raised assert True + + +@pytest.mark.asyncio +async def test_direct_ingestor_accepts_control_event_sink() -> None: + sink = CountingSink() + ingestor = DirectEventIngestor(sink) + events = [ + ControlExecutionEvent( + trace_id="a" * 32, + span_id="b" * 16, + agent_name="agent-test-01", + control_id=1, + control_name="c", + check_stage="pre", + applies_to="llm_call", + action="observe", + matched=True, + confidence=0.9, + ) + ] + + result = await ingestor.ingest(events) + + assert result.received == 1 + assert result.processed == 1 + assert result.dropped == 0 + assert sink.calls == [events] diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py index 4c77c640..fca59a50 100644 --- a/telemetry/src/agent_control_telemetry/sinks.py +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -2,7 +2,8 @@ from __future__ import annotations -from collections.abc import Sequence +import inspect +from collections.abc import Awaitable, Sequence from dataclasses import dataclass from typing import Protocol @@ -22,10 +23,13 @@ def success(self) -> bool: return self.accepted > 0 +type SinkWriteResult = SinkResult | Awaitable[SinkResult] + + class ControlEventSink(Protocol): """Write-side abstraction for delivering control execution events.""" - def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkWriteResult: """Write a batch of control execution events.""" @@ -47,6 +51,13 @@ def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: return SinkResult(accepted=accepted, dropped=dropped) +async def resolve_sink_result(result: SinkWriteResult) -> SinkResult: + """Resolve a sync or async sink result into a SinkResult.""" + if inspect.isawaitable(result): + return await result + return result + + class AsyncControlEventSink(Protocol): """Async write-side abstraction for delivering control execution events.""" diff --git a/telemetry/tests/test_sinks.py b/telemetry/tests/test_sinks.py index cb04f179..90e6d765 100644 --- a/telemetry/tests/test_sinks.py +++ b/telemetry/tests/test_sinks.py @@ -10,6 +10,7 @@ BaseControlEventSink, SinkResult, ) +from agent_control_telemetry.sinks import resolve_sink_result def _make_event(*, control_id: int) -> ControlExecutionEvent: @@ -101,6 +102,29 @@ async def write_event(self, event: ControlExecutionEvent) -> SinkResult: assert result == SinkResult(accepted=2, dropped=1) +def test_resolve_sink_result_returns_sync_result_directly() -> None: + # Given: a plain SinkResult (not awaitable) + result = SinkResult(accepted=3, dropped=1) + + # When: resolved + resolved = asyncio.run(resolve_sink_result(result)) + + # Then: the same result is returned unchanged + assert resolved == SinkResult(accepted=3, dropped=1) + + +def test_resolve_sink_result_awaits_async_result() -> None: + # Given: a coroutine that returns a SinkResult + async def async_result() -> SinkResult: + return SinkResult(accepted=2, dropped=0) + + # When: resolved + resolved = asyncio.run(resolve_sink_result(async_result())) + + # Then: the awaited result is returned + assert resolved == SinkResult(accepted=2, dropped=0) + + def test_base_async_control_event_sink_single_event_delegates_to_batch_writer() -> None: # Given: an async sink that only implements batch writes class RecordingAsyncSink(BaseAsyncControlEventSink):