Skip to content

Commit efae780

Browse files
Merge pull request #275 from j3-signalroom/271-update-the-hot_partition_egress-setting
Resolved #271.
2 parents aa0e355 + 7389b24 commit efae780

File tree

1 file changed

+17
-18
lines changed

1 file changed

+17
-18
lines changed

src/thread_safe_topic_analyzer.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
DEFAULT_RESTFUL_API_MAX_RETRIES)
1515

1616

17-
1817
__copyright__ = "Copyright (c) 2025 Jeffrey Jonathan Jennings"
1918
__credits__ = ["Jeffrey Jonathan Jennings"]
2019
__license__ = "MIT"
@@ -69,7 +68,7 @@ def analyze_topic(self,
6968
topic_metadata = topic_info['metadata']
7069
sampling_days = topic_info['sampling_days_based_on_retention_days']
7170

72-
logging.info("[Thread-%d] Analyzing topic %s with %d-day rolling window (from %s)",
71+
logging.info("[Thread-%d] Analyzing topic '%s' with %d-day rolling window (from %s)",
7372
threading.current_thread().ident, topic_name, sampling_days, iso_start_time.isoformat())
7473

7574
partitions = list(topic_metadata.partitions.keys())
@@ -159,18 +158,18 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
159158
if http_status_code == HttpStatus.RATE_LIMIT_EXCEEDED:
160159
retry += 1
161160
if retry == max_retries:
162-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED BYTES' metric for topic %s. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
161+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED BYTES' metric for topic '%s'. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
163162
result['error'] = "Rate limit exceeded when retrieving 'RECEIVED BYTES' metric."
164163
result['total_record_count'] = 0
165164
result['avg_bytes_per_record'] = 0.0
166165
result['hot_partition_ingress'] = 'no'
167166
result['hot_partition_egress'] = 'no'
168167
break
169-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED BYTES' metric for topic %s. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
168+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED BYTES' metric for topic '%s'. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
170169
time.sleep(rate_limits['reset_in_seconds'])
171170
continue
172171
elif http_status_code not in (HttpStatus.OK, HttpStatus.RATE_LIMIT_EXCEEDED):
173-
logging.warning("[Thread-%d] Failed retrieving 'RECEIVED BYTES' metric for topic %s because: %s", threading.current_thread().ident, topic_name, error_message)
172+
logging.warning("[Thread-%d] Failed retrieving 'RECEIVED BYTES' metric for topic '%s' because: %s", threading.current_thread().ident, topic_name, error_message)
174173
result['error'] = error_message
175174
result['total_record_count'] = 0
176175
result['avg_bytes_per_record'] = 0.0
@@ -191,18 +190,18 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
191190
if http_status_code == HttpStatus.RATE_LIMIT_EXCEEDED:
192191
retry += 1
193192
if retry == max_retries:
194-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED RECORDS' metric for topic %s. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
193+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED RECORDS' metric for topic '%s'. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
195194
result['error'] = "Rate limit exceeded when retrieving 'RECEIVED RECORDS' metric."
196195
result['total_record_count'] = 0
197196
result['avg_bytes_per_record'] = 0.0
198197
result['hot_partition_ingress'] = 'no'
199198
result['hot_partition_egress'] = 'no'
200199
break
201-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED RECORDS' metric for topic %s. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
200+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'RECEIVED RECORDS' metric for topic '%s'. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
202201
time.sleep(rate_limits['reset_in_seconds'])
203202
continue
204203
elif http_status_code not in (HttpStatus.OK, HttpStatus.RATE_LIMIT_EXCEEDED):
205-
logging.warning("[Thread-%d] Failed retrieving 'RECEIVED RECORDS' metric for topic %s because: %s", threading.current_thread().ident, topic_name, error_message)
204+
logging.warning("[Thread-%d] Failed retrieving 'RECEIVED RECORDS' metric for topic '%s' because: %s", threading.current_thread().ident, topic_name, error_message)
206205
result['error'] = error_message
207206
result['total_record_count'] = 0
208207
result['avg_bytes_per_record'] = 0.0
@@ -227,7 +226,7 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
227226
avg_bytes_per_record = sum(avg_bytes_daily_totals)/len(avg_bytes_daily_totals) if len(avg_bytes_daily_totals) > 0 else 0
228227
result['avg_bytes_per_record'] = avg_bytes_per_record
229228

230-
logging.info("[Thread-%d] Confluent Metrics API - For topic %s, the average bytes per record is %.2f bytes/record for a total of %.0f records.", threading.current_thread().ident, topic_name, avg_bytes_per_record, record_count)
229+
logging.info("[Thread-%d] Confluent Metrics API - For topic '%s', the average bytes per record is %.2f bytes/record for a total of %.0f records.", threading.current_thread().ident, topic_name, avg_bytes_per_record, record_count)
231230

232231
proceed = True
233232
break
@@ -253,18 +252,18 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
253252
if http_status_code == HttpStatus.RATE_LIMIT_EXCEEDED:
254253
retry += 1
255254
if retry == max_retries:
256-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_INGRESS' metric for topic %s. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
255+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_INGRESS' metric for topic '%s'. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
257256
result['error'] = "Rate limit exceeded when retrieving 'HOT_PARTITION_INGRESS' metric."
258257
result['total_record_count'] = 0
259258
result['avg_bytes_per_record'] = 0.0
260259
result['hot_partition_ingress'] = 'no'
261260
result['hot_partition_egress'] = 'no'
262261
break
263-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_INGRESS' metric for topic %s. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
262+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_INGRESS' metric for topic '%s'. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
264263
time.sleep(rate_limits['reset_in_seconds'])
265264
continue
266265
elif http_status_code not in (HttpStatus.OK, HttpStatus.RATE_LIMIT_EXCEEDED):
267-
logging.warning("[Thread-%d] Failed retrieving 'HOT_PARTITION_INGRESS' metric for topic %s because: %s", threading.current_thread().ident, topic_name, error_message)
266+
logging.warning("[Thread-%d] Failed retrieving 'HOT_PARTITION_INGRESS' metric for topic '%s' because: %s", threading.current_thread().ident, topic_name, error_message)
268267
result['error'] = error_message
269268
result['total_record_count'] = 0
270269
result['avg_bytes_per_record'] = 0.0
@@ -288,18 +287,18 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
288287
if http_status_code == HttpStatus.RATE_LIMIT_EXCEEDED:
289288
retry += 1
290289
if retry == max_retries:
291-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_EGRESS' metric for topic %s. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
290+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_EGRESS' metric for topic '%s'. Max retries reached (%d). Aborting.", threading.current_thread().ident, topic_name, max_retries)
292291
result['error'] = "Rate limit exceeded when retrieving 'HOT_PARTITION_EGRESS' metric."
293292
result['total_record_count'] = 0
294293
result['avg_bytes_per_record'] = 0.0
295294
result['hot_partition_ingress'] = 'no'
296295
result['hot_partition_egress'] = 'no'
297296
break
298-
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_EGRESS' metric for topic %s. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
297+
logging.warning("[Thread-%d] Rate limit exceeded when retrieving 'HOT_PARTITION_EGRESS' metric for topic '%s'. Retrying %d of %d after %d seconds...", threading.current_thread().ident, topic_name, retry, max_retries, rate_limits['reset_in_seconds'])
299298
time.sleep(rate_limits['reset_in_seconds'])
300299
continue
301300
elif http_status_code not in (HttpStatus.OK, HttpStatus.RATE_LIMIT_EXCEEDED):
302-
logging.warning("[Thread-%d] Failed retrieving 'HOT_PARTITION_EGRESS' metric for topic %s because: %s", threading.current_thread().ident, topic_name, error_message)
301+
logging.warning("[Thread-%d] Failed retrieving 'HOT_PARTITION_EGRESS' metric for topic '%s' because: %s", threading.current_thread().ident, topic_name, error_message)
303302
result['error'] = error_message
304303
result['total_record_count'] = 0
305304
result['avg_bytes_per_record'] = 0.0
@@ -309,9 +308,9 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
309308
elif http_status_code == HttpStatus.OK:
310309
result['hot_partition_egress'] = 'yes' if is_partition_hot["is_partition_hot"] else 'no'
311310
if is_partition_hot["is_partition_hot"]:
312-
logging.info("[Thread-%d] Confluent Metrics API - Topic %s is IDENTIFIED as a hot topic by egress throughput in the last %d days.", threading.current_thread().ident, topic_name, topic_info['sampling_days_based_on_retention_days'])
311+
logging.info("[Thread-%d] Confluent Metrics API - Topic '%s' is IDENTIFIED as a hot topic by egress throughput in the last %d days.", threading.current_thread().ident, topic_name, topic_info['sampling_days_based_on_retention_days'])
313312
else:
314-
logging.info("[Thread-%d] Confluent Metrics API - Topic %s is NOT identified as a hot topic by egress throughput in the last %d days.", threading.current_thread().ident, topic_name, topic_info['sampling_days_based_on_retention_days'])
313+
logging.info("[Thread-%d] Confluent Metrics API - Topic '%s' is NOT identified as a hot topic by egress throughput in the last %d days.", threading.current_thread().ident, topic_name, topic_info['sampling_days_based_on_retention_days'])
315314
break
316315

317316
return result
@@ -360,7 +359,7 @@ def __get_partition_offsets(self, topic_name: str, partitions: List[int]) -> Dic
360359
return partition_offsets
361360

362361
except Exception as e:
363-
logging.warning("[Thread-%d] Error getting partition offsets for topic %s: %s",
362+
logging.warning("[Thread-%d] Error getting partition offsets for topic '%s': %s",
364363
threading.current_thread().ident, topic_name, e)
365364
return {}
366365
finally:

0 commit comments

Comments
 (0)