Skip to content

Commit 6af2913

Browse files
committed
Closed #278.
1 parent 6cae4d0 commit 6af2913

File tree

5 files changed

+17
-11
lines changed

5 files changed

+17
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
55

66

7+
## [0.12.03.000] - 2025-10-07
8+
### Changed
9+
- Issue [#278](https://github.com/j3-signalroom/kafka_cluster-topics-partition_count_recommender-tool/issues/278)
10+
711
## [0.12.02.000] - 2025-10-06
812
### Fixed
913
- Issue [#276](https://github.com/j3-signalroom/kafka_cluster-topics-partition_count_recommender-tool/issues/276)

CHANGELOG.pdf

850 Bytes
Binary file not shown.

src/thread_safe_kafka_topics_analyzer.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import csv
2-
from datetime import datetime, timedelta, timezone
2+
from datetime import datetime, timedelta
33
import time
44
from typing import Dict, List
55
from confluent_kafka.admin import AdminClient, ConfigResource
@@ -105,7 +105,8 @@ def analyze_all_topics(self,
105105
kafka_writer_topic_name: str,
106106
kafka_writer_topic_partition_count: int,
107107
kafka_writer_topic_replication_factor: int,
108-
kafka_writer_topic_data_retention_in_days: int) -> bool:
108+
kafka_writer_topic_data_retention_in_days: int,
109+
utc_now: datetime) -> bool:
109110
"""Analyze all topics in the Kafka cluster.
110111
111112
Args:
@@ -129,6 +130,7 @@ def analyze_all_topics(self,
129130
kafka_writer_topic_partition_count (int): The number of partitions for the Kafka writer topic.
130131
kafka_writer_topic_replication_factor (int): The replication factor for the Kafka writer topic.
131132
kafka_writer_topic_data_retention_in_days (int): The data retention period for the Kafka writer topic in days.
133+
utc_now (datetime): Current UTC datetime for consistent time references.
132134
133135
Returns:
134136
bool: True if analysis was successful, False otherwise.
@@ -139,7 +141,7 @@ def analyze_all_topics(self,
139141
return []
140142

141143
# Start analysis
142-
analysis_start_time_epoch = time.time()
144+
analysis_start_time_epoch = int(utc_now.timestamp())
143145
self.total_topics = len(topics_to_analyze)
144146

145147
# Log initial analysis parameters
@@ -237,8 +239,7 @@ def analyze_topic_worker(topic_name: str, topic_info: Dict) -> Dict:
237239
# Use sample records approach
238240

239241
# Calculate the ISO 8601 formatted start timestamp of the rolling window
240-
utc_now = datetime.now(timezone.utc)
241-
rolling_start = utc_now - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
242+
rolling_start = topic_info['utc_now'] - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
242243
iso_start_time = datetime.fromisoformat(rolling_start.strftime('%Y-%m-%dT%H:%M:%S+00:00'))
243244
start_time_epoch_ms = int(rolling_start.timestamp() * 1000)
244245

src/thread_safe_tool.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime, timezone
12
import logging
23
import os
34
from typing import Dict
@@ -199,7 +200,8 @@ def main():
199200
'kafka_writer_topic_name': kafka_writer_topic_name,
200201
'kafka_writer_topic_partition_count': kafka_writer_topic_partition_count,
201202
'kafka_writer_topic_replication_factor': kafka_writer_topic_replication_factor,
202-
'kafka_writer_topic_data_retention_in_days': kafka_writer_topic_data_retention_in_days
203+
'kafka_writer_topic_data_retention_in_days': kafka_writer_topic_data_retention_in_days,
204+
'utc_now': datetime.now(timezone.utc)
203205
}
204206

205207
logging.info("=" * DEFAULT_CHARACTER_REPEAT)

src/thread_safe_topic_analyzer.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Dict, List, Tuple
2-
from datetime import datetime, timedelta, timezone
2+
from datetime import datetime, timedelta
33
import time
44
import logging
55
import threading
@@ -241,10 +241,9 @@ def analyze_topic_with_metrics(self, metrics_config: Dict, topic_name: str, topi
241241

242242
while retry < max_retries:
243243
# Calculate the ISO 8601 formatted start and end times within a rolling window for the last 1 day
244-
utc_now = datetime.now(timezone.utc)
245-
one_day_ago = utc_now - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
246-
iso_start_time = one_day_ago.strftime('%Y-%m-%dT%H:%M:%S')
247-
iso_end_time = utc_now.strftime('%Y-%m-%dT%H:%M:%S')
244+
rolling_days_start = topic_info['utc_now'] - timedelta(days=topic_info['sampling_days_based_on_retention_days'])
245+
iso_start_time = rolling_days_start.strftime('%Y-%m-%dT%H:%M:%S')
246+
iso_end_time = topic_info['utc_now'].strftime('%Y-%m-%dT%H:%M:%S')
248247
query_start_time = datetime.fromisoformat(iso_start_time.replace('Z', '+00:00'))
249248
query_end_time = datetime.fromisoformat(iso_end_time.replace('Z', '+00:00'))
250249

0 commit comments

Comments
 (0)