@@ -61,53 +61,28 @@ def test_send_with_partition_key(connstr_receivers, live_eventhub, uamqp_transpo
6161 batch_cnt = 0
6262 single_cnt = 0
6363 found_partition_keys = {}
64- reconnect_receivers = []
6564 for index , partition in enumerate (receivers ):
6665 retry_total = 0
6766 while retry_total < 3 :
6867 timeout = (5 + retry_total ) * timeout_factor
69- try :
70- received = partition .receive_message_batch (timeout = timeout )
71- for message in received :
72- try :
73- event_data = EventData ._from_message (message )
74- if event_data .properties and event_data .properties [b'is_single' ]:
75- single_cnt += 1
76- else :
77- batch_cnt += 1
78- existing = found_partition_keys [event_data .partition_key ]
79- assert existing == index
80- except KeyError :
81- found_partition_keys [event_data .partition_key ] = index
82- if received :
83- break
84- retry_total += 1
85- except AMQPConnectionError :
86- for r in reconnect_receivers :
87- r .close ()
88- uri = "sb://{}/{}" .format (live_eventhub ['hostname' ], live_eventhub ['event_hub' ])
89- source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}" .format (
90- live_eventhub ['hostname' ],
91- live_eventhub ['event_hub' ],
92- live_eventhub ['consumer_group' ],
93- index )
94- if uamqp_transport :
95- sas_auth = uamqp .authentication .SASTokenAuth .from_shared_access_key (
96- uri , live_eventhub ['key_name' ], live_eventhub ['access_key' ])
97- partition = uamqp .ReceiveClient (source , auth = sas_auth , debug = False , timeout = 0 , prefetch = 500 )
98- else :
99- sas_auth = SASTokenAuth (
100- uri , uri , live_eventhub ['key_name' ], live_eventhub ['access_key' ])
101- partition = ReceiveClient (live_eventhub ['hostname' ], source , auth = sas_auth , network_trace = False , timeout = 0 , link_credit = 500 )
102- partition .open ()
103- reconnect_receivers .append (partition )
104- retry_total += 1
105- if retry_total == 3 :
68+ received = partition .receive_message_batch (timeout = timeout )
69+ for message in received :
70+ try :
71+ event_data = EventData ._from_message (message )
72+ if event_data .properties and event_data .properties [b'is_single' ]:
73+ single_cnt += 1
74+ else :
75+ batch_cnt += 1
76+ existing = found_partition_keys [event_data .partition_key ]
77+ assert existing == index
78+ except KeyError :
79+ found_partition_keys [event_data .partition_key ] = index
80+ if received :
81+ break
82+ retry_total += 1
83+ if retry_total >= 3 :
10684 raise OperationTimeoutError (f"Exhausted retries for receiving from { live_eventhub ['hostname' ]} ." )
10785
108- for r in reconnect_receivers :
109- r .close ()
110-
11186 assert single_cnt == 60
11287 assert batch_cnt == 60
11388 assert len (found_partition_keys ) == 6
0 commit comments