From a1c9c7ff35b49cf30b5f05c802737ab8bf2edb5d Mon Sep 17 00:00:00 2001 From: lixiang365 Date: Fri, 25 Oct 2024 18:00:49 +0800 Subject: [PATCH] fix: Fixed MessageQueues not filtering slave broker --- golang/client.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/golang/client.go b/golang/client.go index a51c4cc6..70098922 100644 --- a/golang/client.go +++ b/golang/client.go @@ -38,6 +38,10 @@ import ( "google.golang.org/grpc/metadata" ) +const ( + MASTER_BROKER_ID = 0 +) + type Client interface { GetClientID() string Sign(ctx context.Context) context.Context @@ -335,9 +339,16 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([ return nil, err } + masterRoute := []*v2.MessageQueue{} + for _, messageQueue := range route { + if messageQueue.GetId() == MASTER_BROKER_ID { + masterRoute = append(masterRoute, messageQueue) + } + } + // telemeter to all messageQueues endpointsSet := make(map[string]bool) - for _, messageQueue := range route { + for _, messageQueue := range masterRoute { for _, address := range messageQueue.GetBroker().GetEndpoints().GetAddresses() { target := utils.ParseAddress(address) if _, ok := endpointsSet[target]; ok { @@ -350,8 +361,8 @@ func (cli *defaultClient) getMessageQueues(ctx context.Context, topic string) ([ } } - cli.router.Store(topic, route) - return route, nil + cli.router.Store(topic, masterRoute) + return masterRoute, nil } func (cli *defaultClient) queryRoute(ctx context.Context, topic string, duration time.Duration) ([]*v2.MessageQueue, error) {