From 8c6e48d6838f3928e79456ca189508f9d9e79297 Mon Sep 17 00:00:00 2001 From: Mani Yazdankhah Date: Thu, 16 Apr 2026 14:01:35 +0100 Subject: [PATCH 1/3] New APIs to add/remove metric readers at run-time --- .../sdk/metrics/_internal/__init__.py | 44 ++++++- .../metrics/_internal/measurement_consumer.py | 32 ++++- .../metrics/_internal/sdk_configuration.py | 1 - .../_configuration/test_meter_provider.py | 23 ++-- .../metrics/test_measurement_consumer.py | 116 ++++++++++++++++-- .../metrics/test_metric_reader_storage.py | 17 --- .../tests/metrics/test_metrics.py | 34 ++++- .../metrics/test_view_instrument_match.py | 1 - opentelemetry-sdk/tests/test_configurator.py | 6 +- 9 files changed, 213 insertions(+), 61 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index e6583d1c5ff..11899f5be4a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -501,11 +501,12 @@ def __init__( ) ), resource=resource, - metric_readers=metric_readers, views=views, ) + self._metric_readers = metric_readers self._measurement_consumer = SynchronousMeasurementConsumer( - sdk_config=self._sdk_config + sdk_config=self._sdk_config, + metric_readers=metric_readers, ) disabled = environ.get(OTEL_SDK_DISABLED, "") self._disabled = disabled.lower().strip() == "true" @@ -520,7 +521,7 @@ def __init__( _meter_configurator or _default_meter_configurator ) - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: with self._all_metric_readers_lock: if metric_reader in self._all_metric_readers: # pylint: disable=broad-exception-raised @@ -571,7 +572,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -616,7 +617,7 @@ def _shutdown(): metric_reader_error = {} - for metric_reader in self._sdk_config.metric_readers: + for metric_reader in self._metric_readers: current_ts = time_ns() try: if current_ts >= deadline_ns: @@ -686,3 +687,36 @@ def get_meter( ), ) return self._meters[instrumentation_scope] + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader" + ) -> None: + with self._all_metric_readers_lock: + if metric_reader in self._all_metric_readers: + _logger.warning( + "MetricReader '%s' has been registered already!", + metric_reader, + ) + return + self._measurement_consumer.add_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback( + self._measurement_consumer.collect + ) + self._all_metric_readers.add(metric_reader) + + def remove_metric_reader( + self, + metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", + ) -> None: + with self._all_metric_readers_lock: + if metric_reader not in self._all_metric_readers: + _logger.warning( + "MetricReader '%s' has not been registered!", metric_reader + ) + return + self._measurement_consumer.remove_metric_reader(metric_reader) + # pylint: disable-next=protected-access + metric_reader._set_collect_callback(None) + metric_reader.shutdown() + self._all_metric_readers.remove(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 302f82d9924..e6cb5a3a7e8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,15 +14,15 @@ # pylint: disable=unused-import +import weakref from abc import ABC, abstractmethod from threading import Lock from time import time_ns -from typing import List, Mapping, Optional +from typing import Iterable, List, Mapping, Optional # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics import opentelemetry.sdk.metrics._internal.instrument -import opentelemetry.sdk.metrics._internal.sdk_configuration from opentelemetry.metrics._internal.instrument import CallbackOptions from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError from opentelemetry.sdk.metrics._internal.measurement import Measurement @@ -59,10 +59,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer): def __init__( self, sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", + metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"], ) -> None: self._lock = Lock() self._sdk_config = sdk_config - # should never be mutated self._reader_storages: Mapping[ opentelemetry.sdk.metrics.export.MetricReader, MetricReaderStorage ] = { @@ -71,7 +71,7 @@ def __init__( reader._instrument_class_temporality, reader._instrument_class_aggregation, ) - for reader in sdk_config.metric_readers + for reader in metric_readers } self._async_instruments: List[ opentelemetry.sdk.metrics._internal.instrument._Asynchronous @@ -86,7 +86,9 @@ def consume_measurement(self, measurement: Measurement) -> None: measurement.context, ) ) - for reader_storage in self._reader_storages.values(): + with self._lock: + reader_storages = weakref.WeakSet(self._reader_storages.values()) + for reader_storage in reader_storages: reader_storage.consume_measurement( measurement, should_sample_exemplar ) @@ -143,3 +145,23 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def add_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Registers a new metric reader.""" + with self._lock: + self._reader_storages[metric_reader] = MetricReaderStorage( + self._sdk_config, + # pylint: disable-next=protected-access + metric_reader._instrument_class_temporality, + # pylint: disable-next=protected-access + metric_reader._instrument_class_aggregation, + ) + + def remove_metric_reader( + self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" + ) -> None: + """Unregisters the given metric reader.""" + with self._lock: + self._reader_storages.pop(metric_reader) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py index f5d176d0b02..93dd2ecd3a3 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/sdk_configuration.py @@ -26,5 +26,4 @@ class SdkConfiguration: exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter" resource: "opentelemetry.sdk.resources.Resource" - metric_readers: Sequence["opentelemetry.sdk.metrics.export.MetricReader"] views: Sequence["opentelemetry.sdk.metrics.view.View"] diff --git a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py index 04d60847f02..e64f615a029 100644 --- a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py +++ b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py @@ -110,7 +110,7 @@ def test_none_config_uses_supplied_resource(self): def test_none_config_no_readers(self): provider = create_meter_provider(None) - self.assertEqual(len(provider._sdk_config.metric_readers), 0) + self.assertEqual(len(provider._metric_readers), 0) def test_none_config_uses_trace_based_exemplar_filter(self): provider = create_meter_provider(None) @@ -141,7 +141,7 @@ def test_none_config_does_not_read_interval_env_var(self): ) with patch.dict(os.environ, {"OTEL_METRIC_EXPORT_INTERVAL": "999999"}): provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, PeriodicExportingMetricReader) self.assertEqual(reader._export_interval_millis, 60000.0) @@ -165,11 +165,6 @@ def test_configure_with_config_sets_global(self): arg = mock_set.call_args[0][0] self.assertIsInstance(arg, MeterProvider) - def test_empty_readers_list(self): - config = MeterProviderConfig(readers=[]) - provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 0) - class TestCreateMetricReaders(unittest.TestCase): @staticmethod @@ -191,7 +186,7 @@ def test_console_exporter(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, PeriodicExportingMetricReader) self.assertIsInstance(reader._exporter, ConsoleMetricExporter) @@ -200,7 +195,7 @@ def test_periodic_reader_default_interval(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_interval_millis, 60000.0) def test_periodic_reader_default_timeout(self): @@ -208,7 +203,7 @@ def test_periodic_reader_default_timeout(self): PushMetricExporterConfig(console=ConsoleMetricExporterConfig()) ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_timeout_millis, 30000.0) def test_periodic_reader_explicit_interval(self): @@ -217,7 +212,7 @@ def test_periodic_reader_explicit_interval(self): interval=5000, ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_interval_millis, 5000.0) def test_periodic_reader_explicit_timeout(self): @@ -226,7 +221,7 @@ def test_periodic_reader_explicit_timeout(self): timeout=10000, ) provider = create_meter_provider(config) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader._export_timeout_millis, 10000.0) def test_otlp_http_missing_package_raises(self): @@ -326,7 +321,7 @@ def test_multiple_readers(self): ] ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 2) + self.assertEqual(len(provider._metric_readers), 2) class TestTemporalityAndAggregation(unittest.TestCase): @@ -350,7 +345,7 @@ def _make_console_config(temporality=None, histogram_agg=None): @staticmethod def _get_exporter(config): provider = create_meter_provider(config) - return provider._sdk_config.metric_readers[0]._exporter + return provider._metric_readers[0]._exporter def test_default_temporality_is_cumulative(self): exporter = self._get_exporter(self._make_console_config()) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index a9c3d434fdb..8240dc857f7 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -14,6 +14,7 @@ # pylint: disable=invalid-name,no-self-use +from threading import Event, Thread from time import sleep from unittest import TestCase from unittest.mock import MagicMock, Mock, patch @@ -34,7 +35,8 @@ class TestSynchronousMeasurementConsumer(TestCase): def test_parent(self, _): self.assertIsInstance( - SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer + SynchronousMeasurementConsumer(MagicMock(), metric_readers=()), + MeasurementConsumer, ) def test_creates_metric_reader_storages(self, MockMetricReaderStorage): @@ -44,9 +46,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5) @@ -61,9 +63,9 @@ def test_measurements_passed_to_each_reader_storage( SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) measurement_mock = Mock() consumer.consume_measurement(measurement_mock) @@ -83,9 +85,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=reader_mocks, views=Mock(), - ) + ), + metric_readers=reader_mocks, ) for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks): rs_mock.collect.assert_not_called() @@ -102,9 +104,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(should_sample=Mock(return_value=False)), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) async_instrument_mocks = [MagicMock() for _ in range(5)] for i_mock in async_instrument_mocks: @@ -133,9 +135,9 @@ def test_collect_timeout(self, MockMetricReaderStorage): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) def sleep_1(*args, **kwargs): @@ -167,9 +169,9 @@ def test_collect_deadline( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=[reader_mock], views=Mock(), - ) + ), + metric_readers=[reader_mock], ) consumer.register_asynchronous_instrument( @@ -199,3 +201,91 @@ def test_collect_deadline( callback_options_time_call, 10000, ) + + +class TestSynchronousMeasurementConsumerConcurrency(TestCase): + def test_concurrent_changes_to_metric_readers(self): + timeout = 1 + failure = None + iteration_started = Event() + mutation_done = Event() + iteration_timeout_error = "Timed out waiting for iteration to start" + mutation_timeout_error = "Timed out waiting for mutation to be done" + + consumer = SynchronousMeasurementConsumer( + SdkConfiguration( + exemplar_filter=MagicMock(), + resource=MagicMock(), + views=MagicMock(), + ), + metric_readers=[MagicMock()], + ) + + def _hooked_iter(iterable): + nonlocal failure + + iterable = iter(iterable) + iteration_started.set() + if not mutation_done.wait(timeout): + failure = mutation_timeout_error + yield next(iterable, None) + yield from iterable + + class HookedDict(dict): + def __iter__(self): + return _hooked_iter(super().__iter__()) + + def keys(self): + return _hooked_iter(super().keys()) + + def values(self): + return _hooked_iter(super().values()) + + def items(self): + return _hooked_iter(super().items()) + + with patch.object( + consumer, + "_reader_storages", + # pylint: disable-next=protected-access + HookedDict(consumer._reader_storages), + ): + + def mutate(): + """Directly mutate _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + # pylint: disable-next=protected-access + consumer._reader_storages.clear() + + # Verify that test setup works (direct mutation with no synchronization fails) + with self.assertRaises(RuntimeError) as cm: + t = Thread(target=mutate) + t.start() + try: + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual( + "dictionary changed size during iteration", str(cm.exception) + ) + + def add_and_remove_readers(): + """Modifies _reader_storages after iteration starts""" + nonlocal failure + if not iteration_started.wait(timeout): + failure = iteration_timeout_error + reader = MagicMock() + consumer.add_metric_reader(reader) + consumer.remove_metric_reader(reader) + + # Verify the API calls do not attempt concurrent modification of reader storages + t = Thread(target=add_and_remove_readers) + t.start() + try: + consumer.add_metric_reader(MagicMock()) + consumer.consume_measurement(MagicMock()) + finally: + t.join() + self.assertEqual(mutation_timeout_error, failure) diff --git a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py index ec1456ae84c..5f23d6e7472 100644 --- a/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py +++ b/opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py @@ -79,7 +79,6 @@ def test_creates_view_instrument_matches( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -146,7 +145,6 @@ def test_forwards_calls_to_view_instrument_match( SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1, view2), ), MagicMock( @@ -256,7 +254,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(view1,), ), MagicMock( @@ -283,7 +280,6 @@ def test_race_collect_with_new_instruments(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="test"),), ), MagicMock( @@ -329,7 +325,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(), ), MagicMock( @@ -365,7 +360,6 @@ def test_drop_aggregation(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="name", aggregation=DropAggregation() @@ -393,7 +387,6 @@ def test_same_collection_start(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=(View(instrument_name="name"),), ), MagicMock( @@ -440,7 +433,6 @@ def test_conflicting_view_configuration(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View( instrument_name="observable_counter", @@ -489,7 +481,6 @@ def test_view_instrument_match_conflict_0(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -547,7 +538,6 @@ def test_view_instrument_match_conflict_1(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -616,7 +606,6 @@ def test_view_instrument_match_conflict_2(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="foo"), View(instrument_name="bar"), @@ -669,7 +658,6 @@ def test_view_instrument_match_conflict_3(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -720,7 +708,6 @@ def test_view_instrument_match_conflict_4(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="bar", name="foo"), View(instrument_name="baz", name="foo"), @@ -767,7 +754,6 @@ def test_view_instrument_match_conflict_5(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -822,7 +808,6 @@ def test_view_instrument_match_conflict_6(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter", name="foo"), View(instrument_name="histogram", name="foo"), @@ -877,7 +862,6 @@ def test_view_instrument_match_conflict_7(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="observable_counter_0", name="foo"), View(instrument_name="observable_counter_1", name="foo"), @@ -932,7 +916,6 @@ def test_view_instrument_match_conflict_8(self): SdkConfiguration( exemplar_filter=Mock(), resource=Mock(), - metric_readers=(), views=( View(instrument_name="up_down_counter", name="foo"), View( diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index d2f475faa00..7eb58343c1b 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -13,10 +13,9 @@ # limitations under the License. # pylint: disable=protected-access,no-self-use,too-many-lines - import weakref from collections.abc import Callable -from logging import WARNING +from logging import DEBUG, WARNING from threading import Lock from time import sleep from typing import Any, Iterable, Sequence @@ -45,6 +44,7 @@ _RuleBasedMeterConfigurator, ) from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, Metric, MetricExporter, MetricExportResult, @@ -499,6 +499,36 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer): sync_consumer_instance.consume_measurement.assert_called() + def test_addition_of_metric_reader(self): + internal_logger = "opentelemetry.sdk.metrics._internal" + export_logger = "opentelemetry.sdk.metrics._internal.export" + + reader = InMemoryMetricReader() + meter_provider = MeterProvider() + meter = meter_provider.get_meter(__name__) + counter = meter.create_counter("counter") + counter.add(1) + # Suppress warnings for calling collect on an unregistered metric reader + with self.assertLogs(export_logger, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + meter_provider.add_metric_reader(reader) + counter.add(1) + self.assertIsNotNone(reader.get_metrics_data()) + + with self.assertLogs(internal_logger, DEBUG) as cm: + meter_provider.add_metric_reader(reader) + self.assertIn("has been registered already!", cm.output[0]) + + meter_provider.remove_metric_reader(reader) + counter.add(1) + with self.assertLogs(export_logger, DEBUG): + self.assertIsNone(reader.get_metrics_data()) + + with self.assertLogs(internal_logger, DEBUG) as cm: + meter_provider.remove_metric_reader(reader) + self.assertIn("has not been registered!", cm.output[0]) + class TestMeterConcurrency(ConcurrencyTestBase, TestCase): def test_create_instrument_concurrency(self): diff --git a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py index 38d36758f39..63a5edfed83 100644 --- a/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py +++ b/opentelemetry-sdk/tests/metrics/test_view_instrument_match.py @@ -82,7 +82,6 @@ def setUpClass(cls): cls.sdk_configuration = SdkConfiguration( exemplar_filter=Mock(), resource=cls.mock_resource, - metric_readers=[], views=[], ) diff --git a/opentelemetry-sdk/tests/test_configurator.py b/opentelemetry-sdk/tests/test_configurator.py index 17a2b85bae8..c1b84cad3ec 100644 --- a/opentelemetry-sdk/tests/test_configurator.py +++ b/opentelemetry-sdk/tests/test_configurator.py @@ -1263,7 +1263,7 @@ def test_metrics_init_exporter(self): provider._sdk_config.resource.attributes.get("service.name"), "otlp-service", ) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReader) self.assertIsInstance(reader.exporter, DummyOTLPMetricExporter) @@ -1276,7 +1276,7 @@ def test_metrics_init_pull_exporter(self): self.assertEqual(self.set_provider_mock.call_count, 1) provider = self.set_provider_mock.call_args[0][0] self.assertIsInstance(provider, DummyMeterProvider) - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertIsInstance(reader, DummyMetricReaderPullExporter) def test_metrics_init_exporter_uses_exporter_args_map(self): @@ -1290,7 +1290,7 @@ def test_metrics_init_exporter_uses_exporter_args_map(self): }, ) provider = self.set_provider_mock.call_args[0][0] - reader = provider._sdk_config.metric_readers[0] + reader = provider._metric_readers[0] self.assertEqual(reader.exporter.compression, "gzip") def test_metrics_init_meter_configurator_none_by_default(self): From 482f7886e779af26dd7cae1fa6ba80cf771f5589 Mon Sep 17 00:00:00 2001 From: Mani Yazdankhah Date: Thu, 21 May 2026 09:44:27 +0100 Subject: [PATCH 2/3] Updated typehints for `_collect` callback. Added Changelog entry for the public API --- .changelog/4863.added | 1 + .../opentelemetry/sdk/metrics/_internal/export/__init__.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 .changelog/4863.added diff --git a/.changelog/4863.added b/.changelog/4863.added new file mode 100644 index 00000000000..fd65e7b875e --- /dev/null +++ b/.changelog/4863.added @@ -0,0 +1 @@ +`opentelemetry-sdk`: add `add_metric_reader` / `remove_metric_reader` public APIs to register / unregister metric readers at runtime. \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 676a356c2bc..31ae3bdfe87 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -229,7 +229,7 @@ def __init__( AggregationTemporality, ], Iterable[opentelemetry.sdk.metrics.export.Metric], - ] = None + ] | None = None self._instrument_class_temporality = { _Counter: AggregationTemporality.CUMULATIVE, @@ -373,7 +373,7 @@ def _set_collect_callback( AggregationTemporality, ], MetricsData, - ], + ] | None, ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" self._collect = func From 17e99d4b5c2e3eb8ac8dbc3483df2b224151ca8e Mon Sep 17 00:00:00 2001 From: Mani Yazdankhah Date: Fri, 22 May 2026 05:43:04 +0100 Subject: [PATCH 3/3] Updated tests and fixed lint --- .../sdk/metrics/_internal/export/__init__.py | 20 +++++++++++-------- .../metrics/_internal/measurement_consumer.py | 3 +-- .../_configuration/test_meter_provider.py | 8 ++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 31ae3bdfe87..aee356f78d2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -223,13 +223,16 @@ def __init__( *, otel_component_type: OtelComponentTypeValues | None = None, ) -> None: - self._collect: Callable[ - [ - opentelemetry.sdk.metrics.export.MetricReader, - AggregationTemporality, - ], - Iterable[opentelemetry.sdk.metrics.export.Metric], - ] | None = None + self._collect: ( + Callable[ + [ + opentelemetry.sdk.metrics.export.MetricReader, + AggregationTemporality, + ], + Iterable[opentelemetry.sdk.metrics.export.Metric], + ] + | None + ) = None self._instrument_class_temporality = { _Counter: AggregationTemporality.CUMULATIVE, @@ -373,7 +376,8 @@ def _set_collect_callback( AggregationTemporality, ], MetricsData, - ] | None, + ] + | None, ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" self._collect = func diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 2ada4791242..8af2f064314 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -5,10 +5,9 @@ import weakref from abc import ABC, abstractmethod -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from threading import Lock from time import time_ns -from typing import Iterable # This kind of import is needed to avoid Sphinx errors. import opentelemetry.sdk.metrics diff --git a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py index 84af63b0491..566329cf146 100644 --- a/opentelemetry-sdk/tests/_configuration/test_meter_provider.py +++ b/opentelemetry-sdk/tests/_configuration/test_meter_provider.py @@ -342,7 +342,7 @@ def test_pull_prometheus_creates_reader(self): mock_reader_cls.assert_called_once_with(disable_target_info=True) mock_start_server.assert_called_once_with(port=9090, addr="0.0.0.0") - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_pull_prometheus_defaults(self): mock_reader_cls = MagicMock() @@ -370,7 +370,7 @@ def test_pull_prometheus_defaults(self): mock_reader_cls.assert_called_once_with(disable_target_info=False) mock_start_server.assert_called_once_with(port=9464, addr="localhost") - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_pull_prometheus_missing_package_raises(self): with patch.dict( @@ -427,7 +427,7 @@ def test_pull_plugin_loads_via_entry_point(self): ] ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) mock_class.assert_called_once_with(port=8080) mock_entry_points.assert_called_once_with( group="opentelemetry_pull_metric_exporter", @@ -542,7 +542,7 @@ def test_plugin_metric_exporter_loaded_via_entry_point(self): PushMetricExporterConfig(my_custom_exporter={}) ) provider = create_meter_provider(config) - self.assertEqual(len(provider._sdk_config.metric_readers), 1) + self.assertEqual(len(provider._metric_readers), 1) def test_unknown_metric_exporter_raises_configuration_error(self): with patch(