1010import os
1111import logging
1212from collections import defaultdict
13+ from functools import partial
1314
1415from azure .identity .aio import ClientSecretCredential
1516from azure .eventhub .aio import EventHubConsumerClient
1920
2021from logger import get_logger
2122from process_monitor import ProcessMonitor
23+ from app_insights_metric import AzureMonitorMetric
2224
2325
2426def parse_starting_position (args ):
@@ -65,7 +67,7 @@ def parse_starting_position(args):
6567 type = int ,
6668 default = 0
6769)
68- parser .add_argument ("--parallel_recv_cnt" , help = "Number of receive clients doing parallel receiving" , type = int )
70+ parser .add_argument ("--parallel_recv_cnt" , help = "Number of receive clients doing parallel receiving" , type = int , default = 1 )
6971parser .add_argument ("--proxy_hostname" , type = str )
7072parser .add_argument ("--proxy_port" , type = str )
7173parser .add_argument ("--proxy_username" , type = str )
@@ -89,6 +91,8 @@ def parse_starting_position(args):
8991recv_cnt_iteration_map = defaultdict (int )
9092recv_time_map = dict ()
9193
94+ azure_metric_monitor = AzureMonitorMetric ("Async EventHubConsumerClient" )
95+
9296
9397class EventHubConsumerClientTest (EventHubConsumerClient ):
9498 async def get_partition_ids (self ):
@@ -98,7 +102,7 @@ async def get_partition_ids(self):
98102 return await super (EventHubConsumerClientTest , self ).get_partition_ids ()
99103
100104
101- async def on_event_received (partition_context , event ):
105+ async def on_event_received (process_monitor , partition_context , event ):
102106 recv_cnt_map [partition_context .partition_id ] += 1 if event else 0
103107 if recv_cnt_map [partition_context .partition_id ] % LOG_PER_COUNT == 0 :
104108 total_time_elapsed = time .perf_counter () - start_time
@@ -113,10 +117,15 @@ async def on_event_received(partition_context, event):
113117 recv_cnt_map [partition_context .partition_id ] / total_time_elapsed ,
114118 LOG_PER_COUNT / (partition_current_time - partition_previous_time ) if partition_previous_time else None
115119 )
120+ azure_metric_monitor .record_events_cpu_memory (
121+ LOG_PER_COUNT ,
122+ process_monitor .cpu_usage_percent ,
123+ process_monitor .memory_usage_percent
124+ )
116125 await partition_context .update_checkpoint (event )
117126
118127
119- async def on_event_batch_received (partition_context , event_batch ):
128+ async def on_event_batch_received (process_monitor , partition_context , event_batch ):
120129 recv_cnt_map [partition_context .partition_id ] += len (event_batch )
121130 recv_cnt_iteration_map [partition_context .partition_id ] += len (event_batch )
122131 if recv_cnt_iteration_map [partition_context .partition_id ] > LOG_PER_COUNT :
@@ -133,9 +142,18 @@ async def on_event_batch_received(partition_context, event_batch):
133142 recv_cnt_iteration_map [partition_context .partition_id ] / (partition_current_time - partition_previous_time ) if partition_previous_time else None
134143 )
135144 recv_cnt_iteration_map [partition_context .partition_id ] = 0
145+ azure_metric_monitor .record_events_cpu_memory (
146+ LOG_PER_COUNT ,
147+ process_monitor .cpu_usage_percent ,
148+ process_monitor .memory_usage_percent
149+ )
136150 await partition_context .update_checkpoint ()
137151
138152
153+ async def on_error (partition_context , exception ):
154+ azure_metric_monitor .record_error (exception , extra = "partition: {}" .format (partition_context .partition_id ))
155+
156+
139157def create_client (args ):
140158
141159 if args .storage_conn_str :
@@ -198,26 +216,31 @@ def create_client(args):
198216
199217async def run (args ):
200218
201- with ProcessMonitor ("monitor_{}" .format (args .log_filename ), "consumer_stress_async" , print_console = args .print_console ):
219+ with ProcessMonitor ("monitor_{}" .format (args .log_filename ), "consumer_stress_async" , print_console = args .print_console ) as process_monitor :
202220 kwargs_dict = {
203221 "prefetch" : args .link_credit ,
204222 "partition_id" : str (args .recv_partition_id ) if args .recv_partition_id else None ,
205223 "track_last_enqueued_event_properties" : args .track_last_enqueued_event_properties ,
206- "starting_position" : starting_position
224+ "starting_position" : starting_position ,
225+ "on_error" : on_error
207226 }
208227 if args .max_batch_size :
209228 kwargs_dict ["max_batch_size" ] = args .max_batch_size
210229 if args .max_wait_time :
211230 kwargs_dict ["max_wait_time" ] = args .max_wait_time
231+
232+ on_event_received_with_process_monitor = partial (on_event_received , process_monitor )
233+ on_event_batch_received_with_process_monitor = partial (on_event_batch_received , process_monitor )
234+
212235 if args .parallel_recv_cnt and args .parallel_recv_cnt > 1 :
213236 clients = [create_client (args ) for _ in range (args .parallel_recv_cnt )]
214237 tasks = [
215238 asyncio .ensure_future (
216239 clients [i ].receive_batch (
217- on_event_batch_received ,
240+ on_event_batch_received_with_process_monitor ,
218241 ** kwargs_dict
219242 ) if args .max_batch_size else clients [i ].receive (
220- on_event_received ,
243+ on_event_received_with_process_monitor ,
221244 ** kwargs_dict
222245 )
223246 ) for i in range (args .parallel_recv_cnt )
@@ -226,10 +249,10 @@ async def run(args):
226249 clients = [create_client (args )]
227250 tasks = [asyncio .ensure_future (
228251 clients [0 ].receive_batch (
229- on_event_batch_received ,
252+ on_event_batch_received_with_process_monitor ,
230253 ** kwargs_dict
231254 ) if args .max_batch_size else clients [0 ].receive (
232- on_event_received ,
255+ on_event_received_with_process_monitor ,
233256 ** kwargs_dict
234257 )
235258 )]
0 commit comments