Skip to content

Commit 1a410cd

Browse files
dateutliDavid Teutlipetedannemann
authored
feat: rebalance of topics with no config files (#245)
* feat(tube-3309): add initial groundwork for rebalancing of topics with no config files * feat(tube-3309): use topic name instead of filename to keep track of existing generated files; wrap topic file and config traversal in reusable function * feat(tube-3309): make use of wrapper processTopicFiles function for main process * feat(tube-3309): update logic to skip existing config files * Update cmd/topicctl/subcmd/rebalance.go Co-authored-by: Peter Dannemann <28637185+petedannemann@users.noreply.github.com> * update readme, name of flag, and logic to skip existing files * Update README.md Co-authored-by: Peter Dannemann <28637185+petedannemann@users.noreply.github.com> --------- Co-authored-by: David Teutli <dapalacio@segment.com> Co-authored-by: Peter Dannemann <28637185+petedannemann@users.noreply.github.com>
1 parent 9796c45 commit 1a410cd

File tree

2 files changed

+109
-42
lines changed

2 files changed

+109
-42
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,8 @@ To rebalance **all** topics in a cluster, use the `rebalance` subcommand, which
457457
function on all qualifying topics. It will inventory all topic configs found at `--path-prefix` for a cluster
458458
specified by `--cluster-config`.
459459

460+
To rebalance topics in a cluster that exist without topic configuration files, use the `rebalance` subcommand with the `--bootstrap-missing-configs` flag. This will temporarily bootstrap any missing topic configs at `--path-prefix`. This can also be used to use topicctl as a topic rebalancing tool, without using its topic configuration management features
461+
460462
This subcommand will not rebalance a topic if:
461463

462464
1. the topic config is inconsistent with the cluster config (name, region, environment etc...)

cmd/topicctl/subcmd/rebalance.go

Lines changed: 107 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type rebalanceCmdConfig struct {
3232
brokersToRemove []int
3333
brokerThrottleMBsOverride int
3434
dryRun bool
35+
bootstrapMissingConfigs bool
3536
partitionBatchSizeOverride int
3637
pathPrefix string
3738
sleepLoopDuration time.Duration
@@ -85,6 +86,12 @@ func init() {
8586
0*time.Second,
8687
"Interval of time to show progress during rebalance",
8788
)
89+
rebalanceCmd.Flags().BoolVar(
90+
&rebalanceConfig.bootstrapMissingConfigs,
91+
"bootstrap-missing-configs",
92+
false,
93+
"Bootstrap temporary topic config(s) for the rebalance of configless topic(s)",
94+
)
8895

8996
addSharedConfigOnlyFlags(rebalanceCmd, &rebalanceConfig.shared)
9097
RootCmd.AddCommand(rebalanceCmd)
@@ -151,55 +158,80 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
151158
return err
152159
}
153160

154-
// iterate through each topic config and initiate rebalance
155-
topicConfigs := []config.TopicConfig{}
156-
topicErrorDict := make(map[string]error)
157-
for _, topicFile := range topicFiles {
158-
// do not consider invalid topic yaml files for rebalance
159-
topicConfigs, err = config.LoadTopicsFile(topicFile)
161+
existingConfigFiles := make(map[string]struct{})
162+
if rebalanceConfig.bootstrapMissingConfigs {
163+
// make set of existing files
164+
err := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error {
165+
_, topicFilename := filepath.Split(topicFile)
166+
existingConfigFiles[topicFilename] = struct{}{}
167+
return nil
168+
})
160169
if err != nil {
161-
log.Errorf("Invalid topic yaml file: %s", topicFile)
162-
continue
170+
return err
163171
}
164172

165-
for _, topicConfig := range topicConfigs {
166-
// topic config should be consistent with the cluster config
167-
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
168-
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
169-
continue
170-
}
173+
// bootstrap missing config files
174+
cliRunner := cli.NewCLIRunner(adminClient, log.Infof, false)
175+
cliRunner.BootstrapTopics(
176+
ctx,
177+
[]string{},
178+
clusterConfig,
179+
".*",
180+
".^",
181+
rebalanceConfig.pathPrefix,
182+
false,
183+
false,
184+
)
171185

172-
log.Infof(
173-
"Rebalancing topic %s from config file %s with cluster config %s",
174-
topicConfig.Meta.Name,
175-
topicFile,
176-
clusterConfigPath,
177-
)
178-
179-
topicErrorDict[topicConfig.Meta.Name] = nil
180-
rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{
181-
TopicName: topicConfig.Meta.Name,
182-
ClusterName: clusterConfig.Meta.Name,
183-
ClusterEnvironment: clusterConfig.Meta.Environment,
184-
ToRemove: rebalanceConfig.brokersToRemove,
185-
RebalanceError: false,
186-
}
187-
if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil {
188-
topicErrorDict[topicConfig.Meta.Name] = err
189-
rebalanceTopicProgressConfig.RebalanceError = true
190-
log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err)
191-
}
186+
// re-inventory topic configs to take into account bootstrapped ones
187+
topicFiles, err = getAllFiles(topicConfigDir)
188+
if err != nil {
189+
return err
190+
}
191+
}
192+
193+
// iterate through each topic config and initiate rebalance
194+
topicErrorDict := make(map[string]error)
195+
processed := processTopicFiles(topicFiles, func(topicConfig config.TopicConfig, topicFile string) error {
196+
// topic config should be consistent with the cluster config
197+
if err := config.CheckConsistency(topicConfig.Meta, clusterConfig); err != nil {
198+
log.Errorf("topic file: %s inconsistent with cluster: %s", topicFile, clusterConfigPath)
199+
return nil
200+
}
201+
202+
log.Infof(
203+
"Rebalancing topic %s from config file %s with cluster config %s",
204+
topicConfig.Meta.Name,
205+
topicFile,
206+
clusterConfigPath,
207+
)
208+
topicErrorDict[topicConfig.Meta.Name] = nil
209+
rebalanceTopicProgressConfig := util.RebalanceTopicProgressConfig{
210+
TopicName: topicConfig.Meta.Name,
211+
ClusterName: clusterConfig.Meta.Name,
212+
ClusterEnvironment: clusterConfig.Meta.Environment,
213+
ToRemove: rebalanceConfig.brokersToRemove,
214+
RebalanceError: false,
215+
}
216+
if err := rebalanceApplyTopic(ctx, topicConfig, clusterConfig, adminClient); err != nil {
217+
topicErrorDict[topicConfig.Meta.Name] = err
218+
rebalanceTopicProgressConfig.RebalanceError = true
219+
log.Errorf("topic: %s rebalance failed with error: %v", topicConfig.Meta.Name, err)
220+
}
192221

193-
// show topic final progress
194-
if rebalanceCtxStruct.Enabled {
195-
progressStr, err := util.StructToStr(rebalanceTopicProgressConfig)
196-
if err != nil {
197-
log.Errorf("progress struct to string error: %+v", err)
198-
} else {
199-
log.Infof("Rebalance Progress: %s", progressStr)
200-
}
222+
// show topic final progress
223+
if rebalanceCtxStruct.Enabled {
224+
progressStr, err := util.StructToStr(rebalanceTopicProgressConfig)
225+
if err != nil {
226+
log.Errorf("progress struct to string error: %+v", err)
227+
} else {
228+
log.Infof("Rebalance Progress: %s", progressStr)
201229
}
202230
}
231+
return nil
232+
})
233+
if processed != nil {
234+
return processed
203235
}
204236

205237
// audit at the end of all topic rebalances
@@ -231,6 +263,20 @@ func rebalanceRun(cmd *cobra.Command, args []string) error {
231263
}
232264
}
233265

