Skip to content

Commit 8269f54

Browse files
committed
Refactor BatchLogRecordProcessor
1 parent 561f347 commit 8269f54

File tree

2 files changed

+173
-277
lines changed

2 files changed

+173
-277
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 79 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222
import threading
2323
import weakref
2424
from os import environ, linesep
25-
from time import time_ns
26-
from typing import IO, Callable, Deque, List, Optional, Sequence
25+
from typing import IO, Callable, Deque, Optional, Sequence
2726

2827
from opentelemetry.context import (
2928
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -56,6 +55,12 @@ class LogExportResult(enum.Enum):
5655
FAILURE = 1
5756

5857

58+
class BatchLogExportStrategy(enum.Enum):
59+
EXPORT_ALL = 0
60+
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61+
EXPORT_AT_LEAST_ONE_BATCH = 2
62+
63+
5964
class LogExporter(abc.ABC):
6065
"""Interface for exporting logs.
6166
@@ -152,14 +157,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
152157
return True
153158

154159

155-
class _FlushRequest:
156-
__slots__ = ["event", "num_log_records"]
157-
158-
def __init__(self):
159-
self.event = threading.Event()
160-
self.num_log_records = 0
161-
162-
163160
_BSP_RESET_ONCE = Once()
164161

165162

@@ -178,8 +175,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
178175
"""
179176

180177
_queue: Deque[LogData]
181-
_flush_request: _FlushRequest | None
182-
_log_records: List[LogData | None]
183178

184179
def __init__(
185180
self,
@@ -201,7 +196,7 @@ def __init__(
201196
max_export_batch_size = (
202197
BatchLogRecordProcessor._default_max_export_batch_size()
203198
)
204-
199+
# Not used. No way currently to pass timeout to export.
205200
if export_timeout_millis is None:
206201
export_timeout_millis = (
207202
BatchLogRecordProcessor._default_export_timeout_millis()
@@ -210,30 +205,45 @@ def __init__(
210205
BatchLogRecordProcessor._validate_arguments(
211206
max_queue_size, schedule_delay_millis, max_export_batch_size
212207
)
213-
214208
self._exporter = exporter
215209
self._max_queue_size = max_queue_size
216-
self._schedule_delay_millis = schedule_delay_millis
210+
self._schedule_delay = schedule_delay_millis / 1e3
217211
self._max_export_batch_size = max_export_batch_size
212+
# Not used. No way currently to pass timeout to export.
218213
self._export_timeout_millis = export_timeout_millis
214+
# Deque is thread safe.
219215
self._queue = collections.deque([], max_queue_size)
220216
self._worker_thread = threading.Thread(
221217
name="OtelBatchLogRecordProcessor",
222218
target=self.worker,
223219
daemon=True,
224220
)
225-
self._condition = threading.Condition(threading.Lock())
226221
self._shutdown = False
227-
self._flush_request = None
228-
self._log_records = [None] * self._max_export_batch_size
222+
self._export_lock = threading.Lock()
223+
self._worker_sleep = threading.Event()
229224
self._worker_thread.start()
230225
if hasattr(os, "register_at_fork"):
231226
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
232227
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
233228
self._pid = os.getpid()
234229

230+
def _should_export_batch(
231+
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
232+
) -> bool:
233+
if not self._queue:
234+
return False
235+
# Always continue to export while queue length exceeds max batch size.
236+
if len(self._queue) >= self._max_export_batch_size:
237+
return True
238+
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
239+
return True
240+
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
241+
return num_iterations == 0
242+
return False
243+
235244
def _at_fork_reinit(self):
236-
self._condition = threading.Condition(threading.Lock())
245+
self._export_lock = threading.Lock()
246+
self._worker_sleep = threading.Event()
237247
self._queue.clear()
238248
self._worker_thread = threading.Thread(
239249
name="OtelBatchLogRecordProcessor",
@@ -244,152 +254,75 @@ def _at_fork_reinit(self):
244254
self._pid = os.getpid()
245255

246256
def worker(self):
247-
timeout = self._schedule_delay_millis / 1e3
248-
flush_request: Optional[_FlushRequest] = None
249257
while not self._shutdown:
250-
with self._condition:
251-
if self._shutdown:
252-
# shutdown may have been called, avoid further processing
253-
break
254-
flush_request = self._get_and_unset_flush_request()
255-
if (
256-
len(self._queue) < self._max_export_batch_size
257-
and flush_request is None
258-
):
259-
self._condition.wait(timeout)
260-
261-
flush_request = self._get_and_unset_flush_request()
262-
if not self._queue:
263-
timeout = self._schedule_delay_millis / 1e3
264-
self._notify_flush_request_finished(flush_request)
265-
flush_request = None
266-
continue
267-
if self._shutdown:
268-
break
269-
270-
start_ns = time_ns()
271-
self._export(flush_request)
272-
end_ns = time_ns()
273-
# subtract the duration of this export call to the next timeout
274-
timeout = self._schedule_delay_millis / 1e3 - (
275-
(end_ns - start_ns) / 1e9
276-
)
277-
278-
self._notify_flush_request_finished(flush_request)
279-
flush_request = None
280-
281-
# there might have been a new flush request while export was running
282-
# and before the done flag switched to true
283-
with self._condition:
284-
shutdown_flush_request = self._get_and_unset_flush_request()
285-
286-
# flush the remaining logs
287-
self._drain_queue()
288-
self._notify_flush_request_finished(flush_request)
289-
self._notify_flush_request_finished(shutdown_flush_request)
290-
291-
def _export(self, flush_request: Optional[_FlushRequest] = None):
292-
"""Exports logs considering the given flush_request.
293-
294-
If flush_request is not None then logs are exported in batches
295-
until the number of exported logs reached or exceeded the num of logs in
296-
flush_request, otherwise exports at max max_export_batch_size logs.
297-
"""
298-
if flush_request is None:
299-
self._export_batch()
300-
return
301-
302-
num_log_records = flush_request.num_log_records
303-
while self._queue:
304-
exported = self._export_batch()
305-
num_log_records -= exported
306-
307-
if num_log_records <= 0:
258+
# Lots of strategies in the spec for setting next timeout.
259+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
260+
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
261+
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
262+
if self._shutdown:
308263
break
309-
310-
def _export_batch(self) -> int:
311-
"""Exports at most max_export_batch_size logs and returns the number of
312-
exported logs.
313-
"""
314-
idx = 0
315-
while idx < self._max_export_batch_size and self._queue:
316-
record = self._queue.pop()
317-
self._log_records[idx] = record
318-
idx += 1
319-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
320-
try:
321-
self._exporter.export(self._log_records[:idx]) # type: ignore
322-
except Exception: # pylint: disable=broad-exception-caught
323-
_logger.exception("Exception while exporting logs.")
324-
detach(token)
325-
326-
for index in range(idx):
327-
self._log_records[index] = None
328-
return idx
329-
330-
def _drain_queue(self):
331-
"""Export all elements until queue is empty.
332-
333-
Can only be called from the worker thread context because it invokes
334-
`export` that is not thread safe.
335-
"""
336-
while self._queue:
337-
self._export_batch()
338-
339-
def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
340-
flush_request = self._flush_request
341-
self._flush_request = None
342-
if flush_request is not None:
343-
flush_request.num_log_records = len(self._queue)
344-
return flush_request
345-
346-
@staticmethod
347-
def _notify_flush_request_finished(
348-
flush_request: Optional[_FlushRequest] = None,
349-
):
350-
if flush_request is not None:
351-
flush_request.event.set()
352-
353-
def _get_or_create_flush_request(self) -> _FlushRequest:
354-
if self._flush_request is None:
355-
self._flush_request = _FlushRequest()
356-
return self._flush_request
264+
self._export(
265+
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
266+
if sleep_interrupted
267+
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
268+
)
269+
self._worker_sleep.clear()
270+
self._export(BatchLogExportStrategy.EXPORT_ALL)
271+
272+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
273+
with self._export_lock:
274+
iteration = 0
275+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
276+
# once the lock is obtained to see if we still need to make the requested export.
277+
while self._should_export_batch(batch_strategy, iteration):
278+
iteration += 1
279+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
280+
try:
281+
self._exporter.export(
282+
[
283+
# Oldest records are at the back, so pop from there.
284+
self._queue.pop()
285+
for _ in range(
286+
min(
287+
self._max_export_batch_size,
288+
len(self._queue),
289+
)
290+
)
291+
]
292+
)
293+
except Exception: # pylint: disable=broad-exception-caught
294+
_logger.exception("Exception while exporting logs.")
295+
detach(token)
357296

358297
def emit(self, log_data: LogData) -> None:
359-
"""Adds the `LogData` to queue and notifies the waiting threads
360-
when size of queue reaches max_export_batch_size.
361-
"""
362298
if self._shutdown:
299+
_logger.warning("Shutdown called, ignoring log.")
363300
return
364301
if self._pid != os.getpid():
365302
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
366303

304+
if len(self._queue) == self._max_queue_size:
305+
_logger.warning("Queue full, dropping log.")
367306
self._queue.appendleft(log_data)
368307
if len(self._queue) >= self._max_export_batch_size:
369-
with self._condition:
370-
self._condition.notify()
308+
self._worker_sleep.set()
371309

372310
def shutdown(self):
311+
if self._shutdown:
312+
return
313+
# Prevents emit and force_flush from further calling export.
373314
self._shutdown = True
374-
with self._condition:
375-
self._condition.notify_all()
315+
# Interrupts sleep in the worker, if it's sleeping.
316+
self._worker_sleep.set()
317+
# Main worker loop should exit after one final export call with flush all strategy.
376318
self._worker_thread.join()
377319
self._exporter.shutdown()
378320

379321
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
380-
if timeout_millis is None:
381-
timeout_millis = self._export_timeout_millis
382322
if self._shutdown:
383-
return True
384-
385-
with self._condition:
386-
flush_request = self._get_or_create_flush_request()
387-
self._condition.notify_all()
388-
389-
ret = flush_request.event.wait(timeout_millis / 1e3)
390-
if not ret:
391-
_logger.warning("Timeout was exceeded in force_flush().")
392-
return ret
323+
return
324+
# Blocking call to export.
325+
self._export(BatchLogExportStrategy.EXPORT_ALL)
393326

394327
@staticmethod
395328
def _default_max_queue_size():

0 commit comments

Comments
 (0)