Skip to content

Commit

Permalink
Modify some functions
Browse files Browse the repository at this point in the history
Signed-off-by: obaydullahmhs <[email protected]>
  • Loading branch information
obaydullahmhs committed Dec 22, 2023
1 parent dd488a2 commit cd0d824
Show file tree
Hide file tree
Showing 27 changed files with 24,134 additions and 5,057 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/elastic/go-elasticsearch/v6 v6.8.10
github.com/elastic/go-elasticsearch/v7 v7.15.1
github.com/elastic/go-elasticsearch/v8 v8.4.0
github.com/go-logr/logr v1.2.4
github.com/go-resty/resty/v2 v2.10.0
github.com/go-sql-driver/mysql v1.6.0
github.com/lib/pq v1.10.4
Expand All @@ -22,7 +21,7 @@ require (
k8s.io/apimachinery v0.25.4
k8s.io/klog/v2 v2.80.1
kmodules.xyz/client-go v0.25.43
kubedb.dev/apimachinery v0.38.1-0.20231219060351-8b884cb230c8
kubedb.dev/apimachinery v0.39.1-0.20231222063121-fbbd57ae9c72
sigs.k8s.io/controller-runtime v0.13.1
xorm.io/xorm v1.3.2
)
Expand All @@ -42,6 +41,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1554,8 +1554,8 @@ kmodules.xyz/monitoring-agent-api v0.25.6 h1:NxPkk2MRUWusI8L5u1U7w8GBUagnKqK49C2
kmodules.xyz/monitoring-agent-api v0.25.6/go.mod h1:vvgbaYZkVb+KsxeiLRM7xe1M2BkBZVCYcM32ak5bin8=
kmodules.xyz/offshoot-api v0.25.5 h1:erUtTDj9iljikd9CvrCz0E32P5mgEqq1NYxy06lxrNo=
kmodules.xyz/offshoot-api v0.25.5/go.mod h1:wotLtcXWHw6KrWX6Ry2EsHn2I2QTvyLX7gXAuwBjkFc=
kubedb.dev/apimachinery v0.38.1-0.20231219060351-8b884cb230c8 h1:In1sm0OuGkOpY5zynQXPIUyeq+pSlwgE0gCesJOg3/k=
kubedb.dev/apimachinery v0.38.1-0.20231219060351-8b884cb230c8/go.mod h1:hMFtRnxRCZR39lvguqtqq6PZ5dDRDHFjupQZQ37QIo8=
kubedb.dev/apimachinery v0.39.1-0.20231222063121-fbbd57ae9c72 h1:hI9Zsn+QnTMVpANE8P6WGaTq7kObeErvdM1Gs/ZpS7o=
kubedb.dev/apimachinery v0.39.1-0.20231222063121-fbbd57ae9c72/go.mod h1:hMFtRnxRCZR39lvguqtqq6PZ5dDRDHFjupQZQ37QIo8=
lukechampine.com/uint128 v1.1.1 h1:pnxCASz787iMf+02ssImqk6OLt+Z5QHMoZyUXR4z6JU=
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
modernc.org/cc/v3 v3.33.6/go.mod h1:iPJg1pkwXqAV16SNgFBVYmggfMg6xhs+2oiO0vclK3g=
Expand Down
11 changes: 5 additions & 6 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type MessageMetadata struct {
}

