Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #3369][FEAT]create topic with broker addr or name or none #1087

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
153 changes: 136 additions & 17 deletions admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package admin

import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -36,7 +38,8 @@ type Admin interface {

GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error)
FetchAllTopicList(ctx context.Context) (*TopicList, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
FindBrokerAddrByName(ctx context.Context, BrokerName string) ([]string, error)
GetBrokerClusterInfo(ctx context.Context) (*ClusterInfo, error)
FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
Close() error
}
Expand Down Expand Up @@ -102,7 +105,7 @@ func NewAdmin(opts ...AdminOption) (*admin, error) {
return nil, fmt.Errorf("GetOrNewRocketMQClient faild")
}
defaultOpts.Namesrv = cli.GetNameSrv()
//log.Printf("Client: %#v", namesrv.srvs)
// log.Printf("Client: %#v", namesrv.srvs)
return &admin{
cli: cli,
opts: defaultOpts,
Expand Down Expand Up @@ -154,13 +157,110 @@ func (a *admin) FetchAllTopicList(ctx context.Context) (*TopicList, error) {
return &topicList, nil
}

// Decode overrides decode method avoid the problem of server-side returned JSON **not** conforming to
// the JSON specification(JSON key should always is string, don't use int as key).
// Related Issue: https://github.com/apache/rocketmq/issues/3369
func (a *ClusterInfo) Decode(data []byte, classOfT interface{}) (interface{}, error) {
jsonStr := utils.RectifyJsonIntKeysByChar(string(data))
return a.FromJson(jsonStr, classOfT)
}

// GetBrokerClusterInfo Get Broker's Cluster Info, Address Table, and so on
func (a *admin) GetBrokerClusterInfo(ctx context.Context) (*ClusterInfo, error) {
cmd := remote.NewRemotingCommand(internal.ReqGetBrokerClusterInfo, nil, nil)
response, err := a.cli.InvokeSync(ctx, a.cli.GetNameSrv().AddrList()[0], cmd, 3*time.Second)
if err != nil {
rlog.Error("Fetch cluster info error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
rlog.Info("Fetch cluster info success", map[string]interface{}{})

var clusterInfo ClusterInfo
_, err = clusterInfo.Decode(response.Body, &clusterInfo)
if err != nil {
rlog.Error("Fetch cluster info decode error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
return &clusterInfo, nil
}

func (a *admin) checkIsTopicDuplicated(ctx context.Context, topic string) (bool, error) {
topicList, err := a.FetchAllTopicList(ctx)
if err != nil {
return false, err
}
for _, t := range topicList.TopicList {
if t == topic {
return true, nil
}
}
return false, nil
}

func (a *admin) FindBrokerAddrByName(ctx context.Context, brokerName string) ([]string, error) {
var brokersAddrList []string
clusterInfo, err := a.GetBrokerClusterInfo(ctx)
if err != nil {
rlog.Error("call GetBrokerClusterInfo error", map[string]interface{}{
rlog.LogKeyBroker: brokerName,
rlog.LogKeyUnderlayError: err,
})
return nil, err
}
// fetch broker addr via broker name
if brokerName != "" {
if val, exist := clusterInfo.BrokerAddrTable[brokerName]; exist {
// only add master broker address
brokersAddrList = append(brokersAddrList, val.BrokerAddresses[internal.MasterId])
} else {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyBroker: brokerName,
rlog.LogKeyUnderlayError: "Broker Name not found",
})
return nil, errors.New("create topic error due to broker name not found")
}
} else {
// not given broker addr and name, then create topic on all broker of default cluster
for _, nestedBrokerAddrData := range clusterInfo.BrokerAddrTable {
// only add master broker address
brokersAddrList = append(brokersAddrList, nestedBrokerAddrData.BrokerAddresses[internal.MasterId])
}
}
return brokersAddrList, nil
}

// CreateTopic create topic.
// TODO: another implementation like sarama, without brokerAddr as input
// Done: another implementation like sarama, without brokerAddr as input
func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
cfg := defaultTopicConfigCreate()
for _, apply := range opts {
apply(&cfg)
}
if cfg.Topic == "" {
rlog.Error("empty topic", map[string]interface{}{})
return errors.New("topic is empty string")
}

if cfg.OptNotOverride {
isExist, err := a.checkIsTopicDuplicated(ctx, cfg.Topic)
if err != nil {
rlog.Error("failed to FetchAllTopicList", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return errors.New("failed to FetchAllTopicList")
}
if isExist {
rlog.Error("same name topic is exist", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyUnderlayError: "topic is duplicated",
})
return errors.New("topic is duplicated")
}
}

request := &internal.CreateTopicRequestHeader{
Topic: cfg.Topic,
Expand All @@ -174,20 +274,39 @@ func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
}

cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
var brokersAddrList []string
if cfg.BrokerAddr == "" {
// we need get broker addr table from RocketMQ server
foundBrokers, err := a.FindBrokerAddrByName(ctx, cfg.BrokerName)
if err != nil {
return err
}
if foundBrokers == nil || len(foundBrokers) == 0 {
return errors.New("broker name not found")
}
brokersAddrList = append(brokersAddrList, foundBrokers...)
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
brokersAddrList = append(brokersAddrList, cfg.BrokerAddr)
}
var invokeErrorStrings []string
for _, brokerAddr := range brokersAddrList {
_, invokeErr := a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
if invokeErr != nil {
invokeErrorStrings = append(invokeErrorStrings, invokeErr.Error())
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: brokerAddr,
rlog.LogKeyUnderlayError: invokeErr,
})
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: brokerAddr,
})
}
}
return err
// go 1.13 not support errors.Join(err, nil, err2, err3), so we join with string repr of error
return fmt.Errorf(strings.Join(invokeErrorStrings, "\n"))
}

