Skip to content

Commit 1a09777

Browse files
committed
Resolved #278.
1 parent 6af2913 commit 1a09777

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

src/thread_safe_kafka_topics_analyzer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ def update_progress() -> None:
214214
progress = (self.completed_topics / self.total_topics) * 100
215215
logging.info("Progress: %d of %d (%.1f%%) topics completed", self.completed_topics, self.total_topics, progress)
216216

217-
def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
217+
def analyze_topic_worker(topic_name: str, topic_info: Dict, start_time_epoch: int) -> Dict:
218218
"""Worker function to analyze a single topic.
219219
220220
Args:
@@ -239,7 +239,7 @@ def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
239239
# Use sample records approach
240240

241241
# Calculate the ISO 8601 formatted start timestamp of the rolling window
242-
rolling_start = topic_info['utc_now'] - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
242+
rolling_start = start_time_epoch - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
243243
iso_start_time = datetime.fromisoformat(rolling_start.strftime('%Y-%m-%dT%H:%M:%S+00:00'))
244244
start_time_epoch_ms = int(rolling_start.timestamp() * 1000)
245245

@@ -260,7 +260,7 @@ def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
260260

261261
else:
262262
# Use Metrics API approach
263-
result = thread_analyzer.analyze_topic_with_metrics(metrics_config, topic_name, topic_info)
263+
result = thread_analyzer.analyze_topic_with_metrics(metrics_config, topic_name, topic_info, start_time_epoch)
264264

265265
return result
266266

@@ -282,7 +282,7 @@ def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
282282
with ThreadPoolExecutor(max_workers=max_workers_per_cluster) as executor:
283283
# Submit all tasks
284284
future_to_topic = {
285-
executor.submit(analyze_topic_worker, topic_name, topic_info): topic_name
285+
executor.submit(analyze_topic_worker, topic_name, topic_info, analysis_start_time_epoch): topic_name
286286
for topic_name, topic_info in topics_to_analyze.items()
287287
}
288288

@@ -308,7 +308,7 @@ def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
308308
update_progress()
309309

310310
except Exception as e:
311-
logging.error("Error processing topic %s: %s", topic_name, e)
311+
logging.warning("Failed processing topic %s, because of %s", topic_name, e)
312312
update_progress()
313313

314314
# Calculate summary statistics

src/thread_safe_topic_analyzer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,14 @@ def analyze_topic(self,
121121
'is_internal': topic_name.startswith('_')
122122
}
123123

124-
def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topic_info: Dict) -> Dict:
124+
def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topic_info: Dict, start_time_epoch: int) -> Dict:
125125
"""Analyze a single topic using Metrics API.
126126
127127
Args:
128128
metrics_config (Dict): Configuration dictionary for Metrics API client.
129129
topic_name (str): The name of the topic to analyze.
130130
topic_info (Dict): Metadata and retention info about the topic.
131+
start_time_epoch (int): The start time in epoch milliseconds for sampling.
131132
132133
Returns:
133134
Dict: Analysis results including partition count, compaction status,
@@ -240,10 +241,11 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
240241
retry = 0
241242

242243
while retry < max_retries:
243-
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
244-
rolling_days_start = topic_info['utc_now'] - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
244+
# Calculate the ISO 8601 formatted start timestamp of the rolling window
245+
utc_now = datetime.fromtimestamp(start_time_epoch)
246+
rolling_days_start = utc_now - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
245247
iso_start_time = rolling_days_start.strftime('%Y-%m-%dT%H:%M:%S')
246-
iso_end_time = topic_info['utc_now'].strftime('%Y-%m-%dT%H:%M:%S')
248+
iso_end_time = utc_now.strftime('%Y-%m-%dT%H:%M:%S')
247249
query_start_time = datetime.fromisoformat(iso_start_time.replace('Z', '+00:00'))
248250
query_end_time = datetime.fromisoformat(iso_end_time.replace('Z', '+00:00'))
249251

tests/test_metrics_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class TestMetricsClient:
4747
def test_get_topic_received_total_bytes(self, metrics_client, kafka_cluster_id, kafka_topic_name):
4848
"""Test the get_topic_total() function for getting the total bytes."""
4949

50-
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
50+
# Calculate the ISO 8601 formatted start timestamp of the rolling window
5151
utc_now = datetime.now(timezone.utc)
5252
seven_days_ago = utc_now - timedelta(days=7)
5353
iso_start_time = seven_days_ago.strftime('%Y-%m-%dT%H:%M:%S')
@@ -71,7 +71,7 @@ def test_get_topic_received_total_bytes(self, metrics_client, kafka_cluster_id,
7171
def test_get_topic_received_total_records(self, metrics_client, kafka_cluster_id, kafka_topic_name):
7272
"""Test the get_topic_total() function for getting the total records."""
7373

74-
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
74+
# Calculate the ISO 8601 formatted start timestamp of the rolling window
7575
utc_now = datetime.now(timezone.utc)
7676
seven_days_ago = utc_now - timedelta(days=7)
7777
iso_start_time = seven_days_ago.strftime('%Y-%m-%dT%H:%M:%S')
@@ -174,7 +174,7 @@ def test_is_topic_partition_hot_by_ingress_throughput(self, metrics_client, kafk
174174
"""Test the is_topic_partition_hot() function for checking if a topic partition is hot
175175
by ingress throughput."""
176176

177-
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
177+
# Calculate the ISO 8601 formatted start timestamp of the rolling window
178178
utc_now = datetime.now(timezone.utc)
179179
seven_days_ago = utc_now - timedelta(days=7)
180180
iso_start_time = seven_days_ago.strftime('%Y-%m-%dT%H:%M:%S')
@@ -199,7 +199,7 @@ def test_is_topic_partition_hot_by_egress_throughput(self, metrics_client, kafka
199199
"""Test the is_topic_partition_hot() function for checking if a topic partition is hot
200200
by egress throughput."""
201201

202-
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
202+
# Calculate the ISO 8601 formatted start timestamp of the rolling window
203203
utc_now = datetime.now(timezone.utc)
204204
seven_days_ago = utc_now - timedelta(days=7)
205205
iso_start_time = seven_days_ago.strftime('%Y-%m-%dT%H:%M:%S')

0 commit comments

Comments
 (0)