diff --git a/elasticsearch/client.go b/elasticsearch/client.go index 2ffdbfa67..2e04bbb62 100644 --- a/elasticsearch/client.go +++ b/elasticsearch/client.go @@ -16,6 +16,13 @@ limitations under the License. package elasticsearch +import "github.com/go-resty/resty/v2" + type Client struct { ESClient } + +type ESRestyClient struct { + Client *resty.Client + Config *Config +} diff --git a/elasticsearch/kubedb_client_builder.go b/elasticsearch/kubedb_client_builder.go index ad86b5a37..fdc728992 100644 --- a/elasticsearch/kubedb_client_builder.go +++ b/elasticsearch/kubedb_client_builder.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "crypto/x509" + "encoding/json" "fmt" "io" "net" @@ -36,6 +37,7 @@ import ( esv7 "github.com/elastic/go-elasticsearch/v7" esv8 "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/go-resty/resty/v2" osv1 "github.com/opensearch-project/opensearch-go" osapiv1 "github.com/opensearch-project/opensearch-go/opensearchapi" osv2 "github.com/opensearch-project/opensearch-go/v2" @@ -412,6 +414,92 @@ func (o *KubeDBClientBuilder) GetElasticClient() (*Client, error) { return nil, fmt.Errorf("unknown database version: %s", o.db.Spec.Version) } +type Config struct { + host string + api string + transport *http.Transport +} + +func (o *KubeDBClientBuilder) GetElasticRestyClient() (*ESRestyClient, error) { + config := Config{ + host: o.ServiceURL(), + api: "/_cluster/health?pretty", + transport: &http.Transport{ + IdleConnTimeout: time.Second * 3, + DialContext: (&net.Dialer{ + Timeout: time.Second * 30, + }).DialContext, + }, + } + + var authSecret core.Secret + var username, password string + if !o.db.Spec.DisableSecurity && o.db.Spec.AuthSecret != nil { + err := o.kc.Get(o.ctx, client.ObjectKey{Namespace: o.db.Namespace, Name: o.db.Spec.AuthSecret.Name}, &authSecret) + if err != nil { + return nil, errors.Errorf("Failed to get auth secret with %s", err) + } + + if value, ok := authSecret.Data[core.BasicAuthUsernameKey]; ok { + username = string(value) + } else { + klog.Errorf("Failed for secret: %s/%s, username is missing", authSecret.Namespace, authSecret.Name) + return nil, errors.New("username is missing") + } + + if value, ok := authSecret.Data[core.BasicAuthPasswordKey]; ok { + password = string(value) + } else { + klog.Errorf("Failed for secret: %s/%s, password is missing", authSecret.Namespace, authSecret.Name) + return nil, errors.New("password is missing") + } + } + + defaultTlsConfig, err := o.getDefaultTLSConfig() + if err != nil { + klog.Errorf("Failed to get default tls config: %v", err) + } + config.transport.TLSClientConfig = defaultTlsConfig + newClient := resty.New() + newClient.SetTransport(config.transport).SetScheme(o.db.GetConnectionScheme()).SetBaseURL(config.host) + newClient.SetHeader("Accept", "application/json") + newClient.SetBasicAuth(username, password) + newClient.SetTimeout(time.Second * 30) + return &ESRestyClient{ + Client: newClient, + Config: &config, + }, nil +} + +func (client *ESRestyClient) Ping() (string, error) { + req := client.Client.R().SetDoNotParseResponse(true) + res, err := req.Get(client.Config.api) + if res != nil { + if res.StatusCode() != 200 { + klog.Error("stauscode is not 200") + return "", errors.New("statuscode is not 200") + } + } else if res == nil { + return "", errors.New("response can not be nil") + } + if err != nil { + klog.Error(err, res.StatusCode(), "Failed to send http request") + return "", err + } + body := res.RawBody() + responseBody := make(map[string]interface{}) + if err := json.NewDecoder(body).Decode(&responseBody); err != nil { + return "", fmt.Errorf("failed to deserialize the response: %v", err) + } + if val, ok := responseBody["status"]; ok { + if strValue, ok := val.(string); ok { + return strValue, nil + } + return "", errors.New("failed to convert response to string") + } + return "", errors.New("status is missing") +} + func (o *KubeDBClientBuilder) getDefaultTLSConfig() (*tls.Config, error) { var crt tls.Certificate var clientCA, rootCA *x509.CertPool