// DeleteTopicInBroker delete topic in broker.
Expand Down Expand Up @@ -216,7 +335,7 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
for _, apply := range opts {
apply(&cfg)
}
//delete topic in broker
// delete topic in broker
if cfg.BrokerAddr == "" {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.cli.GetNameSrv().FindBrokerAddrByTopic(cfg.Topic)
Expand All @@ -231,7 +350,7 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
return err
}

//delete topic in nameserver
// delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
a.cli.GetNameSrv().UpdateTopicRouteInfo(cfg.Topic)
cfg.NameSrvAddr = a.cli.GetNameSrv().AddrList()
Expand Down
15 changes: 15 additions & 0 deletions admin/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ func defaultTopicConfigCreate() TopicConfigCreate {
TopicFilterType: "SINGLE_TAG",
TopicSysFlag: 0,
Order: false,
OptNotOverride: false,
}
return opts
}

type TopicConfigCreate struct {
Topic string
BrokerName string
BrokerAddr string
DefaultTopic string
ReadQueueNums int
Expand All @@ -40,6 +42,7 @@ type TopicConfigCreate struct {
TopicFilterType string
TopicSysFlag int
Order bool
OptNotOverride bool // optional flag for avoid override same name topic
}

type OptionCreate func(*TopicConfigCreate)
Expand All @@ -50,6 +53,18 @@ func WithTopicCreate(Topic string) OptionCreate {
}
}

func WithOptNotOverrideCreate(NotOverride bool) OptionCreate {
return func(opts *TopicConfigCreate) {
opts.OptNotOverride = NotOverride
}
}

func WithBrokerNameCreate(BrokerName string) OptionCreate {
return func(opts *TopicConfigCreate) {
opts.BrokerName = BrokerName
}
}

func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
return func(opts *TopicConfigCreate) {
opts.BrokerAddr = BrokerAddr
Expand Down
13 changes: 13 additions & 0 deletions admin/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ type TopicList struct {
RemotingSerializable
}

type NestedBrokerAddrData struct {
BrokerAddresses map[int64]string `json:"brokerAddrs"` // may have multi addresses, master index is 0
Cluster string `json:"cluster"`
BrokerName string `json:"brokerName"`
EnableActingMaster bool `json:"enableActingMaster"`
}

type ClusterInfo struct {
BrokerAddrTable map[string]NestedBrokerAddrData `json:"brokerAddrTable"` // k: broker name
ClusterAddrTable map[string][]string `json:"clusterAddrTable"` // k: cluster name, v: multi broker names
RemotingSerializable
}

type SubscriptionGroupWrapper struct {
SubscriptionGroupTable map[string]SubscriptionGroupConfig
DataVersion DataVersion
Expand Down
32 changes: 27 additions & 5 deletions examples/admin/topic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func main() {
topic := "newOne"
//clusterName := "DefaultCluster"
// clusterName := "DefaultCluster"
nameSrvAddr := []string{"127.0.0.1:9876"}
brokerAddr := "127.0.0.1:10911"

Expand All @@ -46,22 +46,44 @@ func main() {
}
fmt.Println(result.TopicList)

//create topic
// get broker cluster info
clusterInfo, err := testAdmin.GetBrokerClusterInfo(
context.Background(),
)
fmt.Printf("Broker Cluster Info:\n%v\n", clusterInfo)
if err != nil {
fmt.Println("GetBrokerClusterInfo error:", err.Error())
}

// create topic
err = testAdmin.CreateTopic(
context.Background(),
admin.WithTopicCreate(topic),
// admin will resolve broker name to broker address
admin.WithBrokerNameCreate("broker-abc"),
)
if err != nil {
fmt.Println("Create topic error:", err.Error())
}

// try to create same-name topic again
err = testAdmin.CreateTopic(
context.Background(),
admin.WithTopicCreate(topic),
admin.WithBrokerAddrCreate(brokerAddr),
admin.WithOptNotOverrideCreate(true),
)
// it should raise `topic is exist` error if we have set the option `OptNotOverride`
if err != nil {
fmt.Println("Create topic error:", err.Error())
}

//deletetopic
// delete topic
err = testAdmin.DeleteTopic(
context.Background(),
admin.WithTopicDelete(topic),
//admin.WithBrokerAddrDelete(brokerAddr),
//admin.WithNameSrvAddr(nameSrvAddr),
// admin.WithBrokerAddrDelete(brokerAddr),
// admin.WithNameSrvAddr(nameSrvAddr),
)
if err != nil {
fmt.Println("Delete topic error:", err.Error())
Expand Down
Loading