Skip to content

Commit b15f8c8

Browse files
committed
Add a timeout to export calls
1 parent ffc23bc commit b15f8c8

File tree

15 files changed

+407
-323
lines changed

15 files changed

+407
-323
lines changed

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_log_exporter/__init__.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(
5858
headers: Optional[
5959
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
6060
] = None,
61-
timeout: Optional[int] = None,
61+
timeout: Optional[float] = None,
6262
compression: Optional[Compression] = None,
6363
):
6464
if insecure is None:
@@ -79,7 +79,7 @@ def __init__(
7979

8080
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)
8181
environ_timeout = (
82-
int(environ_timeout) if environ_timeout is not None else None
82+
float(environ_timeout) if environ_timeout is not None else None
8383
)
8484

8585
compression = (
@@ -107,8 +107,12 @@ def _translate_data(
107107
) -> ExportLogsServiceRequest:
108108
return encode_logs(data)
109109

110-
def export(self, batch: Sequence[LogData]) -> LogExportResult:
111-
return self._export(batch)
110+
def export(
111+
self, batch: Sequence[LogData], timeout_millis: Optional[float] = None
112+
) -> LogExportResult:
113+
return self._export(
114+
batch, timeout_millis / 1e3 if timeout_millis else None
115+
)
112116

113117
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
114118
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 56 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414

1515
"""OTLP Exporter"""
1616

17+
import json
1718
import threading
1819
from abc import ABC, abstractmethod
1920
from collections.abc import Sequence # noqa: F401
2021
from logging import getLogger
2122
from os import environ
22-
from time import sleep
2323
from typing import ( # noqa: F401
2424
Any,
2525
Callable,
@@ -35,7 +35,6 @@
3535
from urllib.parse import urlparse
3636

3737
from deprecated import deprecated
38-
from google.rpc.error_details_pb2 import RetryInfo
3938

4039
from grpc import (
4140
ChannelCredentials,
@@ -47,7 +46,6 @@
4746
ssl_channel_credentials,
4847
)
4948
from opentelemetry.exporter.otlp.proto.common._internal import (
50-
_create_exp_backoff_generator,
5149
_get_resource_data,
5250
)
5351
from opentelemetry.exporter.otlp.proto.grpc import (
@@ -74,6 +72,29 @@
7472
from opentelemetry.sdk.trace import ReadableSpan
7573
from opentelemetry.util.re import parse_env_headers
7674

75+
json_config = json.dumps(
76+
{
77+
"methodConfig": [
78+
{
79+
"name": [dict()],
80+
"retryPolicy": {
81+
"maxAttempts": 5,
82+
"initialBackoff": "1s",
83+
"maxBackoff": "64s",
84+
"backoffMultiplier": 2,
85+
"retryableStatusCodes": [
86+
"UNAVAILABLE",
87+
"CANCELLED",
88+
"RESOURCE_EXHAUSTED",
89+
"ABORTED",
90+
"OUT_OF_RANGE",
91+
"DATA_LOSS",
92+
],
93+
},
94+
}
95+
]
96+
}
97+
)
7798
logger = getLogger(__name__)
7899
SDKDataT = TypeVar("SDKDataT")
79100
ResourceDataT = TypeVar("ResourceDataT")
@@ -195,7 +216,7 @@ def __init__(
195216
headers: Optional[
196217
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
197218
] = None,
198-
timeout: Optional[int] = None,
219+
timeout: Optional[float] = None,
199220
compression: Optional[Compression] = None,
200221
):
201222
super().__init__()
@@ -232,7 +253,7 @@ def __init__(
232253
else:
233254
self._headers = tuple(self._headers) + tuple(_OTLP_GRPC_HEADERS)
234255

235-
self._timeout = timeout or int(
256+
self._timeout = timeout or float(
236257
environ.get(OTEL_EXPORTER_OTLP_TIMEOUT, 10)
237258
)
238259
self._collector_kwargs = None
@@ -245,7 +266,11 @@ def __init__(
245266

246267
if insecure:
247268
self._channel = insecure_channel(
248-
self._endpoint, compression=compression
269+
self._endpoint,
270+
compression=compression,
271+
options=[
272+
("grpc.service_config", json_config),
273+
],
249274
)
250275
else:
251276
credentials = _get_credentials(
@@ -255,7 +280,12 @@ def __init__(
255280
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
256281
)
257282
self._channel = secure_channel(
258-
self._endpoint, credentials, compression=compression
283+
self._endpoint,
284+
credentials,
285+
compression=compression,
286+
options=[
287+
("grpc.service_config", json_config),
288+
],
259289
)
260290
self._client = self._stub(self._channel)
261291

@@ -269,90 +299,35 @@ def _translate_data(
269299
pass
270300

271301
def _export(
272-
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
302+
self,
303+
data: Union[TypingSequence[ReadableSpan], MetricsData],
304+
timeout_sec: Optional[float] = None,
273305
) -> ExportResultT:
274-
# After the call to shutdown, subsequent calls to Export are
275-
# not allowed and should return a Failure result.
276306
if self._shutdown:
277307
logger.warning("Exporter already shutdown, ignoring batch")
278308
return self._result.FAILURE
279309

280310
# FIXME remove this check if the export type for traces
281311
# gets updated to a class that represents the proto
282312
# TracesData and use the code below instead.
283-
# logger.warning(
284-
# "Transient error %s encountered while exporting %s, retrying in %ss.",
285-
# error.code(),
286-
# data.__class__.__name__,
287-
# delay,
288-
# )
289-
max_value = 64
290-
# expo returns a generator that yields delay values which grow
291-
# exponentially. Once delay is greater than max_value, the yielded
292-
# value will remain constant.
293-
for delay in _create_exp_backoff_generator(max_value=max_value):
294-
if delay == max_value or self._shutdown:
313+
with self._export_lock:
314+
try:
315+
self._client.Export(
316+
request=self._translate_data(data),
317+
metadata=self._headers,
318+
timeout=(timeout_sec or self._timeout),
319+
)
320+
return self._result.SUCCESS
321+
except RpcError as error:
322+
logger.error(
323+
"Failed to export %s to %s, error code: %s",
324+
self._exporting,
325+
self._endpoint,
326+
error.code(),
327+
exc_info=error.code() == StatusCode.UNKNOWN,
328+
)
295329
return self._result.FAILURE
296330

297-
with self._export_lock:
298-
try:
299-
self._client.Export(
300-
request=self._translate_data(data),
301-
metadata=self._headers,
302-
timeout=self._timeout,
303-
)
304-
305-
return self._result.SUCCESS
306-
307-
except RpcError as error:
308-
if error.code() in [
309-
StatusCode.CANCELLED,
310-
StatusCode.DEADLINE_EXCEEDED,
311-
StatusCode.RESOURCE_EXHAUSTED,
312-
StatusCode.ABORTED,
313-
StatusCode.OUT_OF_RANGE,
314-
StatusCode.UNAVAILABLE,
315-
StatusCode.DATA_LOSS,
316-
]:
317-
retry_info_bin = dict(error.trailing_metadata()).get(
318-
"google.rpc.retryinfo-bin"
319-
)
320-
if retry_info_bin is not None:
321-
retry_info = RetryInfo()
322-
retry_info.ParseFromString(retry_info_bin)
323-
delay = (
324-
retry_info.retry_delay.seconds
325-
+ retry_info.retry_delay.nanos / 1.0e9
326-
)
327-
328-
logger.warning(
329-
(
330-
"Transient error %s encountered while exporting "
331-
"%s to %s, retrying in %ss."
332-
),
333-
error.code(),
334-
self._exporting,
335-
self._endpoint,
336-
delay,
337-
)
338-
sleep(delay)
339-
continue
340-
else:
341-
logger.error(
342-
"Failed to export %s to %s, error code: %s",
343-
self._exporting,
344-
self._endpoint,
345-
error.code(),
346-
exc_info=error.code() == StatusCode.UNKNOWN,
347-
)
348-
349-
if error.code() == StatusCode.OK:
350-
return self._result.SUCCESS
351-
352-
return self._result.FAILURE
353-
354-
return self._result.FAILURE
355-
356331
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
357332
if self._shutdown:
358333
logger.warning("Exporter already shutdown, ignoring call")

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313

1414
from __future__ import annotations
1515

16+
import time
1617
from dataclasses import replace
1718
from logging import getLogger
1819
from os import environ
19-
from typing import Iterable, List, Tuple, Union
20+
from typing import Iterable, List, Optional, Tuple, Union
2021
from typing import Sequence as TypingSequence
2122

2223
from grpc import ChannelCredentials, Compression
@@ -99,7 +100,7 @@ def __init__(
99100
credentials: ChannelCredentials | None = None,
100101
headers: Union[TypingSequence[Tuple[str, str]], dict[str, str], str]
101102
| None = None,
102-
timeout: int | None = None,
103+
timeout: float | None = None,
103104
compression: Compression | None = None,
104105
preferred_temporality: dict[type, AggregationTemporality]
105106
| None = None,
@@ -124,7 +125,7 @@ def __init__(
124125

125126
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_METRICS_TIMEOUT)
126127
environ_timeout = (
127-
int(environ_timeout) if environ_timeout is not None else None
128+
float(environ_timeout) if environ_timeout is not None else None
128129
)
129130

130131
compression = (
@@ -158,17 +159,22 @@ def _translate_data(
158159
def export(
159160
self,
160161
metrics_data: MetricsData,
161-
timeout_millis: float = 10_000,
162+
timeout_millis: Optional[float] = None,
162163
**kwargs,
163164
) -> MetricExportResult:
164-
# TODO(#2663): OTLPExporterMixin should pass timeout to gRPC
165+
timeout_sec = (
166+
timeout_millis / 1e3 if timeout_millis else self._timeout # pylint: disable=protected-access
167+
)
165168
if self._max_export_batch_size is None:
166-
return self._export(data=metrics_data)
169+
return self._export(metrics_data, timeout_sec)
167170

168171
export_result = MetricExportResult.SUCCESS
169-
172+
deadline_sec = time.time() + timeout_sec
170173
for split_metrics_data in self._split_metrics_data(metrics_data):
171-
split_export_result = self._export(data=split_metrics_data)
174+
time_remaining_sec = deadline_sec - time.time()
175+
split_export_result = self._export(
176+
split_metrics_data, time_remaining_sec
177+
)
172178

173179
if split_export_result is MetricExportResult.FAILURE:
174180
export_result = MetricExportResult.FAILURE

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(
9191
headers: Optional[
9292
Union[TypingSequence[Tuple[str, str]], Dict[str, str], str]
9393
] = None,
94-
timeout: Optional[int] = None,
94+
timeout: Optional[float] = None,
9595
compression: Optional[Compression] = None,
9696
):
9797
if insecure is None:
@@ -112,7 +112,7 @@ def __init__(
112112

113113
environ_timeout = environ.get(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)
114114
environ_timeout = (
115-
int(environ_timeout) if environ_timeout is not None else None
115+
float(environ_timeout) if environ_timeout is not None else None
116116
)
117117

118118
compression = (
@@ -139,8 +139,14 @@ def _translate_data(
139139
) -> ExportTraceServiceRequest:
140140
return encode_spans(data)
141141

142-
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
143-
return self._export(spans)
142+
def export(
143+
self,
144+
spans: Sequence[ReadableSpan],
145+
timeout_millis: Optional[float] = None,
146+
) -> SpanExportResult:
147+
return self._export(
148+
spans, timeout_millis / 1e3 if timeout_millis else None
149+
)
144150

145151
def shutdown(self) -> None:
146152
OTLPExporterMixin.shutdown(self)

exporter/opentelemetry-exporter-otlp-proto-grpc/test-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ asgiref==3.7.2
22
Deprecated==1.2.14
33
googleapis-common-protos==1.63.2
44
grpcio==1.66.2
5+
grpcio-status==1.66.0
56
importlib-metadata==6.11.0
67
iniconfig==2.0.0
78
packaging==24.0

0 commit comments

Comments
 (0)