From d97ba7ae2540476d4d28b25a98feda983e3ade14 Mon Sep 17 00:00:00 2001 From: Towseef Altaf Date: Sun, 30 Nov 2025 16:41:18 +0530 Subject: [PATCH 1/3] Add retry/backoff support to Prometheus Remote Write exporter --- .../README.rst | 10 ++ .../prometheus_remote_write/__init__.py | 170 ++++++++++++++++-- .../test_prometheus_remote_write_exporter.py | 37 ++++ 3 files changed, 199 insertions(+), 18 deletions(-) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst index 6ed5c5ebcf..cd4916226f 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst @@ -9,6 +9,11 @@ OpenTelemetry Prometheus Remote Write Exporter This package contains an exporter to send metrics from the OpenTelemetry Python SDK directly to a Prometheus Remote Write integrated backend (such as Cortex or Thanos) without having to run an instance of the Prometheus server. +Key features +------------ + +* Optional bounded retries with exponential backoff and jitter for retryable HTTP/network failures. + Installation ------------ @@ -21,6 +26,11 @@ Installation .. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/ .. _Prometheus Remote Write integrated backend: https://prometheus.io/docs/operating/integrations/ +Configuration highlights +------------------------ + +* ``max_retries`` (default ``3``), ``retry_backoff_factor`` (default ``0.5``), ``retry_backoff_max`` (default ``5.0``), and ``retry_jitter_ratio`` (default ``0.1``) tune the retry policy for retryable statuses (429/408/5xx) and connection/timeouts. + References ---------- diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index cfb1d9ea75..578541b448 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -13,10 +13,12 @@ # limitations under the License. import logging +import random import re +import time from collections import defaultdict from itertools import chain -from typing import Dict, Mapping, Sequence +from typing import Dict, Mapping, Optional, Sequence import requests import snappy @@ -79,6 +81,11 @@ def __init__( resources_as_labels: bool = True, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict = None, + max_retries: int = 3, + retry_backoff_factor: float = 0.5, + retry_backoff_max: float = 5.0, + retry_jitter_ratio: float = 0.1, + retry_status_codes: Optional[Sequence[int]] = None, ): self.endpoint = endpoint self.basic_auth = basic_auth @@ -87,6 +94,11 @@ def __init__( self.tls_config = tls_config self.proxies = proxies self.resources_as_labels = resources_as_labels + self.max_retries = max_retries + self.retry_backoff_factor = retry_backoff_factor + self.retry_backoff_max = retry_backoff_max + self.retry_jitter_ratio = retry_jitter_ratio + self.retry_status_codes = retry_status_codes if not preferred_temporality: preferred_temporality = { @@ -181,6 +193,56 @@ def headers(self): def headers(self, headers: Dict): self._headers = headers + @property + def max_retries(self) -> int: + return self._max_retries + + @max_retries.setter + def max_retries(self, max_retries: int): + if max_retries < 0: + raise ValueError("max_retries must be greater than or equal to 0") + self._max_retries = max_retries + + @property + def retry_backoff_factor(self) -> float: + return self._retry_backoff_factor + + @retry_backoff_factor.setter + def retry_backoff_factor(self, retry_backoff_factor: float): + if retry_backoff_factor <= 0: + raise ValueError("retry_backoff_factor must be greater than 0") + self._retry_backoff_factor = retry_backoff_factor + + @property + def retry_backoff_max(self) -> float: + return self._retry_backoff_max + + @retry_backoff_max.setter + def retry_backoff_max(self, retry_backoff_max: float): + if retry_backoff_max <= 0: + raise ValueError("retry_backoff_max must be greater than 0") + self._retry_backoff_max = retry_backoff_max + + @property + def retry_jitter_ratio(self) -> float: + return self._retry_jitter_ratio + + @retry_jitter_ratio.setter + def retry_jitter_ratio(self, retry_jitter_ratio: float): + if retry_jitter_ratio < 0: + raise ValueError("retry_jitter_ratio must be greater than or equal to 0") + self._retry_jitter_ratio = retry_jitter_ratio + + @property + def retry_status_codes(self) -> Sequence[int]: + return self._retry_status_codes + + @retry_status_codes.setter + def retry_status_codes(self, retry_status_codes: Optional[Sequence[int]]): + if retry_status_codes is None: + retry_status_codes = chain([429, 408], range(500, 600)) + self._retry_status_codes = tuple(retry_status_codes) + def export( self, metrics_data: MetricsData, @@ -253,7 +315,9 @@ def _parse_metric( return self._convert_to_timeseries(sample_sets, resource_labels) def _convert_to_timeseries( - self, sample_sets: Mapping[tuple, Sequence], resource_labels: Sequence + self, + sample_sets: Mapping[tuple, Sequence], + resource_labels: Sequence, ) -> Sequence[TimeSeries]: timeseries = [] for labels, samples in sample_sets.items(): @@ -339,6 +403,7 @@ def handle_bucket(value, bound=None, name_override=None): sample_attr_pairs.append( handle_bucket(data_point.count, name_override=f"{name}_count") ) + return sample_attr_pairs def _parse_data_point(self, data_point, name=None): @@ -366,6 +431,35 @@ def _build_headers(self) -> Dict: headers[header_name] = header_value return headers + def _should_retry_status(self, status_code: Optional[int]) -> bool: + if status_code is None: + return False + return status_code in self.retry_status_codes + + @staticmethod + def _is_retryable_error(err: requests.exceptions.RequestException) -> bool: + return isinstance( + err, + ( + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + ), + ) + + def _calculate_backoff(self, attempt: int) -> float: + delay = min( + self.retry_backoff_max, + self.retry_backoff_factor * (2 ** (attempt - 1)), + ) + if self.retry_jitter_ratio: + jitter = delay * self.retry_jitter_ratio + delay += random.uniform(-jitter, jitter) + return max(delay, 0.0) + + @staticmethod + def _sleep(duration: float) -> None: + time.sleep(duration) + def _send_message( self, message: bytes, headers: Dict ) -> MetricExportResult: @@ -389,23 +483,63 @@ def _send_message( self.tls_config["cert_file"], self.tls_config["key_file"], ) - try: - response = requests.post( - self.endpoint, - data=message, - headers=headers, - auth=auth, - timeout=self.timeout, - proxies=self.proxies, - cert=cert, - verify=verify, - ) - if not response.ok: + total_attempts = self.max_retries + 1 + + for attempt in range(1, total_attempts + 1): + try: + response = requests.post( + self.endpoint, + data=message, + headers=headers, + auth=auth, + timeout=self.timeout, + proxies=self.proxies, + cert=cert, + verify=verify, + ) + if response.ok: + return MetricExportResult.SUCCESS + + if attempt < total_attempts and self._should_retry_status( + response.status_code + ): + delay = self._calculate_backoff(attempt) + logger.warning( + "Remote Write request failed with status %s, retrying in %.2fs (attempt %s/%s)", + response.status_code, + delay, + attempt, + total_attempts, + ) + self._sleep(delay) + continue + response.raise_for_status() - except requests.exceptions.RequestException as err: - logger.error("Export POST request failed with reason: %s", err) - return MetricExportResult.FAILURE - return MetricExportResult.SUCCESS + except requests.exceptions.RequestException as err: + status_code = getattr(err.response, "status_code", None) + retryable = self._should_retry_status(status_code) or self._is_retryable_error(err) + if attempt < total_attempts and retryable: + delay = self._calculate_backoff(attempt) + logger.warning( + "Remote Write request failed with reason: %s; retrying in %.2fs (attempt %s/%s)", + err, + delay, + attempt, + total_attempts, + ) + self._sleep(delay) + continue + + logger.error( + "Export POST request failed with reason: %s", err + ) + return MetricExportResult.FAILURE + + logger.error( + "Export POST request failed after %s attempts", + total_attempts, + ) + return MetricExportResult.FAILURE def force_flush(self, timeout_millis: float = 10_000) -> bool: return True diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index 814de75be4..cd396c6081 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -15,6 +15,8 @@ import unittest from unittest.mock import patch +import requests + import pytest # pylint: disable=no-name-in-module @@ -341,3 +343,38 @@ def test_build_headers(prom_rw): assert headers["Content-Type"] == "application/x-protobuf" assert headers["X-Prometheus-Remote-Write-Version"] == "0.1.0" assert headers["Custom Header"] == "test_header" + + +@patch("opentelemetry.exporter.prometheus_remote_write.time.sleep") +@patch("requests.post") +def test_send_message_retries_then_succeeds(mock_post, mock_sleep, prom_rw): + prom_rw.max_retries = 2 + prom_rw.retry_jitter_ratio = 0 + + first_response = unittest.mock.Mock() + first_response.ok = False + first_response.status_code = 500 + second_response = unittest.mock.Mock() + second_response.ok = True + mock_post.side_effect = [first_response, second_response] + + result = prom_rw._send_message(bytes(), {}) + assert result == MetricExportResult.SUCCESS + assert mock_post.call_count == 2 + mock_sleep.assert_called_once() + + +@patch("requests.post") +def test_send_message_non_retryable_status(mock_post, prom_rw): + prom_rw.max_retries = 2 + response = unittest.mock.Mock() + response.ok = False + response.status_code = 400 + response.raise_for_status.side_effect = requests.exceptions.HTTPError( + response=response + ) + mock_post.return_value = response + + result = prom_rw._send_message(bytes(), {}) + assert result == MetricExportResult.FAILURE + assert mock_post.call_count == 1 From fcebc4aea05d7a69ce0d5b51eac6d6edb8753c3e Mon Sep 17 00:00:00 2001 From: Towseef Altaf Date: Tue, 2 Dec 2025 00:44:12 +0530 Subject: [PATCH 2/3] Use urllib3 Retry for Prometheus Remote Write exporter --- .../README.rst | 3 +- .../prometheus_remote_write/__init__.py | 146 ++++++++---------- .../test_prometheus_remote_write_exporter.py | 42 ++--- 3 files changed, 80 insertions(+), 111 deletions(-) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst index cd4916226f..a654038467 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/README.rst @@ -29,7 +29,8 @@ Installation Configuration highlights ------------------------ -* ``max_retries`` (default ``3``), ``retry_backoff_factor`` (default ``0.5``), ``retry_backoff_max`` (default ``5.0``), and ``retry_jitter_ratio`` (default ``0.1``) tune the retry policy for retryable statuses (429/408/5xx) and connection/timeouts. +* ``max_retries`` (default ``3``), ``retry_backoff_factor`` (default ``0.5``), ``retry_backoff_max`` (default ``5.0``), and ``retry_jitter_ratio`` (default ``0.1``) tune the retry policy for retryable statuses (429/408/5xx) and connection/timeouts. The retry adapter is built when the exporter is instantiated; update these values at construction time. +* Total request time can grow to roughly ``(max_retries + 1) * timeout`` plus backoff; server ``Retry-After`` hints are ignored (``respect_retry_after_header=False``). References diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index 578541b448..4b6b3f1ad4 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -15,13 +15,14 @@ import logging import random import re -import time from collections import defaultdict from itertools import chain from typing import Dict, Mapping, Optional, Sequence import requests import snappy +from requests.adapters import HTTPAdapter +from urllib3.util import Retry from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import ( # pylint: disable=no-name-in-module WriteRequest, @@ -57,6 +58,27 @@ UNDERSCORE_REGEX = re.compile(r"_+") +class _JitterRetry(Retry): + def __init__( + self, + *args, + backoff_max: float = 5.0, + jitter_ratio: float = 0.1, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.backoff_max = backoff_max + self.jitter_ratio = jitter_ratio + + def get_backoff_time(self) -> float: + backoff = super().get_backoff_time() + backoff = min(backoff, self.backoff_max) + if self.jitter_ratio: + jitter = backoff * self.jitter_ratio + backoff += random.uniform(-jitter, jitter) + return max(backoff, 0.0) + + class PrometheusRemoteWriteMetricsExporter(MetricExporter): """ Prometheus remote write metric exporter for OpenTelemetry. @@ -110,6 +132,7 @@ def __init__( ObservableGauge: AggregationTemporality.CUMULATIVE, } + self._session = self._create_session() super().__init__(preferred_temporality, preferred_aggregation) @property @@ -431,34 +454,26 @@ def _build_headers(self) -> Dict: headers[header_name] = header_value return headers - def _should_retry_status(self, status_code: Optional[int]) -> bool: - if status_code is None: - return False - return status_code in self.retry_status_codes - - @staticmethod - def _is_retryable_error(err: requests.exceptions.RequestException) -> bool: - return isinstance( - err, - ( - requests.exceptions.ConnectionError, - requests.exceptions.Timeout, - ), - ) - - def _calculate_backoff(self, attempt: int) -> float: - delay = min( - self.retry_backoff_max, - self.retry_backoff_factor * (2 ** (attempt - 1)), + def _create_session(self) -> requests.Session: + session = requests.Session() + retry = _JitterRetry( + total=self.max_retries, + connect=self.max_retries, + read=self.max_retries, + status=self.max_retries, + allowed_methods=frozenset(["POST"]), + status_forcelist=self.retry_status_codes, + backoff_factor=self.retry_backoff_factor, + raise_on_status=False, + raise_on_redirect=False, + respect_retry_after_header=False, + backoff_max=self.retry_backoff_max, + jitter_ratio=self.retry_jitter_ratio, ) - if self.retry_jitter_ratio: - jitter = delay * self.retry_jitter_ratio - delay += random.uniform(-jitter, jitter) - return max(delay, 0.0) - - @staticmethod - def _sleep(duration: float) -> None: - time.sleep(duration) + adapter = HTTPAdapter(max_retries=retry) + session.mount("https://", adapter) + session.mount("http://", adapter) + return session def _send_message( self, message: bytes, headers: Dict @@ -483,63 +498,24 @@ def _send_message( self.tls_config["cert_file"], self.tls_config["key_file"], ) - total_attempts = self.max_retries + 1 - - for attempt in range(1, total_attempts + 1): - try: - response = requests.post( - self.endpoint, - data=message, - headers=headers, - auth=auth, - timeout=self.timeout, - proxies=self.proxies, - cert=cert, - verify=verify, - ) - if response.ok: - return MetricExportResult.SUCCESS - - if attempt < total_attempts and self._should_retry_status( - response.status_code - ): - delay = self._calculate_backoff(attempt) - logger.warning( - "Remote Write request failed with status %s, retrying in %.2fs (attempt %s/%s)", - response.status_code, - delay, - attempt, - total_attempts, - ) - self._sleep(delay) - continue - - response.raise_for_status() - except requests.exceptions.RequestException as err: - status_code = getattr(err.response, "status_code", None) - retryable = self._should_retry_status(status_code) or self._is_retryable_error(err) - if attempt < total_attempts and retryable: - delay = self._calculate_backoff(attempt) - logger.warning( - "Remote Write request failed with reason: %s; retrying in %.2fs (attempt %s/%s)", - err, - delay, - attempt, - total_attempts, - ) - self._sleep(delay) - continue - - logger.error( - "Export POST request failed with reason: %s", err - ) - return MetricExportResult.FAILURE - - logger.error( - "Export POST request failed after %s attempts", - total_attempts, - ) - return MetricExportResult.FAILURE + try: + response = self._session.post( + self.endpoint, + data=message, + headers=headers, + auth=auth, + timeout=self.timeout, + proxies=self.proxies, + cert=cert, + verify=verify, + ) + if response.ok: + return MetricExportResult.SUCCESS + response.raise_for_status() + except requests.exceptions.RequestException as err: + logger.error("Export POST request failed with reason: %s", err) + return MetricExportResult.FAILURE + return MetricExportResult.SUCCESS def force_flush(self, timeout_millis: float = 10_000) -> bool: return True diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py index cd396c6081..116cbbd409 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py @@ -285,9 +285,10 @@ def test_invalid_tls_config_key_only_param(self): # Ensures export is successful with valid export_records and config -@patch("requests.post") -def test_valid_export(mock_post, prom_rw, metric): - mock_post.return_value.configure_mock(**{"status_code": 200}) +def test_valid_export(prom_rw, metric): + mock_post = unittest.mock.Mock() + mock_post.return_value.configure_mock(ok=True, status_code=200) + prom_rw._session.post = mock_post # Assumed a "None" for Scope or Resource aren't valid, so build them here scope = ScopeMetrics( @@ -316,12 +317,16 @@ def test_invalid_export(prom_rw): @patch("requests.post") def test_valid_send_message(mock_post, prom_rw): mock_post.return_value.configure_mock(**{"ok": True}) + prom_rw._session.post = mock_post # use the mocked session post result = prom_rw._send_message(bytes(), {}) assert mock_post.call_count == 1 assert result == MetricExportResult.SUCCESS def test_invalid_send_message(prom_rw): + prom_rw._session.post = unittest.mock.Mock( + side_effect=requests.exceptions.RequestException("boom") + ) result = prom_rw._send_message(bytes(), {}) assert result == MetricExportResult.FAILURE @@ -345,36 +350,23 @@ def test_build_headers(prom_rw): assert headers["Custom Header"] == "test_header" -@patch("opentelemetry.exporter.prometheus_remote_write.time.sleep") -@patch("requests.post") -def test_send_message_retries_then_succeeds(mock_post, mock_sleep, prom_rw): - prom_rw.max_retries = 2 - prom_rw.retry_jitter_ratio = 0 +def test_session_retry_configuration(prom_rw): + adapter = prom_rw._session.adapters["https://"] + retry = adapter.max_retries + assert retry.total == prom_rw.max_retries + assert "POST" in retry.allowed_methods + assert 500 in retry.status_forcelist - first_response = unittest.mock.Mock() - first_response.ok = False - first_response.status_code = 500 - second_response = unittest.mock.Mock() - second_response.ok = True - mock_post.side_effect = [first_response, second_response] - result = prom_rw._send_message(bytes(), {}) - assert result == MetricExportResult.SUCCESS - assert mock_post.call_count == 2 - mock_sleep.assert_called_once() - - -@patch("requests.post") -def test_send_message_non_retryable_status(mock_post, prom_rw): - prom_rw.max_retries = 2 +def test_non_retryable_status(prom_rw): response = unittest.mock.Mock() response.ok = False response.status_code = 400 response.raise_for_status.side_effect = requests.exceptions.HTTPError( response=response ) - mock_post.return_value = response + prom_rw._session.post = unittest.mock.Mock(return_value=response) result = prom_rw._send_message(bytes(), {}) assert result == MetricExportResult.FAILURE - assert mock_post.call_count == 1 + assert prom_rw._session.post.call_count == 1 From 2f36d70f69cd83aa875a84b8b8aa0b3aae2b2c02 Mon Sep 17 00:00:00 2001 From: Towseef Altaf <31309254+towseef41@users.noreply.github.com> Date: Tue, 2 Dec 2025 08:30:33 +0530 Subject: [PATCH 3/3] Update exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py Co-authored-by: Lukas Hering <40302054+herin049@users.noreply.github.com> --- .../exporter/prometheus_remote_write/__init__.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py index 4b6b3f1ad4..bf5e22ff9d 100644 --- a/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py @@ -72,11 +72,9 @@ def __init__( def get_backoff_time(self) -> float: backoff = super().get_backoff_time() - backoff = min(backoff, self.backoff_max) if self.jitter_ratio: - jitter = backoff * self.jitter_ratio - backoff += random.uniform(-jitter, jitter) - return max(backoff, 0.0) + backoff += backoff * self.jitter_ratio * random.uniform(-1, 1) + return min(max(backoff, 0.0), self.backoff_max) class PrometheusRemoteWriteMetricsExporter(MetricExporter):