4747logger = setup_logging ()
4848
4949
50- def _analyze_kafka_cluster (kafka_credential : Dict , sr_credential : Dict , config : Dict ) -> bool :
50+ def _analyze_kafka_cluster (kafka_credential : Dict , config : Dict , sr_credential : Dict | None = None ) -> bool :
5151 """Analyze a single Kafka cluster.
5252
5353 Args:
5454 kafka_credential (Dict): Kafka cluster credentials
55- sr_credential (Dict): Schema Registry credentials
55+ sr_credential (Dict | None ): Schema Registry credentials
5656 config (Dict): Configuration parameters
5757
5858 Return(s):
5959 bool: True if analysis succeeded, False otherwise
6060 """
6161 try :
6262 # Instantiate the Kafka Topics Analyzer
63- analyzer = ThreadSafeKafkaTopicsAnalyzer (kafka_cluster_id = kafka_credential .get ("kafka_cluster_id" ),
64- bootstrap_server_uri = kafka_credential .get ("bootstrap.servers" ),
65- kafka_api_key = kafka_credential .get ("sasl.username" ),
66- kafka_api_secret = kafka_credential .get ("sasl.password" ),
67- sr_url = sr_credential ["url" ],
68- sr_api_key_secret = sr_credential ["basic.auth.user.info" ])
63+ if sr_credential is not None :
64+ analyzer = ThreadSafeKafkaTopicsAnalyzer (kafka_cluster_id = kafka_credential .get ("kafka_cluster_id" ),
65+ bootstrap_server_uri = kafka_credential .get ("bootstrap.servers" ),
66+ kafka_api_key = kafka_credential .get ("sasl.username" ),
67+ kafka_api_secret = kafka_credential .get ("sasl.password" ),
68+ sr_url = sr_credential ["url" ],
69+ sr_api_key_secret = sr_credential ["basic.auth.user.info" ])
70+ else :
71+ analyzer = ThreadSafeKafkaTopicsAnalyzer (kafka_cluster_id = kafka_credential .get ("kafka_cluster_id" ),
72+ bootstrap_server_uri = kafka_credential .get ("bootstrap.servers" ),
73+ kafka_api_key = kafka_credential .get ("sasl.username" ),
74+ kafka_api_secret = kafka_credential .get ("sasl.password" ))
6975
7076 # Multithread the analysis of all topics in the Kafka cluster
7177 success = analyzer .analyze_all_topics (** config )
@@ -157,17 +163,18 @@ def main():
157163 return
158164
159165 # Fetch Schema Registry credentials
160- if use_confluent_cloud_api_key_to_fetch_resource_credentials :
161- # Read the Schema Registry credentials using Confluent Cloud API key
162- sr_credentials = fetch_schema_registry_via_confluent_cloud_api_key (principal_id ,
163- metrics_config ,
164- use_private_schema_registry ,
165- environment_filter )
166- else :
167- # Read the Schema Registry credentials from the environment variable or AWS Secrets Manager
168- sr_credentials = fetch_schema_registry_credentials_via_env_file (use_aws_secrets_manager )
169- if not sr_credentials :
170- return
166+ if use_kafka_writer :
167+ if use_confluent_cloud_api_key_to_fetch_resource_credentials :
168+ # Read the Schema Registry credentials using Confluent Cloud API key
169+ sr_credentials = fetch_schema_registry_via_confluent_cloud_api_key (principal_id ,
170+ metrics_config ,
171+ use_private_schema_registry ,
172+ environment_filter )
173+ else :
174+ # Read the Schema Registry credentials from the environment variable or AWS Secrets Manager
175+ sr_credentials = fetch_schema_registry_credentials_via_env_file (use_aws_secrets_manager )
176+ if not sr_credentials :
177+ return
171178
172179 # Prepare configuration object
173180 config = {
@@ -212,7 +219,11 @@ def main():
212219 # Analyze Kafka clusters concurrently if more than one cluster
213220 if len (kafka_credentials ) == 1 :
214221 # Single Kafka cluster. No need for cluster-level threading
215- success = _analyze_kafka_cluster (kafka_credentials [0 ], sr_credentials [0 ], config )
222+ if use_kafka_writer :
223+ success = _analyze_kafka_cluster (kafka_credentials [0 ], config , sr_credential = sr_credentials [0 ])
224+ else :
225+ success = _analyze_kafka_cluster (kafka_credentials [0 ], config )
226+
216227 if success :
217228 logging .info ("SINGLE KAFKA CLUSTER ANALYSIS COMPLETED SUCCESSFULLY." )
218229 else :
@@ -224,10 +235,16 @@ def main():
224235
225236 with ThreadPoolExecutor (max_workers = max_cluster_workers ) as executor :
226237 # Submit Kafka cluster analysis tasks
227- future_to_cluster = {
228- executor .submit (_analyze_kafka_cluster , kafka_credential , sr_credentials [kafka_credential ["environment_id" ]], config ): kafka_credential .get ("kafka_cluster_id" , "unknown" )
229- for kafka_credential in kafka_credentials
230- }
238+ if use_kafka_writer :
239+ future_to_cluster = {
240+ executor .submit (_analyze_kafka_cluster , kafka_credential , config , sr_credential = sr_credentials [kafka_credential ["environment_id" ]]): kafka_credential .get ("kafka_cluster_id" , "unknown" )
241+ for kafka_credential in kafka_credentials
242+ }
243+ else :
244+ future_to_cluster = {
245+ executor .submit (_analyze_kafka_cluster , kafka_credential , config ): kafka_credential .get ("kafka_cluster_id" , "unknown" )
246+ for kafka_credential in kafka_credentials
247+ }
231248
232249 # Process completed Kafka cluster analyses
233250 for future in as_completed (future_to_cluster ):
@@ -244,16 +261,18 @@ def main():
244261 failed_clusters += 1
245262 logging .error ("KAFKA CLUSTER %s: ANALYSIS FAILED WITH EXCEPTION: %s" , kafka_cluster_id , e )
246263
247- # Instantiate the IamClient class.
248- iam_client = IamClient (iam_config = config ['metrics_config' ])
264+ # Clean up the created Schema Registry API key(s) if they were created using Confluent Cloud API key
265+ if use_kafka_writer :
266+ # Instantiate the IamClient class.
267+ iam_client = IamClient (iam_config = config ['metrics_config' ])
249268
250- # Delete all the Schema Registry API keys created for each Schema Registry instance
251- for sr_credential in sr_credentials .values ():
252- http_status_code , error_message = iam_client .delete_api_key (api_key = sr_credential ["basic.auth.user.info" ].split (":" )[0 ])
253- if http_status_code != HttpStatus .NO_CONTENT :
254- logging .warning ("FAILED TO DELETE SCHEMA REGISTRY API KEY %s FOR SCHEMA REGISTRY %s BECAUSE THE FOLLOWING ERROR OCCURRED: %s." , sr_credential ["basic.auth.user.info" ].split (":" )[0 ], sr_credential ['schema_registry_cluster_id' ], error_message )
255- else :
256- logging .info ("Schema Registry API key %s for Schema Registry %s deleted successfully." , sr_credential ["basic.auth.user.info" ].split (":" )[0 ], sr_credential ['schema_registry_cluster_id' ])
269+ # Delete all the Schema Registry API keys created for each Schema Registry instance
270+ for sr_credential in sr_credentials .values ():
271+ http_status_code , error_message = iam_client .delete_api_key (api_key = sr_credential ["basic.auth.user.info" ].split (":" )[0 ])
272+ if http_status_code != HttpStatus .NO_CONTENT :
273+ logging .warning ("FAILED TO DELETE SCHEMA REGISTRY API KEY %s FOR SCHEMA REGISTRY %s BECAUSE THE FOLLOWING ERROR OCCURRED: %s." , sr_credential ["basic.auth.user.info" ].split (":" )[0 ], sr_credential ['schema_registry_cluster_id' ], error_message )
274+ else :
275+ logging .info ("Schema Registry API key %s for Schema Registry %s deleted successfully." , sr_credential ["basic.auth.user.info" ].split (":" )[0 ], sr_credential ['schema_registry_cluster_id' ])
257276
258277 # Log final summary
259278 logging .info ("=" * DEFAULT_CHARACTER_REPEAT )
0 commit comments