From d69b5cb0595c63a6e543528027304dfb04b42d2d Mon Sep 17 00:00:00 2001 From: Rudro-25 Date: Fri, 9 Aug 2024 16:50:10 +0600 Subject: [PATCH] update zk client Signed-off-by: Rudro-25 --- zookeeper/kubedb_client_builder.go | 98 ++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 5 deletions(-) diff --git a/zookeeper/kubedb_client_builder.go b/zookeeper/kubedb_client_builder.go index 1b529aae..85291908 100644 --- a/zookeeper/kubedb_client_builder.go +++ b/zookeeper/kubedb_client_builder.go @@ -18,7 +18,15 @@ package zookeeper import ( "context" + "errors" "fmt" + core "k8s.io/api/core/v1" + kerr "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + "log" + "strconv" + "strings" "time" "github.com/Shopify/zk" @@ -33,10 +41,13 @@ const ( ) type KubeDBClientBuilder struct { - kc client.Client - db *dbapi.ZooKeeper - podName string - url string + kc client.Client + db *dbapi.ZooKeeper + ctx context.Context + podName string + url string + enableHTTPClient bool + disableAMQPClient bool } func NewKubeDBClientBuilder(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClientBuilder { @@ -46,6 +57,14 @@ func NewKubeDBClientBuilder(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClient } } +// NewKubeDBClientBuilderForHTTP returns a KubeDB client builder only for http client +func NewKubeDBClientBuilderForHTTP(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClientBuilder { + return NewKubeDBClientBuilder(kc, db). + WithContext(context.TODO()). + WithAMQPClientDisabled(). + WithHTTPClientEnabled() +} + func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder { o.podName = podName return o @@ -56,7 +75,22 @@ func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder { return o } -func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client, error) { +func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder { + o.ctx = ctx + return o +} + +func (o *KubeDBClientBuilder) WithHTTPClientEnabled() *KubeDBClientBuilder { + o.enableHTTPClient = true + return o +} + +func (o *KubeDBClientBuilder) WithAMQPClientDisabled() *KubeDBClientBuilder { + o.disableAMQPClient = true + return o +} + +func (o *KubeDBClientBuilder) GetZooKeeperClient() (*Client, error) { var err error if o.podName != "" { o.url = o.getPodURL() @@ -73,6 +107,41 @@ func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client, break } } + + if !o.db.Spec.DisableAuth { + if o.db.Spec.AuthSecret == nil { + klog.Info("Auth-secret not set") + return nil, errors.New("auth-secret is not set") + } + + authSecret := core.Secret{} + err := o.kc.Get(o.ctx, types.NamespacedName{ + Namespace: o.db.Namespace, + Name: o.db.Spec.AuthSecret.Name, + }, &authSecret) + if err != nil { + if kerr.IsNotFound(err) { + klog.Error(err, "Auth-secret not found") + return nil, errors.New("auth-secret is not found") + } + klog.Error(err, "Failed to get auth-secret") + return nil, err + } + + //clientConfig.Net.SASL.Enable = true + username := string(authSecret.Data[core.BasicAuthUsernameKey]) + password := string(authSecret.Data[core.BasicAuthPasswordKey]) + + // Correct the format for the username:password string + authString := fmt.Sprintf("%s:%s", username, password) + + // Add authentication using the properly formatted authString + err = zkConn.AddAuth("digest", []byte(authString)) + if err != nil { + log.Fatalf("Failed to add authentication: %v", err) + } + } + return &Client{ zkConn, }, nil @@ -81,3 +150,22 @@ func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client, func (o *KubeDBClientBuilder) getPodURL() string { return fmt.Sprintf("%v.%v.%v.svc:%d", o.podName, o.db.GoverningServiceName(), o.db.Namespace, kubedb.ZooKeeperClientPort) } + +func (c *Client) RemoveMember(podName string) error { + parts := strings.Split(podName, "-") + nodeID := parts[len(parts)-1] + nodeIDInt, err := strconv.Atoi(nodeID) + if err != nil { + return err + } + nodeID = strconv.Itoa(nodeIDInt + 1) + + // Use the IncrementalReconfig API to remove the specific node from the Zookeeper cluster + _, err = c.Conn.IncrementalReconfig(nil, []string{nodeID}, -1) + if err != nil { + return fmt.Errorf("failed to reconfigure Zookeeper and remove node %s: %w", nodeID, err) + } + + fmt.Printf("Node %s successfully removed from Zookeeper cluster.\n", nodeID) + return nil +}