Skip to content

Commit a13a526

Browse files
authored
[Eventhub] Fix Blocking Behavior of Buffered Producer Flush (Azure#25406)
* changes to make get non blocking async * make emptying a queue non blocking * test * reset sample async * test * new line * remove second lock * sum over all the partitions * fix async tests * remove debug print * lock around queue * fix pylint * changelog * remove print * reset cur_buffer_length outside while loop * increase waiting time to receive events * remove sleep on add
1 parent e279e15 commit a13a526

File tree

5 files changed

+125
-18
lines changed

5 files changed

+125
-18
lines changed

sdk/eventhub/azure-eventhub/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
### Breaking Changes
88

9+
- Fixed a bug in `BufferedProducer` that would block when flushing the queue causing the client to freeze up (issue #23510).
10+
911
### Bugs Fixed
1012

1113
### Other Changes

sdk/eventhub/azure-eventhub/azure/eventhub/_buffered_producer/_buffered_producer.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ def put_events(self, events, timeout_time=None):
9696
new_events_len,
9797
)
9898
# flush the buffer
99-
with self._lock:
100-
self.flush(timeout_time=timeout_time)
99+
self.flush(timeout_time=timeout_time)
101100
if timeout_time and time.time() > timeout_time:
102101
raise OperationTimeoutError(
103102
"Failed to enqueue events into buffer due to timeout."
@@ -107,14 +106,16 @@ def put_events(self, events, timeout_time=None):
107106
self._cur_batch.add(events)
108107
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
109108
# if there are events in cur_batch, enqueue cur_batch to the buffer
110-
if self._cur_batch:
111-
self._buffered_queue.put(self._cur_batch)
112-
self._buffered_queue.put(events)
109+
with self._lock:
110+
if self._cur_batch:
111+
self._buffered_queue.put(self._cur_batch)
112+
self._buffered_queue.put(events)
113113
# create a new batch for incoming events
114114
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
115115
except ValueError:
116116
# add single event exceeds the cur batch size, create new batch
117-
self._buffered_queue.put(self._cur_batch)
117+
with self._lock:
118+
self._buffered_queue.put(self._cur_batch)
118119
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
119120
self._cur_batch.add(events)
120121
self._cur_buffered_len += new_events_len
@@ -140,10 +141,13 @@ def flush(self, timeout_time=None, raise_error=True):
140141
_LOGGER.info("Partition: %r started flushing.", self.partition_id)
141142
if self._cur_batch: # if there is batch, enqueue it to the buffer first
142143
self._buffered_queue.put(self._cur_batch)
143-
while self._cur_buffered_len:
144+
while self._buffered_queue.qsize() > 0:
144145
remaining_time = timeout_time - time.time() if timeout_time else None
145146
if (remaining_time and remaining_time > 0) or remaining_time is None:
146-
batch = self._buffered_queue.get()
147+
try:
148+
batch = self._buffered_queue.get(block=False)
149+
except queue.Empty:
150+
break
147151
self._buffered_queue.task_done()
148152
try:
149153
_LOGGER.info("Partition %r is sending.", self.partition_id)
@@ -182,6 +186,8 @@ def flush(self, timeout_time=None, raise_error=True):
182186
break
183187
# after finishing flushing, reset cur batch and put it into the buffer
184188
self._last_send_time = time.time()
189+
#reset buffered count
190+
self._cur_buffered_len = 0
185191
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
186192
_LOGGER.info("Partition %r finished flushing.", self.partition_id)
187193

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_buffered_producer/_buffered_producer_async.py

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,14 @@ async def put_events(self, events, timeout_time=None):
8989
new_events_len = len(events)
9090
except TypeError:
9191
new_events_len = 1
92-
9392
if self._max_buffer_len - self._cur_buffered_len < new_events_len:
9493
_LOGGER.info(
9594
"The buffer for partition %r is full. Attempting to flush before adding %r events.",
9695
self.partition_id,
9796
new_events_len,
9897
)
9998
# flush the buffer
100-
async with self._lock:
101-
await self._flush(timeout_time=timeout_time)
102-
99+
await self.flush(timeout_time=timeout_time)
103100
if timeout_time and time.time() > timeout_time:
104101
raise OperationTimeoutError(
105102
"Failed to enqueue events into buffer due to timeout."
@@ -109,14 +106,16 @@ async def put_events(self, events, timeout_time=None):
109106
self._cur_batch.add(events)
110107
except AttributeError: # if the input events is a EventDataBatch, put the whole into the buffer
111108
# if there are events in cur_batch, enqueue cur_batch to the buffer
112-
if self._cur_batch:
113-
self._buffered_queue.put(self._cur_batch)
114-
self._buffered_queue.put(events)
109+
async with self._lock:
110+
if self._cur_batch:
111+
self._buffered_queue.put(self._cur_batch)
112+
self._buffered_queue.put(events)
115113
# create a new batch for incoming events
116114
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
117115
except ValueError:
118116
# add single event exceeds the cur batch size, create new batch
119-
self._buffered_queue.put(self._cur_batch)
117+
async with self._lock:
118+
self._buffered_queue.put(self._cur_batch)
120119
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
121120
self._cur_batch.add(events)
122121
self._cur_buffered_len += new_events_len
@@ -146,10 +145,13 @@ async def _flush(self, timeout_time=None, raise_error=True):
146145
if self._cur_batch: # if there is batch, enqueue it to the buffer first
147146
self._buffered_queue.put(self._cur_batch)
148147
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
149-
while self._cur_buffered_len:
148+
while self._buffered_queue.qsize() > 0:
150149
remaining_time = timeout_time - time.time() if timeout_time else None
151150
if (remaining_time and remaining_time > 0) or remaining_time is None:
152-
batch = self._buffered_queue.get()
151+
try:
152+
batch = self._buffered_queue.get(block=False)
153+
except queue.Empty:
154+
break
153155
self._buffered_queue.task_done()
154156
try:
155157
_LOGGER.info("Partition %r is sending.", self.partition_id)
@@ -187,6 +189,8 @@ async def _flush(self, timeout_time=None, raise_error=True):
187189
break
188190
# after finishing flushing, reset cur batch and put it into the buffer
189191
self._last_send_time = time.time()
192+
#reset curr_buffered
193+
self._cur_buffered_len = 0
190194
self._cur_batch = EventDataBatch(self._max_message_size_on_link)
191195
_LOGGER.info("Partition %r finished flushing.", self.partition_id)
192196

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_buffered_producer_async.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,3 +490,50 @@ async def on_error(events, pid, err):
490490

491491
await consumer.close()
492492
await receive_thread
493+
494+
@pytest.mark.liveTest
495+
@pytest.mark.asyncio
496+
async def test_long_wait_small_buffer(connection_str):
497+
received_events = defaultdict(list)
498+
499+
async def on_event(partition_context, event):
500+
received_events[partition_context.partition_id].append(event)
501+
502+
consumer = EventHubConsumerClient.from_connection_string(connection_str, consumer_group="$default")
503+
504+
receive_thread = asyncio.ensure_future(consumer.receive(on_event=on_event))
505+
506+
sent_events = defaultdict(list)
507+
508+
async def on_success(events, pid):
509+
sent_events[pid].extend(events)
510+
511+
async def on_error(events, pid, err):
512+
on_error.err = err
513+
514+
on_error.err = None # ensure no error
515+
producer = EventHubProducerClient.from_connection_string(
516+
connection_str,
517+
buffered_mode=True,
518+
on_success=on_success,
519+
on_error=on_error,
520+
auth_timeout=3,
521+
retry_total=3,
522+
retry_mode='fixed',
523+
retry_backoff_factor=0.01,
524+
max_wait_time=10,
525+
max_buffer_length=100
526+
)
527+
528+
async with producer:
529+
for i in range(100):
530+
await producer.send_event(EventData("test"))
531+
532+
await asyncio.sleep(60)
533+
534+
assert not on_error.err
535+
assert sum([len(sent_events[key]) for key in sent_events]) == 100
536+
assert sum([len(received_events[key]) for key in received_events]) == 100
537+
538+
await consumer.close()
539+
await receive_thread

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_buffered_producer.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,3 +496,51 @@ def on_error(events, pid, err):
496496

497497
consumer.close()
498498
receive_thread.join()
499+
500+
@pytest.mark.liveTest
501+
def test_long_wait_small_buffer(connection_str):
502+
received_events = defaultdict(list)
503+
504+
def on_event(partition_context, event):
505+
received_events[partition_context.partition_id].append(event)
506+
507+
consumer = EventHubConsumerClient.from_connection_string(connection_str, consumer_group="$default")
508+
receive_thread = Thread(target=consumer.receive, args=(on_event,))
509+
receive_thread.daemon = True
510+
receive_thread.start()
511+
512+
sent_events = defaultdict(list)
513+
514+
def on_success(events, pid):
515+
sent_events[pid].extend(events)
516+
517+
def on_error(events, pid, err):
518+
on_error.err = err
519+
520+
on_error.err = None # ensure no error
521+
producer = EventHubProducerClient.from_connection_string(
522+
connection_str,
523+
buffered_mode=True,
524+
on_success=on_success,
525+
on_error=on_error,
526+
auth_timeout=3,
527+
retry_total=3,
528+
retry_mode='fixed',
529+
retry_backoff_factor=0.01,
530+
max_wait_time=10,
531+
max_buffer_length=100
532+
)
533+
534+
with producer:
535+
for i in range(100):
536+
producer.send_event(EventData("test"))
537+
time.sleep(.1)
538+
539+
time.sleep(60)
540+
541+
assert not on_error.err
542+
assert sum([len(sent_events[key]) for key in sent_events]) == 100
543+
assert sum([len(received_events[key]) for key in received_events]) == 100
544+
545+
consumer.close()
546+
receive_thread.join()

0 commit comments

Comments
 (0)