diff --git a/.changelog/5220.added b/.changelog/5220.added new file mode 100644 index 00000000000..d2830931d3f --- /dev/null +++ b/.changelog/5220.added @@ -0,0 +1 @@ +`opentelemetry-docker-tests`: Refactor Docker tests to properly validate contents of exported telemetry diff --git a/tests/opentelemetry-docker-tests/tests/collector-config.yaml b/tests/opentelemetry-docker-tests/tests/collector-config.yaml new file mode 100644 index 00000000000..dbd52364e17 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/collector-config.yaml @@ -0,0 +1,23 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +exporters: + otlphttp: + endpoint: http://host.docker.internal:4319 + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [otlphttp] + metrics: + receivers: [otlp] + exporters: [otlphttp] + logs: + receivers: [otlp] + exporters: [otlphttp] diff --git a/tests/opentelemetry-docker-tests/tests/docker-compose.yml b/tests/opentelemetry-docker-tests/tests/docker-compose.yml index 914bab1daee..52cc7074d4a 100644 --- a/tests/opentelemetry-docker-tests/tests/docker-compose.yml +++ b/tests/opentelemetry-docker-tests/tests/docker-compose.yml @@ -7,6 +7,11 @@ services: - "55678:55678" otcollector: image: otel/opentelemetry-collector:0.149.0 + command: ["--config=/etc/otelcol/collector-config.yaml"] + volumes: + - ./collector-config.yaml:/etc/otelcol/collector-config.yaml ports: - - "4317:4317" - - "4318:4318" + - "4317:4317" + - "4318:4318" + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/__init__.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/__init__.py index cd9a8fd266e..786af331239 100644 --- a/tests/opentelemetry-docker-tests/tests/otlpexporter/__init__.py +++ b/tests/opentelemetry-docker-tests/tests/otlpexporter/__init__.py @@ -1,166 +1,441 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 -import time +from __future__ import annotations + +import math +import unittest from abc import ABC, abstractmethod -from opentelemetry.context import ( - _SUPPRESS_INSTRUMENTATION_KEY, - attach, - detach, - set_value, +from opentelemetry._logs import SeverityNumber +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + LogRecordExporter, + SimpleLogRecordProcessor, ) +from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics._internal.export import ( - MetricExportResult, PeriodicExportingMetricReader, ) -from opentelemetry.sdk.metrics._internal.point import ( - Metric, - NumberDataPoint, - Sum, -) -from opentelemetry.sdk.metrics.export import ( - MetricsData, - ResourceMetrics, - ScopeMetrics, +from opentelemetry.sdk.metrics.export import MetricExporter +from opentelemetry.sdk.metrics.view import ( + ExponentialBucketHistogramAggregation, + View, ) from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter +from opentelemetry.test.otlp_test_server import OtlpProtoTestServer +from opentelemetry.trace import Link, SpanContext, StatusCode, TraceFlags -class ExportStatusSpanProcessor(SimpleSpanProcessor): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.export_status = [] +def _attrs_to_dict(attributes) -> dict: + result = {} + for kv in attributes: + which = kv.value.WhichOneof("value") + if which == "string_value": + result[kv.key] = kv.value.string_value + elif which == "int_value": + result[kv.key] = kv.value.int_value + elif which == "double_value": + result[kv.key] = kv.value.double_value + elif which == "bool_value": + result[kv.key] = kv.value.bool_value + return result - def on_end(self, span): - token = attach(set_value("suppress_instrumentation", True)) - self.export_status.append(self.span_exporter.export((span,))) - detach(token) +class TracesExporterTestsBase(ABC, unittest.TestCase): + __test__ = False -class ExportStatusMetricReader(PeriodicExportingMetricReader): - def __init__(self, exporter, **kwargs): - # Very short export interval for testing - super().__init__(exporter, export_interval_millis=1, **kwargs) - self.export_status = [] + _server: OtlpProtoTestServer - def _receive_metrics(self, metrics_data, timeout_millis=10_000, **kwargs): - token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) - try: - export_result = self._exporter.export( - metrics_data, timeout_millis=timeout_millis - ) - self.export_status.append(export_result) - except Exception: - self.export_status.append(MetricExportResult.FAILURE) - finally: - detach(token) + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._server = OtlpProtoTestServer(host="0.0.0.0", port=4319).start() + @classmethod + def tearDownClass(cls): + cls._server.stop() + super().tearDownClass() -class BaseTestOTLPExporter(ABC): - @abstractmethod - def get_span_processor(self): - pass + def setUp(self): + super().setUp() + self._tracer_provider = TracerProvider() + self._tracer_provider.add_span_processor( + SimpleSpanProcessor(self.build_exporter()) + ) + self._tracer = self._tracer_provider.get_tracer(__name__) + self._server.clear() + + def tearDown(self): + self._tracer_provider.shutdown() + super().tearDown() @abstractmethod - def get_metric_reader(self): - pass - - # pylint: disable=no-member - def test_export(self): - """Test span export""" - with self.tracer.start_as_current_span("foo"): - with self.tracer.start_as_current_span("bar"): - with self.tracer.start_as_current_span("baz"): + def build_exporter(self) -> SpanExporter: ... + + def test_simple_span_name(self): + with self._tracer.start_as_current_span("my-span"): + pass + + recorded = self._server.get_span(timeout=5.0) + self.assertEqual(recorded.span.name, "my-span") + + def test_span_attributes(self): + with self._tracer.start_as_current_span( + "attrs-span", + attributes={ + "str_key": "hello", + "int_key": 42, + "float_key": 3.14, + "bool_key": True, + }, + ): + pass + + recorded = self._server.get_span(timeout=5.0) + attrs = _attrs_to_dict(recorded.span.attributes) + self.assertEqual(attrs["str_key"], "hello") + self.assertEqual(attrs["int_key"], 42) + self.assertAlmostEqual(attrs["float_key"], 3.14, places=5) + self.assertEqual(attrs["bool_key"], True) + + def test_nested_spans_parent_child(self): + with self._tracer.start_as_current_span("foo"): + with self._tracer.start_as_current_span("bar"): + with self._tracer.start_as_current_span("baz"): pass - self.assertTrue(len(self.span_processor.export_status), 3) + spans = { + r.span.name: r.span + for r in self._server.get_spans(count=3, timeout=10.0) + } + self.assertIn("foo", spans) + self.assertIn("bar", spans) + self.assertIn("baz", spans) + self.assertEqual(spans["baz"].parent_span_id, spans["bar"].span_id) + self.assertEqual(spans["bar"].parent_span_id, spans["foo"].span_id) + self.assertEqual(spans["foo"].parent_span_id, b"") - for export_status in self.span_processor.export_status: - self.assertEqual(export_status.name, "SUCCESS") - self.assertEqual(export_status.value, 0) + def test_span_with_event(self): + with self._tracer.start_as_current_span("event-span") as span: + span.add_event("my-event", {"event_key": "event_val"}) - def test_metrics_export(self): - """Test metrics export from full metrics SDK pipeline""" - counter = self.meter.create_counter("test_counter") - histogram = self.meter.create_histogram("test_histogram") - up_down_counter = self.meter.create_up_down_counter( - "test_up_down_counter" + recorded = self._server.get_span(timeout=5.0) + self.assertEqual(len(recorded.span.events), 1) + event = recorded.span.events[0] + self.assertEqual(event.name, "my-event") + self.assertEqual( + _attrs_to_dict(event.attributes), {"event_key": "event_val"} + ) + + def test_span_with_link(self): + link_trace_id = 0x000000000000000000000000DEADBEEF + link_span_id = 0x00000000DEADBEF0 + link_context = SpanContext( + trace_id=link_trace_id, + span_id=link_span_id, + is_remote=True, + trace_flags=TraceFlags(0x01), ) + with self._tracer.start_as_current_span( + "linked-span", links=[Link(link_context)] + ): + pass + + recorded = self._server.get_span(timeout=5.0) + self.assertEqual(len(recorded.span.links), 1) + link = recorded.span.links[0] + self.assertEqual(link.trace_id, link_trace_id.to_bytes(16, "big")) + self.assertEqual(link.span_id, link_span_id.to_bytes(8, "big")) + + def test_span_status_ok(self): + with self._tracer.start_as_current_span("ok-span") as span: + span.set_status(StatusCode.OK) + + recorded = self._server.get_span(timeout=5.0) + self.assertEqual(recorded.span.status.code, 1) + + def test_span_status_error(self): + with self._tracer.start_as_current_span("error-span") as span: + span.set_status(StatusCode.ERROR, "something went wrong") + + recorded = self._server.get_span(timeout=5.0) + self.assertEqual(recorded.span.status.code, 2) + self.assertEqual(recorded.span.status.message, "something went wrong") + + +class MetricsExporterTestsBase(ABC, unittest.TestCase): + __test__ = False - counter.add(1, {"key1": "value1"}) - counter.add(2, {"key2": "value2"}) - histogram.record(1.5, {"key3": "value3"}) - histogram.record(2.5, {"key4": "value4"}) - up_down_counter.add(3, {"key5": "value5"}) - up_down_counter.add(-1, {"key6": "value6"}) - self.metric_reader.force_flush(timeout_millis=5000) - time.sleep(0.1) + _server: OtlpProtoTestServer - # Verify at least one export happened - self.assertTrue(len(self.metric_reader.export_status) >= 1) - # Verify all exports succeeded - for export_status in self.metric_reader.export_status: - self.assertEqual(export_status.name, "SUCCESS") - self.assertEqual(export_status.value, 0) + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._server = OtlpProtoTestServer(host="0.0.0.0", port=4319).start() + + @classmethod + def tearDownClass(cls): + cls._server.stop() + super().tearDownClass() + + def setUp(self): + super().setUp() + self._reader = PeriodicExportingMetricReader( + self.build_exporter(), + export_interval_millis=math.inf, + ) + self._meter_provider = MeterProvider( + metric_readers=[self._reader], + resource=Resource.create({"service.name": "test-service"}), + ) + self._meter = self._meter_provider.get_meter(__name__) + self._server.clear() + + def tearDown(self): + self._meter_provider.shutdown() + super().tearDown() @abstractmethod - def test_metrics_export_batch_size_two(self): - """Test metrics max_export_batch_size=2 directly through exporter""" - - def _create_test_metrics_data(self, num_data_points=6): - """Create test metrics data with specified number of data points.""" - data_points = [ - NumberDataPoint( - attributes={"key": f"value{i}"}, - start_time_unix_nano=1000000 + i, - time_unix_nano=2000000 + i, - value=i + 1.0, - ) - for i in range(num_data_points) - ] - metric = Metric( - name="otel_test_counter_foobar", - description="Test counter metric for batch verification", - unit="1", - data=Sum( - data_points=data_points, - aggregation_temporality=1, # CUMULATIVE - is_monotonic=True, - ), - ) - scope_metrics = ScopeMetrics( - scope=InstrumentationScope(name="test_scope"), - metrics=[metric], - schema_url=None, - ) - resource_metrics = ResourceMetrics( + def build_exporter(self) -> MetricExporter: ... + + def test_sum_counter(self): + counter = self._meter.create_counter("test.counter", unit="requests") + counter.add(3, {"status": "ok"}) + counter.add(7, {"status": "ok"}) + counter.add(5, {"status": "error"}) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="test.counter", timeout=5.0 + ) + self.assertEqual(recorded.metric.name, "test.counter") + self.assertEqual(recorded.metric.unit, "requests") + self.assertTrue(recorded.metric.HasField("sum")) + self.assertTrue(recorded.metric.sum.is_monotonic) + dps = { + tuple(sorted(_attrs_to_dict(dp.attributes).items())): dp + for dp in recorded.metric.sum.data_points + } + self.assertEqual(dps[(("status", "ok"),)].as_int, 10) + self.assertEqual(dps[(("status", "error"),)].as_int, 5) + + def test_sum_up_down_counter(self): + counter = self._meter.create_up_down_counter("test.up_down_counter") + counter.add(10) + counter.add(-3) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="test.up_down_counter", timeout=5.0 + ) + self.assertTrue(recorded.metric.HasField("sum")) + self.assertFalse(recorded.metric.sum.is_monotonic) + self.assertEqual(recorded.metric.sum.data_points[0].as_int, 7) + + def test_gauge(self): + gauge = self._meter.create_gauge("test.gauge") + gauge.set(42, {"status": "active"}) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric(name="test.gauge", timeout=5.0) + self.assertTrue(recorded.metric.HasField("gauge")) + self.assertEqual(recorded.metric.gauge.data_points[0].as_int, 42) + + def test_explicit_bucket_histogram(self): + histogram = self._meter.create_histogram("test.histogram") + histogram.record(5) + histogram.record(15) + histogram.record(150) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="test.histogram", timeout=5.0 + ) + self.assertTrue(recorded.metric.HasField("histogram")) + dp = recorded.metric.histogram.data_points[0] + self.assertEqual(dp.count, 3) + self.assertAlmostEqual(dp.sum, 170.0) + self.assertGreater(len(dp.bucket_counts), 0) + self.assertGreater(len(dp.explicit_bounds), 0) + + def test_exponential_histogram(self): + reader = PeriodicExportingMetricReader( + self.build_exporter(), + export_interval_millis=math.inf, + ) + meter_provider = MeterProvider( + metric_readers=[reader], resource=Resource.create({"service.name": "test-service"}), - scope_metrics=[scope_metrics], - schema_url=None, + views=[ + View( + instrument_name="test.exp.histogram", + aggregation=ExponentialBucketHistogramAggregation(), + ) + ], ) + meter = meter_provider.get_meter(__name__) + histogram = meter.create_histogram("test.exp.histogram") + for v in [1.0, 2.0, 4.0]: + histogram.record(v) + reader.force_flush(timeout_millis=5000) - return MetricsData(resource_metrics=[resource_metrics]), data_points + recorded = self._server.wait_for_metric( + name="test.exp.histogram", timeout=5.0 + ) + self.assertTrue(recorded.metric.HasField("exponential_histogram")) + dp = recorded.metric.exponential_histogram.data_points[0] + self.assertEqual(dp.count, 3) + self.assertAlmostEqual(dp.sum, 7.0) + self.assertIsNotNone(dp.scale) - def _verify_batch_export_result( - self, result, data_points, batch_counter, max_batch_size=2 - ): - """Verify export result and batch count for export batching tests.""" + meter_provider.shutdown() + + def test_metric_data_point_attributes(self): + counter = self._meter.create_counter("test.attrs.counter") + counter.add(1, {"str_key": "hello", "int_key": 42}) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="test.attrs.counter", timeout=5.0 + ) + attrs = _attrs_to_dict(recorded.metric.sum.data_points[0].attributes) + self.assertEqual(attrs["str_key"], "hello") + self.assertEqual(attrs["int_key"], 42) + + def test_scope_attributes(self): + meter = self._meter_provider.get_meter( + "test.scope", + version="1.0.0", + attributes={"scope.key": "scope.val"}, + ) + counter = meter.create_counter("scope.counter") + counter.add(1) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="scope.counter", timeout=5.0 + ) + self.assertEqual(recorded.scope.name, "test.scope") + self.assertEqual(recorded.scope.version, "1.0.0") self.assertEqual( - result.name, "SUCCESS", f"Expected SUCCESS, got: {result}" + _attrs_to_dict(recorded.scope.attributes)["scope.key"], "scope.val" + ) + + def test_resource_attributes(self): + counter = self._meter.create_counter("resource.counter") + counter.add(1) + self._reader.force_flush(timeout_millis=5000) + + recorded = self._server.wait_for_metric( + name="resource.counter", timeout=5.0 ) + resource_attrs = _attrs_to_dict(recorded.resource.attributes) + self.assertEqual(resource_attrs["service.name"], "test-service") + + +class LogsExporterTestsBase(ABC, unittest.TestCase): + __test__ = False + + _server: OtlpProtoTestServer + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._server = OtlpProtoTestServer(host="0.0.0.0", port=4319).start() + + @classmethod + def tearDownClass(cls): + cls._server.stop() + super().tearDownClass() + + def setUp(self): + super().setUp() + self._logger_provider = LoggerProvider( + resource=Resource.create({"service.name": "test-service"}), + ) + self._logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(self.build_exporter()) + ) + self._logger = self._logger_provider.get_logger(__name__) + self._server.clear() + + def tearDown(self): + self._logger_provider.shutdown() + super().tearDown() + + @abstractmethod + def build_exporter(self) -> LogRecordExporter: ... + + def test_log_body(self): + self._logger.emit( + body="hello world", severity_number=SeverityNumber.INFO + ) + + recorded = self._server.get_log_record(timeout=5.0) + self.assertEqual(recorded.log_record.body.string_value, "hello world") + + def test_log_severity_number(self): + self._logger.emit( + severity_number=SeverityNumber.ERROR, body="error occurred" + ) + + recorded = self._server.get_log_record(timeout=5.0) self.assertEqual( - result.value, 0, f"Expected result code 0, got: {result.value}" + recorded.log_record.severity_number, SeverityNumber.ERROR.value + ) + + def test_log_severity_text(self): + self._logger.emit( + severity_number=SeverityNumber.WARN, + severity_text="WARN", + body="warning", ) - expected_batches = ( - len(data_points) + max_batch_size - 1 - ) // max_batch_size + recorded = self._server.get_log_record(timeout=5.0) + self.assertEqual(recorded.log_record.severity_text, "WARN") + + def test_log_attributes(self): + self._logger.emit( + body="attrs test", + severity_number=SeverityNumber.INFO, + attributes={ + "str_key": "hello", + "int_key": 42, + "float_key": 3.14, + "bool_key": True, + }, + ) + + recorded = self._server.get_log_record(timeout=5.0) + attrs = _attrs_to_dict(recorded.log_record.attributes) + self.assertEqual(attrs["str_key"], "hello") + self.assertEqual(attrs["int_key"], 42) + self.assertAlmostEqual(attrs["float_key"], 3.14, places=5) + self.assertEqual(attrs["bool_key"], True) + + def test_scope_attributes(self): + logger = self._logger_provider.get_logger( + "test.scope", + version="1.0.0", + attributes={"scope.key": "scope.val"}, + ) + logger.emit(body="scope test", severity_number=SeverityNumber.INFO) + + recorded = self._server.get_log_record(timeout=5.0) + self.assertEqual(recorded.scope.name, "test.scope") + self.assertEqual(recorded.scope.version, "1.0.0") self.assertEqual( - batch_counter.export_call_count, - expected_batches, - f"Expected {expected_batches} export calls with max_export_batch_size={max_batch_size} and {len(data_points)} data points, " - f"but got {batch_counter.export_call_count} calls", + _attrs_to_dict(recorded.scope.attributes)["scope.key"], "scope.val" + ) + + def test_resource_attributes(self): + self._logger.emit( + body="resource test", severity_number=SeverityNumber.INFO ) + + recorded = self._server.get_log_record(timeout=5.0) + resource_attrs = _attrs_to_dict(recorded.resource.attributes) + self.assertEqual(resource_attrs["service.name"], "test-service") diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_grpc_exporter_functional.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_grpc_exporter_functional.py deleted file mode 100644 index 4d6ba4e6dae..00000000000 --- a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_grpc_exporter_functional.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright The OpenTelemetry Authors -# SPDX-License-Identifier: Apache-2.0 - -from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( - OTLPMetricExporter, -) -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter, -) -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.test.globals_test import ( - reset_metrics_globals, - reset_trace_globals, -) -from opentelemetry.test.test_base import TestBase - -from . import ( - BaseTestOTLPExporter, - ExportStatusMetricReader, - ExportStatusSpanProcessor, -) - - -class BatchCountingGRPCExporter(OTLPMetricExporter): - """gRPC exporter that counts actual batch export calls for testing.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.export_call_count = 0 - - def _export(self, *args, **kwargs): - self.export_call_count += 1 - return super()._export(*args, **kwargs) - - -class TestOTLPGRPCExporter(BaseTestOTLPExporter, TestBase): - # pylint: disable=no-self-use - def get_span_processor(self): - return ExportStatusSpanProcessor( - OTLPSpanExporter(insecure=True, timeout=1) - ) - - def get_metric_reader(self): - return ExportStatusMetricReader( - OTLPMetricExporter( - insecure=True, timeout=1, max_export_batch_size=2 - ) - ) - - def setUp(self): - super().setUp() - - reset_trace_globals() - trace.set_tracer_provider(TracerProvider()) - self.tracer = trace.get_tracer(__name__) - self.span_processor = self.get_span_processor() - trace.get_tracer_provider().add_span_processor(self.span_processor) - - reset_metrics_globals() - self.metric_reader = self.get_metric_reader() - meter_provider = MeterProvider(metric_readers=[self.metric_reader]) - metrics.set_meter_provider(meter_provider) - self.meter = metrics.get_meter(__name__) - - def test_metrics_export_batch_size_two(self): - """Test metrics max_export_batch_size=2 directly through gRPC exporter""" - batch_counter = BatchCountingGRPCExporter( - endpoint="localhost:4317", insecure=True, max_export_batch_size=2 - ) - metrics_data, data_points = self._create_test_metrics_data( - num_data_points=6 - ) - result = batch_counter.export(metrics_data) - self._verify_batch_export_result( - result, data_points, batch_counter, max_batch_size=2 - ) diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_http_exporter_functional.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_http_exporter_functional.py deleted file mode 100644 index 95aa772716b..00000000000 --- a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_http_exporter_functional.py +++ /dev/null @@ -1,75 +0,0 @@ -# Copyright The OpenTelemetry Authors -# SPDX-License-Identifier: Apache-2.0 - -from opentelemetry import metrics, trace -from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( - OTLPMetricExporter, -) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( - OTLPSpanExporter, -) -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.test.globals_test import ( - reset_metrics_globals, - reset_trace_globals, -) -from opentelemetry.test.test_base import TestBase - -from . import ( - BaseTestOTLPExporter, - ExportStatusMetricReader, - ExportStatusSpanProcessor, -) - - -class BatchCountingHTTPExporter(OTLPMetricExporter): - """HTTP exporter that counts actual batch export calls for testing.""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.export_call_count = 0 - - def _export(self, *args, **kwargs): - self.export_call_count += 1 - return super()._export(*args, **kwargs) - - -class TestOTLPHTTPExporter(BaseTestOTLPExporter, TestBase): - # pylint: disable=no-self-use - def get_span_processor(self): - return ExportStatusSpanProcessor(OTLPSpanExporter()) - - def get_metric_reader(self): - return ExportStatusMetricReader( - OTLPMetricExporter(max_export_batch_size=2) - ) - - def setUp(self): - super().setUp() - - reset_trace_globals() - trace.set_tracer_provider(TracerProvider()) - self.tracer = trace.get_tracer(__name__) - self.span_processor = self.get_span_processor() - trace.get_tracer_provider().add_span_processor(self.span_processor) - - reset_metrics_globals() - self.metric_reader = self.get_metric_reader() - meter_provider = MeterProvider(metric_readers=[self.metric_reader]) - metrics.set_meter_provider(meter_provider) - self.meter = metrics.get_meter(__name__) - - def test_metrics_export_batch_size_two(self): - """Test metrics max_export_batch_size=2 directly through HTTP exporter""" - batch_counter = BatchCountingHTTPExporter( - endpoint="http://localhost:4318/v1/metrics", - max_export_batch_size=2, - ) - metrics_data, data_points = self._create_test_metrics_data( - num_data_points=6 - ) - result = batch_counter.export(metrics_data) - self._verify_batch_export_result( - result, data_points, batch_counter, max_batch_size=2 - ) diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_logs_functional.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_logs_functional.py new file mode 100644 index 00000000000..e8bfe9856a0 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_logs_functional.py @@ -0,0 +1,28 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter as GRPCLogExporter, +) +from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + OTLPLogExporter as HTTPLogExporter, +) +from opentelemetry.sdk._logs.export import LogRecordExporter + +from . import LogsExporterTestsBase + + +class HTTPProtobufLogsExporterTests(LogsExporterTestsBase): + __test__ = True + + def build_exporter(self) -> LogRecordExporter: + return HTTPLogExporter(endpoint="http://localhost:4318/v1/logs") + + +class GrpcLogsExporterTests(LogsExporterTestsBase): + __test__ = True + + def build_exporter(self) -> LogRecordExporter: + return GRPCLogExporter(insecure=True) diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_metrics_functional.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_metrics_functional.py new file mode 100644 index 00000000000..c9ff37d31cf --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_metrics_functional.py @@ -0,0 +1,28 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( + OTLPMetricExporter as GRPCMetricExporter, +) +from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter as HTTPMetricExporter, +) +from opentelemetry.sdk.metrics.export import MetricExporter + +from . import MetricsExporterTestsBase + + +class HTTPProtobufMetricsExporterTests(MetricsExporterTestsBase): + __test__ = True + + def build_exporter(self) -> MetricExporter: + return HTTPMetricExporter(endpoint="http://localhost:4318/v1/metrics") + + +class GrpcMetricsExporterTests(MetricsExporterTestsBase): + __test__ = True + + def build_exporter(self) -> MetricExporter: + return GRPCMetricExporter(insecure=True) diff --git a/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_traces_functional.py b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_traces_functional.py new file mode 100644 index 00000000000..45dc0550644 --- /dev/null +++ b/tests/opentelemetry-docker-tests/tests/otlpexporter/test_otlp_traces_functional.py @@ -0,0 +1,28 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter as GRPCSpanExporter, +) +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter as HTTPSpanExporter, +) +from opentelemetry.sdk.trace.export import SpanExporter + +from . import TracesExporterTestsBase + + +class HTTPProtobufTracesExporterTests(TracesExporterTestsBase): + __test__ = True + + def build_exporter(self) -> SpanExporter: + return HTTPSpanExporter(endpoint="http://localhost:4318/v1/traces") + + +class GrpcTracesExporterTests(TracesExporterTestsBase): + __test__ = True + + def build_exporter(self) -> SpanExporter: + return GRPCSpanExporter(insecure=True) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/otlp_test_server.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/otlp_test_server.py new file mode 100644 index 00000000000..146dd322be7 --- /dev/null +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/otlp_test_server.py @@ -0,0 +1,364 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import gzip +import time +import zlib +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, HTTPServer +from queue import Empty, Queue +from threading import Thread +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +if TYPE_CHECKING: + from opentelemetry.proto.common.v1.common_pb2 import InstrumentationScope + from opentelemetry.proto.logs.v1.logs_pb2 import LogRecord + from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric + from opentelemetry.proto.resource.v1.resource_pb2 import Resource + from opentelemetry.proto.trace.v1.trace_pb2 import Span + + +@dataclass +class RecordedSpan: + span: Span + resource: Resource + scope: InstrumentationScope + + +@dataclass +class RecordedMetric: + metric: Metric + resource: Resource + scope: InstrumentationScope + + +@dataclass +class RecordedLogRecord: + log_record: LogRecord + resource: Resource + scope: InstrumentationScope + + +def _make_handler( + spans_queue: Queue[RecordedSpan], + metrics_queue: Queue[RecordedMetric], + logs_queue: Queue[RecordedLogRecord], + traces_path: str, + metrics_path: str, + logs_path: str, +) -> type[BaseHTTPRequestHandler]: + # pylint: disable=import-outside-toplevel,no-name-in-module + from opentelemetry.proto.collector.logs.v1.logs_service_pb2 import ( # noqa: PLC0415 + ExportLogsServiceRequest, + ExportLogsServiceResponse, + ) + from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import ( # noqa: PLC0415 + ExportMetricsServiceRequest, + ExportMetricsServiceResponse, + ) + from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( # noqa: PLC0415 + ExportTraceServiceRequest, + ExportTraceServiceResponse, + ) + + class _Handler(BaseHTTPRequestHandler): + def do_POST(self): # pylint: disable=invalid-name + content_length = int(self.headers.get("Content-Length", 0)) + body = self._decompress(self.rfile.read(content_length)) + path = urlparse(self.path).path + + if path == traces_path: + response_body = self._handle_traces(body) + elif path == metrics_path: + response_body = self._handle_metrics(body) + elif path == logs_path: + response_body = self._handle_logs(body) + else: + self.send_error(404) + return + + self.send_response(200) + self.send_header("Content-Type", "application/x-protobuf") + self.send_header("Content-Length", str(len(response_body))) + self.end_headers() + self.wfile.write(response_body) + + def _decompress(self, body: bytes) -> bytes: + encoding = self.headers.get("Content-Encoding", "") + if encoding == "gzip": + return gzip.decompress(body) + if encoding == "deflate": + return zlib.decompress(body) + return body + + @staticmethod + def _handle_traces(body: bytes) -> bytes: + request = ExportTraceServiceRequest() + request.ParseFromString(body) + for rs in request.resource_spans: + for ss in rs.scope_spans: + for span in ss.spans: + spans_queue.put( + RecordedSpan( + span=span, resource=rs.resource, scope=ss.scope + ) + ) + return ExportTraceServiceResponse().SerializeToString() + + @staticmethod + def _handle_metrics(body: bytes) -> bytes: + request = ExportMetricsServiceRequest() + request.ParseFromString(body) + for rm in request.resource_metrics: + for sm in rm.scope_metrics: + for metric in sm.metrics: + metrics_queue.put( + RecordedMetric( + metric=metric, + resource=rm.resource, + scope=sm.scope, + ) + ) + return ExportMetricsServiceResponse().SerializeToString() + + @staticmethod + def _handle_logs(body: bytes) -> bytes: + request = ExportLogsServiceRequest() + request.ParseFromString(body) + for rl in request.resource_logs: + for sl in rl.scope_logs: + for log_record in sl.log_records: + logs_queue.put( + RecordedLogRecord( + log_record=log_record, + resource=rl.resource, + scope=sl.scope, + ) + ) + return ExportLogsServiceResponse().SerializeToString() + + def log_message(self, format, *args): # pylint: disable=redefined-builtin + pass + + return _Handler + + +class OtlpProtoTestServer: + def __init__( + self, host: str = "127.0.0.1", port: int = 0, base_path: str = "" + ) -> None: + try: + # pylint: disable-next=import-outside-toplevel,unused-import + import opentelemetry.proto # noqa: F401, PLC0415 + except ImportError: + raise ImportError( + "opentelemetry-proto is required to use OtlpProtoTestServer. " + "Install it with: pip install opentelemetry-proto" + ) from None + self._host = host + self._port = port + self._base_path = base_path.rstrip("/") + self._spans_queue: Queue[RecordedSpan] = Queue() + self._metrics_queue: Queue[RecordedMetric] = Queue() + self._logs_queue: Queue[RecordedLogRecord] = Queue() + self._server: HTTPServer | None = None + self._thread: Thread | None = None + + def start(self) -> OtlpProtoTestServer: + handler = _make_handler( + self._spans_queue, + self._metrics_queue, + self._logs_queue, + f"{self._base_path}/v1/traces", + f"{self._base_path}/v1/metrics", + f"{self._base_path}/v1/logs", + ) + self._server = HTTPServer((self._host, self._port), handler) + self._port = self._server.server_address[1] + self._thread = Thread( + target=self._server.serve_forever, + daemon=True, + name="OtlpProtoTestServer", + ) + self._thread.start() + return self + + def stop(self) -> None: + if self._server is not None: + self._server.shutdown() + if self._thread is not None: + self._thread.join() + self._server.server_close() + + def __enter__(self) -> OtlpProtoTestServer: + return self.start() + + def __exit__(self, *exc: object) -> None: + self.stop() + + @property + def port(self) -> int: + return self._port + + @property + def traces_endpoint(self) -> str: + return f"http://{self._host}:{self._port}{self._base_path}/v1/traces" + + @property + def metrics_endpoint(self) -> str: + return f"http://{self._host}:{self._port}{self._base_path}/v1/metrics" + + @property + def logs_endpoint(self) -> str: + return f"http://{self._host}:{self._port}{self._base_path}/v1/logs" + + def get_span(self, timeout: float = 5.0) -> RecordedSpan: + try: + return self._spans_queue.get(timeout=timeout) + except Empty: + raise TimeoutError(f"No span received within {timeout}s") from None + + def get_spans( + self, count: int = 1, timeout: float = 5.0 + ) -> list[RecordedSpan]: + deadline = time.monotonic() + timeout + spans = [] + for _ in range(count): + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"Timed out after receiving {len(spans)}/{count} spans" + ) + spans.append(self.get_span(timeout=remaining)) + return spans + + def wait_for_span( + self, *, name: str | None = None, timeout: float = 5.0 + ) -> RecordedSpan: + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"No span with name={name!r} received within {timeout}s" + ) + try: + recorded = self._spans_queue.get(timeout=remaining) + except Empty: + raise TimeoutError( + f"No span with name={name!r} received within {timeout}s" + ) from None + if name is None or recorded.span.name == name: + return recorded + + def drain_spans(self) -> list[RecordedSpan]: + result = [] + while True: + try: + result.append(self._spans_queue.get_nowait()) + except Empty: + return result + + def get_metric(self, timeout: float = 5.0) -> RecordedMetric: + try: + return self._metrics_queue.get(timeout=timeout) + except Empty: + raise TimeoutError( + f"No metric received within {timeout}s" + ) from None + + def get_metrics( + self, count: int = 1, timeout: float = 5.0 + ) -> list[RecordedMetric]: + deadline = time.monotonic() + timeout + metrics = [] + for _ in range(count): + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"Timed out after receiving {len(metrics)}/{count} metrics" + ) + metrics.append(self.get_metric(timeout=remaining)) + return metrics + + def wait_for_metric( + self, *, name: str | None = None, timeout: float = 5.0 + ) -> RecordedMetric: + deadline = time.monotonic() + timeout + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"No metric with name={name!r} received within {timeout}s" + ) + try: + recorded = self._metrics_queue.get(timeout=remaining) + except Empty: + raise TimeoutError( + f"No metric with name={name!r} received within {timeout}s" + ) from None + if name is None or recorded.metric.name == name: + return recorded + + def drain_metrics(self) -> list[RecordedMetric]: + result = [] + while True: + try: + result.append(self._metrics_queue.get_nowait()) + except Empty: + return result + + def get_log_record(self, timeout: float = 5.0) -> RecordedLogRecord: + try: + return self._logs_queue.get(timeout=timeout) + except Empty: + raise TimeoutError( + f"No log record received within {timeout}s" + ) from None + + def get_log_records( + self, count: int = 1, timeout: float = 5.0 + ) -> list[RecordedLogRecord]: + deadline = time.monotonic() + timeout + log_records = [] + for _ in range(count): + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError( + f"Timed out after receiving {len(log_records)}/{count} log records" + ) + log_records.append(self.get_log_record(timeout=remaining)) + return log_records + + def wait_for_log_record( + self, *, timeout: float = 5.0 + ) -> RecordedLogRecord: + try: + return self._logs_queue.get(timeout=timeout) + except Empty: + raise TimeoutError( + f"No log record received within {timeout}s" + ) from None + + def drain_log_records(self) -> list[RecordedLogRecord]: + result = [] + while True: + try: + result.append(self._logs_queue.get_nowait()) + except Empty: + return result + + def clear(self) -> None: + for queue in ( + self._spans_queue, + self._metrics_queue, + self._logs_queue, + ): + while True: + try: + queue.get_nowait() + except Empty: + break diff --git a/tests/opentelemetry-test-utils/test-requirements.txt b/tests/opentelemetry-test-utils/test-requirements.txt index 7cb9414f47d..0d2d3bccc57 100644 --- a/tests/opentelemetry-test-utils/test-requirements.txt +++ b/tests/opentelemetry-test-utils/test-requirements.txt @@ -16,3 +16,4 @@ wrapt==1.16.0 ./opentelemetry-proto ; platform_python_implementation != 'PyPy' ./exporter/opentelemetry-exporter-otlp-proto-common ; platform_python_implementation != 'PyPy' ./exporter/opentelemetry-exporter-otlp-proto-grpc ; platform_python_implementation != 'PyPy' +./exporter/opentelemetry-exporter-otlp-proto-http ; platform_python_implementation != 'PyPy' diff --git a/tests/opentelemetry-test-utils/tests/test_otlp_test_server.py b/tests/opentelemetry-test-utils/tests/test_otlp_test_server.py new file mode 100644 index 00000000000..1840b98147d --- /dev/null +++ b/tests/opentelemetry-test-utils/tests/test_otlp_test_server.py @@ -0,0 +1,368 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import unittest +import unittest.mock + +import requests + +from opentelemetry._logs import SeverityNumber +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.test.otlp_test_server import ( + OtlpProtoTestServer, + RecordedLogRecord, + RecordedMetric, + RecordedSpan, +) + +try: + # pylint: disable-next=unused-import + import opentelemetry.proto # noqa: F401 + from opentelemetry.exporter.otlp.proto.http import Compression + from opentelemetry.exporter.otlp.proto.http._log_exporter import ( + OTLPLogExporter, + ) + from opentelemetry.exporter.otlp.proto.http.metric_exporter import ( + OTLPMetricExporter, + ) + from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, + ) + + _HAS_PROTO_DEPS = True +except ImportError: + _HAS_PROTO_DEPS = False + + +@unittest.skipUnless( + _HAS_PROTO_DEPS, + "opentelemetry-exporter-otlp-proto-http or opentelemetry-proto are not installed", +) +class TestOtlpProtoTestServer(unittest.TestCase): + def setUp(self): + self.server = OtlpProtoTestServer() + self.server.start() + + def tearDown(self): + self.server.stop() + + def _make_trace_provider( + self, service: str = "trace-svc", **exporter_kwargs + ) -> TracerProvider: + provider = TracerProvider( + resource=Resource.create({"service.name": service}) + ) + provider.add_span_processor( + SimpleSpanProcessor( + OTLPSpanExporter( + endpoint=self.server.traces_endpoint, + timeout=1, + **exporter_kwargs, + ) + ) + ) + return provider + + def _make_metrics_provider( + self, service: str = "metrics-svc", **exporter_kwargs + ) -> tuple[PeriodicExportingMetricReader, MeterProvider]: + reader = PeriodicExportingMetricReader( + OTLPMetricExporter( + endpoint=self.server.metrics_endpoint, + timeout=1, + **exporter_kwargs, + ), + export_interval_millis=100, + ) + provider = MeterProvider( + resource=Resource.create({"service.name": service}), + metric_readers=[reader], + ) + return reader, provider + + def _make_log_provider( + self, service: str = "log-svc", **exporter_kwargs + ) -> LoggerProvider: + provider = LoggerProvider( + resource=Resource.create({"service.name": service}) + ) + provider.add_log_record_processor( + SimpleLogRecordProcessor( + OTLPLogExporter( + endpoint=self.server.logs_endpoint, + timeout=1, + **exporter_kwargs, + ) + ) + ) + return provider + + def test_span_export(self): + provider = self._make_trace_provider() + tracer = provider.get_tracer("trace-scope") + with tracer.start_as_current_span("foo"): + with tracer.start_as_current_span("bar"): + pass + tracer.start_span("baz").end() + + spans = self.server.get_spans(count=3, timeout=5.0) + self.assertEqual( + {recorded.span.name for recorded in spans}, {"foo", "bar", "baz"} + ) + for recorded in spans: + self.assertIsInstance(recorded, RecordedSpan) + self.assertEqual(recorded.scope.name, "trace-scope") + svc = next( + a.value.string_value + for a in recorded.resource.attributes + if a.key == "service.name" + ) + self.assertEqual(svc, "trace-svc") + provider.shutdown() + + def test_wait_for_span(self): + provider = self._make_trace_provider() + tracer = provider.get_tracer("trace-scope") + tracer.start_span("alpha").end() + tracer.start_span("beta").end() + + found = self.server.wait_for_span(name="beta", timeout=5.0) + self.assertEqual(found.span.name, "beta") + with self.assertRaises(TimeoutError): + self.server.get_span(timeout=0.1) + provider.shutdown() + + def test_span_drain_and_clear(self): + provider = self._make_trace_provider() + tracer = provider.get_tracer("trace-scope") + + tracer.start_span("d1").end() + tracer.start_span("d2").end() + drained = self.server.drain_spans() + self.assertEqual({r.span.name for r in drained}, {"d1", "d2"}) + + tracer.start_span("c1").end() + tracer.start_span("c2").end() + self.server.clear() + with self.assertRaises(TimeoutError): + self.server.get_span(timeout=0.1) + provider.shutdown() + + def test_metric_export(self): + reader, provider = self._make_metrics_provider() + meter = provider.get_meter("metrics-scope") + meter.create_counter("test.requests").add(5, {"env": "test"}) + meter.create_histogram("test.latency").record(1.5, {"route": "/api"}) + reader.force_flush(timeout_millis=3000) + + metrics = self.server.get_metrics(count=2, timeout=5.0) + self.assertEqual( + {r.metric.name for r in metrics}, {"test.requests", "test.latency"} + ) + for recorded in metrics: + self.assertIsInstance(recorded, RecordedMetric) + self.assertEqual(recorded.scope.name, "metrics-scope") + svc = next( + a.value.string_value + for a in recorded.resource.attributes + if a.key == "service.name" + ) + self.assertEqual(svc, "metrics-svc") + provider.shutdown() + + def test_wait_for_metric(self): + reader, provider = self._make_metrics_provider() + meter = provider.get_meter("metrics-scope") + meter.create_counter("other.counter").add(1) + meter.create_counter("target.counter").add(1) + reader.force_flush(timeout_millis=3000) + + found = self.server.wait_for_metric(name="target.counter", timeout=5.0) + self.assertEqual(found.metric.name, "target.counter") + provider.shutdown() + + def test_metric_drain_and_timeout(self): + reader, provider = self._make_metrics_provider() + meter = provider.get_meter("metrics-scope") + meter.create_counter("drain.counter").add(1) + reader.force_flush(timeout_millis=3000) + self.server.get_metric(timeout=5.0) + + self.assertEqual(self.server.drain_metrics(), []) + provider.shutdown() + self.server.drain_metrics() + with self.assertRaises(TimeoutError): + self.server.get_metric(timeout=0.1) + + def test_log_export(self): + provider = self._make_log_provider() + logger = provider.get_logger("log-scope") + logger.emit(body="first message", severity_number=SeverityNumber.WARN) + logger.emit( + body="second message", severity_number=SeverityNumber.ERROR + ) + + log_records = self.server.get_log_records(count=2, timeout=5.0) + self.assertEqual( + {r.log_record.body.string_value for r in log_records}, + {"first message", "second message"}, + ) + for recorded in log_records: + self.assertIsInstance(recorded, RecordedLogRecord) + self.assertEqual(recorded.scope.name, "log-scope") + svc = next( + a.value.string_value + for a in recorded.resource.attributes + if a.key == "service.name" + ) + self.assertEqual(svc, "log-svc") + provider.shutdown() + + def test_wait_for_log_record(self): + provider = self._make_log_provider() + logger = provider.get_logger("log-scope") + logger.emit(body="wait-target", severity_number=SeverityNumber.INFO) + + found = self.server.wait_for_log_record(timeout=5.0) + self.assertIsInstance(found, RecordedLogRecord) + self.assertEqual(found.log_record.body.string_value, "wait-target") + provider.shutdown() + + def test_log_drain_and_timeout(self): + provider = self._make_log_provider() + logger = provider.get_logger("log-scope") + logger.emit(body="drain-me", severity_number=SeverityNumber.DEBUG) + self.server.get_log_record(timeout=5.0) + + self.assertEqual(self.server.drain_log_records(), []) + with self.assertRaises(TimeoutError): + self.server.get_log_record(timeout=0.1) + provider.shutdown() + + def test_compression(self): + trace_provider = self._make_trace_provider( + compression=Compression.Gzip + ) + metrics_reader, metrics_provider = self._make_metrics_provider( + compression=Compression.Gzip + ) + log_provider = self._make_log_provider(compression=Compression.Gzip) + + trace_provider.get_tracer("s").start_span("gzip-span").end() + self.assertEqual( + self.server.get_span(timeout=5.0).span.name, "gzip-span" + ) + + metrics_provider.get_meter("s").create_counter("gzip.counter").add(1) + metrics_reader.force_flush(timeout_millis=3000) + self.assertEqual( + self.server.get_metric(timeout=5.0).metric.name, "gzip.counter" + ) + + log_provider.get_logger("s").emit( + body="gzip log", severity_number=SeverityNumber.INFO + ) + self.assertEqual( + self.server.get_log_record( + timeout=5.0 + ).log_record.body.string_value, + "gzip log", + ) + + trace_provider.shutdown() + metrics_provider.shutdown() + log_provider.shutdown() + + def test_endpoint_urls(self): + port = self.server.port + self.assertGreater(port, 0) + self.assertEqual( + self.server.traces_endpoint, f"http://127.0.0.1:{port}/v1/traces" + ) + self.assertEqual( + self.server.metrics_endpoint, f"http://127.0.0.1:{port}/v1/metrics" + ) + self.assertEqual( + self.server.logs_endpoint, f"http://127.0.0.1:{port}/v1/logs" + ) + + def test_context_manager(self): + with OtlpProtoTestServer() as srv: + provider = TracerProvider() + provider.add_span_processor( + SimpleSpanProcessor( + OTLPSpanExporter(endpoint=srv.traces_endpoint, timeout=1) + ) + ) + provider.get_tracer("s").start_span("ctx-span").end() + self.assertEqual(srv.get_span(timeout=5.0).span.name, "ctx-span") + provider.shutdown() + + def test_base_path(self): + with OtlpProtoTestServer(base_path="/custom") as srv: + self.assertTrue(srv.traces_endpoint.endswith("/custom/v1/traces")) + provider = TracerProvider() + provider.add_span_processor( + SimpleSpanProcessor( + OTLPSpanExporter(endpoint=srv.traces_endpoint, timeout=1) + ) + ) + provider.get_tracer("s").start_span("prefixed-span").end() + self.assertEqual( + srv.get_span(timeout=5.0).span.name, "prefixed-span" + ) + provider.shutdown() + + def test_signal_routing(self): + trace_provider = self._make_trace_provider() + metrics_reader, metrics_provider = self._make_metrics_provider() + log_provider = self._make_log_provider() + + trace_provider.get_tracer("s").start_span("routed-span").end() + metrics_provider.get_meter("s").create_counter("routed.counter").add(1) + metrics_reader.force_flush(timeout_millis=3000) + log_provider.get_logger("s").emit( + body="routed log", severity_number=SeverityNumber.INFO + ) + + self.assertEqual( + self.server.get_span(timeout=5.0).span.name, "routed-span" + ) + self.assertEqual( + self.server.get_metric(timeout=5.0).metric.name, "routed.counter" + ) + self.assertEqual( + self.server.get_log_record( + timeout=5.0 + ).log_record.body.string_value, + "routed log", + ) + + trace_provider.shutdown() + metrics_provider.shutdown() + log_provider.shutdown() + + def test_unknown_path_returns_404(self): + resp = requests.post( + f"http://127.0.0.1:{self.server.port}/unknown", + data=b"", + headers={"Content-Type": "application/x-protobuf"}, + timeout=2, + ) + self.assertEqual(resp.status_code, 404) + + def test_missing_proto_raises_import_error(self): + with unittest.mock.patch.dict( + "sys.modules", {"opentelemetry.proto": None} + ): + with self.assertRaises(ImportError) as cm: + OtlpProtoTestServer() + self.assertIn("opentelemetry-proto", str(cm.exception)) diff --git a/tox.ini b/tox.ini index 5ebf34c3f37..a16b65b03eb 100644 --- a/tox.ini +++ b/tox.ini @@ -270,7 +270,6 @@ commands = lint-opentelemetry-propagator-jaeger: sh -c "cd propagator && pylint --rcfile ../.pylintrc {toxinidir}/propagator/opentelemetry-propagator-jaeger" test-opentelemetry-test-utils: pytest {toxinidir}/tests/opentelemetry-test-utils/tests {posargs} - lint-opentelemetry-test-utils: sh -c "cd tests && pylint --rcfile ../.pylintrc {toxinidir}/tests/opentelemetry-test-utils" coverage: {toxinidir}/scripts/coverage.sh @@ -321,13 +320,7 @@ commands = [testenv:docker-tests-{otlpexporter,opencensus}] deps = - pytest==7.1.3 - # Pinning PyYAML for issue: https://github.com/yaml/pyyaml/issues/724 - PyYAML==5.3.1 - # Pinning docker for issue: https://github.com/docker/compose/issues/11309 - docker<7 - docker-compose==1.29.2 - requests==2.28.2 + pytest==7.4.4 ; core packages -e {toxinidir}/opentelemetry-api -e {toxinidir}/opentelemetry-semantic-conventions @@ -342,18 +335,30 @@ deps = opencensus: -e {toxinidir}/exporter/opentelemetry-exporter-opencensus +allowlist_externals = + docker + changedir = tests/opentelemetry-docker-tests/tests commands_pre = - pip freeze - docker-compose up -d + docker --version || echo 'Docker with compose subcommand is required' && /bin/false + docker compose up -d commands = otlpexporter: pytest otlpexporter {posargs} opencensus: pytest opencensus {posargs} commands_post = - docker-compose down -v + docker compose down -v + +[testenv:lint-opentelemetry-test-utils] +; This environment must be defined explicitly because of how +; tox parses and matches environment names. +deps = + -r dev-requirements.txt + -r {toxinidir}/tests/opentelemetry-test-utils/test-requirements.txt +allowlist_externals = sh +commands = sh -c "cd tests && pylint --rcfile ../.pylintrc {toxinidir}/tests/opentelemetry-test-utils" [testenv:lint-license-header-check] commands =