From afe9246d86e5a98a9ca830c6b92d3bcaa7946c3d Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Sun, 5 Apr 2026 16:08:58 -0700 Subject: [PATCH 1/7] Add telemetry package to support sinks --- pyproject.toml | 3 ++ scripts/build.py | 32 +++++++++--- sdks/python/pyproject.toml | 15 ++++-- sdks/python/src/agent_control/__init__.py | 14 ++++-- .../src/agent_control/evaluation_events.py | 5 +- .../python/src/agent_control/observability.py | 46 ++++++++++++++--- sdks/python/src/agent_control/tracing.py | 2 +- sdks/python/tests/test_observability.py | 44 +++++++++++++++- .../tests/test_observability_updates.py | 6 +-- sdks/python/tests/test_trace_context.py | 2 +- sdks/python/tests/test_tracing.py | 5 +- server/pyproject.toml | 15 ++++-- telemetry/pyproject.toml | 50 +++++++++++++++++++ .../src/agent_control_telemetry}/__init__.py | 7 ++- .../src/agent_control_telemetry/py.typed | 1 + .../src/agent_control_telemetry/sinks.py | 46 +++++++++++++++++ .../agent_control_telemetry}/trace_context.py | 1 - 17 files changed, 256 insertions(+), 38 deletions(-) create mode 100644 telemetry/pyproject.toml rename {sdks/python/src/agent_control/telemetry => telemetry/src/agent_control_telemetry}/__init__.py (64%) create mode 100644 telemetry/src/agent_control_telemetry/py.typed create mode 100644 telemetry/src/agent_control_telemetry/sinks.py rename {sdks/python/src/agent_control/telemetry => telemetry/src/agent_control_telemetry}/trace_context.py (95%) diff --git a/pyproject.toml b/pyproject.toml index c9da2cd2..35d26b30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ members = [ "models", "server", "sdks/python", + "telemetry", "engine", "evaluators/builtin", # NOTE: evaluators/contrib/* excluded - install separately when needed @@ -36,6 +37,7 @@ required-environments = [ agent-control-models = { workspace = true } agent-control-engine = { workspace = true } agent-control-evaluators = { workspace = true } +agent-control-telemetry = { workspace = true } [tool.ruff] line-length = 100 @@ -68,6 +70,7 @@ version_toml = [ "models/pyproject.toml:project.version", "engine/pyproject.toml:project.version", "sdks/python/pyproject.toml:project.version", + "telemetry/pyproject.toml:project.version", "server/pyproject.toml:project.version", "evaluators/builtin/pyproject.toml:project.version", "evaluators/contrib/galileo/pyproject.toml:project.version", diff --git a/scripts/build.py b/scripts/build.py index 498239ca..43095af2 100644 --- a/scripts/build.py +++ b/scripts/build.py @@ -2,8 +2,8 @@ """Build packages for PyPI distribution. This script builds all publishable packages. For SDK and server, it copies internal -packages (models, engine) into the source directories before building, then cleans up -afterward. This allows the published wheels to be self-contained. +packages (models, engine, telemetry) into the source directories before building, +then cleans up afterward. This allows the published wheels to be self-contained. Usage: python scripts/build.py [models|evaluators|sdk|server|galileo|all] @@ -81,7 +81,7 @@ def build_sdk() -> None: print(f"Building agent-control-sdk v{version}") # Clean previous builds and vendored code - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = sdk_src / pkg if target.exists(): shutil.rmtree(target) @@ -99,6 +99,10 @@ def build_sdk() -> None: ROOT / "engine" / "src" / "agent_control_engine", sdk_src / "agent_control_engine", ) + shutil.copytree( + ROOT / "telemetry" / "src" / "agent_control_telemetry", + sdk_src / "agent_control_telemetry", + ) # Inject bundle metadata for conflict detection inject_bundle_metadata( @@ -111,6 +115,11 @@ def build_sdk() -> None: "agent-control-sdk", version, ) + inject_bundle_metadata( + sdk_src / "agent_control_telemetry" / "__init__.py", + "agent-control-sdk", + version, + ) # Set version set_package_version(sdk_dir / "pyproject.toml", version) @@ -120,7 +129,7 @@ def build_sdk() -> None: print(f" Built agent-control-sdk v{version}") finally: # Clean up vendored code (don't commit it) - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = sdk_src / pkg if target.exists(): shutil.rmtree(target) @@ -139,7 +148,7 @@ def build_server() -> None: print(f"Building agent-control-server v{version}") # Clean previous builds and vendored code - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = server_src / pkg if target.exists(): shutil.rmtree(target) @@ -148,7 +157,7 @@ def build_server() -> None: if dist_dir.exists(): shutil.rmtree(dist_dir) - # Copy vendored packages (models and engine only, NOT evaluators) + # Copy vendored packages (models, engine, and telemetry only, NOT evaluators) shutil.copytree( ROOT / "models" / "src" / "agent_control_models", server_src / "agent_control_models", @@ -157,6 +166,10 @@ def build_server() -> None: ROOT / "engine" / "src" / "agent_control_engine", server_src / "agent_control_engine", ) + shutil.copytree( + ROOT / "telemetry" / "src" / "agent_control_telemetry", + server_src / "agent_control_telemetry", + ) # Inject bundle metadata for conflict detection inject_bundle_metadata( @@ -169,6 +182,11 @@ def build_server() -> None: "agent-control-server", version, ) + inject_bundle_metadata( + server_src / "agent_control_telemetry" / "__init__.py", + "agent-control-server", + version, + ) # Set version set_package_version(server_dir / "pyproject.toml", version) @@ -178,7 +196,7 @@ def build_server() -> None: print(f" Built agent-control-server v{version}") finally: # Clean up vendored code (don't commit it) - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = server_src / pkg if target.exists(): shutil.rmtree(target) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index f5c9734e..fe4db670 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -3,7 +3,8 @@ name = "agent-control-sdk" version = "7.3.2" description = "Python SDK for Agent Control - protect your AI agents with controls" requires-python = ">=3.12" -# Note: agent-control-models and agent-control-engine are bundled at build time +# Note: agent-control-models, agent-control-engine, and agent-control-telemetry +# are bundled at build time # Note: agent-control-evaluators is a runtime dependency (NOT vendored) to avoid # duplicate module conflict when galileo extras are installed dependencies = [ @@ -48,6 +49,7 @@ dev = [ "mypy>=1.8.0", "agent-control-models", "agent-control-engine", + "agent-control-telemetry", "agent-control-evaluators", "strands-agents>=1.26.0", # For strands integration tests ] @@ -57,8 +59,14 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -# Note: agent_control_models and agent_control_engine are copied by scripts/build.py -packages = ["src/agent_control", "src/agent_control_models", "src/agent_control_engine"] +# Note: agent_control_models, agent_control_engine, and agent_control_telemetry +# are copied by scripts/build.py +packages = [ + "src/agent_control", + "src/agent_control_models", + "src/agent_control_engine", + "src/agent_control_telemetry", +] [tool.ruff] line-length = 100 @@ -80,6 +88,7 @@ known-first-party = ["agent_control"] [tool.uv.sources] agent-control-models = { workspace = true } agent-control-engine = { workspace = true } +agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../../evaluators/contrib/galileo", editable = true } diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index 24353411..024eaf98 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -68,6 +68,11 @@ async def handle_input(user_message: str) -> str: Step, StepSchema, ) +from agent_control_telemetry.trace_context import ( + clear_trace_context_provider, + get_trace_context_from_provider, + set_trace_context_provider, +) from . import agents, controls, evaluation, evaluators, policies from ._control_registry import ( @@ -86,6 +91,7 @@ async def handle_input(user_message: str) -> str: add_event, configure_logging, get_event_batcher, + get_event_sink, get_log_config, get_logger, init_observability, @@ -93,11 +99,7 @@ async def handle_input(user_message: str) -> str: log_control_evaluation, shutdown_observability, sync_shutdown_observability, -) -from .telemetry import ( - clear_trace_context_provider, - get_trace_context_from_provider, - set_trace_context_provider, + write_events, ) from .tracing import ( get_current_span_id, @@ -1310,9 +1312,11 @@ async def main(): # Observability "init_observability", "add_event", + "write_events", "shutdown_observability", "is_observability_enabled", "get_event_batcher", + "get_event_sink", "configure_logging", "get_log_config", "log_control_evaluation", diff --git a/sdks/python/src/agent_control/evaluation_events.py b/sdks/python/src/agent_control/evaluation_events.py index 0c3f9438..9ce8a249 100644 --- a/sdks/python/src/agent_control/evaluation_events.py +++ b/sdks/python/src/agent_control/evaluation_events.py @@ -11,7 +11,7 @@ EvaluationResponse, ) -from .observability import add_event, get_logger, is_observability_enabled +from .observability import get_logger, is_observability_enabled, write_events _logger = get_logger(__name__) @@ -210,5 +210,4 @@ def enqueue_observability_events(events: list[ControlExecutionEvent]) -> None: if not is_observability_enabled(): return - for event in events: - add_event(event) + write_events(events) diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 96d26813..1568f13e 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -49,10 +49,12 @@ import logging import threading import time +from collections.abc import Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any import httpx +from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult from agent_control.settings import configure_settings, get_settings @@ -791,6 +793,24 @@ def get_stats(self) -> dict: # Global batcher instance _batcher: EventBatcher | None = None +_event_sink: ControlEventSink | None = None + + +class _BatcherControlEventSink(BaseControlEventSink): + """Default SDK sink backed by the existing queue-based EventBatcher.""" + + def __init__(self, batcher: EventBatcher): + self._batcher = batcher + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + accepted = 0 + dropped = 0 + for event in events: + if self._batcher.add_event(event): + accepted += 1 + else: + dropped += 1 + return SinkResult(accepted=accepted, dropped=dropped) def get_event_batcher() -> EventBatcher | None: @@ -803,6 +823,11 @@ def get_event_batcher() -> EventBatcher | None: return _batcher +def get_event_sink() -> ControlEventSink | None: + """Get the active global control-event sink.""" + return _event_sink + + def init_observability( server_url: str | None = None, api_key: str | None = None, @@ -821,7 +846,7 @@ def init_observability( Returns: EventBatcher instance if enabled, None otherwise """ - global _batcher + global _batcher, _event_sink # Check if enabled is_enabled = enabled if enabled is not None else get_settings().observability_enabled @@ -830,13 +855,14 @@ def init_observability( logger.debug("Observability disabled") return None - if _batcher is not None: + if _event_sink is not None: logger.debug("Observability already initialized") return _batcher # Create batcher _batcher = EventBatcher(server_url=server_url, api_key=api_key) _batcher.start() + _event_sink = _BatcherControlEventSink(_batcher) # Register shutdown handler atexit.register(_batcher.shutdown) @@ -855,17 +881,23 @@ def add_event(event: ControlExecutionEvent) -> bool: Returns: True if added, False if observability disabled or event dropped """ - if _batcher is None: - return False - return _batcher.add_event(event) + return write_events([event]).accepted == 1 + + +def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write events through the active global sink.""" + if _event_sink is None: + return SinkResult(accepted=0, dropped=len(events)) + return _event_sink.write_events(events) def sync_shutdown_observability() -> None: """Synchronously shut down observability and flush remaining events.""" - global _batcher + global _batcher, _event_sink if _batcher is not None: _batcher.shutdown() _batcher = None + _event_sink = None async def shutdown_observability() -> None: @@ -880,7 +912,7 @@ async def shutdown_observability() -> None: def is_observability_enabled() -> bool: """Check if observability is enabled and initialized.""" - return _batcher is not None + return _event_sink is not None def log_span_start( diff --git a/sdks/python/src/agent_control/tracing.py b/sdks/python/src/agent_control/tracing.py index 47696b15..43be5b5c 100644 --- a/sdks/python/src/agent_control/tracing.py +++ b/sdks/python/src/agent_control/tracing.py @@ -31,7 +31,7 @@ from contextlib import contextmanager from contextvars import ContextVar, Token -from .telemetry.trace_context import get_trace_context_from_provider +from agent_control_telemetry.trace_context import get_trace_context_from_provider # Context variables for trace/span propagation _trace_id_var: ContextVar[str | None] = ContextVar("trace_id", default=None) diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 04115c2b..740e22f9 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -13,6 +13,7 @@ EventBatcher, add_event, get_event_batcher, + get_event_sink, init_observability, is_observability_enabled, log_span_end, @@ -605,35 +606,58 @@ def test_get_event_batcher_not_initialized(self): # Reset global state import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: assert get_event_batcher() is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink + + def test_get_event_sink_not_initialized(self): + """Test get_event_sink returns None when not initialized.""" + import agent_control.observability as obs + old_batcher = obs._batcher + old_sink = obs._event_sink + obs._batcher = None + obs._event_sink = None + + try: + assert get_event_sink() is None + finally: + obs._batcher = old_batcher + obs._event_sink = old_sink def test_is_observability_enabled_false(self): """Test is_observability_enabled returns False when not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: assert is_observability_enabled() is False finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_add_event_without_batcher(self): """Test add_event returns False when batcher not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = add_event(create_mock_event()) assert result is False finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestInitObservability: @@ -643,19 +667,24 @@ def test_init_disabled_when_explicitly_off(self): """Test that init_observability returns None when explicitly disabled.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = init_observability(enabled=False) assert result is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_init_enabled_creates_batcher(self): """Test that init_observability creates batcher when enabled.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = init_observability( @@ -666,17 +695,21 @@ def test_init_enabled_creates_batcher(self): assert result is not None assert isinstance(result, EventBatcher) assert result._running is True + assert get_event_sink() is not None # Cleanup - result.stop() + obs.sync_shutdown_observability() finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_init_idempotent(self): """Test that init_observability is idempotent.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: batcher1 = init_observability(enabled=True) @@ -684,9 +717,10 @@ def test_init_idempotent(self): assert batcher1 is batcher2 - batcher1.stop() + obs.sync_shutdown_observability() finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestShutdownObservability: @@ -697,6 +731,7 @@ async def test_shutdown_flushes_and_stops(self): """Test that shutdown flushes remaining events and stops batcher.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink try: batcher = init_observability(enabled=True) @@ -710,20 +745,25 @@ async def test_shutdown_flushes_and_stops(self): # Batcher should be stopped and cleared assert obs._batcher is None + assert obs._event_sink is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink @pytest.mark.asyncio async def test_shutdown_without_batcher(self): """Test that shutdown is safe when batcher not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: await shutdown_observability() # Should not raise finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestEventBatcherShutdownConfig: diff --git a/sdks/python/tests/test_observability_updates.py b/sdks/python/tests/test_observability_updates.py index a90ea785..1e7a1025 100644 --- a/sdks/python/tests/test_observability_updates.py +++ b/sdks/python/tests/test_observability_updates.py @@ -15,7 +15,7 @@ enqueue_observability_events, map_applies_to, ) -from agent_control.telemetry.trace_context import ( +from agent_control_telemetry.trace_context import ( clear_trace_context_provider, set_trace_context_provider, ) @@ -322,10 +322,10 @@ def test_enqueue_observability_events_uses_existing_batcher(self): ] with patch("agent_control.evaluation_events.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation_events.add_event") as mock_add: + patch("agent_control.evaluation_events.write_events") as mock_write: enqueue_observability_events(events) - mock_add.assert_called_once_with(events[0]) + mock_write.assert_called_once_with(events) class TestCheckEvaluationWithLocal: diff --git a/sdks/python/tests/test_trace_context.py b/sdks/python/tests/test_trace_context.py index 2c1d727f..445d6cbf 100644 --- a/sdks/python/tests/test_trace_context.py +++ b/sdks/python/tests/test_trace_context.py @@ -1,6 +1,6 @@ """Tests for the telemetry trace context provider interface.""" -from agent_control.telemetry.trace_context import ( +from agent_control_telemetry.trace_context import ( clear_trace_context_provider, get_trace_context_from_provider, set_trace_context_provider, diff --git a/sdks/python/tests/test_tracing.py b/sdks/python/tests/test_tracing.py index 97397b8d..3b0bb629 100644 --- a/sdks/python/tests/test_tracing.py +++ b/sdks/python/tests/test_tracing.py @@ -2,7 +2,10 @@ import pytest -from agent_control.telemetry.trace_context import clear_trace_context_provider, set_trace_context_provider +from agent_control_telemetry.trace_context import ( + clear_trace_context_provider, + set_trace_context_provider, +) from agent_control.tracing import ( _generate_span_id, _generate_trace_id, diff --git a/server/pyproject.toml b/server/pyproject.toml index 9bdec2c0..de2165e5 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -3,7 +3,8 @@ name = "agent-control-server" version = "7.3.2" description = "Server for Agent Control - manage and evaluate controls for AI agents" requires-python = ">=3.12" -# Note: agent-control-models and agent-control-engine are bundled at build time +# Note: agent-control-models, agent-control-engine, and agent-control-telemetry +# are bundled at build time # Note: agent-control-evaluators is a runtime dependency (NOT vendored) to avoid # duplicate module conflict when galileo extras are installed dependencies = [ @@ -43,6 +44,7 @@ dev = [ "types-jsonschema>=4.23.0", "agent-control-models", "agent-control-engine", + "agent-control-telemetry", "agent-control-evaluators", ] @@ -56,9 +58,15 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -# Note: agent_control_models and agent_control_engine are copied by scripts/build.py +# Note: agent_control_models, agent_control_engine, and agent_control_telemetry +# are copied by scripts/build.py # Note: agent_control_evaluators is a runtime dep, not vendored -packages = ["src/agent_control_server", "src/agent_control_models", "src/agent_control_engine"] +packages = [ + "src/agent_control_server", + "src/agent_control_models", + "src/agent_control_engine", + "src/agent_control_telemetry", +] [tool.pytest.ini_options] asyncio_mode = "auto" @@ -87,6 +95,7 @@ known-first-party = ["agent_control_server"] [tool.uv.sources] agent-control-models = { workspace = true } agent-control-engine = { workspace = true } +agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../evaluators/contrib/galileo", editable = true } diff --git a/telemetry/pyproject.toml b/telemetry/pyproject.toml new file mode 100644 index 00000000..fb818ce6 --- /dev/null +++ b/telemetry/pyproject.toml @@ -0,0 +1,50 @@ +[project] +name = "agent-control-telemetry" +version = "7.3.2" +description = "Shared telemetry contracts for Agent Control" +requires-python = ">=3.12" +dependencies = [ + "agent-control-models>=3.0.0", +] +authors = [ + {name = "Agent Control Team"} +] +readme = "../README.md" +license = {text = "Apache-2.0"} + +[dependency-groups] +dev = [ + "pytest>=8.0.0", + "mypy>=1.8.0", + "ruff>=0.1.0", + "agent-control-models", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/agent_control_telemetry"] + +[tool.hatch.build.targets.wheel.force-include] +"src/agent_control_telemetry/py.typed" = "agent_control_telemetry/py.typed" + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = [ + "E", + "W", + "F", + "I", + "UP", +] +ignore = [] + +[tool.ruff.lint.isort] +known-first-party = ["agent_control_telemetry"] + +[tool.uv.sources] +agent-control-models = { workspace = true } diff --git a/sdks/python/src/agent_control/telemetry/__init__.py b/telemetry/src/agent_control_telemetry/__init__.py similarity index 64% rename from sdks/python/src/agent_control/telemetry/__init__.py rename to telemetry/src/agent_control_telemetry/__init__.py index c488d4a2..fd9e3115 100644 --- a/sdks/python/src/agent_control/telemetry/__init__.py +++ b/telemetry/src/agent_control_telemetry/__init__.py @@ -1,4 +1,6 @@ -"""Telemetry interfaces for provider-agnostic tracing.""" +"""Shared telemetry contracts for Agent Control.""" + +from .sinks import BaseControlEventSink, ControlEventSink, SinkResult from .trace_context import ( TraceContext, TraceContextProvider, @@ -8,6 +10,9 @@ ) __all__ = [ + "BaseControlEventSink", + "ControlEventSink", + "SinkResult", "TraceContext", "TraceContextProvider", "clear_trace_context_provider", diff --git a/telemetry/src/agent_control_telemetry/py.typed b/telemetry/src/agent_control_telemetry/py.typed new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/telemetry/src/agent_control_telemetry/py.typed @@ -0,0 +1 @@ + diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py new file mode 100644 index 00000000..f2249cce --- /dev/null +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -0,0 +1,46 @@ +"""Shared sink contracts for control execution event delivery.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, Sequence + +from agent_control_models import ControlExecutionEvent + + +@dataclass(frozen=True) +class SinkResult: + """Summarizes the outcome of a sink write attempt.""" + + accepted: int + dropped: int = 0 + + @property + def success(self) -> bool: + """Return True when at least one event was accepted.""" + return self.accepted > 0 + + +class ControlEventSink(Protocol): + """Write-side abstraction for delivering control execution events.""" + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + + +class BaseControlEventSink(ControlEventSink): + """Minimal helper base for sink implementations.""" + + def write_event(self, event: ControlExecutionEvent) -> SinkResult: + """Write a single control execution event.""" + return self.write_events([event]) + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + accepted = 0 + dropped = 0 + for event in events: + result = self.write_event(event) + accepted += result.accepted + dropped += result.dropped + return SinkResult(accepted=accepted, dropped=dropped) diff --git a/sdks/python/src/agent_control/telemetry/trace_context.py b/telemetry/src/agent_control_telemetry/trace_context.py similarity index 95% rename from sdks/python/src/agent_control/telemetry/trace_context.py rename to telemetry/src/agent_control_telemetry/trace_context.py index a871fb29..134e7ac1 100644 --- a/sdks/python/src/agent_control/telemetry/trace_context.py +++ b/telemetry/src/agent_control_telemetry/trace_context.py @@ -30,7 +30,6 @@ def get_trace_context_from_provider() -> TraceContext | None: try: trace_context = _trace_context_provider() except Exception: - # Provider failures should not break control evaluation. return None if trace_context is None: From 8b57ca7c4159c9053667ea1b525161ef9dd561e0 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Sun, 5 Apr 2026 16:19:45 -0700 Subject: [PATCH 2/7] add telemetry to server docker --- server/Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/Dockerfile b/server/Dockerfile index 7ca7b30d..30db201a 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -42,6 +42,7 @@ COPY server/alembic/ alembic/ # Copy shared dependencies (Workspaces) COPY models/ models/ COPY engine/ engine/ +COPY telemetry/ telemetry/ COPY evaluators/ evaluators/ # Copy server application @@ -50,19 +51,21 @@ COPY server/ server/ # Copy the exported UI bundle that FastAPI serves for hosted deployments COPY --from=ui-builder /ui/out ./server/ui-dist -# Install server + bundled runtime packages (engine/evaluators/models) +# Install server + bundled runtime packages (engine/evaluators/models/telemetry) # We ignore the lockfile if it's missing or out of sync to ensure build succeeds during dev RUN uv sync \ --package agent-control-server \ --package agent-control-engine \ --package agent-control-evaluators \ --package agent-control-models \ + --package agent-control-telemetry \ --no-dev \ || uv sync \ --package agent-control-server \ --package agent-control-engine \ --package agent-control-evaluators \ --package agent-control-models \ + --package agent-control-telemetry \ --no-dev \ --frozen From 92cee53ff333fda88c18aa137e9d667a8731b1a2 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Sun, 5 Apr 2026 16:46:51 -0700 Subject: [PATCH 3/7] make defauteventingestor use ControlEventSink --- .../python/src/agent_control/observability.py | 11 +++++- .../observability/ingest/direct.py | 34 +++++++++++------ .../observability/sinks.py | 23 ++++++++++++ .../tests/test_observability_direct_ingest.py | 37 +++++++++++++++++++ .../src/agent_control_telemetry/sinks.py | 15 +++++++- 5 files changed, 105 insertions(+), 15 deletions(-) create mode 100644 server/src/agent_control_server/observability/sinks.py diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 1568f13e..53fd00c3 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -54,7 +54,11 @@ from typing import TYPE_CHECKING, Any import httpx -from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult +from agent_control_telemetry.sinks import ( + BaseControlEventSink, + ControlEventSink, + SinkResult, +) from agent_control.settings import configure_settings, get_settings @@ -888,7 +892,10 @@ def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: """Write events through the active global sink.""" if _event_sink is None: return SinkResult(accepted=0, dropped=len(events)) - return _event_sink.write_events(events) + result = _event_sink.write_events(events) + if isinstance(result, SinkResult): + return result + raise RuntimeError("SDK observability sink must return a synchronous SinkResult.") def sync_shutdown_observability() -> None: diff --git a/server/src/agent_control_server/observability/ingest/direct.py b/server/src/agent_control_server/observability/ingest/direct.py index b2b09e6d..9d6a2faa 100644 --- a/server/src/agent_control_server/observability/ingest/direct.py +++ b/server/src/agent_control_server/observability/ingest/direct.py @@ -1,7 +1,9 @@ """Direct event ingestor implementation. This module provides the DirectEventIngestor, which processes events -immediately (synchronously) by storing them directly to the EventStore. +immediately by writing them to a control-event sink. Existing store-based +callers are preserved by wrapping EventStore instances in the default +EventStoreControlEventSink internally. For high-throughput scenarios, users can implement their own buffered ingestor (e.g., QueuedEventIngestor, RedisEventIngestor). @@ -11,7 +13,9 @@ import logging from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import ControlEventSink, resolve_sink_result +from ..sinks import EventStoreControlEventSink from ..store.base import EventStore from .base import EventIngestor, IngestResult @@ -19,31 +23,38 @@ class DirectEventIngestor(EventIngestor): - """Processes events immediately by storing them to the EventStore. + """Processes events immediately by writing them to a control-event sink. - This is the simplest ingestor implementation. Events are stored - directly to the database, adding ~5-20ms latency per batch. + This is the simplest ingestor implementation. Events are written + directly to the configured sink, adding ~5-20ms latency per batch. For use cases that require lower latency or higher throughput, implement a custom buffered ingestor (e.g., QueuedEventIngestor). Attributes: - store: The EventStore to write events to + sink: The ControlEventSink used to write events log_to_stdout: Whether to log events as structured JSON """ - def __init__(self, store: EventStore, log_to_stdout: bool = False): + def __init__( + self, + store: EventStore | ControlEventSink, + log_to_stdout: bool = False, + ): """Initialize the ingestor. Args: - store: The EventStore to write events to + store: Either an EventStore or a ControlEventSink implementation log_to_stdout: Whether to log events as structured JSON (default: False) """ - self.store = store + if isinstance(store, EventStore): + self.sink: ControlEventSink = EventStoreControlEventSink(store) + else: + self.sink = store self.log_to_stdout = log_to_stdout async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: - """Ingest events by storing them directly to the EventStore. + """Ingest events by writing them directly to the configured sink. Args: events: List of control execution events to ingest @@ -59,8 +70,9 @@ async def ingest(self, events: list[ControlExecutionEvent]) -> IngestResult: dropped = 0 try: - # Store events - processed = await self.store.store(events) + sink_result = await resolve_sink_result(self.sink.write_events(events)) + processed = sink_result.accepted + dropped = sink_result.dropped # Log to stdout if enabled if self.log_to_stdout: diff --git a/server/src/agent_control_server/observability/sinks.py b/server/src/agent_control_server/observability/sinks.py new file mode 100644 index 00000000..b96f4eca --- /dev/null +++ b/server/src/agent_control_server/observability/sinks.py @@ -0,0 +1,23 @@ +"""Server-side sink implementations for observability event delivery.""" + +from __future__ import annotations + +from collections.abc import Sequence + +from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult + +from .store.base import EventStore + + +class EventStoreControlEventSink: + """Write events through an EventStore-backed sink.""" + + def __init__(self, store: EventStore): + self.store = store + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write events to the underlying store and report accepted/dropped counts.""" + stored = await self.store.store(list(events)) + dropped = max(len(events) - stored, 0) + return SinkResult(accepted=stored, dropped=dropped) diff --git a/server/tests/test_observability_direct_ingest.py b/server/tests/test_observability_direct_ingest.py index 1e83f816..f3f6db81 100644 --- a/server/tests/test_observability_direct_ingest.py +++ b/server/tests/test_observability_direct_ingest.py @@ -7,6 +7,7 @@ from uuid import uuid4 from agent_control_models.observability import ControlExecutionEvent +from agent_control_telemetry.sinks import SinkResult from agent_control_server.observability.ingest.direct import DirectEventIngestor from agent_control_server.observability.store.base import EventStore @@ -37,6 +38,15 @@ async def query_events(self, query): # pragma: no cover - not used raise NotImplementedError +class CountingSink: + def __init__(self) -> None: + self.calls: list[list[ControlExecutionEvent]] = [] + + async def write_events(self, events: list[ControlExecutionEvent]) -> SinkResult: + self.calls.append(events) + return SinkResult(accepted=len(events), dropped=0) + + @pytest.mark.asyncio async def test_direct_ingestor_drops_on_store_error() -> None: # Given: an ingestor with a failing store @@ -117,3 +127,30 @@ async def test_direct_ingestor_flush_noop() -> None: # Then: no error is raised assert True + + +@pytest.mark.asyncio +async def test_direct_ingestor_accepts_control_event_sink() -> None: + sink = CountingSink() + ingestor = DirectEventIngestor(sink) + events = [ + ControlExecutionEvent( + trace_id="a" * 32, + span_id="b" * 16, + agent_name="agent-test-01", + control_id=1, + control_name="c", + check_stage="pre", + applies_to="llm_call", + action="observe", + matched=True, + confidence=0.9, + ) + ] + + result = await ingestor.ingest(events) + + assert result.received == 1 + assert result.processed == 1 + assert result.dropped == 0 + assert sink.calls == [events] diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py index f2249cce..b4d57246 100644 --- a/telemetry/src/agent_control_telemetry/sinks.py +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -2,8 +2,9 @@ from __future__ import annotations +import inspect from dataclasses import dataclass -from typing import Protocol, Sequence +from typing import Awaitable, Protocol, Sequence, TypeAlias from agent_control_models import ControlExecutionEvent @@ -21,10 +22,13 @@ def success(self) -> bool: return self.accepted > 0 +SinkWriteResult: TypeAlias = SinkResult | Awaitable[SinkResult] + + class ControlEventSink(Protocol): """Write-side abstraction for delivering control execution events.""" - def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkWriteResult: """Write a batch of control execution events.""" @@ -44,3 +48,10 @@ def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: accepted += result.accepted dropped += result.dropped return SinkResult(accepted=accepted, dropped=dropped) + + +async def resolve_sink_result(result: SinkWriteResult) -> SinkResult: + """Resolve a sync or async sink result into a SinkResult.""" + if inspect.isawaitable(result): + return await result + return result From d8b10aab0f82dad4290be2c0c559535415e4104b Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Mon, 6 Apr 2026 09:30:01 -0700 Subject: [PATCH 4/7] add telemetry README --- telemetry/README.md | 7 +++++++ telemetry/pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 telemetry/README.md diff --git a/telemetry/README.md b/telemetry/README.md new file mode 100644 index 00000000..c30fdcfc --- /dev/null +++ b/telemetry/README.md @@ -0,0 +1,7 @@ +# Agent Control Telemetry + +Shared telemetry contracts for Agent Control. + +This package contains reusable telemetry abstractions shared across the +Agent Control SDK and server, including sink contracts and trace-context +helpers. diff --git a/telemetry/pyproject.toml b/telemetry/pyproject.toml index fb818ce6..efdf4812 100644 --- a/telemetry/pyproject.toml +++ b/telemetry/pyproject.toml @@ -9,7 +9,7 @@ dependencies = [ authors = [ {name = "Agent Control Team"} ] -readme = "../README.md" +readme = "README.md" license = {text = "Apache-2.0"} [dependency-groups] From 2c623e5e79a78f0e19c6e91d9b3e4fdbfc1d7742 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Tue, 7 Apr 2026 11:02:22 -0700 Subject: [PATCH 5/7] merge from main --- telemetry/src/agent_control_telemetry/sinks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py index 26e52e38..652d5af5 100644 --- a/telemetry/src/agent_control_telemetry/sinks.py +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -5,7 +5,7 @@ import inspect from collections.abc import Sequence from dataclasses import dataclass -from typing import Awaitable, Protocol, Sequence, TypeAlias +from typing import Awaitable, Protocol, TypeAlias from agent_control_models import ControlExecutionEvent @@ -57,6 +57,7 @@ async def resolve_sink_result(result: SinkWriteResult) -> SinkResult: return await result return result + class AsyncControlEventSink(Protocol): """Async write-side abstraction for delivering control execution events.""" From 9025e4551c603db9fcc4a451bdbeb5612606b6f1 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Tue, 7 Apr 2026 11:52:35 -0700 Subject: [PATCH 6/7] type error --- telemetry/src/agent_control_telemetry/sinks.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py index 652d5af5..fca59a50 100644 --- a/telemetry/src/agent_control_telemetry/sinks.py +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -3,9 +3,9 @@ from __future__ import annotations import inspect -from collections.abc import Sequence +from collections.abc import Awaitable, Sequence from dataclasses import dataclass -from typing import Awaitable, Protocol, TypeAlias +from typing import Protocol from agent_control_models import ControlExecutionEvent @@ -23,7 +23,7 @@ def success(self) -> bool: return self.accepted > 0 -SinkWriteResult: TypeAlias = SinkResult | Awaitable[SinkResult] +type SinkWriteResult = SinkResult | Awaitable[SinkResult] class ControlEventSink(Protocol): From d93564ba8ae30989d173f43a815a957b99510905 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Tue, 7 Apr 2026 12:13:46 -0700 Subject: [PATCH 7/7] coverage --- telemetry/tests/test_sinks.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/telemetry/tests/test_sinks.py b/telemetry/tests/test_sinks.py index cb04f179..90e6d765 100644 --- a/telemetry/tests/test_sinks.py +++ b/telemetry/tests/test_sinks.py @@ -10,6 +10,7 @@ BaseControlEventSink, SinkResult, ) +from agent_control_telemetry.sinks import resolve_sink_result def _make_event(*, control_id: int) -> ControlExecutionEvent: @@ -101,6 +102,29 @@ async def write_event(self, event: ControlExecutionEvent) -> SinkResult: assert result == SinkResult(accepted=2, dropped=1) +def test_resolve_sink_result_returns_sync_result_directly() -> None: + # Given: a plain SinkResult (not awaitable) + result = SinkResult(accepted=3, dropped=1) + + # When: resolved + resolved = asyncio.run(resolve_sink_result(result)) + + # Then: the same result is returned unchanged + assert resolved == SinkResult(accepted=3, dropped=1) + + +def test_resolve_sink_result_awaits_async_result() -> None: + # Given: a coroutine that returns a SinkResult + async def async_result() -> SinkResult: + return SinkResult(accepted=2, dropped=0) + + # When: resolved + resolved = asyncio.run(resolve_sink_result(async_result())) + + # Then: the awaited result is returned + assert resolved == SinkResult(accepted=2, dropped=0) + + def test_base_async_control_event_sink_single_event_delegates_to_batch_writer() -> None: # Given: an async sink that only implements batch writes class RecordingAsyncSink(BaseAsyncControlEventSink):