Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ dev = [
"toml>=0.10.2,<0.11",
"twine>=4.0.1,<5",
"maturin>=1.8.2",
"openinference-instrumentation-openai-agents>=0.1.0",
"pytest-cov>=6.1.1",
"httpx>=0.28.1",
"pytest-pretty>=1.3.0",
Expand Down
103 changes: 103 additions & 0 deletions temporalio/contrib/openai_agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,109 @@ SQLite storage is not suited to a distributed environment.
| :--------------- | :-------: |
| OpenAI platform | Yes |

## OpenTelemetry Integration

This integration provides seamless export of OpenAI agent telemetry to OpenTelemetry (OTEL) endpoints for observability and monitoring. The integration automatically handles workflow replay semantics, ensuring spans are only exported when workflows actually complete.

### Quick Start

To enable OTEL telemetry export, simply provide exporters to the `OpenAIAgentsPlugin` or `AgentEnvironment`:

```python
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# Your OTEL endpoint configuration
exporters = [
OTLPSpanExporter(endpoint="http://localhost:4317"),
# Add multiple exporters for different endpoints as needed
]

# For production applications
client = await Client.connect(
"localhost:7233",
plugins=[
OpenAIAgentsPlugin(
otel_exporters=exporters, # Enable OTEL integration
model_params=ModelActivityParameters(
start_to_close_timeout=timedelta(seconds=30)
)
),
],
)

# For testing
from temporalio.contrib.openai_agents.testing import AgentEnvironment

async with AgentEnvironment(
model=my_test_model,
otel_exporters=exporters # Enable OTEL integration for tests
) as env:
client = env.applied_on_client(base_client)
```

### Features

- **Multiple Exporters**: Send telemetry to multiple OTEL endpoints simultaneously
- **Replay-Safe**: Spans are only exported when workflows actually complete, not during replays
- **Deterministic IDs**: Consistent span IDs across workflow replays for reliable correlation
- **Automatic Setup**: No manual instrumentation required - just provide exporters
- **Graceful Degradation**: Works seamlessly whether OTEL dependencies are installed or not

### Dependencies

OTEL integration requires additional dependencies:

```bash
pip install openinference-instrumentation-openai-agents opentelemetry-sdk
```

Choose the appropriate OTEL exporter for your monitoring system:

```bash
# For OTLP (works with most OTEL collectors and monitoring systems)
pip install opentelemetry-exporter-otlp

# For Console output (development/debugging)
pip install opentelemetry-exporter-console

# Other exporters available for specific systems
pip install opentelemetry-exporter-<your-system>
```

### Example: Multiple Exporters

```python
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.console import ConsoleSpanExporter

exporters = [
# Production monitoring system
OTLPSpanExporter(
endpoint="https://your-monitoring-system:4317",
headers={"api-key": "your-api-key"}
),

# Secondary monitoring endpoint
OTLPSpanExporter(endpoint="https://backup-collector:4317"),

# Development debugging
ConsoleSpanExporter(),
]

plugin = OpenAIAgentsPlugin(otel_exporters=exporters)
```

### Error Handling

If you provide OTEL exporters but the required dependencies are not installed, you'll receive a clear error message:

```
ImportError: OTEL dependencies not available. Install with: pip install openinference-instrumentation-openai-agents opentelemetry-sdk
```

If no OTEL exporters are provided, the integration works normally without any OTEL setup.

### Voice

| Mode | Supported |
Expand Down
43 changes: 43 additions & 0 deletions temporalio/contrib/openai_agents/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""OpenTelemetry integration for OpenAI Agents in Temporal workflows.

This module provides utilities for properly exporting OpenAI agent telemetry
to OpenTelemetry endpoints from within Temporal workflows, handling workflow
replay semantics correctly.
"""

from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

from temporalio import workflow


class TemporalSpanProcessor(SimpleSpanProcessor):
"""A span processor that handles Temporal workflow replay semantics.

This processor ensures that spans are only exported when workflows actually
complete, not during intermediate replays. This is crucial for maintaining
correct telemetry data when using OpenAI agents within Temporal workflows.

Example usage:
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from temporalio.contrib.openai_agents._temporal_trace_provider import TemporalIdGenerator
from temporalio.contrib.openai_agents._otel import TemporalSpanProcessor
from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor

exporter = InMemorySpanExporter()
provider = trace_sdk.TracerProvider(id_generator=TemporalIdGenerator())
provider.add_span_processor(TemporalSpanProcessor(exporter))
OpenAIAgentsInstrumentor().instrument(tracer_provider=provider)
"""

def on_end(self, span: ReadableSpan) -> None:
"""Handle span end events, skipping export during workflow replay.

Args:
span: The span that has ended.
"""
if workflow.in_workflow() and workflow.unsafe.is_replaying():
# Skip exporting spans during workflow replay to avoid duplicate telemetry
return
super().on_end(span)
64 changes: 58 additions & 6 deletions temporalio/contrib/openai_agents/_temporal_openai_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
TemporalTraceProvider,
)
from temporalio.contrib.openai_agents._trace_interceptor import (
OpenAIAgentsTracingInterceptor,
OpenAIAgentsContextPropagationInterceptor,
)
from temporalio.contrib.openai_agents.workflow import AgentsWorkflowError
from temporalio.contrib.pydantic import (
Expand All @@ -36,6 +36,8 @@
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner

if typing.TYPE_CHECKING:
from opentelemetry.sdk.trace.export import SpanExporter

from temporalio.contrib.openai_agents import (
StatefulMCPServerProvider,
StatelessMCPServerProvider,
Expand All @@ -46,6 +48,7 @@
def set_open_ai_agent_temporal_overrides(
model_params: ModelActivityParameters,
auto_close_tracing_in_workflows: bool = False,
start_spans_in_replay: bool = False,
):
"""Configure Temporal-specific overrides for OpenAI agents.

Expand All @@ -66,14 +69,16 @@ def set_open_ai_agent_temporal_overrides(
Args:
model_params: Configuration parameters for Temporal activity execution of model calls.
auto_close_tracing_in_workflows: If set to true, close tracing spans immediately.
start_spans_in_replay: If set to true, start spans even during replay. Primarily used for otel integration.

Returns:
A context manager that yields the configured TemporalTraceProvider.
"""
previous_runner = get_default_agent_runner()
previous_trace_provider = get_trace_provider()
provider = TemporalTraceProvider(
auto_close_in_workflows=auto_close_tracing_in_workflows
auto_close_in_workflows=auto_close_tracing_in_workflows,
start_spans_in_replay=start_spans_in_replay,
)

try:
Expand Down Expand Up @@ -178,9 +183,11 @@ def __init__(
model_params: ModelActivityParameters | None = None,
model_provider: ModelProvider | None = None,
mcp_server_providers: Sequence[
"StatelessMCPServerProvider | StatefulMCPServerProvider"
StatelessMCPServerProvider | StatefulMCPServerProvider
] = (),
register_activities: bool = True,
add_temporal_spans: bool = True,
otel_exporters: Sequence[SpanExporter] | None = None,
) -> None:
"""Initialize the OpenAI agents plugin.

Expand All @@ -196,6 +203,11 @@ def __init__(
register_activities: Whether to register activities during the worker execution.
This can be disabled on some workers to allow a separation of workflows and activities
but should not be disabled on all workers, or agents will not be able to progress.
add_temporal_spans: Whether to add temporal spans to traces
otel_exporters: Optional sequence of OpenTelemetry span exporters for telemetry export.
When provided, the plugin automatically sets up OpenAI agents instrumentation
with proper Temporal workflow replay semantics. Each exporter will receive
a copy of all OpenAI agent spans. If None, no OTEL instrumentation is configured.
"""
if model_params is None:
model_params = ModelActivityParameters()
Expand All @@ -213,6 +225,9 @@ def __init__(
"When configuring a custom provider, the model activity must have start_to_close_timeout or schedule_to_close_timeout"
)

# Store OTEL configuration for later setup
self._otel_exporters = otel_exporters

# Delay activity construction until they are actually needed
def add_activities(
activities: Sequence[Callable] | None,
Expand Down Expand Up @@ -246,13 +261,50 @@ def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:

@asynccontextmanager
async def run_context() -> AsyncIterator[None]:
with set_open_ai_agent_temporal_overrides(model_params):
yield
# Set up OTEL instrumentation if exporters are provided
otel_instrumentor = None
if self._otel_exporters is not None:
from openinference.instrumentation.openai_agents import (
OpenAIAgentsInstrumentor,
)
from opentelemetry.sdk import trace as trace_sdk

from temporalio.contrib.openai_agents._otel import TemporalSpanProcessor
from temporalio.contrib.openai_agents._temporal_trace_provider import (
TemporalIdGenerator,
)

# Create trace provider with deterministic ID generation
provider = trace_sdk.TracerProvider(id_generator=TemporalIdGenerator())

# Add all exporters with TemporalSpanProcessor wrapper
for exporter in self._otel_exporters:
processor = TemporalSpanProcessor(exporter)
provider.add_span_processor(processor)

# Set up instrumentor
otel_instrumentor = OpenAIAgentsInstrumentor()
otel_instrumentor.instrument(tracer_provider=provider)

try:
with set_open_ai_agent_temporal_overrides(
model_params, start_spans_in_replay=self._otel_exporters is not None
):
yield
finally:
# Clean up OTEL instrumentation
if otel_instrumentor is not None:
otel_instrumentor.uninstrument()

super().__init__(
name="OpenAIAgentsPlugin",
data_converter=_data_converter,
worker_interceptors=[OpenAIAgentsTracingInterceptor()],
client_interceptors=[
OpenAIAgentsContextPropagationInterceptor(
add_temporal_spans=add_temporal_spans,
start_traces=self._otel_exporters is not None,
)
],
activities=add_activities,
workflow_runner=workflow_runner,
workflow_failure_exception_types=[AgentsWorkflowError],
Expand Down
Loading
Loading