From 28b37c7d48fdca50cd65593ee52b57b944e06872 Mon Sep 17 00:00:00 2001 From: weilin Date: Tue, 12 Aug 2025 15:34:36 +0800 Subject: [PATCH 1/2] [ISSUE #1209] Create a topic using clusterName --- admin/admin.go | 5 +++++ admin/option.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/admin/admin.go b/admin/admin.go index a92a25c7..296296ab 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -183,6 +183,11 @@ func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error { } cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil) + if cfg.BrokerAddr == "" { + a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.ClusterName) + cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.ClusterName) + } + _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second) if err != nil { rlog.Error("create topic error", map[string]interface{}{ diff --git a/admin/option.go b/admin/option.go index d5a648ef..3e29c14b 100644 --- a/admin/option.go +++ b/admin/option.go @@ -33,6 +33,7 @@ func defaultTopicConfigCreate() TopicConfigCreate { type TopicConfigCreate struct { Topic string BrokerAddr string + ClusterName string DefaultTopic string ReadQueueNums int WriteQueueNums int @@ -56,6 +57,12 @@ func WithBrokerAddrCreate(BrokerAddr string) OptionCreate { } } +func WithClusterNameCreate(ClusterName string) OptionCreate { + return func(opts *TopicConfigCreate) { + opts.ClusterName = ClusterName + } +} + func WithReadQueueNums(ReadQueueNums int) OptionCreate { return func(opts *TopicConfigCreate) { opts.ReadQueueNums = ReadQueueNums From 74f1a2c2c5a29953472aa8fff8bbb014a9bbf9a1 Mon Sep 17 00:00:00 2001 From: weilin Date: Sat, 16 Aug 2025 11:55:13 +0800 Subject: [PATCH 2/2] [ISSUE #1209] Create a topic using clusterName --- admin/admin.go | 77 +++++++++++++++++++++++++++------------- internal/mock_namesrv.go | 14 ++++++++ internal/namesrv.go | 2 ++ internal/route.go | 18 ++++++++++ 4 files changed, 86 insertions(+), 25 deletions(-) diff --git a/admin/admin.go b/admin/admin.go index 296296ab..5fa0b813 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -185,23 +185,40 @@ func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error { cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil) if cfg.BrokerAddr == "" { a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.ClusterName) - cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.ClusterName) - } - - _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second) - if err != nil { - rlog.Error("create topic error", map[string]interface{}{ - rlog.LogKeyTopic: cfg.Topic, - rlog.LogKeyBroker: cfg.BrokerAddr, - rlog.LogKeyUnderlayError: err, - }) + brokerAddresses := a.cli.GetNameSrv().FindAllBrokerAddressByCluster(cfg.ClusterName) + for brokerAddr, _ := range brokerAddresses { + _, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second) + if err != nil { + rlog.Error("create topic error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: brokerAddr, + rlog.LogKeyUnderlayError: err, + }) + return err + } else { + rlog.Info("create topic success", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: brokerAddr, + }) + } + } } else { - rlog.Info("create topic success", map[string]interface{}{ - rlog.LogKeyTopic: cfg.Topic, - rlog.LogKeyBroker: cfg.BrokerAddr, - }) + _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second) + if err != nil { + rlog.Error("create topic error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: cfg.BrokerAddr, + rlog.LogKeyUnderlayError: err, + }) + } else { + rlog.Info("create topic success", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: cfg.BrokerAddr, + }) + } + return err } - return err + return nil } // DeleteTopicInBroker delete topic in broker. @@ -233,16 +250,26 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error { //delete topic in broker if cfg.BrokerAddr == "" { a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic) - cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic) - } - - if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil { - rlog.Error("delete topic in broker error", map[string]interface{}{ - rlog.LogKeyTopic: cfg.Topic, - rlog.LogKeyBroker: cfg.BrokerAddr, - rlog.LogKeyUnderlayError: err, - }) - return err + brokerAddresses := a.cli.GetNameSrv().FindAllBrokerAddressByCluster(cfg.ClusterName) + for brokerAddr, _ := range brokerAddresses { + if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, brokerAddr); err != nil { + rlog.Error("delete topic in broker error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: brokerAddr, + rlog.LogKeyUnderlayError: err, + }) + return err + } + } + } else { + if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil { + rlog.Error("delete topic in broker error", map[string]interface{}{ + rlog.LogKeyTopic: cfg.Topic, + rlog.LogKeyBroker: cfg.BrokerAddr, + rlog.LogKeyUnderlayError: err, + }) + return err + } } //delete topic in nameserver diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go index 7ce6f971..ca2b6de1 100644 --- a/internal/mock_namesrv.go +++ b/internal/mock_namesrv.go @@ -178,6 +178,20 @@ func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic) } +// FindAllBrokerAddressByCluster mocks base method +func (m *MockNamesrvs) FindAllBrokerAddressByCluster(clusterName string) map[string]struct{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindAllBrokerAddressByCluster", clusterName) + ret0, _ := ret[0].(map[string]struct{}) + return ret0 +} + +// FindAllBrokerAddressByCluster indicates an expected call of FindAllBrokerAddressByCluster +func (mr *MockNamesrvsMockRecorder) FindAllBrokerAddressByCluster(clusterName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindAllBrokerAddressByCluster", reflect.TypeOf((*MockNamesrvs)(nil).FindAllBrokerAddressByCluster), clusterName) +} + // AddrList mocks base method func (m *MockNamesrvs) AddrList() []string { m.ctrl.T.Helper() diff --git a/internal/namesrv.go b/internal/namesrv.go index c3479985..f3bc2d51 100644 --- a/internal/namesrv.go +++ b/internal/namesrv.go @@ -62,6 +62,8 @@ type Namesrvs interface { FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) + FindAllBrokerAddressByCluster(clusterName string) map[string]struct{} + AddrList() []string } diff --git a/internal/route.go b/internal/route.go index 13ce05c1..253a9e74 100644 --- a/internal/route.go +++ b/internal/route.go @@ -239,6 +239,24 @@ func (s *namesrvs) FindBrokerAddrByTopic(topic string) string { return addr } +func (s *namesrvs) FindAllBrokerAddressByCluster(clusterName string) map[string]struct{} { + addrs := make(map[string]struct{}) + v, exist := s.routeDataMap.Load(clusterName) + if !exist { + return addrs + } + routeData := v.(*TopicRouteData) + + for _, bd := range routeData.BrokerDataList { + for _, addr := range bd.BrokerAddresses { + if addr != "" { + addrs[addr] = struct{}{} + } + } + } + return addrs +} + func (s *namesrvs) FindBrokerAddrByName(brokerName string) string { bd, exist := s.brokerAddressesMap.Load(brokerName)