266+
// clean up any bootstrapped topic configs
267+
if rebalanceConfig.bootstrapMissingConfigs {
268+
for _, topicFile := range topicFiles {
269+
_, topicFilename := filepath.Split(topicFile)
270+
if _, found := existingConfigFiles[topicFilename]; found {
271+
continue
272+
}
273+
err := os.Remove(topicFile)
274+
if err != nil {
275+
log.Errorf("error deleting temporary file %s: %v", topicFile, err)
276+
}
277+
}
278+
}
279+
234280
log.Infof("Rebalance complete! %d topics rebalanced successfully, %d topics had errors", successTopics, errorTopics)
235281
return nil
236282
}
@@ -366,3 +412,22 @@ func getAllFiles(dir string) ([]string, error) {
366412

367413
return files, err
368414
}
415+
416+
func processTopicFiles(topicFiles []string, operation func(topicConfig config.TopicConfig, topicFile string) error) error {
417+
for _, topicFile := range topicFiles {
418+
// do not consider invalid topic yaml files for rebalance
419+
topicConfigs, err := config.LoadTopicsFile(topicFile)
420+
if err != nil {
421+
log.Errorf("Invalid topic yaml file: %s", topicFile)
422+
continue
423+
}
424+
425+
for _, topicConfig := range topicConfigs {
426+
err := operation(topicConfig, topicFile)
427+
if err != nil {
428+
return fmt.Errorf("error during operation on config %d (%s): %w", 0, topicConfig.Meta.Name, err)
429+
}
430+
}
431+
}
432+
return nil
433+
}

0 commit comments

Comments
 (0)