diff --git a/.changelog/4863.added b/.changelog/4863.added new file mode 100644 index 00000000000..fd65e7b875e --- /dev/null +++ b/.changelog/4863.added @@ -0,0 +1 @@ +`opentelemetry-sdk`: add `add_metric_reader` / `remove_metric_reader` public APIs to register / unregister metric readers at runtime. \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index a514be735d1..5b8c2600c21 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -490,11 +490,12 @@ def __init__( ) ), resource=resource, - metric_readers=metric_readers, views=views, ) + self._metric_readers = metric_readers self._measurement_consumer = SynchronousMeasurementConsumer( - sdk_config=self._sdk_config + sdk_config=self._sdk_config, + metric_readers=metric_readers, ) disabled = environ.get(OTEL_SDK_DISABLED, "") self._disabled = disabled.lower().strip() == "true" @@ -509,7 +510,7 @@ def __init__( _meter_configurator or _default_meter_configurator ) - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: # pylint: disable=broad-exception-raised @@ -560,7 +561,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -605,7 +606,7 @@ def _shutdown(): metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -675,3 +676,36 @@ def get_meter( ), ) return self._meters[instrumentation_scope] + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader" + ) -> None: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + _logger.warning( + "MetricReader '%s' has been registered already!", + metric_reader, + ) + return + self._measurement_consumer.add_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + self._all_metric_readers.add(metric_reader) + + def remove_metric_reader( + self, + metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", + ) -> None: + with self._all_metric_readers_lock: + if metric_reader not in self._all_metric_readers: + _logger.warning( + "MetricReader '%s' has not been registered!", metric_reader + ) + return + self._measurement_consumer.remove_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback(None) + metric_reader.shutdown() + self._all_metric_readers.remove(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 676a356c2bc..aee356f78d2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -223,13 +223,16 @@ def __init__( *, otel_component_type: OtelComponentTypeValues | None = None, ) -> None: - self._collect: Callable[ - [ - opentelemetry.sdk.metrics.export.MetricReader, - AggregationTemporality, - ], - Iterable[opentelemetry.sdk.metrics.export.Metric], - ] = None + self._collect: ( + Callable[ + [ + opentelemetry.sdk.metrics.export.MetricReader, + AggregationTemporality, + ], + Iterable[opentelemetry.sdk.metrics.export.Metric], + ] + | None + ) = None self._instrument_class_temporality = { _Counter: AggregationTemporality.CUMULATIVE, @@ -373,7 +376,8 @@ def _set_collect_callback( AggregationTemporality, ], MetricsData, - ], + ] + | None, ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" self._collect = func diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 64d996787c2..8af2f064314 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -3,15 +3,15 @@ # pylint: disable=unused-import +import weakref from abc import ABC, abstractmethod -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from threading import Lock from time import time_ns # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics import opentelemetry.sdk.metrics._internal.instrument -import opentelemetry.sdk.metrics._internal.sdk_configuration from opentelemetry.metrics._internal.instrument import CallbackOptions from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from opentelemetry.sdk.metrics._internal.measurement import Measurement @@ -48,10 +48,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer): def __init__( self, sdk_config: "opentelemetry.sdk.metrics._internal.sdk_configuration.SdkConfiguration", + metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"], ) -> None: self._lock = Lock() self._sdk_config = sdk_config - # should never be mutated self._reader_storages: Mapping[ opentelemetry.sdk.metrics.export.MetricReader, MetricReaderStorage ] = { @@ -60,7 +60,7 @@ def __init__( reader._instrument_class_temporality, reader._instrument_class_aggregation, ) - for reader in sdk_config.metric_readers + for reader in metric_readers } self._async_instruments: list[ opentelemetry.sdk.metrics._internal.instrument._Asynchronous @@ -75,7 +75,9 @@ def consume_measurement(self, measurement: Measurement) -> None: measurement.context, ) ) - for reader_storage in self._reader_storages.values(): + with self._lock: + reader_storages = weakref.WeakSet(self._reader_storages.values()) + for reader_storage in reader_storages: reader_storage.consume_measurement( measurement, should_sample_exemplar ) @@ -132,3 +134,23 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Registers a new metric reader.""" + with self._lock: + self._reader_storages[metric_reader] = MetricReaderStorage( + self._sdk_config, + # pylint: disable-next=protected-access + metric_reader._instrument_class_temporality, + # pylint: disable-next=protected-access + metric_reader._instrument_class_aggregation, + ) + + def remove_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Unregisters the given metric reader.""" + with self._lock: + self._reader_storages.pop(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index 6c2b8de25c6..a1430261cb5 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -15,5 +15,4 @@ class SdkConfiguration: exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" resource: "opentelemetry.sdk.resources.Resource" - metric_readers: Sequence["opentelemetry.sdk.metrics.export.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.view.View"] diff --git a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py index 0cbb53cbc74..566329cf146 100644 --- a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py +++ b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py @@ -108,7 +108,7 @@ def test_none_config_uses_supplied_resource(self): def test_none_config_no_readers(self): provider = create_meter_provider(None) - self.assertEqual(len(provider._sdk_config.metric_readers), 0) + self.assertEqual(len(provider._metric_readers), 0) def test_none_config_uses_trace_based_exemplar_filter(self): provider = create_meter_provider(None) @@ -139,7 +139,7 @@ def test_none_config_does_not_read_interval_env_var(self): ) with patch.dict(os.environ, {"OTEL_METRIC_EXPORT_INTERVAL": "999999"}): provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, PeriodicExportingMetricReader) self.assertEqual(reader._export_interval_millis, 60000.0) @@ -163,11 +163,6 @@ def test_configure_with_config_sets_global(self): arg = mock_set.call_args[0][0] self.assertIsInstance(arg, MeterProvider) - def test_empty_readers_list(self): - config = MeterProviderConfig(readers=[]) - provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 0) - class TestCreateMetricReaders(unittest.TestCase): @staticmethod @@ -189,7 +184,7 @@ def test_console_exporter(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, PeriodicExportingMetricReader) self.assertIsInstance(reader._exporter, ConsoleMetricExporter) @@ -198,7 +193,7 @@ def test_periodic_reader_default_interval(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_interval_millis, 60000.0) def test_periodic_reader_default_timeout(self): @@ -206,7 +201,7 @@ def test_periodic_reader_default_timeout(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_timeout_millis, 30000.0) def test_periodic_reader_explicit_interval(self): @@ -215,7 +210,7 @@ def test_periodic_reader_explicit_interval(self): interval=5000, ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_interval_millis, 5000.0) def test_periodic_reader_explicit_timeout(self): @@ -224,7 +219,7 @@ def test_periodic_reader_explicit_timeout(self): timeout=10000, ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_timeout_millis, 10000.0) def test_otlp_http_missing_package_raises(self): @@ -347,7 +342,7 @@ def test_pull_prometheus_creates_reader(self): mock_reader_cls.assert_called_once_with(disable_target_info=True) mock_start_server.assert_called_once_with(port=9090, addr="0.0.0.0") - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_pull_prometheus_defaults(self): mock_reader_cls = MagicMock() @@ -375,7 +370,7 @@ def test_pull_prometheus_defaults(self): mock_reader_cls.assert_called_once_with(disable_target_info=False) mock_start_server.assert_called_once_with(port=9464, addr="localhost") - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_pull_prometheus_missing_package_raises(self): with patch.dict( @@ -432,7 +427,7 @@ def test_pull_plugin_loads_via_entry_point(self): ] ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) mock_class.assert_called_once_with(port=8080) mock_entry_points.assert_called_once_with( group="opentelemetry_pull_metric_exporter", @@ -547,7 +542,7 @@ def test_plugin_metric_exporter_loaded_via_entry_point(self): PushMetricExporterConfig(my_custom_exporter={}) ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_unknown_metric_exporter_raises_configuration_error(self): with patch( @@ -581,7 +576,7 @@ def test_multiple_readers(self): ] ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 2) + self.assertEqual(len(provider._metric_readers), 2) class TestTemporalityAndAggregation(unittest.TestCase): @@ -605,7 +600,7 @@ def _make_console_config(temporality=None, histogram_agg=None): @staticmethod def _get_exporter(config): provider = create_meter_provider(config) - return provider._sdk_config.metric_readers[0]._exporter + return provider._metric_readers[0]._exporter def test_default_temporality_is_cumulative(self): exporter = self._get_exporter(self._make_console_config()) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 9f67271a086..b4ce731cc1e 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -3,6 +3,7 @@ # pylint: disable=invalid-name,no-self-use +from threading import Event, Thread from time import sleep from unittest import TestCase from unittest.mock import MagicMock, Mock, patch @@ -23,7 +24,8 @@ class TestSynchronousMeasurementConsumer(TestCase): def test_parent(self, _): self.assertIsInstance( - SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer + SynchronousMeasurementConsumer(MagicMock(), metric_readers=()), + MeasurementConsumer, ) def test_creates_metric_reader_storages(self, MockMetricReaderStorage): @@ -33,9 +35,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) @@ -50,9 +52,9 @@ def test_measurements_passed_to_each_reader_storage( SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) measurement_mock = Mock() consumer.consume_measurement(measurement_mock) @@ -72,9 +74,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): rs_mock.collect.assert_not_called() @@ -91,9 +93,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) async_instrument_mocks = [MagicMock() for _ in range(5)] for i_mock in async_instrument_mocks: @@ -122,9 +124,9 @@ def test_collect_timeout(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): @@ -156,9 +158,9 @@ def test_collect_deadline( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) consumer.register_asynchronous_instrument( @@ -188,3 +190,91 @@ def test_collect_deadline( callback_options_time_call, 10000, ) + + +class TestSynchronousMeasurementConsumerConcurrency(TestCase): + def test_concurrent_changes_to_metric_readers(self): + timeout = 1 + failure = None + iteration_started = Event() + mutation_done = Event() + iteration_timeout_error = "Timed out waiting for iteration to start" + mutation_timeout_error = "Timed out waiting for mutation to be done" + + consumer = SynchronousMeasurementConsumer( + SdkConfiguration( + exemplar_filter=MagicMock(), + resource=MagicMock(), + views=MagicMock(), + ), + metric_readers=[MagicMock()], + ) + + def _hooked_iter(iterable): + nonlocal failure + + iterable = iter(iterable) + iteration_started.set() + if not mutation_done.wait(timeout): + failure = mutation_timeout_error + yield next(iterable, None) + yield from iterable + + class HookedDict(dict): + def __iter__(self): + return _hooked_iter(super().__iter__()) + + def keys(self): + return _hooked_iter(super().keys()) + + def values(self): + return _hooked_iter(super().values()) + + def items(self): + return _hooked_iter(super().items()) + + with patch.object( + consumer, + "_reader_storages", + # pylint: disable-next=protected-access + HookedDict(consumer._reader_storages), + ): + + def mutate(): + """Directly mutate _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + # pylint: disable-next=protected-access + consumer._reader_storages.clear() + + # Verify that test setup works (direct mutation with no synchronization fails) + with self.assertRaises(RuntimeError) as cm: + t = Thread(target=mutate) + t.start() + try: + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual( + "dictionary changed size during iteration", str(cm.exception) + ) + + def add_and_remove_readers(): + """Modifies _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + reader = MagicMock() + consumer.add_metric_reader(reader) + consumer.remove_metric_reader(reader) + + # Verify the API calls do not attempt concurrent modification of reader storages + t = Thread(target=add_and_remove_readers) + t.start() + try: + consumer.add_metric_reader(MagicMock()) + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual(mutation_timeout_error, failure) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index ec70ff11455..281aae92158 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -68,7 +68,6 @@ def test_creates_view_instrument_matches( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -135,7 +134,6 @@ def test_forwards_calls_to_view_instrument_match( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -245,7 +243,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1,), ), MagicMock( @@ -276,7 +273,6 @@ def test_race_collect_with_new_instruments(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="test"),), ), MagicMock( @@ -322,7 +318,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(), ), MagicMock( @@ -358,7 +353,6 @@ def test_drop_aggregation(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="name", aggregation=DropAggregation() @@ -386,7 +380,6 @@ def test_same_collection_start(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="name"),), ), MagicMock( @@ -433,7 +426,6 @@ def test_conflicting_view_configuration(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="observable_counter", @@ -482,7 +474,6 @@ def test_view_instrument_match_conflict_0(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -540,7 +531,6 @@ def test_view_instrument_match_conflict_1(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -609,7 +599,6 @@ def test_view_instrument_match_conflict_2(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="foo"), View(instrument_name="bar"), @@ -662,7 +651,6 @@ def test_view_instrument_match_conflict_3(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -713,7 +701,6 @@ def test_view_instrument_match_conflict_4(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -760,7 +747,6 @@ def test_view_instrument_match_conflict_5(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -815,7 +801,6 @@ def test_view_instrument_match_conflict_6(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter", name="foo"), View(instrument_name="histogram", name="foo"), @@ -870,7 +855,6 @@ def test_view_instrument_match_conflict_7(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -925,7 +909,6 @@ def test_view_instrument_match_conflict_8(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="up_down_counter", name="foo"), View( diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index beb8dbb2f23..7f32da99a60 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -2,10 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 # pylint: disable=protected-access,no-self-use,too-many-lines - import weakref from collections.abc import Callable, Iterable, Sequence -from logging import WARNING +from logging import DEBUG, WARNING from threading import Lock from time import sleep from typing import Any @@ -34,6 +33,7 @@ _RuleBasedMeterConfigurator, ) from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, @@ -488,6 +488,36 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() + def test_addition_of_metric_reader(self): + internal_logger = "opentelemetry.sdk.metrics._internal" + export_logger = "opentelemetry.sdk.metrics._internal.export" + + reader = InMemoryMetricReader() + meter_provider = MeterProvider() + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("counter") + counter.add(1) + # Suppress warnings for calling collect on an unregistered metric reader + with self.assertLogs(export_logger, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + meter_provider.add_metric_reader(reader) + counter.add(1) + self.assertIsNotNone(reader.get_metrics_data()) + + with self.assertLogs(internal_logger, DEBUG) as cm: + meter_provider.add_metric_reader(reader) + self.assertIn("has been registered already!", cm.output[0]) + + meter_provider.remove_metric_reader(reader) + counter.add(1) + with self.assertLogs(export_logger, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + with self.assertLogs(internal_logger, DEBUG) as cm: + meter_provider.remove_metric_reader(reader) + self.assertIn("has not been registered!", cm.output[0]) + class TestMeterConcurrency(ConcurrencyTestBase, TestCase): def test_create_instrument_concurrency(self): diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 1de1af319fe..73b0e6e24a5 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -71,7 +71,6 @@ def setUpClass(cls): cls.sdk_configuration = SdkConfiguration( exemplar_filter=Mock(), resource=cls.mock_resource, - metric_readers=[], views=[], ) diff --git a/opentelemetry-sdk/tests/test_configurator.py b/opentelemetry-sdk/tests/test_configurator.py index fbf4bd3b3c5..c66deeb2d81 100644 --- a/opentelemetry-sdk/tests/test_configurator.py +++ b/opentelemetry-sdk/tests/test_configurator.py @@ -1255,7 +1255,7 @@ def test_metrics_init_exporter(self): provider._sdk_config.resource.attributes.get("service.name"), "otlp-service", ) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReader) self.assertIsInstance(reader.exporter, DummyOTLPMetricExporter) @@ -1268,7 +1268,7 @@ def test_metrics_init_pull_exporter(self): self.assertEqual(self.set_provider_mock.call_count, 1) provider = self.set_provider_mock.call_args[0][0] self.assertIsInstance(provider, DummyMeterProvider) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReaderPullExporter) def test_metrics_init_exporter_uses_exporter_args_map(self): @@ -1282,7 +1282,7 @@ def test_metrics_init_exporter_uses_exporter_args_map(self): }, ) provider = self.set_provider_mock.call_args[0][0] - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader.exporter.compression, "gzip") def test_metrics_init_meter_configurator_none_by_default(self):