Skip to content

Commit

Permalink
update zk client
Browse files Browse the repository at this point in the history
Signed-off-by: Rudro-25 <[email protected]>
  • Loading branch information
Rudro-25 committed Sep 12, 2024
1 parent b8c278d commit d69b5cb
Showing 1 changed file with 93 additions and 5 deletions.
98 changes: 93 additions & 5 deletions zookeeper/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ package zookeeper

import (
"context"
"errors"
"fmt"
core "k8s.io/api/core/v1"

Check failure on line 23 in zookeeper/kubedb_client_builder.go

View workflow job for this annotation

GitHub Actions / Build

File is not `goimports`-ed (goimports)
kerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"log"
"strconv"
"strings"
"time"

"github.com/Shopify/zk"
Expand All @@ -33,10 +41,13 @@ const (
)

type KubeDBClientBuilder struct {
kc client.Client
db *dbapi.ZooKeeper
podName string
url string
kc client.Client
db *dbapi.ZooKeeper
ctx context.Context
podName string
url string
enableHTTPClient bool
disableAMQPClient bool
}

func NewKubeDBClientBuilder(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClientBuilder {
Expand All @@ -46,6 +57,14 @@ func NewKubeDBClientBuilder(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClient
}
}

// NewKubeDBClientBuilderForHTTP returns a KubeDB client builder only for http client
func NewKubeDBClientBuilderForHTTP(kc client.Client, db *dbapi.ZooKeeper) *KubeDBClientBuilder {
return NewKubeDBClientBuilder(kc, db).
WithContext(context.TODO()).
WithAMQPClientDisabled().
WithHTTPClientEnabled()
}

func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder {
o.podName = podName
return o
Expand All @@ -56,7 +75,22 @@ func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
return o
}

func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client, error) {
func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder {
o.ctx = ctx
return o
}

func (o *KubeDBClientBuilder) WithHTTPClientEnabled() *KubeDBClientBuilder {
o.enableHTTPClient = true
return o
}

func (o *KubeDBClientBuilder) WithAMQPClientDisabled() *KubeDBClientBuilder {
o.disableAMQPClient = true
return o
}

func (o *KubeDBClientBuilder) GetZooKeeperClient() (*Client, error) {
var err error
if o.podName != "" {
o.url = o.getPodURL()
Expand All @@ -73,6 +107,41 @@ func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client,
break
}
}

if !o.db.Spec.DisableAuth {
if o.db.Spec.AuthSecret == nil {
klog.Info("Auth-secret not set")
return nil, errors.New("auth-secret is not set")
}

authSecret := core.Secret{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.db.Namespace,
Name: o.db.Spec.AuthSecret.Name,
}, &authSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "Auth-secret not found")
return nil, errors.New("auth-secret is not found")
}
klog.Error(err, "Failed to get auth-secret")
return nil, err
}

//clientConfig.Net.SASL.Enable = true
username := string(authSecret.Data[core.BasicAuthUsernameKey])
password := string(authSecret.Data[core.BasicAuthPasswordKey])

// Correct the format for the username:password string
authString := fmt.Sprintf("%s:%s", username, password)

// Add authentication using the properly formatted authString
err = zkConn.AddAuth("digest", []byte(authString))
if err != nil {
log.Fatalf("Failed to add authentication: %v", err)
}
}

return &Client{
zkConn,
}, nil
Expand All @@ -81,3 +150,22 @@ func (o *KubeDBClientBuilder) GetZooKeeperClient(ctx context.Context) (*Client,
func (o *KubeDBClientBuilder) getPodURL() string {
return fmt.Sprintf("%v.%v.%v.svc:%d", o.podName, o.db.GoverningServiceName(), o.db.Namespace, kubedb.ZooKeeperClientPort)
}

func (c *Client) RemoveMember(podName string) error {
parts := strings.Split(podName, "-")
nodeID := parts[len(parts)-1]
nodeIDInt, err := strconv.Atoi(nodeID)
if err != nil {
return err
}
nodeID = strconv.Itoa(nodeIDInt + 1)

// Use the IncrementalReconfig API to remove the specific node from the Zookeeper cluster
_, err = c.Conn.IncrementalReconfig(nil, []string{nodeID}, -1)
if err != nil {
return fmt.Errorf("failed to reconfigure Zookeeper and remove node %s: %w", nodeID, err)
}

fmt.Printf("Node %s successfully removed from Zookeeper cluster.\n", nodeID)
return nil
}

0 comments on commit d69b5cb

Please sign in to comment.