Skip to content

Commit

Permalink
Add virtual host setup
Browse files Browse the repository at this point in the history
Signed-off-by: raihankhan <[email protected]>
  • Loading branch information
raihankhan committed Aug 28, 2024
1 parent bb62d05 commit ef95b88
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions rabbitmq/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package rabbitmq
import (
"context"
"errors"

Check failure on line 21 in rabbitmq/kubedb_client_builder.go

View workflow job for this annotation

GitHub Actions / Build

File is not `goimports`-ed (goimports)
amqp "github.com/rabbitmq/amqp091-go"

"fmt"
"strings"

rmqhttp "github.com/michaelklishin/rabbit-hole/v2"
amqp "github.com/rabbitmq/amqp091-go"
core "k8s.io/api/core/v1"
kerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -131,14 +131,34 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
}

rmqClient := &Client{}
defaultVhost := "/"

if !o.disableAMQPClient {
if o.amqpURL == "" {
o.amqpURL = o.GetAMQPconnURL(username, password)
if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, username, password)
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
}

if o.vhost == "" {
o.vhost = o.GetVirtualHostFromURL(o.amqpURL)
vhosts, err := httpClient.ListVhosts()

Check failure on line 146 in rabbitmq/kubedb_client_builder.go

View workflow job for this annotation

GitHub Actions / Build

ineffectual assignment to err (ineffassign)
for _, vhost := range vhosts {
if vhost.Description == "Default virtual host" {
defaultVhost = vhost.Name
break
}
}
rmqClient.HTTPClient = HTTPClient{httpClient}
}

if !o.disableAMQPClient {
if o.amqpURL == "" {
if o.vhost == "" {
o.vhost = defaultVhost
}
o.amqpURL = o.GetAMQPconnURL(username, password, o.vhost)
}

rabbitConnection, err := amqp.DialConfig(o.amqpURL, amqp.Config{
Expand All @@ -153,23 +173,11 @@ func (o *KubeDBClientBuilder) GetRabbitMQClient() (*Client, error) {
rmqClient.AMQPClient = AMQPClient{rabbitConnection}
}

if o.enableHTTPClient {
if o.httpURL == "" {
o.httpURL = o.GetHTTPconnURL()
}
httpClient, err := rmqhttp.NewClient(o.httpURL, username, password)
if err != nil {
klog.Error(err, "Failed to get http client for rabbitmq")
return nil, err
}
rmqClient.HTTPClient = HTTPClient{httpClient}
}

return rmqClient, nil
}

func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d/", username, password, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQAMQPPort)
func (o *KubeDBClientBuilder) GetAMQPconnURL(username string, password string, vhost string) string {
return fmt.Sprintf("amqp://%s:%s@%s.%s.svc.cluster.local:%d%s", username, password, o.db.ServiceName(), o.db.Namespace, kubedb.RabbitMQAMQPPort, vhost)
}

func (o *KubeDBClientBuilder) GetHTTPconnURL() string {
Expand Down

0 comments on commit ef95b88

Please sign in to comment.