From c51244fcfabb4db2ca4a5c82eeead15334ac5c2d Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 16:26:03 +0000 Subject: [PATCH 1/8] Fix memory leak --- .../sdk/metrics/_internal/__init__.py | 4 +- .../sdk/metrics/_internal/export/__init__.py | 5 +- .../test_provider_shutdown.py | 76 +++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) create mode 100644 opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index f9ed0280325..ccd1c1faa60 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import weakref from atexit import register, unregister from logging import getLogger from os import environ @@ -393,7 +393,7 @@ class MeterProvider(APIMeterProvider): """ _all_metric_readers_lock = Lock() - _all_metric_readers = set() + _all_metric_readers = weakref.WeakSet() def __init__( self, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 4acf1cc81dd..f6da08ff07c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -14,6 +14,7 @@ import math import os +import weakref from abc import ABC, abstractmethod from enum import Enum from logging import getLogger @@ -490,8 +491,10 @@ def __init__( ) self._daemon_thread.start() if hasattr(os, "register_at_fork"): + weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) + os.register_at_fork( - after_in_child=self._at_fork_reinit + after_in_child=lambda: weak_at_fork()() ) # pylint: disable=protected-access elif self._export_interval_millis <= 0: raise ValueError( diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py new file mode 100644 index 00000000000..02f1ac29241 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py @@ -0,0 +1,76 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import time +import weakref +from typing import Sequence +from unittest import TestCase + +from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.export import MetricExporter, MetricExportResult, PeriodicExportingMetricReader + + +class FakeMetricsExporter(MetricExporter): + def __init__(self, wait=0, preferred_temporality=None, preferred_aggregation=None): + self.wait = wait + self.metrics = [] + self._shutdown = False + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + + def export( + self, + metrics_data: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + time.sleep(self.wait) + self.metrics.extend(metrics_data) + return True + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + self._shutdown = True + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + +class TestMeterProviderShutdown(TestCase): + def test_meter_provider_shutdown_triggers_garbage_collection(self): + def create_and_shutdown(): + exporter = FakeMetricsExporter() + exporter_wr = weakref.ref(exporter) + + reader = PeriodicExportingMetricReader(exporter) + reader_wr = weakref.ref(reader) + + provider = MeterProvider(metric_readers=[reader]) + provider_wr = weakref.ref(provider) + + provider.shutdown() + + return exporter_wr, reader_wr, provider_wr + + # When: the provider is shutdown + exporter_weakref, reader_weakref, provider_weakref = create_and_shutdown() + gc.collect() + + # Then: the provider, exporter and reader should be garbage collected + self.assertIsNone(exporter_weakref()) + self.assertIsNone(reader_weakref()) + self.assertIsNone(provider_weakref()) From 83d06b08f435d832032fdf487d2b35bb93082220 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 16:30:28 +0000 Subject: [PATCH 2/8] fix format --- .../sdk/metrics/_internal/export/__init__.py | 2 +- .../test_provider_shutdown.py | 24 ++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index f6da08ff07c..c0c101d296b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -494,7 +494,7 @@ def __init__( weak_at_fork = weakref.WeakMethod(self._at_fork_reinit) os.register_at_fork( - after_in_child=lambda: weak_at_fork()() + after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda ) # pylint: disable=protected-access elif self._export_interval_millis <= 0: raise ValueError( diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py index 02f1ac29241..54c8b3ced26 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py @@ -18,13 +18,19 @@ from typing import Sequence from unittest import TestCase -from opentelemetry.proto.metrics.v1.metrics_pb2 import Metric from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics._internal.export import MetricExporter, MetricExportResult, PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.export import ( + Metric, + MetricExporter, + MetricExportResult, + PeriodicExportingMetricReader, +) class FakeMetricsExporter(MetricExporter): - def __init__(self, wait=0, preferred_temporality=None, preferred_aggregation=None): + def __init__( + self, wait=0, preferred_temporality=None, preferred_aggregation=None + ): self.wait = wait self.metrics = [] self._shutdown = False @@ -34,10 +40,10 @@ def __init__(self, wait=0, preferred_temporality=None, preferred_aggregation=Non ) def export( - self, - metrics_data: Sequence[Metric], - timeout_millis: float = 10_000, - **kwargs, + self, + metrics_data: Sequence[Metric], + timeout_millis: float = 10_000, + **kwargs, ) -> MetricExportResult: time.sleep(self.wait) self.metrics.extend(metrics_data) @@ -67,7 +73,9 @@ def create_and_shutdown(): return exporter_wr, reader_wr, provider_wr # When: the provider is shutdown - exporter_weakref, reader_weakref, provider_weakref = create_and_shutdown() + exporter_weakref, reader_weakref, provider_weakref = ( + create_and_shutdown() + ) gc.collect() # Then: the provider, exporter and reader should be garbage collected From 328085e3d113a70e9ea93e5586c5ce4c16e06400 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 16:59:48 +0000 Subject: [PATCH 3/8] Update: CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6699a625537..4a44e6c9bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4176](https://github.com/open-telemetry/opentelemetry-python/pull/4176)) - Update semantic conventions to version 1.28.0 ([#4218](https://github.com/open-telemetry/opentelemetry-python/pull/4218)) +- Fix memory leak in exporter and reader + ([#4224](https://github.com/open-telemetry/opentelemetry-python/pull/4224)) ## Version 1.27.0/0.48b0 (2024-08-28) From 5bde8d7db07d5944894ad6d36d42fbbcb38a2923 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 17:06:12 +0000 Subject: [PATCH 4/8] Adjust test method name --- .../tests/metrics/integration_test/test_provider_shutdown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py index 54c8b3ced26..12267a2410a 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py @@ -57,7 +57,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool: class TestMeterProviderShutdown(TestCase): - def test_meter_provider_shutdown_triggers_garbage_collection(self): + def test_meter_provider_shutdown_cleans_up_successfully(self): def create_and_shutdown(): exporter = FakeMetricsExporter() exporter_wr = weakref.ref(exporter) From 4b68c87aca3d71544cfd8472353db8de66807787 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 18:19:43 +0000 Subject: [PATCH 5/8] fix tests --- opentelemetry-sdk/tests/metrics/test_metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index f899b30808a..3f4f1325eb0 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import weakref # pylint: disable=protected-access,no-self-use @@ -67,7 +68,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: class TestMeterProvider(ConcurrencyTestBase, TestCase): def tearDown(self): - MeterProvider._all_metric_readers = set() + MeterProvider._all_metric_readers = weakref.WeakSet() @patch.object(Resource, "create") def test_init_default(self, resource_patch): From 0f940f78e1e51b7ab8d55be7b3dfe4cfcbea8f86 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 19:56:20 +0000 Subject: [PATCH 6/8] sort imports --- opentelemetry-sdk/tests/metrics/test_metrics.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 3f4f1325eb0..4701ada9c40 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -12,10 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import weakref - -# pylint: disable=protected-access,no-self-use - - from logging import WARNING from time import sleep from typing import Iterable, Sequence @@ -48,6 +44,11 @@ from opentelemetry.test import TestCase from opentelemetry.test.concurrency_test import ConcurrencyTestBase, MockFunc +# pylint: disable=protected-access,no-self-use + + + + class DummyMetricReader(MetricReader): def __init__(self): From d84bf565969312cb860b6d08040044d9c54b41d2 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Tue, 15 Oct 2024 20:45:24 +0000 Subject: [PATCH 7/8] ruff format --- .../sdk/metrics/_internal/export/__init__.py | 48 +++++++++---------- .../test_provider_shutdown.py | 8 ++-- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 89bb8f3eb73..d1ff8be6be6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -247,27 +247,27 @@ def __init__( if typ is Counter: self._instrument_class_temporality[_Counter] = temporality elif typ is UpDownCounter: - self._instrument_class_temporality[_UpDownCounter] = ( - temporality - ) + self._instrument_class_temporality[ + _UpDownCounter + ] = temporality elif typ is Histogram: - self._instrument_class_temporality[_Histogram] = ( - temporality - ) + self._instrument_class_temporality[ + _Histogram + ] = temporality elif typ is Gauge: self._instrument_class_temporality[_Gauge] = temporality elif typ is ObservableCounter: - self._instrument_class_temporality[_ObservableCounter] = ( - temporality - ) + self._instrument_class_temporality[ + _ObservableCounter + ] = temporality elif typ is ObservableUpDownCounter: self._instrument_class_temporality[ _ObservableUpDownCounter ] = temporality elif typ is ObservableGauge: - self._instrument_class_temporality[_ObservableGauge] = ( - temporality - ) + self._instrument_class_temporality[ + _ObservableGauge + ] = temporality else: raise Exception(f"Invalid instrument class found {typ}") @@ -287,27 +287,27 @@ def __init__( if typ is Counter: self._instrument_class_aggregation[_Counter] = aggregation elif typ is UpDownCounter: - self._instrument_class_aggregation[_UpDownCounter] = ( - aggregation - ) + self._instrument_class_aggregation[ + _UpDownCounter + ] = aggregation elif typ is Histogram: - self._instrument_class_aggregation[_Histogram] = ( - aggregation - ) + self._instrument_class_aggregation[ + _Histogram + ] = aggregation elif typ is Gauge: self._instrument_class_aggregation[_Gauge] = aggregation elif typ is ObservableCounter: - self._instrument_class_aggregation[_ObservableCounter] = ( - aggregation - ) + self._instrument_class_aggregation[ + _ObservableCounter + ] = aggregation elif typ is ObservableUpDownCounter: self._instrument_class_aggregation[ _ObservableUpDownCounter ] = aggregation elif typ is ObservableGauge: - self._instrument_class_aggregation[_ObservableGauge] = ( - aggregation - ) + self._instrument_class_aggregation[ + _ObservableGauge + ] = aggregation else: raise Exception(f"Invalid instrument class found {typ}") diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py index 12267a2410a..1f4a16d7f69 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_provider_shutdown.py @@ -73,9 +73,11 @@ def create_and_shutdown(): return exporter_wr, reader_wr, provider_wr # When: the provider is shutdown - exporter_weakref, reader_weakref, provider_weakref = ( - create_and_shutdown() - ) + ( + exporter_weakref, + reader_weakref, + provider_weakref, + ) = create_and_shutdown() gc.collect() # Then: the provider, exporter and reader should be garbage collected From 7b20bf86b9fa42fa1ab33cda5d83eac943c8a236 Mon Sep 17 00:00:00 2001 From: Mateusz Soltysik Date: Wed, 16 Oct 2024 16:53:57 +0000 Subject: [PATCH 8/8] apply tox -e ruff --- .../sdk/metrics/_internal/export/__init__.py | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index d1ff8be6be6..89bb8f3eb73 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -247,27 +247,27 @@ def __init__( if typ is Counter: self._instrument_class_temporality[_Counter] = temporality elif typ is UpDownCounter: - self._instrument_class_temporality[ - _UpDownCounter - ] = temporality + self._instrument_class_temporality[_UpDownCounter] = ( + temporality + ) elif typ is Histogram: - self._instrument_class_temporality[ - _Histogram - ] = temporality + self._instrument_class_temporality[_Histogram] = ( + temporality + ) elif typ is Gauge: self._instrument_class_temporality[_Gauge] = temporality elif typ is ObservableCounter: - self._instrument_class_temporality[ - _ObservableCounter - ] = temporality + self._instrument_class_temporality[_ObservableCounter] = ( + temporality + ) elif typ is ObservableUpDownCounter: self._instrument_class_temporality[ _ObservableUpDownCounter ] = temporality elif typ is ObservableGauge: - self._instrument_class_temporality[ - _ObservableGauge - ] = temporality + self._instrument_class_temporality[_ObservableGauge] = ( + temporality + ) else: raise Exception(f"Invalid instrument class found {typ}") @@ -287,27 +287,27 @@ def __init__( if typ is Counter: self._instrument_class_aggregation[_Counter] = aggregation elif typ is UpDownCounter: - self._instrument_class_aggregation[ - _UpDownCounter - ] = aggregation + self._instrument_class_aggregation[_UpDownCounter] = ( + aggregation + ) elif typ is Histogram: - self._instrument_class_aggregation[ - _Histogram - ] = aggregation + self._instrument_class_aggregation[_Histogram] = ( + aggregation + ) elif typ is Gauge: self._instrument_class_aggregation[_Gauge] = aggregation elif typ is ObservableCounter: - self._instrument_class_aggregation[ - _ObservableCounter - ] = aggregation + self._instrument_class_aggregation[_ObservableCounter] = ( + aggregation + ) elif typ is ObservableUpDownCounter: self._instrument_class_aggregation[ _ObservableUpDownCounter ] = aggregation elif typ is ObservableGauge: - self._instrument_class_aggregation[ - _ObservableGauge - ] = aggregation + self._instrument_class_aggregation[_ObservableGauge] = ( + aggregation + ) else: raise Exception(f"Invalid instrument class found {typ}")