From afe9246d86e5a98a9ca830c6b92d3bcaa7946c3d Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Sun, 5 Apr 2026 16:08:58 -0700 Subject: [PATCH 1/4] Add telemetry package to support sinks --- pyproject.toml | 3 ++ scripts/build.py | 32 +++++++++--- sdks/python/pyproject.toml | 15 ++++-- sdks/python/src/agent_control/__init__.py | 14 ++++-- .../src/agent_control/evaluation_events.py | 5 +- .../python/src/agent_control/observability.py | 46 ++++++++++++++--- sdks/python/src/agent_control/tracing.py | 2 +- sdks/python/tests/test_observability.py | 44 +++++++++++++++- .../tests/test_observability_updates.py | 6 +-- sdks/python/tests/test_trace_context.py | 2 +- sdks/python/tests/test_tracing.py | 5 +- server/pyproject.toml | 15 ++++-- telemetry/pyproject.toml | 50 +++++++++++++++++++ .../src/agent_control_telemetry}/__init__.py | 7 ++- .../src/agent_control_telemetry/py.typed | 1 + .../src/agent_control_telemetry/sinks.py | 46 +++++++++++++++++ .../agent_control_telemetry}/trace_context.py | 1 - 17 files changed, 256 insertions(+), 38 deletions(-) create mode 100644 telemetry/pyproject.toml rename {sdks/python/src/agent_control/telemetry => telemetry/src/agent_control_telemetry}/__init__.py (64%) create mode 100644 telemetry/src/agent_control_telemetry/py.typed create mode 100644 telemetry/src/agent_control_telemetry/sinks.py rename {sdks/python/src/agent_control/telemetry => telemetry/src/agent_control_telemetry}/trace_context.py (95%) diff --git a/pyproject.toml b/pyproject.toml index c9da2cd2..35d26b30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ members = [ "models", "server", "sdks/python", + "telemetry", "engine", "evaluators/builtin", # NOTE: evaluators/contrib/* excluded - install separately when needed @@ -36,6 +37,7 @@ required-environments = [ agent-control-models = { workspace = true } agent-control-engine = { workspace = true } agent-control-evaluators = { workspace = true } +agent-control-telemetry = { workspace = true } [tool.ruff] line-length = 100 @@ -68,6 +70,7 @@ version_toml = [ "models/pyproject.toml:project.version", "engine/pyproject.toml:project.version", "sdks/python/pyproject.toml:project.version", + "telemetry/pyproject.toml:project.version", "server/pyproject.toml:project.version", "evaluators/builtin/pyproject.toml:project.version", "evaluators/contrib/galileo/pyproject.toml:project.version", diff --git a/scripts/build.py b/scripts/build.py index 498239ca..43095af2 100644 --- a/scripts/build.py +++ b/scripts/build.py @@ -2,8 +2,8 @@ """Build packages for PyPI distribution. This script builds all publishable packages. For SDK and server, it copies internal -packages (models, engine) into the source directories before building, then cleans up -afterward. This allows the published wheels to be self-contained. +packages (models, engine, telemetry) into the source directories before building, +then cleans up afterward. This allows the published wheels to be self-contained. Usage: python scripts/build.py [models|evaluators|sdk|server|galileo|all] @@ -81,7 +81,7 @@ def build_sdk() -> None: print(f"Building agent-control-sdk v{version}") # Clean previous builds and vendored code - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = sdk_src / pkg if target.exists(): shutil.rmtree(target) @@ -99,6 +99,10 @@ def build_sdk() -> None: ROOT / "engine" / "src" / "agent_control_engine", sdk_src / "agent_control_engine", ) + shutil.copytree( + ROOT / "telemetry" / "src" / "agent_control_telemetry", + sdk_src / "agent_control_telemetry", + ) # Inject bundle metadata for conflict detection inject_bundle_metadata( @@ -111,6 +115,11 @@ def build_sdk() -> None: "agent-control-sdk", version, ) + inject_bundle_metadata( + sdk_src / "agent_control_telemetry" / "__init__.py", + "agent-control-sdk", + version, + ) # Set version set_package_version(sdk_dir / "pyproject.toml", version) @@ -120,7 +129,7 @@ def build_sdk() -> None: print(f" Built agent-control-sdk v{version}") finally: # Clean up vendored code (don't commit it) - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = sdk_src / pkg if target.exists(): shutil.rmtree(target) @@ -139,7 +148,7 @@ def build_server() -> None: print(f"Building agent-control-server v{version}") # Clean previous builds and vendored code - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = server_src / pkg if target.exists(): shutil.rmtree(target) @@ -148,7 +157,7 @@ def build_server() -> None: if dist_dir.exists(): shutil.rmtree(dist_dir) - # Copy vendored packages (models and engine only, NOT evaluators) + # Copy vendored packages (models, engine, and telemetry only, NOT evaluators) shutil.copytree( ROOT / "models" / "src" / "agent_control_models", server_src / "agent_control_models", @@ -157,6 +166,10 @@ def build_server() -> None: ROOT / "engine" / "src" / "agent_control_engine", server_src / "agent_control_engine", ) + shutil.copytree( + ROOT / "telemetry" / "src" / "agent_control_telemetry", + server_src / "agent_control_telemetry", + ) # Inject bundle metadata for conflict detection inject_bundle_metadata( @@ -169,6 +182,11 @@ def build_server() -> None: "agent-control-server", version, ) + inject_bundle_metadata( + server_src / "agent_control_telemetry" / "__init__.py", + "agent-control-server", + version, + ) # Set version set_package_version(server_dir / "pyproject.toml", version) @@ -178,7 +196,7 @@ def build_server() -> None: print(f" Built agent-control-server v{version}") finally: # Clean up vendored code (don't commit it) - for pkg in ["agent_control_models", "agent_control_engine"]: + for pkg in ["agent_control_models", "agent_control_engine", "agent_control_telemetry"]: target = server_src / pkg if target.exists(): shutil.rmtree(target) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index f5c9734e..fe4db670 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -3,7 +3,8 @@ name = "agent-control-sdk" version = "7.3.2" description = "Python SDK for Agent Control - protect your AI agents with controls" requires-python = ">=3.12" -# Note: agent-control-models and agent-control-engine are bundled at build time +# Note: agent-control-models, agent-control-engine, and agent-control-telemetry +# are bundled at build time # Note: agent-control-evaluators is a runtime dependency (NOT vendored) to avoid # duplicate module conflict when galileo extras are installed dependencies = [ @@ -48,6 +49,7 @@ dev = [ "mypy>=1.8.0", "agent-control-models", "agent-control-engine", + "agent-control-telemetry", "agent-control-evaluators", "strands-agents>=1.26.0", # For strands integration tests ] @@ -57,8 +59,14 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -# Note: agent_control_models and agent_control_engine are copied by scripts/build.py -packages = ["src/agent_control", "src/agent_control_models", "src/agent_control_engine"] +# Note: agent_control_models, agent_control_engine, and agent_control_telemetry +# are copied by scripts/build.py +packages = [ + "src/agent_control", + "src/agent_control_models", + "src/agent_control_engine", + "src/agent_control_telemetry", +] [tool.ruff] line-length = 100 @@ -80,6 +88,7 @@ known-first-party = ["agent_control"] [tool.uv.sources] agent-control-models = { workspace = true } agent-control-engine = { workspace = true } +agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../../evaluators/contrib/galileo", editable = true } diff --git a/sdks/python/src/agent_control/__init__.py b/sdks/python/src/agent_control/__init__.py index 24353411..024eaf98 100644 --- a/sdks/python/src/agent_control/__init__.py +++ b/sdks/python/src/agent_control/__init__.py @@ -68,6 +68,11 @@ async def handle_input(user_message: str) -> str: Step, StepSchema, ) +from agent_control_telemetry.trace_context import ( + clear_trace_context_provider, + get_trace_context_from_provider, + set_trace_context_provider, +) from . import agents, controls, evaluation, evaluators, policies from ._control_registry import ( @@ -86,6 +91,7 @@ async def handle_input(user_message: str) -> str: add_event, configure_logging, get_event_batcher, + get_event_sink, get_log_config, get_logger, init_observability, @@ -93,11 +99,7 @@ async def handle_input(user_message: str) -> str: log_control_evaluation, shutdown_observability, sync_shutdown_observability, -) -from .telemetry import ( - clear_trace_context_provider, - get_trace_context_from_provider, - set_trace_context_provider, + write_events, ) from .tracing import ( get_current_span_id, @@ -1310,9 +1312,11 @@ async def main(): # Observability "init_observability", "add_event", + "write_events", "shutdown_observability", "is_observability_enabled", "get_event_batcher", + "get_event_sink", "configure_logging", "get_log_config", "log_control_evaluation", diff --git a/sdks/python/src/agent_control/evaluation_events.py b/sdks/python/src/agent_control/evaluation_events.py index 0c3f9438..9ce8a249 100644 --- a/sdks/python/src/agent_control/evaluation_events.py +++ b/sdks/python/src/agent_control/evaluation_events.py @@ -11,7 +11,7 @@ EvaluationResponse, ) -from .observability import add_event, get_logger, is_observability_enabled +from .observability import get_logger, is_observability_enabled, write_events _logger = get_logger(__name__) @@ -210,5 +210,4 @@ def enqueue_observability_events(events: list[ControlExecutionEvent]) -> None: if not is_observability_enabled(): return - for event in events: - add_event(event) + write_events(events) diff --git a/sdks/python/src/agent_control/observability.py b/sdks/python/src/agent_control/observability.py index 96d26813..1568f13e 100644 --- a/sdks/python/src/agent_control/observability.py +++ b/sdks/python/src/agent_control/observability.py @@ -49,10 +49,12 @@ import logging import threading import time +from collections.abc import Sequence from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any import httpx +from agent_control_telemetry.sinks import BaseControlEventSink, ControlEventSink, SinkResult from agent_control.settings import configure_settings, get_settings @@ -791,6 +793,24 @@ def get_stats(self) -> dict: # Global batcher instance _batcher: EventBatcher | None = None +_event_sink: ControlEventSink | None = None + + +class _BatcherControlEventSink(BaseControlEventSink): + """Default SDK sink backed by the existing queue-based EventBatcher.""" + + def __init__(self, batcher: EventBatcher): + self._batcher = batcher + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + accepted = 0 + dropped = 0 + for event in events: + if self._batcher.add_event(event): + accepted += 1 + else: + dropped += 1 + return SinkResult(accepted=accepted, dropped=dropped) def get_event_batcher() -> EventBatcher | None: @@ -803,6 +823,11 @@ def get_event_batcher() -> EventBatcher | None: return _batcher +def get_event_sink() -> ControlEventSink | None: + """Get the active global control-event sink.""" + return _event_sink + + def init_observability( server_url: str | None = None, api_key: str | None = None, @@ -821,7 +846,7 @@ def init_observability( Returns: EventBatcher instance if enabled, None otherwise """ - global _batcher + global _batcher, _event_sink # Check if enabled is_enabled = enabled if enabled is not None else get_settings().observability_enabled @@ -830,13 +855,14 @@ def init_observability( logger.debug("Observability disabled") return None - if _batcher is not None: + if _event_sink is not None: logger.debug("Observability already initialized") return _batcher # Create batcher _batcher = EventBatcher(server_url=server_url, api_key=api_key) _batcher.start() + _event_sink = _BatcherControlEventSink(_batcher) # Register shutdown handler atexit.register(_batcher.shutdown) @@ -855,17 +881,23 @@ def add_event(event: ControlExecutionEvent) -> bool: Returns: True if added, False if observability disabled or event dropped """ - if _batcher is None: - return False - return _batcher.add_event(event) + return write_events([event]).accepted == 1 + + +def write_events(events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write events through the active global sink.""" + if _event_sink is None: + return SinkResult(accepted=0, dropped=len(events)) + return _event_sink.write_events(events) def sync_shutdown_observability() -> None: """Synchronously shut down observability and flush remaining events.""" - global _batcher + global _batcher, _event_sink if _batcher is not None: _batcher.shutdown() _batcher = None + _event_sink = None async def shutdown_observability() -> None: @@ -880,7 +912,7 @@ async def shutdown_observability() -> None: def is_observability_enabled() -> bool: """Check if observability is enabled and initialized.""" - return _batcher is not None + return _event_sink is not None def log_span_start( diff --git a/sdks/python/src/agent_control/tracing.py b/sdks/python/src/agent_control/tracing.py index 47696b15..43be5b5c 100644 --- a/sdks/python/src/agent_control/tracing.py +++ b/sdks/python/src/agent_control/tracing.py @@ -31,7 +31,7 @@ from contextlib import contextmanager from contextvars import ContextVar, Token -from .telemetry.trace_context import get_trace_context_from_provider +from agent_control_telemetry.trace_context import get_trace_context_from_provider # Context variables for trace/span propagation _trace_id_var: ContextVar[str | None] = ContextVar("trace_id", default=None) diff --git a/sdks/python/tests/test_observability.py b/sdks/python/tests/test_observability.py index 04115c2b..740e22f9 100644 --- a/sdks/python/tests/test_observability.py +++ b/sdks/python/tests/test_observability.py @@ -13,6 +13,7 @@ EventBatcher, add_event, get_event_batcher, + get_event_sink, init_observability, is_observability_enabled, log_span_end, @@ -605,35 +606,58 @@ def test_get_event_batcher_not_initialized(self): # Reset global state import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: assert get_event_batcher() is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink + + def test_get_event_sink_not_initialized(self): + """Test get_event_sink returns None when not initialized.""" + import agent_control.observability as obs + old_batcher = obs._batcher + old_sink = obs._event_sink + obs._batcher = None + obs._event_sink = None + + try: + assert get_event_sink() is None + finally: + obs._batcher = old_batcher + obs._event_sink = old_sink def test_is_observability_enabled_false(self): """Test is_observability_enabled returns False when not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: assert is_observability_enabled() is False finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_add_event_without_batcher(self): """Test add_event returns False when batcher not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = add_event(create_mock_event()) assert result is False finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestInitObservability: @@ -643,19 +667,24 @@ def test_init_disabled_when_explicitly_off(self): """Test that init_observability returns None when explicitly disabled.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = init_observability(enabled=False) assert result is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_init_enabled_creates_batcher(self): """Test that init_observability creates batcher when enabled.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: result = init_observability( @@ -666,17 +695,21 @@ def test_init_enabled_creates_batcher(self): assert result is not None assert isinstance(result, EventBatcher) assert result._running is True + assert get_event_sink() is not None # Cleanup - result.stop() + obs.sync_shutdown_observability() finally: obs._batcher = old_batcher + obs._event_sink = old_sink def test_init_idempotent(self): """Test that init_observability is idempotent.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: batcher1 = init_observability(enabled=True) @@ -684,9 +717,10 @@ def test_init_idempotent(self): assert batcher1 is batcher2 - batcher1.stop() + obs.sync_shutdown_observability() finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestShutdownObservability: @@ -697,6 +731,7 @@ async def test_shutdown_flushes_and_stops(self): """Test that shutdown flushes remaining events and stops batcher.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink try: batcher = init_observability(enabled=True) @@ -710,20 +745,25 @@ async def test_shutdown_flushes_and_stops(self): # Batcher should be stopped and cleared assert obs._batcher is None + assert obs._event_sink is None finally: obs._batcher = old_batcher + obs._event_sink = old_sink @pytest.mark.asyncio async def test_shutdown_without_batcher(self): """Test that shutdown is safe when batcher not initialized.""" import agent_control.observability as obs old_batcher = obs._batcher + old_sink = obs._event_sink obs._batcher = None + obs._event_sink = None try: await shutdown_observability() # Should not raise finally: obs._batcher = old_batcher + obs._event_sink = old_sink class TestEventBatcherShutdownConfig: diff --git a/sdks/python/tests/test_observability_updates.py b/sdks/python/tests/test_observability_updates.py index a90ea785..1e7a1025 100644 --- a/sdks/python/tests/test_observability_updates.py +++ b/sdks/python/tests/test_observability_updates.py @@ -15,7 +15,7 @@ enqueue_observability_events, map_applies_to, ) -from agent_control.telemetry.trace_context import ( +from agent_control_telemetry.trace_context import ( clear_trace_context_provider, set_trace_context_provider, ) @@ -322,10 +322,10 @@ def test_enqueue_observability_events_uses_existing_batcher(self): ] with patch("agent_control.evaluation_events.is_observability_enabled", return_value=True), \ - patch("agent_control.evaluation_events.add_event") as mock_add: + patch("agent_control.evaluation_events.write_events") as mock_write: enqueue_observability_events(events) - mock_add.assert_called_once_with(events[0]) + mock_write.assert_called_once_with(events) class TestCheckEvaluationWithLocal: diff --git a/sdks/python/tests/test_trace_context.py b/sdks/python/tests/test_trace_context.py index 2c1d727f..445d6cbf 100644 --- a/sdks/python/tests/test_trace_context.py +++ b/sdks/python/tests/test_trace_context.py @@ -1,6 +1,6 @@ """Tests for the telemetry trace context provider interface.""" -from agent_control.telemetry.trace_context import ( +from agent_control_telemetry.trace_context import ( clear_trace_context_provider, get_trace_context_from_provider, set_trace_context_provider, diff --git a/sdks/python/tests/test_tracing.py b/sdks/python/tests/test_tracing.py index 97397b8d..3b0bb629 100644 --- a/sdks/python/tests/test_tracing.py +++ b/sdks/python/tests/test_tracing.py @@ -2,7 +2,10 @@ import pytest -from agent_control.telemetry.trace_context import clear_trace_context_provider, set_trace_context_provider +from agent_control_telemetry.trace_context import ( + clear_trace_context_provider, + set_trace_context_provider, +) from agent_control.tracing import ( _generate_span_id, _generate_trace_id, diff --git a/server/pyproject.toml b/server/pyproject.toml index 9bdec2c0..de2165e5 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -3,7 +3,8 @@ name = "agent-control-server" version = "7.3.2" description = "Server for Agent Control - manage and evaluate controls for AI agents" requires-python = ">=3.12" -# Note: agent-control-models and agent-control-engine are bundled at build time +# Note: agent-control-models, agent-control-engine, and agent-control-telemetry +# are bundled at build time # Note: agent-control-evaluators is a runtime dependency (NOT vendored) to avoid # duplicate module conflict when galileo extras are installed dependencies = [ @@ -43,6 +44,7 @@ dev = [ "types-jsonschema>=4.23.0", "agent-control-models", "agent-control-engine", + "agent-control-telemetry", "agent-control-evaluators", ] @@ -56,9 +58,15 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -# Note: agent_control_models and agent_control_engine are copied by scripts/build.py +# Note: agent_control_models, agent_control_engine, and agent_control_telemetry +# are copied by scripts/build.py # Note: agent_control_evaluators is a runtime dep, not vendored -packages = ["src/agent_control_server", "src/agent_control_models", "src/agent_control_engine"] +packages = [ + "src/agent_control_server", + "src/agent_control_models", + "src/agent_control_engine", + "src/agent_control_telemetry", +] [tool.pytest.ini_options] asyncio_mode = "auto" @@ -87,6 +95,7 @@ known-first-party = ["agent_control_server"] [tool.uv.sources] agent-control-models = { workspace = true } agent-control-engine = { workspace = true } +agent-control-telemetry = { workspace = true } agent-control-evaluators = { workspace = true } # For local dev: use local galileo package instead of PyPI agent-control-evaluator-galileo = { path = "../evaluators/contrib/galileo", editable = true } diff --git a/telemetry/pyproject.toml b/telemetry/pyproject.toml new file mode 100644 index 00000000..fb818ce6 --- /dev/null +++ b/telemetry/pyproject.toml @@ -0,0 +1,50 @@ +[project] +name = "agent-control-telemetry" +version = "7.3.2" +description = "Shared telemetry contracts for Agent Control" +requires-python = ">=3.12" +dependencies = [ + "agent-control-models>=3.0.0", +] +authors = [ + {name = "Agent Control Team"} +] +readme = "../README.md" +license = {text = "Apache-2.0"} + +[dependency-groups] +dev = [ + "pytest>=8.0.0", + "mypy>=1.8.0", + "ruff>=0.1.0", + "agent-control-models", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/agent_control_telemetry"] + +[tool.hatch.build.targets.wheel.force-include] +"src/agent_control_telemetry/py.typed" = "agent_control_telemetry/py.typed" + +[tool.ruff] +line-length = 100 + +[tool.ruff.lint] +select = [ + "E", + "W", + "F", + "I", + "UP", +] +ignore = [] + +[tool.ruff.lint.isort] +known-first-party = ["agent_control_telemetry"] + +[tool.uv.sources] +agent-control-models = { workspace = true } diff --git a/sdks/python/src/agent_control/telemetry/__init__.py b/telemetry/src/agent_control_telemetry/__init__.py similarity index 64% rename from sdks/python/src/agent_control/telemetry/__init__.py rename to telemetry/src/agent_control_telemetry/__init__.py index c488d4a2..fd9e3115 100644 --- a/sdks/python/src/agent_control/telemetry/__init__.py +++ b/telemetry/src/agent_control_telemetry/__init__.py @@ -1,4 +1,6 @@ -"""Telemetry interfaces for provider-agnostic tracing.""" +"""Shared telemetry contracts for Agent Control.""" + +from .sinks import BaseControlEventSink, ControlEventSink, SinkResult from .trace_context import ( TraceContext, TraceContextProvider, @@ -8,6 +10,9 @@ ) __all__ = [ + "BaseControlEventSink", + "ControlEventSink", + "SinkResult", "TraceContext", "TraceContextProvider", "clear_trace_context_provider", diff --git a/telemetry/src/agent_control_telemetry/py.typed b/telemetry/src/agent_control_telemetry/py.typed new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/telemetry/src/agent_control_telemetry/py.typed @@ -0,0 +1 @@ + diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py new file mode 100644 index 00000000..f2249cce --- /dev/null +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -0,0 +1,46 @@ +"""Shared sink contracts for control execution event delivery.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Protocol, Sequence + +from agent_control_models import ControlExecutionEvent + + +@dataclass(frozen=True) +class SinkResult: + """Summarizes the outcome of a sink write attempt.""" + + accepted: int + dropped: int = 0 + + @property + def success(self) -> bool: + """Return True when at least one event was accepted.""" + return self.accepted > 0 + + +class ControlEventSink(Protocol): + """Write-side abstraction for delivering control execution events.""" + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + + +class BaseControlEventSink(ControlEventSink): + """Minimal helper base for sink implementations.""" + + def write_event(self, event: ControlExecutionEvent) -> SinkResult: + """Write a single control execution event.""" + return self.write_events([event]) + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + accepted = 0 + dropped = 0 + for event in events: + result = self.write_event(event) + accepted += result.accepted + dropped += result.dropped + return SinkResult(accepted=accepted, dropped=dropped) diff --git a/sdks/python/src/agent_control/telemetry/trace_context.py b/telemetry/src/agent_control_telemetry/trace_context.py similarity index 95% rename from sdks/python/src/agent_control/telemetry/trace_context.py rename to telemetry/src/agent_control_telemetry/trace_context.py index a871fb29..134e7ac1 100644 --- a/sdks/python/src/agent_control/telemetry/trace_context.py +++ b/telemetry/src/agent_control_telemetry/trace_context.py @@ -30,7 +30,6 @@ def get_trace_context_from_provider() -> TraceContext | None: try: trace_context = _trace_context_provider() except Exception: - # Provider failures should not break control evaluation. return None if trace_context is None: From 8b57ca7c4159c9053667ea1b525161ef9dd561e0 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Sun, 5 Apr 2026 16:19:45 -0700 Subject: [PATCH 2/4] add telemetry to server docker --- server/Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/Dockerfile b/server/Dockerfile index 7ca7b30d..30db201a 100644 --- a/server/Dockerfile +++ b/server/Dockerfile @@ -42,6 +42,7 @@ COPY server/alembic/ alembic/ # Copy shared dependencies (Workspaces) COPY models/ models/ COPY engine/ engine/ +COPY telemetry/ telemetry/ COPY evaluators/ evaluators/ # Copy server application @@ -50,19 +51,21 @@ COPY server/ server/ # Copy the exported UI bundle that FastAPI serves for hosted deployments COPY --from=ui-builder /ui/out ./server/ui-dist -# Install server + bundled runtime packages (engine/evaluators/models) +# Install server + bundled runtime packages (engine/evaluators/models/telemetry) # We ignore the lockfile if it's missing or out of sync to ensure build succeeds during dev RUN uv sync \ --package agent-control-server \ --package agent-control-engine \ --package agent-control-evaluators \ --package agent-control-models \ + --package agent-control-telemetry \ --no-dev \ || uv sync \ --package agent-control-server \ --package agent-control-engine \ --package agent-control-evaluators \ --package agent-control-models \ + --package agent-control-telemetry \ --no-dev \ --frozen From d8b10aab0f82dad4290be2c0c559535415e4104b Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Mon, 6 Apr 2026 09:30:01 -0700 Subject: [PATCH 3/4] add telemetry README --- telemetry/README.md | 7 +++++++ telemetry/pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) create mode 100644 telemetry/README.md diff --git a/telemetry/README.md b/telemetry/README.md new file mode 100644 index 00000000..c30fdcfc --- /dev/null +++ b/telemetry/README.md @@ -0,0 +1,7 @@ +# Agent Control Telemetry + +Shared telemetry contracts for Agent Control. + +This package contains reusable telemetry abstractions shared across the +Agent Control SDK and server, including sink contracts and trace-context +helpers. diff --git a/telemetry/pyproject.toml b/telemetry/pyproject.toml index fb818ce6..efdf4812 100644 --- a/telemetry/pyproject.toml +++ b/telemetry/pyproject.toml @@ -9,7 +9,7 @@ dependencies = [ authors = [ {name = "Agent Control Team"} ] -readme = "../README.md" +readme = "README.md" license = {text = "Apache-2.0"} [dependency-groups] From 8948d97a164a72f68d3c6f158240fd52b8cf7971 Mon Sep 17 00:00:00 2001 From: "namrata.ghadi" Date: Mon, 6 Apr 2026 17:56:38 -0700 Subject: [PATCH 4/4] add tests and address comments --- .github/workflows/ci.yml | 2 +- Makefile | 36 ++++-- codecov.yml | 1 + telemetry/Makefile | 42 ++++++ telemetry/README.md | 2 +- .../src/agent_control_telemetry/__init__.py | 10 +- .../src/agent_control_telemetry/sinks.py | 28 +++- .../agent_control_telemetry/trace_context.py | 2 +- telemetry/tests/test_sinks.py | 122 ++++++++++++++++++ telemetry/tests/test_trace_context.py | 75 +++++++++++ 10 files changed, 307 insertions(+), 13 deletions(-) create mode 100644 telemetry/Makefile create mode 100644 telemetry/tests/test_sinks.py create mode 100644 telemetry/tests/test_trace_context.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 02c45e7c..ac171778 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,7 +62,7 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: - files: coverage-models.xml,coverage-engine.xml,coverage-server.xml,coverage-sdk.xml + files: coverage-models.xml,coverage-engine.xml,coverage-telemetry.xml,coverage-server.xml,coverage-sdk.xml fail_ci_if_error: false token: ${{ secrets.CODECOV_TOKEN }} diff --git a/Makefile b/Makefile index b858a3ed..e11ac6a0 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,11 @@ -.PHONY: help sync openapi-spec openapi-spec-check test test-extras test-all models-test test-models test-sdk lint lint-fix typecheck check build build-models build-server build-sdk publish publish-models publish-server publish-sdk hooks-install hooks-uninstall prepush evaluators-test evaluators-lint evaluators-lint-fix evaluators-typecheck evaluators-build galileo-test galileo-lint galileo-lint-fix galileo-typecheck galileo-build sdk-ts-generate sdk-ts-overlay-test sdk-ts-name-check sdk-ts-generate-check sdk-ts-build sdk-ts-test sdk-ts-lint sdk-ts-typecheck sdk-ts-release-check sdk-ts-publish-dry-run sdk-ts-publish +.PHONY: help sync openapi-spec openapi-spec-check test test-extras test-all models-test test-models test-sdk lint lint-fix typecheck check build build-models build-server build-sdk publish publish-models publish-server publish-sdk hooks-install hooks-uninstall prepush evaluators-test evaluators-lint evaluators-lint-fix evaluators-typecheck evaluators-build galileo-test galileo-lint galileo-lint-fix galileo-typecheck galileo-build sdk-ts-generate sdk-ts-overlay-test sdk-ts-name-check sdk-ts-generate-check sdk-ts-build sdk-ts-test sdk-ts-lint sdk-ts-typecheck sdk-ts-release-check sdk-ts-publish-dry-run sdk-ts-publish telemetry-test telemetry-lint telemetry-lint-fix telemetry-typecheck telemetry-build telemetry-publish # Workspace package names PACK_MODELS := agent-control-models PACK_SERVER := agent-control-server PACK_SDK := agent-control PACK_ENGINE := agent-control-engine +PACK_TELEMETRY := agent-control-telemetry PACK_EVALUATORS := agent-control-evaluators OPENAPI_SPEC_PATH := server/.generated/openapi.json @@ -14,6 +15,7 @@ SERVER_DIR := server SDK_DIR := sdks/python TS_SDK_DIR := sdks/typescript ENGINE_DIR := engine +TELEMETRY_DIR := telemetry EVALUATORS_DIR := evaluators/builtin GALILEO_DIR := evaluators/contrib/galileo UI_DIR := ui @@ -31,7 +33,7 @@ help: @echo " make openapi-spec-check - verify OpenAPI generation succeeds" @echo "" @echo "Test:" - @echo " make test - run tests for core packages (models, server, engine, sdk, evaluators)" + @echo " make test - run tests for core packages (models, telemetry, server, engine, sdk, evaluators)" @echo " make models-test - run shared model tests with coverage" @echo " make test-extras - run tests for contrib evaluators (galileo, etc.)" @echo " make test-all - run all tests (core + extras)" @@ -82,13 +84,16 @@ openapi-spec-check: openapi-spec # Test # --------------------------- -test: models-test server-test engine-test sdk-test evaluators-test +test: models-test telemetry-test server-test engine-test sdk-test evaluators-test models-test: cd $(MODELS_DIR) && uv run pytest --cov=src --cov-report=xml:../coverage-models.xml -q test-models: models-test +telemetry-test: + $(MAKE) -C $(TELEMETRY_DIR) test + # Run tests for contrib evaluators (not included in default test target) test-extras: galileo-test @@ -102,26 +107,35 @@ check: test lint typecheck # Quality # --------------------------- -lint: engine-lint evaluators-lint +lint: engine-lint telemetry-lint evaluators-lint uv run --package $(PACK_MODELS) ruff check --config pyproject.toml models/src uv run --package $(PACK_SERVER) ruff check --config pyproject.toml server/src uv run --package $(PACK_SDK) ruff check --config pyproject.toml sdks/python/src -lint-fix: engine-lint-fix evaluators-lint-fix +lint-fix: engine-lint-fix telemetry-lint-fix evaluators-lint-fix uv run --package $(PACK_MODELS) ruff check --config pyproject.toml --fix models/src uv run --package $(PACK_SERVER) ruff check --config pyproject.toml --fix server/src uv run --package $(PACK_SDK) ruff check --config pyproject.toml --fix sdks/python/src -typecheck: engine-typecheck evaluators-typecheck +typecheck: engine-typecheck telemetry-typecheck evaluators-typecheck uv run --package $(PACK_MODELS) mypy --config-file pyproject.toml models/src uv run --package $(PACK_SERVER) mypy --config-file pyproject.toml server/src uv run --package $(PACK_SDK) mypy --config-file pyproject.toml sdks/python/src +telemetry-lint: + $(MAKE) -C $(TELEMETRY_DIR) lint + +telemetry-lint-fix: + $(MAKE) -C $(TELEMETRY_DIR) lint-fix + +telemetry-typecheck: + $(MAKE) -C $(TELEMETRY_DIR) typecheck + # --------------------------- # Build / Publish # --------------------------- -build: build-models build-server build-sdk engine-build evaluators-build +build: build-models build-server build-sdk engine-build telemetry-build evaluators-build build-models: cd $(MODELS_DIR) && uv build @@ -132,7 +146,10 @@ build-server: build-sdk: cd $(SDK_DIR) && uv build -publish: publish-models publish-server publish-sdk engine-publish +telemetry-build: + cd $(TELEMETRY_DIR) && uv build + +publish: publish-models publish-server publish-sdk engine-publish telemetry-publish publish-models: cd $(MODELS_DIR) && uv publish @@ -143,6 +160,9 @@ publish-server: publish-sdk: cd $(SDK_DIR) && uv publish +telemetry-publish: + cd $(TELEMETRY_DIR) && uv publish + # --------------------------- # Git hooks # --------------------------- diff --git a/codecov.yml b/codecov.yml index 55249608..cd3c83db 100644 --- a/codecov.yml +++ b/codecov.yml @@ -3,4 +3,5 @@ fixes: - "agent_control_engine/::engine/src/agent_control_engine/" - "agent_control_models/::models/src/agent_control_models/" - "agent_control_evaluators/::evaluators/builtin/src/agent_control_evaluators/" + - "agent_control_telemetry/::telemetry/src/agent_control_telemetry/" - "agent_control/::sdks/python/src/agent_control/" diff --git a/telemetry/Makefile b/telemetry/Makefile new file mode 100644 index 00000000..cf15773e --- /dev/null +++ b/telemetry/Makefile @@ -0,0 +1,42 @@ +.PHONY: help sync test lint lint-fix typecheck build publish + +PACKAGE := agent-control-telemetry + +help: + @echo "Agent Control Telemetry - Makefile commands" + @echo "" + @echo "Setup:" + @echo " make sync - uv sync dependencies" + @echo "" + @echo "Quality:" + @echo " make lint - run ruff check (using root config)" + @echo " make lint-fix - run ruff check --fix" + @echo " make typecheck - run mypy (using root config)" + @echo "" + @echo "Test:" + @echo " make test - run pytest" + @echo "" + @echo "Build:" + @echo " make build - build package" + @echo " make publish - publish package" + +sync: + uv sync + +test: + uv run pytest --cov=src --cov-report=xml:../coverage-telemetry.xml -q + +lint: + uv run ruff check --config ../pyproject.toml src/ + +lint-fix: + uv run ruff check --config ../pyproject.toml --fix src/ + +typecheck: + uv run mypy --config-file ../pyproject.toml src/ + +build: + uv build + +publish: + uv publish diff --git a/telemetry/README.md b/telemetry/README.md index c30fdcfc..f81d5e6f 100644 --- a/telemetry/README.md +++ b/telemetry/README.md @@ -3,5 +3,5 @@ Shared telemetry contracts for Agent Control. This package contains reusable telemetry abstractions shared across the -Agent Control SDK and server, including sink contracts and trace-context +Agent Control SDK and server, including sync and async sink contracts and trace-context helpers. diff --git a/telemetry/src/agent_control_telemetry/__init__.py b/telemetry/src/agent_control_telemetry/__init__.py index fd9e3115..e0b99652 100644 --- a/telemetry/src/agent_control_telemetry/__init__.py +++ b/telemetry/src/agent_control_telemetry/__init__.py @@ -1,6 +1,12 @@ """Shared telemetry contracts for Agent Control.""" -from .sinks import BaseControlEventSink, ControlEventSink, SinkResult +from .sinks import ( + AsyncControlEventSink, + BaseAsyncControlEventSink, + BaseControlEventSink, + ControlEventSink, + SinkResult, +) from .trace_context import ( TraceContext, TraceContextProvider, @@ -10,6 +16,8 @@ ) __all__ = [ + "AsyncControlEventSink", + "BaseAsyncControlEventSink", "BaseControlEventSink", "ControlEventSink", "SinkResult", diff --git a/telemetry/src/agent_control_telemetry/sinks.py b/telemetry/src/agent_control_telemetry/sinks.py index f2249cce..4c77c640 100644 --- a/telemetry/src/agent_control_telemetry/sinks.py +++ b/telemetry/src/agent_control_telemetry/sinks.py @@ -2,8 +2,9 @@ from __future__ import annotations +from collections.abc import Sequence from dataclasses import dataclass -from typing import Protocol, Sequence +from typing import Protocol from agent_control_models import ControlExecutionEvent @@ -44,3 +45,28 @@ def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: accepted += result.accepted dropped += result.dropped return SinkResult(accepted=accepted, dropped=dropped) + + +class AsyncControlEventSink(Protocol): + """Async write-side abstraction for delivering control execution events.""" + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + + +class BaseAsyncControlEventSink(AsyncControlEventSink): + """Minimal async helper base for sink implementations.""" + + async def write_event(self, event: ControlExecutionEvent) -> SinkResult: + """Write a single control execution event.""" + return await self.write_events([event]) + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + """Write a batch of control execution events.""" + accepted = 0 + dropped = 0 + for event in events: + result = await self.write_event(event) + accepted += result.accepted + dropped += result.dropped + return SinkResult(accepted=accepted, dropped=dropped) diff --git a/telemetry/src/agent_control_telemetry/trace_context.py b/telemetry/src/agent_control_telemetry/trace_context.py index 134e7ac1..61c9d2c9 100644 --- a/telemetry/src/agent_control_telemetry/trace_context.py +++ b/telemetry/src/agent_control_telemetry/trace_context.py @@ -39,7 +39,7 @@ def get_trace_context_from_provider() -> TraceContext | None: span_id = trace_context.get("span_id") if not isinstance(trace_id, str) or not isinstance(span_id, str): return None - if not trace_id or not span_id: + if not trace_id.strip() or not span_id.strip(): return None return { diff --git a/telemetry/tests/test_sinks.py b/telemetry/tests/test_sinks.py new file mode 100644 index 00000000..cb04f179 --- /dev/null +++ b/telemetry/tests/test_sinks.py @@ -0,0 +1,122 @@ +"""Tests for telemetry sink contracts.""" + +import asyncio +from collections.abc import Sequence +from datetime import UTC, datetime + +from agent_control_models import ControlExecutionEvent +from agent_control_telemetry import ( + BaseAsyncControlEventSink, + BaseControlEventSink, + SinkResult, +) + + +def _make_event(*, control_id: int) -> ControlExecutionEvent: + return ControlExecutionEvent( + trace_id="trace-123", + span_id="span-456", + agent_name="demo-agent", + control_id=control_id, + control_name=f"control-{control_id}", + check_stage="pre", + applies_to="tool_call", + action="observe", + matched=False, + confidence=0.0, + timestamp=datetime(2026, 1, 1, tzinfo=UTC), + ) + + +def test_sink_result_success_reflects_accepted_events() -> None: + # Given: two sink results with and without accepted events + successful = SinkResult(accepted=1, dropped=0) + unsuccessful = SinkResult(accepted=0, dropped=2) + + # When/Then: success tracks whether any events were accepted + assert successful.success is True + assert unsuccessful.success is False + + +def test_base_control_event_sink_batches_single_event_writers() -> None: + # Given: a sink that only implements single-event writes + class RecordingSink(BaseControlEventSink): + def __init__(self) -> None: + self.seen_control_ids: list[int] = [] + + def write_event(self, event: ControlExecutionEvent) -> SinkResult: + self.seen_control_ids.append(event.control_id) + return SinkResult(accepted=1, dropped=event.control_id % 2) + + sink = RecordingSink() + events = [_make_event(control_id=1), _make_event(control_id=2)] + + # When: the caller uses the batch contract + result = sink.write_events(events) + + # Then: the base helper fans out to single-event writes and aggregates results + assert sink.seen_control_ids == [1, 2] + assert result == SinkResult(accepted=2, dropped=1) + + +def test_base_control_event_sink_single_event_delegates_to_batch_writer() -> None: + # Given: a sink that only implements batch writes + class RecordingSink(BaseControlEventSink): + def __init__(self) -> None: + self.batch_sizes: list[int] = [] + + def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + self.batch_sizes.append(len(events)) + return SinkResult(accepted=len(events)) + + sink = RecordingSink() + event = _make_event(control_id=7) + + # When: the caller uses the single-event contract + result = sink.write_event(event) + + # Then: the base helper delegates through the batch implementation + assert sink.batch_sizes == [1] + assert result == SinkResult(accepted=1, dropped=0) + + +def test_base_async_control_event_sink_batches_single_event_writers() -> None: + # Given: an async sink that only implements single-event writes + class RecordingAsyncSink(BaseAsyncControlEventSink): + def __init__(self) -> None: + self.seen_control_ids: list[int] = [] + + async def write_event(self, event: ControlExecutionEvent) -> SinkResult: + self.seen_control_ids.append(event.control_id) + return SinkResult(accepted=1, dropped=event.control_id % 2) + + sink = RecordingAsyncSink() + events = [_make_event(control_id=1), _make_event(control_id=2)] + + # When: the caller uses the async batch contract + result = asyncio.run(sink.write_events(events)) + + # Then: the base helper fans out to single-event writes and aggregates results + assert sink.seen_control_ids == [1, 2] + assert result == SinkResult(accepted=2, dropped=1) + + +def test_base_async_control_event_sink_single_event_delegates_to_batch_writer() -> None: + # Given: an async sink that only implements batch writes + class RecordingAsyncSink(BaseAsyncControlEventSink): + def __init__(self) -> None: + self.batch_sizes: list[int] = [] + + async def write_events(self, events: Sequence[ControlExecutionEvent]) -> SinkResult: + self.batch_sizes.append(len(events)) + return SinkResult(accepted=len(events)) + + sink = RecordingAsyncSink() + event = _make_event(control_id=7) + + # When: the caller uses the async single-event contract + result = asyncio.run(sink.write_event(event)) + + # Then: the base helper delegates through the batch implementation + assert sink.batch_sizes == [1] + assert result == SinkResult(accepted=1, dropped=0) diff --git a/telemetry/tests/test_trace_context.py b/telemetry/tests/test_trace_context.py new file mode 100644 index 00000000..e2e2c849 --- /dev/null +++ b/telemetry/tests/test_trace_context.py @@ -0,0 +1,75 @@ +"""Tests for the telemetry trace context provider contract.""" + +from typing import Any + +import pytest + +from agent_control_telemetry.trace_context import ( + clear_trace_context_provider, + get_trace_context_from_provider, + set_trace_context_provider, +) + + +def teardown_function() -> None: + clear_trace_context_provider() + + +def test_get_trace_context_from_provider_returns_registered_context() -> None: + # Given: a provider that returns valid trace and span identifiers + set_trace_context_provider( + lambda: { + "trace_id": "trace-123", + "span_id": "span-456", + } + ) + + # When: reading the current trace context + trace_context = get_trace_context_from_provider() + + # Then: the provider values are returned unchanged + assert trace_context == { + "trace_id": "trace-123", + "span_id": "span-456", + } + + +@pytest.mark.parametrize( + ("provider_result"), + [ + None, + {"trace_id": "trace-123"}, + {"span_id": "span-456"}, + {"trace_id": 123, "span_id": "span-456"}, + {"trace_id": "trace-123", "span_id": object()}, + {"trace_id": "", "span_id": "span-456"}, + {"trace_id": " ", "span_id": "span-456"}, + {"trace_id": "trace-123", "span_id": ""}, + {"trace_id": "trace-123", "span_id": " "}, + ], +) +def test_get_trace_context_from_provider_rejects_invalid_results( + provider_result: dict[str, Any] | None, +) -> None: + # Given: a provider that returns an invalid trace-context payload + set_trace_context_provider(lambda: provider_result) # type: ignore[arg-type] + + # When: reading the current trace context + trace_context = get_trace_context_from_provider() + + # Then: invalid provider output is ignored + assert trace_context is None + + +def test_get_trace_context_from_provider_swallows_provider_failures() -> None: + # Given: a provider that raises unexpectedly + def _raising_provider() -> None: + raise RuntimeError("boom") + + set_trace_context_provider(_raising_provider) + + # When: reading the current trace context + trace_context = get_trace_context_from_provider() + + # Then: provider failures do not escape the helper + assert trace_context is None