diff --git a/admin/admin.go b/admin/admin.go index 487c8b44..7a9585aa 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -36,6 +36,7 @@ type Admin interface { GetAllSubscriptionGroup(ctx context.Context, brokerAddr string, timeoutMillis time.Duration) (*SubscriptionGroupWrapper, error) FetchAllTopicList(ctx context.Context) (*TopicList, error) + ExamineBrokerClusterAclConfig(ctx context.Context, brokerAddr string) (*AclConfig, error) //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) Close() error @@ -263,6 +264,29 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error { return nil } +func (a *admin) ExamineBrokerClusterAclConfig(ctx context.Context, brokerAddr string) (*AclConfig, error) { + cmd := remote.NewRemotingCommand(internal.ReqGetBrokerClusterAclConfig, nil, nil) + a.cli.RegisterACL() + response, err := a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second) + if err != nil { + rlog.Error("Get broker acl config error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } else { + rlog.Info("Get broker acl config success", map[string]interface{}{}) + } + var aclConfig AclConfig + _, err = aclConfig.Decode(response.Body, &aclConfig) + if err != nil { + rlog.Error("Get broker acl config decode error", map[string]interface{}{ + rlog.LogKeyUnderlayError: err, + }) + return nil, err + } + return &aclConfig, nil +} + func (a *admin) FetchPublishMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) { return a.cli.GetNameSrv().FetchPublishMessageQueues(utils.WrapNamespace(a.opts.Namespace, topic)) } diff --git a/admin/response.go b/admin/response.go index 7a97236f..4c45c83b 100644 --- a/admin/response.go +++ b/admin/response.go @@ -45,6 +45,7 @@ func (r *RemotingSerializable) ToJson(obj interface{}, prettyFormat bool) string return string(jsonBytes) } } + func (r *RemotingSerializable) Decode(data []byte, classOfT interface{}) (interface{}, error) { jsonStr := string(data) return r.FromJson(jsonStr, classOfT) @@ -86,3 +87,20 @@ type SubscriptionGroupConfig struct { WhichBrokerWhenConsumeSlowly int NotifyConsumerIdsChangedEnable bool } + +type AclConfig struct { + GlobalWhiteAddrs []string `json:"globalWhiteAddrs"` + PlainAccessConfigs []PlainAccessConfig `json:"plainAccessConfigs"` + RemotingSerializable +} + +type PlainAccessConfig struct { + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + WhiteRemoteAddress string `json:"whiteRemoteAddress"` + Admin bool `json:"admin"` + DefaultTopicPerm string `json:"defaultTopicPerm"` + DefaultGroupPerm string `json:"defaultGroupPerm"` + TopicPerms []string `json:"topicPerms"` + GroupPerms []string `json:"groupPerms"` +} diff --git a/examples/admin/acl/main.go b/examples/admin/acl/main.go new file mode 100644 index 00000000..6e249f30 --- /dev/null +++ b/examples/admin/acl/main.go @@ -0,0 +1,50 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "github.com/apache/rocketmq-client-go/v2/admin" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +func main() { + //clusterName := "DefaultCluster" + nameSrvAddr := []string{"127.0.0.1:9876"} + brokerAddr := "127.0.0.1:10911" + + testAdmin, err := admin.NewAdmin( + admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)), + admin.WithCredentials(primitive.Credentials{ + AccessKey: "rocketmq2", + SecretKey: "12345678", + }), + ) + + result, err := testAdmin.ExamineBrokerClusterAclConfig(context.Background(), brokerAddr) + if err != nil { + fmt.Println("ExamineBrokerClusterAclConfig error:", err.Error()) + } + fmt.Println(result) + + err = testAdmin.Close() + if err != nil { + fmt.Printf("Shutdown admin error: %s", err.Error()) + } +} diff --git a/internal/request.go b/internal/request.go index 9a590b8f..fe6da016 100644 --- a/internal/request.go +++ b/internal/request.go @@ -40,6 +40,7 @@ const ( ReqGetConsumerListByGroup = int16(38) ReqLockBatchMQ = int16(41) ReqUnlockBatchMQ = int16(42) + ReqGetBrokerClusterAclConfig = int16(54) ReqGetRouteInfoByTopic = int16(105) ReqGetBrokerClusterInfo = int16(106) ReqSendBatchMessage = int16(320)