func (c *Client) IsDBConnected() (bool, error) {
// TODO: try using refreshcontroller
controller, err := c.RefreshController()
if err != nil || controller == nil {
klog.Error(err, "Failed to Get kafka controller")
Expand All @@ -61,9 +60,9 @@ func (c *Client) IsDBConnected() (bool, error) {
return false, err
}
if connected {
klog.Info(fmt.Sprintf("Successfully connected broker: %s", controller.Addr()))
klog.V(5).Info(fmt.Sprintf("Successfully connected broker: %s", controller.Addr()))
} else {
klog.Info(fmt.Sprintf("Failed to connect broker: %s", controller.Addr()))
klog.Error(fmt.Sprintf("Failed to connect broker: %s", controller.Addr()))
}

return connected, nil
Expand Down Expand Up @@ -121,7 +120,7 @@ func (a *AdminClient) EnsureKafkaTopic(topic string, topicConfig map[string]*str
}
return nil
}
func (c *Client) DeleteTopic(topics ...string) {
func (c *Client) DeleteTopics(topics ...string) {
broker, err := c.RefreshController()
if err != nil {
klog.Error(err, "Failed to refresh controller for kafka-health topic")
Expand Down Expand Up @@ -164,7 +163,7 @@ func (p *ProducerClient) SendMessageWithProducer(partition int32, topic, key, me
return &msgMetadata, nil
}

func (c ConsumerClient) ConsumeMessages(partition int32, topic string, offset int64, signal *chan bool, message *chan MessageMetadata) error {
func (c *ConsumerClient) ConsumeMessages(partition int32, topic string, offset int64, signal *chan bool, message *chan MessageMetadata) error {
var err error
var partitionConsumer kafkago.PartitionConsumer
partitionConsumer, err = c.ConsumePartition(topic, partition, offset)
Expand All @@ -179,7 +178,7 @@ func (c ConsumerClient) ConsumeMessages(partition int32, topic string, offset in
case <-*signal:
return nil
case err := <-partitionConsumer.Errors():
klog.Info(fmt.Sprintf("could not process message, err: %s", err.Error()))
klog.Error(fmt.Sprintf("could not process message, err: %s", err.Error()))
return err
case msg := <-partitionConsumer.Messages():
msgMetadata := MessageMetadata{
Expand Down
2 changes: 1 addition & 1 deletion kafka/connect/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type ResponseBody struct {
KafkaClusterId string `json:"kafka_cluster_id"`
}

func (cc *Client) GetKafkaConnectClusterStatus() (*Response, error) {
func (cc *Client) GetConnectClusterStatus() (*Response, error) {
req := cc.Client.R().SetDoNotParseResponse(true)
res, err := req.Get(cc.Config.api)
if err != nil {
Expand Down
37 changes: 15 additions & 22 deletions kafka/connect/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ import (
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
kapi "kubedb.dev/apimachinery/apis/kafka/v1alpha1"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type KubeDBClientBuilder struct {
kc client.Client
dbConnect *api.KafkaConnectCluster
dbConnect *kapi.ConnectCluster
url string
path string
podName string
ctx context.Context
}

func NewKubeDBClientBuilder(kc client.Client, dbConnect *api.KafkaConnectCluster) *KubeDBClientBuilder {
func NewKubeDBClientBuilder(kc client.Client, dbConnect *kapi.ConnectCluster) *KubeDBClientBuilder {
return &KubeDBClientBuilder{
kc: kc,
dbConnect: dbConnect,
Expand All @@ -59,22 +61,27 @@ func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
return o
}

func (o *KubeDBClientBuilder) WithPath(path string) *KubeDBClientBuilder {
o.path = path
return o
}

func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder {
o.ctx = ctx
return o
}

func (o *KubeDBClientBuilder) GetKafkaConnectClusterClient() (*Client, error) {
func (o *KubeDBClientBuilder) GetConnectClusterClient() (*Client, error) {
config := Config{
host: o.getHostPath(),
api: "/",
host: o.url,
api: o.path,
transport: &http.Transport{
IdleConnTimeout: time.Second * 3,
DialContext: (&net.Dialer{
Timeout: time.Second * 30,
}).DialContext,
},
connectionScheme: o.getConnectionScheme(),
connectionScheme: o.dbConnect.GetConnectionScheme(),
}

// If EnableSSL is true set tls config,
Expand All @@ -83,10 +90,10 @@ func (o *KubeDBClientBuilder) GetKafkaConnectClusterClient() (*Client, error) {
var certSecret core.Secret
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.dbConnect.Namespace,
Name: o.dbConnect.GetCertSecretName(api.KafkaConnectClusterClientCert),
Name: o.dbConnect.GetCertSecretName(kapi.ConnectClusterClientCert),
}, &certSecret)
if err != nil {
klog.Error(err, "failed to get kafka connect cluster client secret")
klog.Error(err, "failed to get connect cluster client secret")
return nil, err
}

Expand Down Expand Up @@ -155,17 +162,3 @@ func (o *KubeDBClientBuilder) GetKafkaConnectClusterClient() (*Client, error) {
Config: &config,
}, nil
}

// return host path in
// format https://svc_name.namespace.svc:8083/
func (o *KubeDBClientBuilder) getHostPath() string {
return fmt.Sprintf("%v://%s.%s.svc:%d", o.getConnectionScheme(), o.dbConnect.ServiceName(), o.dbConnect.GetNamespace(), 8083)
}

// will remove later
func (o *KubeDBClientBuilder) getConnectionScheme() string {
if o.dbConnect.Spec.EnableSSL && o.dbConnect.Spec.TLS != nil {
return "https"
}
return "http"
}
10 changes: 3 additions & 7 deletions kafka/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
clientConfig := kafkago.NewConfig()
if !o.db.Spec.DisableSecurity {
if o.db.Spec.AuthSecret == nil {
klog.Info("Authsecret not set")
klog.Info("Auth-secret not set")
return nil, errors.New("auth-secret is not set")
}

Expand All @@ -77,10 +77,10 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
}, authSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "Authsecret not found")
klog.Error(err, "Auth-secret not found")
return nil, errors.New("auth-secret is not found")
}
klog.Error(err, "Failed to get authsecret")
klog.Error(err, "Failed to get auth-secret")
return nil, err
}

Expand Down Expand Up @@ -136,7 +136,6 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
}

func (o *KubeDBClientBuilder) GetKafkaClient() (*Client, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -155,7 +154,6 @@ func (o *KubeDBClientBuilder) GetKafkaClient() (*Client, error) {
}

func (o *KubeDBClientBuilder) GetKafkaProducerClient() (*ProducerClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -174,7 +172,6 @@ func (o *KubeDBClientBuilder) GetKafkaProducerClient() (*ProducerClient, error)
}

func (o *KubeDBClientBuilder) GetKafkaAdminClient() (*AdminClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -193,7 +190,6 @@ func (o *KubeDBClientBuilder) GetKafkaAdminClient() (*AdminClient, error) {
}

func (o *KubeDBClientBuilder) GetKafkaConsumerClient() (*ConsumerClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand Down
26 changes: 26 additions & 0 deletions vendor/kubedb.dev/apimachinery/apis/kafka/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kafka

const (
// GroupName is the group name use in this package
GroupName = "kafka.kubedb.com"
// MutatorGroupName is the group name used to implement mutating webhooks for types in this package
MutatorGroupName = "mutators." + GroupName
// ValidatorGroupName is the group name used to implement validating webhooks for types in this package
ValidatorGroupName = "validators." + GroupName
)
Loading

0 comments on commit cd0d824

Please sign in to comment.