From a8fb565922a69b031c554f2b89c9ec19456e3030 Mon Sep 17 00:00:00 2001 From: prafull01 Date: Fri, 25 Oct 2024 16:49:04 +0530 Subject: [PATCH] Add httpAddr, sqlAddr, listenAddr in CrdbCluster API and migrated ports to addresses --- apis/v1alpha1/cluster_types.go | 16 +++++++ apis/v1alpha1/webhook.go | 36 ++++++++++------ apis/v1alpha1/webhook_test.go | 6 +-- apis/v1alpha1/zz_generated.deepcopy.go | 15 +++++++ .../crdb.cockroachlabs.com_crdbclusters.yaml | 22 ++++++++-- pkg/actor/decommission.go | 5 ++- pkg/actor/initialize.go | 5 +-- pkg/actor/partitioned_update.go | 3 +- pkg/healthchecker/healthchecker.go | 2 +- pkg/resource/cluster.go | 43 ++++++++++++++++++- pkg/resource/discovery_service.go | 8 ++-- pkg/resource/public_service.go | 6 +-- pkg/resource/resource_test.go | 2 +- pkg/resource/statefulset.go | 24 ++++++++--- pkg/scale/drainer.go | 16 +++---- pkg/scale/scale.go | 4 +- pkg/testutil/builder.go | 4 +- pkg/testutil/require.go | 7 +-- 18 files changed, 168 insertions(+), 56 deletions(-) diff --git a/apis/v1alpha1/cluster_types.go b/apis/v1alpha1/cluster_types.go index 55f8bf969..fa35bf4ec 100644 --- a/apis/v1alpha1/cluster_types.go +++ b/apis/v1alpha1/cluster_types.go @@ -43,15 +43,31 @@ type CrdbClusterSpec struct { // (Optional) The database port (`--listen-addr` CLI parameter when starting the service) // Default: 26258 // +optional + // Deprecated: Use ListenAddr instead of GRPCPort GRPCPort *int32 `json:"grpcPort,omitempty"` + // (Optional) The database port (`--listen-addr` CLI parameter when starting the service) + // Default: ":26258" + // +optional + ListenAddr *string `json:"listenAddr,omitempty"` // (Optional) The web UI port (`--http-addr` CLI parameter when starting the service) // Default: 8080 // +optional + // Deprecated: Use HTTPAddr instead of HTTPPort HTTPPort *int32 `json:"httpPort,omitempty"` + // (Optional) The IP address/hostname and port on which to listen for DB Console HTTP requests. + // (`--http-addr` CLI parameter when starting the service) + // Default: ":8080" + // +optional + HTTPAddr *string `json:"httpAddr,omitempty"` // (Optional) The SQL Port number // Default: 26257 // +optional + // Deprecated: Use SQLAddr instead of SQLPort SQLPort *int32 `json:"sqlPort,omitempty"` + // (Optional) The IP address/hostname and port on which to listen for SQL connections from clients. + // Default: ":26257" + // +optional + SQLAddr *string `json:"sqlAddr,omitempty"` // (Optional) TLSEnabled determines if TLS is enabled for your CockroachDB Cluster // +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="TLS Enabled",xDescriptors="urn:alm:descriptor:com.tectonic.ui:booleanSwitch" // +optional diff --git a/apis/v1alpha1/webhook.go b/apis/v1alpha1/webhook.go index 81e00b227..f9e664832 100644 --- a/apis/v1alpha1/webhook.go +++ b/apis/v1alpha1/webhook.go @@ -29,12 +29,12 @@ import ( ) var ( - // DefaultGRPCPort is the default port used for GRPC communication - DefaultGRPCPort int32 = 26258 - // DefaultSQLPort is the default port used for SQL connections - DefaultSQLPort int32 = 26257 - // DefaultHTTPPort is the default port for the Web UI - DefaultHTTPPort int32 = 8080 + // DefaultGRPCAddr is the default grpc address used for GRPC communication + DefaultGRPCAddr string = ":26258" + // DefaultSQLAddr is the default sql address used for SQL connections + DefaultSQLAddr string = ":26257" + // DefaultHTTPAddr is the default http address for the Web UI + DefaultHTTPAddr string = ":8080" // DefaultMaxUnavailable is the default max unavailable nodes during a rollout DefaultMaxUnavailable int32 = 1 ) @@ -59,16 +59,28 @@ func (r *CrdbCluster) SetupWebhookWithManager(mgr ctrl.Manager) error { func (r *CrdbCluster) Default() { webhookLog.Info("default", "name", r.Name) - if r.Spec.GRPCPort == nil { - r.Spec.GRPCPort = &DefaultGRPCPort + if r.Spec.GRPCPort == nil && r.Spec.ListenAddr == nil { + r.Spec.ListenAddr = &DefaultGRPCAddr + } else if r.Spec.GRPCPort != nil && r.Spec.ListenAddr == nil { + listenAddr := fmt.Sprintf(":%d", *r.Spec.GRPCPort) + r.Spec.ListenAddr = &listenAddr + r.Spec.GRPCPort = nil } - if r.Spec.SQLPort == nil { - r.Spec.SQLPort = &DefaultSQLPort + if r.Spec.SQLPort == nil && r.Spec.SQLAddr == nil { + r.Spec.SQLAddr = &DefaultSQLAddr + } else if r.Spec.SQLPort != nil && r.Spec.SQLAddr == nil { + sqlAddr := fmt.Sprintf(":%d", *r.Spec.SQLPort) + r.Spec.SQLAddr = &sqlAddr + r.Spec.SQLPort = nil } - if r.Spec.HTTPPort == nil { - r.Spec.HTTPPort = &DefaultHTTPPort + if r.Spec.HTTPPort == nil && r.Spec.HTTPAddr == nil { + r.Spec.HTTPAddr = &DefaultHTTPAddr + } else if r.Spec.HTTPPort != nil && r.Spec.HTTPAddr == nil { + httpAddr := fmt.Sprintf(":%d", *r.Spec.HTTPPort) + r.Spec.HTTPAddr = &httpAddr + r.Spec.HTTPPort = nil } if r.Spec.MaxUnavailable == nil && r.Spec.MinAvailable == nil { diff --git a/apis/v1alpha1/webhook_test.go b/apis/v1alpha1/webhook_test.go index 49f3ab189..40d7b3ba9 100644 --- a/apis/v1alpha1/webhook_test.go +++ b/apis/v1alpha1/webhook_test.go @@ -36,9 +36,9 @@ func TestCrdbClusterDefault(t *testing.T) { maxUnavailable := int32(1) policy := v1.PullIfNotPresent expected := CrdbClusterSpec{ - GRPCPort: &DefaultGRPCPort, - HTTPPort: &DefaultHTTPPort, - SQLPort: &DefaultSQLPort, + ListenAddr: &DefaultGRPCAddr, + SQLAddr: &DefaultSQLAddr, + HTTPAddr: &DefaultHTTPAddr, MaxUnavailable: &maxUnavailable, Image: &PodImage{PullPolicyName: &policy}, } diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 14840fe6d..2a61b3bbe 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -135,16 +135,31 @@ func (in *CrdbClusterSpec) DeepCopyInto(out *CrdbClusterSpec) { *out = new(int32) **out = **in } + if in.ListenAddr != nil { + in, out := &in.ListenAddr, &out.ListenAddr + *out = new(string) + **out = **in + } if in.HTTPPort != nil { in, out := &in.HTTPPort, &out.HTTPPort *out = new(int32) **out = **in } + if in.HTTPAddr != nil { + in, out := &in.HTTPAddr, &out.HTTPAddr + *out = new(string) + **out = **in + } if in.SQLPort != nil { in, out := &in.SQLPort, &out.SQLPort *out = new(int32) **out = **in } + if in.SQLAddr != nil { + in, out := &in.SQLAddr, &out.SQLAddr + *out = new(string) + **out = **in + } if in.MaxUnavailable != nil { in, out := &in.MaxUnavailable, &out.MaxUnavailable *out = new(int32) diff --git a/config/crd/bases/crdb.cockroachlabs.com_crdbclusters.yaml b/config/crd/bases/crdb.cockroachlabs.com_crdbclusters.yaml index e662ed482..088ff575f 100644 --- a/config/crd/bases/crdb.cockroachlabs.com_crdbclusters.yaml +++ b/config/crd/bases/crdb.cockroachlabs.com_crdbclusters.yaml @@ -1087,12 +1087,19 @@ spec: type: object grpcPort: description: '(Optional) The database port (`--listen-addr` CLI parameter - when starting the service) Default: 26258' + when starting the service) Default: 26258 Deprecated: Use ListenAddr + instead of GRPCPort' format: int32 type: integer + httpAddr: + description: '(Optional) The IP address/hostname and port on which + to listen for DB Console HTTP requests. (`--http-addr` CLI parameter + when starting the service) Default: ":8080"' + type: string httpPort: description: '(Optional) The web UI port (`--http-addr` CLI parameter - when starting the service) Default: 8080' + when starting the service) Default: 8080 Deprecated: Use HTTPAddr + instead of HTTPPort' format: int32 type: integer image: @@ -1214,6 +1221,10 @@ spec: - host type: object type: object + listenAddr: + description: '(Optional) The database port (`--listen-addr` CLI parameter + when starting the service) Default: ":26258"' + type: string logConfigMap: description: '(Optional) LogConfigMap define the config map which contains log configuration used to send the logs through the proper @@ -1389,8 +1400,13 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + sqlAddr: + description: '(Optional) The IP address/hostname and port on which + to listen for SQL connections from clients. Default: ":26257"' + type: string sqlPort: - description: '(Optional) The SQL Port number Default: 26257' + description: '(Optional) The SQL Port number Default: 26257 Deprecated: + Use SQLAddr instead of SQLPort' format: int32 type: integer tlsEnabled: diff --git a/pkg/actor/decommission.go b/pkg/actor/decommission.go index aa871dcc6..d583c5426 100644 --- a/pkg/actor/decommission.go +++ b/pkg/actor/decommission.go @@ -93,6 +93,7 @@ func (d decommission) Act(ctx context.Context, cluster *resource.Cluster, log lo // The connection needs to use the discovery service name because of the // hostnames in the SSL certificates + sqlPort := cluster.GetSQLPort() conn := &database.DBConnection{ Ctx: ctx, Client: d.client, @@ -100,7 +101,7 @@ func (d decommission) Act(ctx context.Context, cluster *resource.Cluster, log lo ServiceName: serviceName, Namespace: cluster.Namespace(), DatabaseName: "system", // TODO we need to use variable instead of string - Port: cluster.Spec().SQLPort, + Port: &sqlPort, RunningInsideK8s: runningInsideK8s, } @@ -140,7 +141,7 @@ func (d decommission) Act(ctx context.Context, cluster *resource.Cluster, log lo Drainer: drainer, PVCPruner: &pvcPruner, } - if err := scaler.EnsureScale(ctx, nodes, *cluster.Spec().GRPCPort, utilfeature.DefaultMutableFeatureGate.Enabled(features.AutoPrunePVC)); err != nil { + if err := scaler.EnsureScale(ctx, nodes, *cluster.Spec().ListenAddr, utilfeature.DefaultMutableFeatureGate.Enabled(features.AutoPrunePVC)); err != nil { /// now check if the decommissionStaleErr and update status log.Error(err, "decommission failed") cluster.SetFalse(api.DecommissionCondition) diff --git a/pkg/actor/initialize.go b/pkg/actor/initialize.go index a3e367409..cfb01db73 100644 --- a/pkg/actor/initialize.go +++ b/pkg/actor/initialize.go @@ -21,7 +21,6 @@ import ( "fmt" "github.com/go-logr/logr" "k8s.io/client-go/util/retry" - "strconv" "strings" api "github.com/cockroachdb/cockroach-operator/apis/v1alpha1" @@ -92,12 +91,12 @@ func (init initialize) Act(ctx context.Context, cluster *resource.Cluster, log l log.V(DEBUGLEVEL).Info("Pod is ready") - port := strconv.FormatInt(int64(*cluster.Spec().GRPCPort), 10) + listenAddr := cluster.GetListenAddr() cmd := []string{ "/cockroach/cockroach.sh", "init", cluster.SecureMode(), - "--host=localhost:" + port, + "--host=" + listenAddr, } log.V(DEBUGLEVEL).Info(fmt.Sprintf("Executing init in pod %s with phase %s", podName, phase)) diff --git a/pkg/actor/partitioned_update.go b/pkg/actor/partitioned_update.go index 4eddb98d3..4d0a32507 100644 --- a/pkg/actor/partitioned_update.go +++ b/pkg/actor/partitioned_update.go @@ -140,6 +140,7 @@ func (up *partitionedUpdate) Act(ctx context.Context, cluster *resource.Cluster, // The connection needs to use the discovery service name because of the // hostnames in the SSL certificates + sqlPort := cluster.GetSQLPort() conn := &database.DBConnection{ Ctx: ctx, Client: up.client, @@ -147,7 +148,7 @@ func (up *partitionedUpdate) Act(ctx context.Context, cluster *resource.Cluster, ServiceName: serviceName, Namespace: cluster.Namespace(), DatabaseName: "system", // TODO we need to use variable instead of string - Port: cluster.Spec().SQLPort, + Port: &sqlPort, RunningInsideK8s: runningInsideK8s, } diff --git a/pkg/healthchecker/healthchecker.go b/pkg/healthchecker/healthchecker.go index e7a0a1c80..bc4198adb 100644 --- a/pkg/healthchecker/healthchecker.go +++ b/pkg/healthchecker/healthchecker.go @@ -119,7 +119,7 @@ func (hc *HealthCheckerImpl) waitUntilUnderReplicatedMetricIsZero(ctx context.Co // ranges_underreplicated{store="1"} 0 func (hc *HealthCheckerImpl) checkUnderReplicatedMetric(ctx context.Context, l logr.Logger, logSuffix, podname, stsname, stsnamespace string, partition int32) error { l.V(int(zapcore.DebugLevel)).Info("checkUnderReplicatedMetric", "label", logSuffix, "podname", podname, "partition", partition) - port := strconv.FormatInt(int64(*hc.cluster.Spec().HTTPPort), 10) + port := strconv.FormatInt(int64(hc.cluster.GetHTTPPort()), 10) url := fmt.Sprintf("https://%s.%s.%s:%s/_status/vars", podname, stsname, stsnamespace, port) runningInsideK8s := inK8s("/var/run/secrets/kubernetes.io/serviceaccount/token") diff --git a/pkg/resource/cluster.go b/pkg/resource/cluster.go index 86401f77d..c350ff123 100644 --- a/pkg/resource/cluster.go +++ b/pkg/resource/cluster.go @@ -19,6 +19,7 @@ package resource import ( "fmt" "os" + "strconv" "strings" "time" @@ -48,7 +49,6 @@ const ( func NewCluster(original *api.CrdbCluster) Cluster { cr := original.DeepCopy() - cr.Default() timeNow := metav1.Now() condition.InitConditionsIfNeeded(&cr.Status, timeNow) @@ -415,3 +415,44 @@ func (cluster Cluster) IsUIIngressEnabled() bool { func (cluster Cluster) IsSQLIngressEnabled() bool { return cluster.Spec().Ingress != nil && cluster.Spec().Ingress.SQL != nil } + +func (cluster Cluster) GetListenAddr() string { + if cluster.Spec().ListenAddr != nil { + return *cluster.Spec().ListenAddr + } + return fmt.Sprintf(":%d", cluster.GetGRPCPort()) +} + +func (cluster Cluster) GetSQLAddr() string { + if cluster.Spec().SQLAddr != nil { + return *cluster.Spec().SQLAddr + } + return fmt.Sprintf(":%d", cluster.GetSQLPort()) +} + +func (cluster Cluster) GetGRPCPort() int32 { + if cluster.Spec().GRPCPort != nil { + return *cluster.Spec().GRPCPort + } + addr := strings.Split(*cluster.Spec().ListenAddr, ":") + i, _ := strconv.ParseInt(addr[1], 10, 32) + return int32(i) +} + +func (cluster Cluster) GetSQLPort() int32 { + if cluster.Spec().SQLPort != nil { + return *cluster.Spec().SQLPort + } + addr := strings.Split(*cluster.Spec().SQLAddr, ":") + i, _ := strconv.ParseInt(addr[1], 10, 32) + return int32(i) +} + +func (cluster Cluster) GetHTTPPort() int32 { + if cluster.Spec().HTTPPort != nil { + return *cluster.Spec().HTTPPort + } + addr := strings.Split(*cluster.Spec().HTTPAddr, ":") + i, _ := strconv.ParseInt(addr[1], 10, 32) + return int32(i) +} diff --git a/pkg/resource/discovery_service.go b/pkg/resource/discovery_service.go index ea2a20e82..429157547 100644 --- a/pkg/resource/discovery_service.go +++ b/pkg/resource/discovery_service.go @@ -67,9 +67,9 @@ func (b DiscoveryServiceBuilder) Build(obj client.Object) error { ClusterIP: "None", PublishNotReadyAddresses: true, Ports: []corev1.ServicePort{ - {Name: "grpc", Port: *b.Cluster.Spec().GRPCPort}, - {Name: "http", Port: *b.Cluster.Spec().HTTPPort}, - {Name: "sql", Port: *b.Cluster.Spec().SQLPort}, + {Name: "grpc", Port: b.Cluster.GetGRPCPort()}, + {Name: "http", Port: b.Cluster.GetHTTPPort()}, + {Name: "sql", Port: b.Cluster.GetSQLPort()}, }, Selector: b.Selector, } @@ -89,6 +89,6 @@ func (b *DiscoveryServiceBuilder) monitoringAnnotations() map[string]string { return map[string]string{ "prometheus.io/scrape": "true", "prometheus.io/path": "_status/vars", - "prometheus.io/port": fmt.Sprint(*(b.Cluster.Spec().HTTPPort)), + "prometheus.io/port": fmt.Sprint(b.Cluster.GetHTTPPort()), } } diff --git a/pkg/resource/public_service.go b/pkg/resource/public_service.go index 36768a278..be9d5bf6a 100644 --- a/pkg/resource/public_service.go +++ b/pkg/resource/public_service.go @@ -54,9 +54,9 @@ func (b PublicServiceBuilder) Build(obj client.Object) error { service.Spec = corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, Ports: []corev1.ServicePort{ - {Name: "grpc", Port: *b.Cluster.Spec().GRPCPort}, - {Name: "http", Port: *b.Cluster.Spec().HTTPPort}, - {Name: "sql", Port: *b.Cluster.Spec().SQLPort}, + {Name: "grpc", Port: b.Cluster.GetGRPCPort()}, + {Name: "http", Port: b.Cluster.GetHTTPPort()}, + {Name: "sql", Port: b.Cluster.GetSQLPort()}, }, } } diff --git a/pkg/resource/resource_test.go b/pkg/resource/resource_test.go index 16a7849be..ba75f5c44 100644 --- a/pkg/resource/resource_test.go +++ b/pkg/resource/resource_test.go @@ -57,7 +57,7 @@ func TestReconcile(t *testing.T) { { name: "updates object when its spec is different", cluster: testutil.NewBuilder("test-cluster").Namespaced("default"). - WithUID("test-cluster-uid").WithHTTPPort(8443).Cluster(), + WithUID("test-cluster-uid").WithHTTPAddr(":8443").Cluster(), existingObjs: []runtime.Object{makeTestService()}, wantUpserted: true, expected: modifyHTTPPort(8443, makeTestService()), diff --git a/pkg/resource/statefulset.go b/pkg/resource/statefulset.go index fb3114c2e..456badac4 100644 --- a/pkg/resource/statefulset.go +++ b/pkg/resource/statefulset.go @@ -292,17 +292,17 @@ func (b StatefulSetBuilder) MakeContainers() []corev1.Container { Ports: []corev1.ContainerPort{ { Name: grpcPortName, - ContainerPort: *b.Spec().GRPCPort, + ContainerPort: b.GetGRPCPort(), Protocol: corev1.ProtocolTCP, }, { Name: httpPortName, - ContainerPort: *b.Spec().HTTPPort, + ContainerPort: b.GetHTTPPort(), Protocol: corev1.ProtocolTCP, }, { Name: sqlPortName, - ContainerPort: *b.Spec().SQLPort, + ContainerPort: b.GetSQLPort(), Protocol: corev1.ProtocolTCP, }, }, @@ -362,11 +362,21 @@ func (b StatefulSetBuilder) dbArgs() []string { fmt.Sprintf("--advertise-host=$(POD_NAME).%s.%s", b.Cluster.DiscoveryServiceName(), b.Cluster.Namespace()), b.Cluster.SecureMode(), - "--http-addr=:" + fmt.Sprint(*b.Spec().HTTPPort), - "--sql-addr=:" + fmt.Sprint(*b.Spec().SQLPort), - "--listen-addr=:" + fmt.Sprint(*b.Spec().GRPCPort), } + // In order to not trigger the rolling update of the statefulset, we need to keep the order + // of the arguments same as the previous version of the operator which is + // --http-port, --sql-addr and --listen-addr. + if b.Spec().HTTPPort != nil { + fmt.Printf("HTTP PORT: %d\n", *b.Spec().HTTPPort) + aa = append(aa, "--http-port="+fmt.Sprint(*b.Spec().HTTPPort)) + } else if b.Spec().HTTPAddr != nil { + fmt.Printf("HTTP ADDR: %s", *b.Spec().HTTPAddr) + aa = append(aa, "--http-addr="+fmt.Sprint(*b.Spec().HTTPAddr)) + } + + aa = append(aa, "--sql-addr="+fmt.Sprint(b.GetSQLAddr()), "--listen-addr="+fmt.Sprint(b.GetListenAddr())) + if b.Cluster.IsLoggingAPIEnabled() { logConfig, _ := b.Cluster.LoggingConfiguration(b.Cluster.Fetcher) aa = append(aa, fmt.Sprintf("--log=%s", logConfig)) @@ -407,7 +417,7 @@ func (b StatefulSetBuilder) joinStr() string { for i := 0; i < int(b.Spec().Nodes) && i < 3; i++ { seeds = append(seeds, fmt.Sprintf("%s-%d.%s.%s:%d", b.Cluster.StatefulSetName(), i, - b.Cluster.DiscoveryServiceName(), b.Cluster.Namespace(), *b.Cluster.Spec().GRPCPort)) + b.Cluster.DiscoveryServiceName(), b.Cluster.Namespace(), b.Cluster.GetGRPCPort())) } return strings.Join(seeds, ",") diff --git a/pkg/scale/drainer.go b/pkg/scale/drainer.go index f64a7be08..66963099f 100644 --- a/pkg/scale/drainer.go +++ b/pkg/scale/drainer.go @@ -44,7 +44,7 @@ var ( // Drainer interface type Drainer interface { - Decommission(ctx context.Context, replica uint, gRPCPort int32) error + Decommission(ctx context.Context, replica uint, listenAddr string) error } // CockroachNodeDrainer does decommissioning of nodes in the CockroachDB cluster @@ -75,7 +75,7 @@ func NewCockroachNodeDrainer(logger logr.Logger, namespace, ssname string, confi } // Decommission commands the node to start training process and watches for it to complete or fail after timeout -func (d *CockroachNodeDrainer) Decommission(ctx context.Context, replica uint, gRPCPort int32) error { +func (d *CockroachNodeDrainer) Decommission(ctx context.Context, replica uint, listenAddr string) error { lastNodeID, err := d.findNodeID(ctx, replica, d.Executor.StatefulSet) if err != nil { return err @@ -83,7 +83,7 @@ func (d *CockroachNodeDrainer) Decommission(ctx context.Context, replica uint, g d.Logger.V(int(zapcore.InfoLevel)).Info("draining node", "NodeID", lastNodeID) - if err := d.executeDrainCmd(ctx, lastNodeID, gRPCPort); err != nil { + if err := d.executeDrainCmd(ctx, lastNodeID, listenAddr); err != nil { return err } @@ -103,7 +103,7 @@ func (d *CockroachNodeDrainer) Decommission(ctx context.Context, replica uint, g // Node has finished draining successfully if replicas == 0 { - return d.markNodeAsDecommissioned(ctx, lastNodeID, gRPCPort) + return d.markNodeAsDecommissioned(ctx, lastNodeID, listenAddr) } // If no replicas have been moved within our timeout, assume that the KV allocator @@ -193,9 +193,9 @@ func (d *CockroachNodeDrainer) makeDrainStatusChecker(id uint) func(ctx context. } } -func (d *CockroachNodeDrainer) executeDrainCmd(ctx context.Context, id uint, gRPCPort int32) error { +func (d *CockroachNodeDrainer) executeDrainCmd(ctx context.Context, id uint, listenAddr string) error { cmd := []string{ - "./cockroach", "node", "decommission", fmt.Sprintf("%d", id), "--wait=none", fmt.Sprintf("--host=:%d", gRPCPort), + "./cockroach", "node", "decommission", fmt.Sprintf("%d", id), "--wait=none", fmt.Sprintf("--host=%s", listenAddr), } if d.Secure { @@ -214,8 +214,8 @@ func (d *CockroachNodeDrainer) executeDrainCmd(ctx context.Context, id uint, gRP // markNodeAsDecommissioned sets a node as `decommissioned`. This is the final step in decommissioning // a node which will transition it from `decommissioning` to `decommissioned`. This should be executed // after it's confirmed that there are 0 replicas on the node. -func (d *CockroachNodeDrainer) markNodeAsDecommissioned(ctx context.Context, id uint, gRPCPort int32) error { - cmd := []string{"./cockroach", "node", "decommission", fmt.Sprintf("%d", id), fmt.Sprintf("--host=:%d", gRPCPort)} +func (d *CockroachNodeDrainer) markNodeAsDecommissioned(ctx context.Context, id uint, listenAddr string) error { + cmd := []string{"./cockroach", "node", "decommission", fmt.Sprintf("%d", id), fmt.Sprintf("--host=%s", listenAddr)} if d.Secure { cmd = append(cmd, "--certs-dir=cockroach-certs") diff --git a/pkg/scale/scale.go b/pkg/scale/scale.go index cdc315167..b4ace418c 100644 --- a/pkg/scale/scale.go +++ b/pkg/scale/scale.go @@ -44,7 +44,7 @@ type Scaler struct { // In some cases, it may not be possible to full drain a node. In such cases a // ErrDecommissioningStalled will be returned and the node will be left in a // decommissioning state. -func (s *Scaler) EnsureScale(ctx context.Context, scale uint, gRPCPort int32, prunePVC bool) error { +func (s *Scaler) EnsureScale(ctx context.Context, scale uint, listenAddr string, prunePVC bool) error { // Before doing any scaling, prune any PVCs that are not currently in use. // This only needs to be done when scaling up but the operation is a noop // if there are no PVCs not currently in use. @@ -88,7 +88,7 @@ func (s *Scaler) EnsureScale(ctx context.Context, scale uint, gRPCPort int32, pr // TODO (chrisseto): If decommissioning fails due to a timeout // recommission that node before failing this job. // Making use of the on finish hook is likely ideal? - if err := s.Drainer.Decommission(ctx, oneOff, gRPCPort); err != nil { + if err := s.Drainer.Decommission(ctx, oneOff, listenAddr); err != nil { return err } diff --git a/pkg/testutil/builder.go b/pkg/testutil/builder.go index 1ca0f8925..5cb286051 100644 --- a/pkg/testutil/builder.go +++ b/pkg/testutil/builder.go @@ -87,8 +87,8 @@ func (b ClusterBuilder) WithPVDataStore(size string) ClusterBuilder { return b } -func (b ClusterBuilder) WithHTTPPort(port int32) ClusterBuilder { - b.cluster.Spec.HTTPPort = &port +func (b ClusterBuilder) WithHTTPAddr(httpAddr string) ClusterBuilder { + b.cluster.Spec.HTTPAddr = &httpAddr return b } diff --git a/pkg/testutil/require.go b/pkg/testutil/require.go index 65c5c9aeb..9fab89d73 100644 --- a/pkg/testutil/require.go +++ b/pkg/testutil/require.go @@ -218,10 +218,11 @@ func statefulSetIsReady(ss *appsv1.StatefulSet) bool { func RequireDownGradeOptionSet(t *testing.T, sb testenv.DiffingSandbox, b ClusterBuilder, version string) { sb.Mgr.GetConfig() podName := fmt.Sprintf("%s-0.%s", b.Cluster().Name(), b.Cluster().Name()) + sqlPort := b.Cluster().GetSQLPort() conn := &database.DBConnection{ Ctx: context.TODO(), Client: sb.Mgr.GetClient(), - Port: b.Cluster().Spec().SQLPort, + Port: &sqlPort, UseSSL: true, RestConfig: sb.Mgr.GetConfig(), @@ -374,11 +375,11 @@ func requireDatabaseToFunction(t *testing.T, sb testenv.DiffingSandbox, b Cluste t.Log("Testing database function") sb.Mgr.GetConfig() podName := fmt.Sprintf("%s-0.%s", b.Cluster().Name(), b.Cluster().Name()) - + sqlPort := b.Cluster().GetSQLPort() conn := &database.DBConnection{ Ctx: context.TODO(), Client: sb.Mgr.GetClient(), - Port: b.Cluster().Spec().SQLPort, + Port: &sqlPort, UseSSL: useSSL, RestConfig: sb.Mgr.GetConfig(),