1515import com .linkedin .xinfra .monitor .services .configs .MultiClusterTopicManagementServiceConfig ;
1616import com .linkedin .xinfra .monitor .services .configs .TopicManagementServiceConfig ;
1717import com .linkedin .xinfra .monitor .topicfactory .TopicFactory ;
18+ import java .time .Duration ;
1819import java .util .ArrayList ;
1920import java .util .Collection ;
2021import java .util .Collections ;
2627import java .util .Properties ;
2728import java .util .Random ;
2829import java .util .Set ;
30+ import java .util .concurrent .CancellationException ;
2931import java .util .concurrent .CompletableFuture ;
3032import java .util .concurrent .ExecutionException ;
3133import java .util .concurrent .Executors ;
4143import org .apache .kafka .clients .admin .AlterPartitionReassignmentsResult ;
4244import org .apache .kafka .clients .admin .Config ;
4345import org .apache .kafka .clients .admin .ConfigEntry ;
44- import org .apache .kafka .clients .admin .CreatePartitionsResult ;
4546import org .apache .kafka .clients .admin .ElectLeadersResult ;
4647import org .apache .kafka .clients .admin .NewPartitionReassignment ;
4748import org .apache .kafka .clients .admin .NewPartitions ;
@@ -248,7 +249,7 @@ static class TopicManagementHelper {
248249 private final int _minPartitionNum ;
249250 private final Properties _topicProperties ;
250251 private boolean _preferredLeaderElectionRequested ;
251- private final int _requestTimeoutMs ;
252+ private final Duration _requestTimeout ;
252253 private final List <String > _bootstrapServers ;
253254
254255 // package private for unit testing
@@ -261,6 +262,7 @@ static class TopicManagementHelper {
261262
262263 @ SuppressWarnings ("unchecked" )
263264 TopicManagementHelper (Map <String , Object > props ) throws Exception {
265+
264266 TopicManagementServiceConfig config = new TopicManagementServiceConfig (props );
265267 AdminClientConfig adminClientConfig = new AdminClientConfig (props );
266268 String topicFactoryClassName = config .getString (TopicManagementServiceConfig .TOPIC_FACTORY_CLASS_CONFIG );
@@ -273,7 +275,7 @@ static class TopicManagementHelper {
273275 _minPartitionsToBrokersRatio = config .getDouble (TopicManagementServiceConfig .PARTITIONS_TO_BROKERS_RATIO_CONFIG );
274276 _minPartitionNum = config .getInt (TopicManagementServiceConfig .MIN_PARTITION_NUM_CONFIG );
275277 _preferredLeaderElectionRequested = false ;
276- _requestTimeoutMs = adminClientConfig .getInt (AdminClientConfig .REQUEST_TIMEOUT_MS_CONFIG );
278+ _requestTimeout = Duration . ofMillis ( adminClientConfig .getInt (AdminClientConfig .REQUEST_TIMEOUT_MS_CONFIG ) );
277279 _bootstrapServers = adminClientConfig .getList (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG );
278280 _topicProperties = new Properties ();
279281 if (props .containsKey (TopicManagementServiceConfig .TOPIC_PROPS_CONFIG )) {
@@ -288,9 +290,17 @@ static class TopicManagementHelper {
288290 TopicManagementServiceConfig .TOPIC_FACTORY_PROPS_CONFIG ) : new HashMap ();
289291 _topicFactory =
290292 (TopicFactory ) Class .forName (topicFactoryClassName ).getConstructor (Map .class ).newInstance (topicFactoryConfig );
291-
292293 _adminClient = constructAdminClient (props );
293294 LOGGER .info ("{} configs: {}" , _adminClient .getClass ().getSimpleName (), props );
295+ logConfigurationValues ();
296+ }
297+
298+ private void logConfigurationValues () {
299+ LOGGER .info ("TopicManagementHelper for cluster with Zookeeper connect {} is configured with " +
300+ "[topic={}, topicCreationEnabled={}, topicAddPartitionEnabled={}, " +
301+ "topicReassignPartitionAndElectLeaderEnabled={}, minPartitionsToBrokersRatio={}, " +
302+ "minPartitionNum={}]" , _zkConnect , _topic , _topicCreationEnabled , _topicAddPartitionEnabled ,
303+ _topicReassignPartitionAndElectLeaderEnabled , _minPartitionsToBrokersRatio , _minPartitionNum );
294304 }
295305
296306 @ SuppressWarnings ("unchecked" )
@@ -317,7 +327,8 @@ int minPartitionNum() throws InterruptedException, ExecutionException {
317327 return Math .max ((int ) Math .ceil (_minPartitionsToBrokersRatio * brokerCount ), _minPartitionNum );
318328 }
319329
320- void maybeAddPartitions (int minPartitionNum ) throws ExecutionException , InterruptedException {
330+ void maybeAddPartitions (final int requiredMinPartitionNum )
331+ throws ExecutionException , InterruptedException , CancellationException , TimeoutException {
321332 if (!_topicAddPartitionEnabled ) {
322333 LOGGER .info ("Adding partition to {} topic is not enabled in a cluster with Zookeeper URL {}. " +
323334 "Refer to config: {}" , _topic , _zkConnect , TopicManagementServiceConfig .TOPIC_ADD_PARTITION_ENABLED_CONFIG );
@@ -326,32 +337,36 @@ void maybeAddPartitions(int minPartitionNum) throws ExecutionException, Interrup
326337 Map <String , KafkaFuture <TopicDescription >> kafkaFutureMap =
327338 _adminClient .describeTopics (Collections .singleton (_topic )).values ();
328339 KafkaFuture <TopicDescription > topicDescriptions = kafkaFutureMap .get (_topic );
329- List <TopicPartitionInfo > partitions = topicDescriptions .get ().partitions ();
330-
331- int partitionNum = partitions .size ();
332- if (partitionNum < minPartitionNum ) {
333- LOGGER .info ("{} will increase partition of the topic {} in the cluster from {}" + " to {}." ,
334- this .getClass ().toString (), _topic , partitionNum , minPartitionNum );
335- Set <Integer > blackListedBrokers = _topicFactory .getBlackListedBrokers (_adminClient );
336- Set <BrokerMetadata > brokers = new HashSet <>();
337- for (Node broker : _adminClient .describeCluster ().nodes ().get ()) {
338- BrokerMetadata brokerMetadata = new BrokerMetadata (broker .id (), null );
339- brokers .add (brokerMetadata );
340- }
340+ List <TopicPartitionInfo > partitions = topicDescriptions .get (_requestTimeout .toMillis (), TimeUnit .MILLISECONDS ).partitions ();
341341
342- if (!blackListedBrokers .isEmpty ()) {
343- brokers .removeIf (broker -> blackListedBrokers .contains (broker .id ()));
344- }
342+ final int currPartitionNum = partitions .size ();
343+ if (currPartitionNum >= requiredMinPartitionNum ) {
344+ LOGGER .debug ("{} will not increase partition of the topic {} in the cluster. Current partition count {} and '" +
345+ "minimum required partition count is {}." , this .getClass ().toString (), _topic , currPartitionNum , requiredMinPartitionNum );
346+ return ;
347+ }
348+ LOGGER .info ("{} will increase partition of the topic {} in the cluster from {}" + " to {}." ,
349+ this .getClass ().toString (), _topic , currPartitionNum , requiredMinPartitionNum );
350+ Set <BrokerMetadata > brokers = new HashSet <>();
351+ for (Node broker : _adminClient .describeCluster ().nodes ().get (_requestTimeout .toMillis (), TimeUnit .MILLISECONDS )) {
352+ BrokerMetadata brokerMetadata = new BrokerMetadata (broker .id (), null );
353+ brokers .add (brokerMetadata );
354+ }
355+ Set <Integer > excludedBrokers = _topicFactory .getExcludedBrokers (_adminClient );
356+ if (!excludedBrokers .isEmpty ()) {
357+ brokers .removeIf (broker -> excludedBrokers .contains (broker .id ()));
358+ }
345359
346- List <List <Integer >> newPartitionAssignments =
347- newPartitionAssignments (minPartitionNum , partitionNum , brokers , _replicationFactor );
360+ List <List <Integer >> newPartitionAssignments =
361+ newPartitionAssignments (requiredMinPartitionNum , currPartitionNum , brokers , _replicationFactor );
348362
349- NewPartitions newPartitions = NewPartitions .increaseTo (minPartitionNum , newPartitionAssignments );
363+ NewPartitions newPartitions = NewPartitions .increaseTo (requiredMinPartitionNum , newPartitionAssignments );
350364
351- Map <String , NewPartitions > newPartitionsMap = new HashMap <>();
352- newPartitionsMap .put (_topic , newPartitions );
353- CreatePartitionsResult createPartitionsResult = _adminClient .createPartitions (newPartitionsMap );
354- }
365+ Map <String , NewPartitions > newPartitionsMap = new HashMap <>();
366+ newPartitionsMap .put (_topic , newPartitions );
367+ _adminClient .createPartitions (newPartitionsMap ).all ().get (_requestTimeout .toMillis (), TimeUnit .MILLISECONDS );
368+ LOGGER .info ("{} finished increasing partition of the topic {} in the cluster from {} to {}" ,
369+ this .getClass ().toString (), _topic , currPartitionNum , requiredMinPartitionNum );
355370 }
356371
357372 static List <List <Integer >> newPartitionAssignments (int minPartitionNum , int partitionNum ,
@@ -427,8 +442,8 @@ int numPartitions() throws InterruptedException, ExecutionException {
427442
428443 private Set <Node > getAvailableBrokers () throws ExecutionException , InterruptedException {
429444 Set <Node > brokers = new HashSet <>(_adminClient .describeCluster ().nodes ().get ());
430- Set <Integer > blackListedBrokers = _topicFactory .getBlackListedBrokers (_adminClient );
431- brokers .removeIf (broker -> blackListedBrokers .contains (broker .id ()));
445+ Set <Integer > excludedBrokers = _topicFactory .getExcludedBrokers (_adminClient );
446+ brokers .removeIf (broker -> excludedBrokers .contains (broker .id ()));
432447 return brokers ;
433448 }
434449
0 commit comments