Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 54 additions & 22 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,42 @@ func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
}

cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
if cfg.BrokerAddr == "" {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.ClusterName)
brokerAddresses := a.cli.GetNameSrv().FindAllBrokerAddressByCluster(cfg.ClusterName)
for brokerAddr, _ := range brokerAddresses {
_, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: brokerAddr,
})
}
}
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
}
return err
}
return err
return nil
}

// DeleteTopicInBroker delete topic in broker.
Expand Down Expand Up @@ -228,16 +250,26 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
//delete topic in broker
if cfg.BrokerAddr == "" {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
}

if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
brokerAddresses := a.cli.GetNameSrv().FindAllBrokerAddressByCluster(cfg.ClusterName)
for brokerAddr, _ := range brokerAddresses {
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, brokerAddr); err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
}
}
} else {
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
return err
}
}

//delete topic in nameserver
Expand Down
7 changes: 7 additions & 0 deletions admin/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func defaultTopicConfigCreate() TopicConfigCreate {
type TopicConfigCreate struct {
Topic string
BrokerAddr string
ClusterName string
DefaultTopic string
ReadQueueNums int
WriteQueueNums int
Expand All @@ -56,6 +57,12 @@ func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
}
}

func WithClusterNameCreate(ClusterName string) OptionCreate {
return func(opts *TopicConfigCreate) {
opts.ClusterName = ClusterName
}
}

func WithReadQueueNums(ReadQueueNums int) OptionCreate {
return func(opts *TopicConfigCreate) {
opts.ReadQueueNums = ReadQueueNums
Expand Down
14 changes: 14 additions & 0 deletions internal/mock_namesrv.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/namesrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Namesrvs interface {

FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error)

FindAllBrokerAddressByCluster(clusterName string) map[string]struct{}

AddrList() []string
}

Expand Down
18 changes: 18 additions & 0 deletions internal/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,24 @@ func (s *namesrvs) FindBrokerAddrByTopic(topic string) string {
return addr
}

func (s *namesrvs) FindAllBrokerAddressByCluster(clusterName string) map[string]struct{} {
addrs := make(map[string]struct{})
v, exist := s.routeDataMap.Load(clusterName)
if !exist {
return addrs
}
routeData := v.(*TopicRouteData)

for _, bd := range routeData.BrokerDataList {
for _, addr := range bd.BrokerAddresses {
if addr != "" {
addrs[addr] = struct{}{}
}
}
}
return addrs
}

func (s *namesrvs) FindBrokerAddrByName(brokerName string) string {
bd, exist := s.brokerAddressesMap.Load(brokerName)

Expand Down