Skip to content

Commit 0253db2

Browse files
authored
[EH] Partition Last Received Event Updating Fix (Azure#31352)
* partition last event received updated * add test * sync test * delete container at the end * add dev req * start trying to move to consumer_async * start aligning uamqp/pyamqp * update conftest * update changes * attempt at one thread * revert to 2 thread * update test * update test 2 * remove commented code * remove dev_req * remove comment * update test * update to kind * remove import * remove import * don't pin websocket
1 parent 8d94f35 commit 0253db2

File tree

6 files changed

+165
-6
lines changed

6 files changed

+165
-6
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,12 @@ def _create_handler(
177177
async def _open_with_retry(self) -> None:
178178
await self._do_retryable_operation(self._open, operation_need_param=False)
179179

180-
# only used by _uamqp_transport_async
181180
def _next_message_in_buffer(self):
182181
# pylint:disable=protected-access
183182
message = self._message_buffer.popleft()
184183
event_data = EventData._from_message(message)
185-
event_data._uamqp_message = message
184+
if self._amqp_transport.KIND == "uamqp":
185+
event_data._uamqp_message = message
186186
self._last_received_event = event_data
187187
return event_data
188188

sdk/eventhub/azure-eventhub/azure/eventhub/aio/_transport/_pyamqp_transport_async.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,10 @@ def create_receive_client(*, config, **kwargs): # pylint:disable=unused-argumen
156156
async def _callback_task(consumer, batch, max_batch_size, max_wait_time):
157157
while consumer._callback_task_run: # pylint: disable=protected-access
158158
async with consumer._message_buffer_lock: # pylint: disable=protected-access
159-
messages = [
160-
consumer._message_buffer.popleft() # pylint: disable=protected-access
159+
events = [
160+
consumer._next_message_in_buffer() # pylint: disable=protected-access
161161
for _ in range(min(max_batch_size, len(consumer._message_buffer))) # pylint: disable=protected-access
162162
]
163-
events = [EventData._from_message(message) for message in messages] # pylint: disable=protected-access
164163
now_time = time.time()
165164
if len(events) > 0:
166165
await consumer._on_event_received(events if batch else events[0]) # pylint: disable=protected-access
@@ -228,6 +227,7 @@ async def receive_messages_async(consumer, batch, max_batch_size, max_wait_time)
228227
# pylint:disable=protected-access
229228
consumer._callback_task_run = True
230229
consumer._last_callback_called_time = time.time()
230+
231231
callback_task = asyncio.create_task(
232232
PyamqpTransportAsync._callback_task(consumer, batch, max_batch_size, max_wait_time)
233233
)

sdk/eventhub/azure-eventhub/dev_requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@
44
azure-mgmt-eventhub<=10.1.0
55
azure-mgmt-resource==20.0.0
66
aiohttp>=3.0,<4.0
7-
websocket-client==1.4.2
7+
websocket-client
88
-e ../../../tools/azure-devtools
9+
azure-eventhub-checkpointstoreblob-aio
10+
azure-eventhub-checkpointstoreblob

sdk/eventhub/azure-eventhub/tests/conftest.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from azure.eventhub import EventHubProducerClient
1919
from azure.eventhub._pyamqp import ReceiveClient
2020
from azure.eventhub._pyamqp.authentication import SASTokenAuth
21+
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
22+
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore as BlobCheckpointStoreAsync
2123
try:
2224
import uamqp
2325
uamqp_transport_params = [True, False]
@@ -54,6 +56,24 @@ def sleep(request):
5456
def uamqp_transport(request):
5557
return request.param
5658

59+
@pytest.fixture(scope="session")
60+
def storage_connection_str():
61+
try:
62+
return os.environ['AZURE_STORAGE_CONN_STR']
63+
except KeyError:
64+
pytest.skip('AZURE_STORAGE_CONN_STR undefined')
65+
return
66+
67+
@pytest.fixture()
68+
def checkpoint_store(storage_connection_str):
69+
checkpoint_store = BlobCheckpointStore.from_connection_string(storage_connection_str, "blobcontainer" + str(uuid.uuid4()))
70+
return checkpoint_store
71+
72+
@pytest.fixture()
73+
def checkpoint_store_aio(storage_connection_str):
74+
checkpoint_store = BlobCheckpointStoreAsync.from_connection_string(storage_connection_str, "blobcontainer" + str(uuid.uuid4()))
75+
return checkpoint_store
76+
5777
def get_logger(filename, level=logging.INFO):
5878
azure_logger = logging.getLogger("azure.eventhub")
5979
azure_logger.setLevel(level)
@@ -179,6 +199,17 @@ def live_eventhub(resource_group, eventhub_namespace): # pylint: disable=redefi
179199
except:
180200
warnings.warn(UserWarning("eventhub teardown failed"))
181201

202+
@pytest.fixture()
203+
def resource_mgmt_client():
204+
try:
205+
SUBSCRIPTION_ID = os.environ["AZURE_SUBSCRIPTION_ID"]
206+
except KeyError:
207+
pytest.skip('AZURE_SUBSCRIPTION_ID defined')
208+
return
209+
base_url = os.environ.get("EVENTHUB_RESOURCE_MANAGER_URL", "https://management.azure.com/")
210+
credential_scopes = ["{}.default".format(base_url)]
211+
resource_client = EventHubManagementClient(EnvironmentCredential(), SUBSCRIPTION_ID, base_url=base_url, credential_scopes=credential_scopes)
212+
yield resource_client
182213

183214
@pytest.fixture()
184215
def connection_str(live_eventhub):

sdk/eventhub/azure-eventhub/tests/livetest/asynctests/test_consumer_client_async.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,68 @@
99
from azure.eventhub._constants import ALL_PARTITIONS
1010

1111

12+
@pytest.mark.liveTest
13+
@pytest.mark.asyncio
14+
async def test_receive_storage_checkpoint_async(connstr_senders, uamqp_transport, checkpoint_store_aio, live_eventhub, resource_mgmt_client):
15+
connection_str, senders = connstr_senders
16+
17+
for i in range(10):
18+
senders[0].send(EventData("Test EventData"))
19+
senders[1].send(EventData("Test EventData"))
20+
21+
try:
22+
await checkpoint_store_aio._container_client.create_container()
23+
except:
24+
pass
25+
26+
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default', checkpoint_store=checkpoint_store_aio, uamqp_transport=uamqp_transport)
27+
28+
sequence_numbers_0 = []
29+
sequence_numbers_1 = []
30+
async def on_event(partition_context, event):
31+
await partition_context.update_checkpoint(event)
32+
sequence_num = event.sequence_number
33+
if partition_context.partition_id == "0":
34+
if sequence_num in sequence_numbers_0:
35+
assert False
36+
sequence_numbers_0.append(sequence_num)
37+
else:
38+
if sequence_num in sequence_numbers_1:
39+
assert False
40+
sequence_numbers_1.append(sequence_num)
41+
42+
async with client:
43+
task = asyncio.ensure_future(
44+
client.receive(on_event, starting_position="-1"))
45+
# Update the eventhub
46+
eventhub = resource_mgmt_client.event_hubs.get(
47+
live_eventhub["resource_group"],
48+
live_eventhub["namespace"],
49+
live_eventhub["event_hub"]
50+
)
51+
properties = eventhub.as_dict()
52+
if properties["message_retention_in_days"] == 1:
53+
properties["message_retention_in_days"] = 2
54+
else:
55+
properties["message_retention_in_days"] = 1
56+
resource_mgmt_client.event_hubs.create_or_update(
57+
live_eventhub["resource_group"],
58+
live_eventhub["namespace"],
59+
live_eventhub["event_hub"],
60+
properties
61+
)
62+
await asyncio.sleep(10)
63+
64+
65+
await task
66+
assert len(sequence_numbers_0) == 10
67+
assert len(sequence_numbers_1) == 10
68+
69+
try:
70+
await checkpoint_store_aio._container_client.delete_container()
71+
except:
72+
pass
73+
1274
@pytest.mark.liveTest
1375
@pytest.mark.asyncio
1476
async def test_receive_no_partition_async(connstr_senders, uamqp_transport):

sdk/eventhub/azure-eventhub/tests/livetest/synctests/test_consumer_client.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,70 @@
1010
from azure.eventhub._eventprocessor.in_memory_checkpoint_store import InMemoryCheckpointStore
1111
from azure.eventhub._constants import ALL_PARTITIONS
1212

13+
@pytest.mark.liveTest
14+
@pytest.mark.asyncio
15+
async def test_receive_storage_checkpoint(connstr_senders, uamqp_transport, checkpoint_store, live_eventhub, resource_mgmt_client):
16+
connection_str, senders = connstr_senders
17+
18+
for i in range(10):
19+
senders[0].send(EventData("Test EventData"))
20+
senders[1].send(EventData("Test EventData"))
21+
22+
try:
23+
checkpoint_store._container_client.create_container()
24+
except:
25+
pass
26+
27+
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default', checkpoint_store=checkpoint_store, uamqp_transport=uamqp_transport)
28+
29+
sequence_numbers_0 = []
30+
sequence_numbers_1 = []
31+
def on_event(partition_context, event):
32+
partition_context.update_checkpoint(event)
33+
sequence_num = event.sequence_number
34+
if partition_context.partition_id == "0":
35+
if sequence_num in sequence_numbers_0:
36+
assert False
37+
sequence_numbers_0.append(sequence_num)
38+
else:
39+
if sequence_num in sequence_numbers_1:
40+
assert False
41+
sequence_numbers_1.append(sequence_num)
42+
43+
with client:
44+
worker = threading.Thread(target=client.receive,
45+
args=(on_event,),
46+
kwargs={"starting_position": "-1"})
47+
worker.start()
48+
49+
# Update the eventhub
50+
eventhub = resource_mgmt_client.event_hubs.get(
51+
live_eventhub["resource_group"],
52+
live_eventhub["namespace"],
53+
live_eventhub["event_hub"]
54+
)
55+
properties = eventhub.as_dict()
56+
if properties["message_retention_in_days"] == 1:
57+
properties["message_retention_in_days"] = 2
58+
else:
59+
properties["message_retention_in_days"] = 1
60+
resource_mgmt_client.event_hubs.create_or_update(
61+
live_eventhub["resource_group"],
62+
live_eventhub["namespace"],
63+
live_eventhub["event_hub"],
64+
properties
65+
)
66+
67+
time.sleep(20)
68+
69+
70+
assert len(sequence_numbers_0) == 10
71+
assert len(sequence_numbers_1) == 10
72+
73+
try:
74+
checkpoint_store._container_client.delete_container()
75+
except:
76+
pass
1377

1478
@pytest.mark.liveTest
1579
def test_receive_no_partition(connstr_senders, uamqp_transport):

0 commit comments

Comments
 (0)