Skip to content

Commit

Permalink
Add httpAddr, sqlAddr, listenAddr in CrdbCluster API and migrated por…
Browse files Browse the repository at this point in the history
…ts to addresses
  • Loading branch information
prafull01 committed Oct 25, 2024
1 parent a3078de commit a8fb565
Show file tree
Hide file tree
Showing 18 changed files with 168 additions and 56 deletions.
16 changes: 16 additions & 0 deletions apis/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,31 @@ type CrdbClusterSpec struct {
// (Optional) The database port (`--listen-addr` CLI parameter when starting the service)
// Default: 26258
// +optional
// Deprecated: Use ListenAddr instead of GRPCPort
GRPCPort *int32 `json:"grpcPort,omitempty"`
// (Optional) The database port (`--listen-addr` CLI parameter when starting the service)
// Default: ":26258"
// +optional
ListenAddr *string `json:"listenAddr,omitempty"`
// (Optional) The web UI port (`--http-addr` CLI parameter when starting the service)
// Default: 8080
// +optional
// Deprecated: Use HTTPAddr instead of HTTPPort
HTTPPort *int32 `json:"httpPort,omitempty"`
// (Optional) The IP address/hostname and port on which to listen for DB Console HTTP requests.
// (`--http-addr` CLI parameter when starting the service)
// Default: ":8080"
// +optional
HTTPAddr *string `json:"httpAddr,omitempty"`
// (Optional) The SQL Port number
// Default: 26257
// +optional
// Deprecated: Use SQLAddr instead of SQLPort
SQLPort *int32 `json:"sqlPort,omitempty"`
// (Optional) The IP address/hostname and port on which to listen for SQL connections from clients.
// Default: ":26257"
// +optional
SQLAddr *string `json:"sqlAddr,omitempty"`
// (Optional) TLSEnabled determines if TLS is enabled for your CockroachDB Cluster
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="TLS Enabled",xDescriptors="urn:alm:descriptor:com.tectonic.ui:booleanSwitch"
// +optional
Expand Down
36 changes: 24 additions & 12 deletions apis/v1alpha1/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
)

var (
// DefaultGRPCPort is the default port used for GRPC communication
DefaultGRPCPort int32 = 26258
// DefaultSQLPort is the default port used for SQL connections
DefaultSQLPort int32 = 26257
// DefaultHTTPPort is the default port for the Web UI
DefaultHTTPPort int32 = 8080
// DefaultGRPCAddr is the default grpc address used for GRPC communication
DefaultGRPCAddr string = ":26258"
// DefaultSQLAddr is the default sql address used for SQL connections
DefaultSQLAddr string = ":26257"
// DefaultHTTPAddr is the default http address for the Web UI
DefaultHTTPAddr string = ":8080"
// DefaultMaxUnavailable is the default max unavailable nodes during a rollout
DefaultMaxUnavailable int32 = 1
)
Expand All @@ -59,16 +59,28 @@ func (r *CrdbCluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
func (r *CrdbCluster) Default() {
webhookLog.Info("default", "name", r.Name)

if r.Spec.GRPCPort == nil {
r.Spec.GRPCPort = &DefaultGRPCPort
if r.Spec.GRPCPort == nil && r.Spec.ListenAddr == nil {
r.Spec.ListenAddr = &DefaultGRPCAddr
} else if r.Spec.GRPCPort != nil && r.Spec.ListenAddr == nil {
listenAddr := fmt.Sprintf(":%d", *r.Spec.GRPCPort)
r.Spec.ListenAddr = &listenAddr
r.Spec.GRPCPort = nil
}

if r.Spec.SQLPort == nil {
r.Spec.SQLPort = &DefaultSQLPort
if r.Spec.SQLPort == nil && r.Spec.SQLAddr == nil {
r.Spec.SQLAddr = &DefaultSQLAddr
} else if r.Spec.SQLPort != nil && r.Spec.SQLAddr == nil {
sqlAddr := fmt.Sprintf(":%d", *r.Spec.SQLPort)
r.Spec.SQLAddr = &sqlAddr
r.Spec.SQLPort = nil
}

if r.Spec.HTTPPort == nil {
r.Spec.HTTPPort = &DefaultHTTPPort
if r.Spec.HTTPPort == nil && r.Spec.HTTPAddr == nil {
r.Spec.HTTPAddr = &DefaultHTTPAddr
} else if r.Spec.HTTPPort != nil && r.Spec.HTTPAddr == nil {
httpAddr := fmt.Sprintf(":%d", *r.Spec.HTTPPort)
r.Spec.HTTPAddr = &httpAddr
r.Spec.HTTPPort = nil
}

if r.Spec.MaxUnavailable == nil && r.Spec.MinAvailable == nil {
Expand Down
6 changes: 3 additions & 3 deletions apis/v1alpha1/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ func TestCrdbClusterDefault(t *testing.T) {
maxUnavailable := int32(1)
policy := v1.PullIfNotPresent
expected := CrdbClusterSpec{
GRPCPort: &DefaultGRPCPort,
HTTPPort: &DefaultHTTPPort,
SQLPort: &DefaultSQLPort,
ListenAddr: &DefaultGRPCAddr,
SQLAddr: &DefaultSQLAddr,
HTTPAddr: &DefaultHTTPAddr,
MaxUnavailable: &maxUnavailable,
Image: &PodImage{PullPolicyName: &policy},
}
Expand Down
15 changes: 15 additions & 0 deletions apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 19 additions & 3 deletions config/crd/bases/crdb.cockroachlabs.com_crdbclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1087,12 +1087,19 @@ spec:
type: object
grpcPort:
description: '(Optional) The database port (`--listen-addr` CLI parameter
when starting the service) Default: 26258'
when starting the service) Default: 26258 Deprecated: Use ListenAddr
instead of GRPCPort'
format: int32
type: integer
httpAddr:
description: '(Optional) The IP address/hostname and port on which
to listen for DB Console HTTP requests. (`--http-addr` CLI parameter
when starting the service) Default: ":8080"'
type: string
httpPort:
description: '(Optional) The web UI port (`--http-addr` CLI parameter
when starting the service) Default: 8080'
when starting the service) Default: 8080 Deprecated: Use HTTPAddr
instead of HTTPPort'
format: int32
type: integer
image:
Expand Down Expand Up @@ -1214,6 +1221,10 @@ spec:
- host
type: object
type: object
listenAddr:
description: '(Optional) The database port (`--listen-addr` CLI parameter
when starting the service) Default: ":26258"'
type: string
logConfigMap:
description: '(Optional) LogConfigMap define the config map which
contains log configuration used to send the logs through the proper
Expand Down Expand Up @@ -1389,8 +1400,13 @@ spec:
to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object
type: object
sqlAddr:
description: '(Optional) The IP address/hostname and port on which
to listen for SQL connections from clients. Default: ":26257"'
type: string
sqlPort:
description: '(Optional) The SQL Port number Default: 26257'
description: '(Optional) The SQL Port number Default: 26257 Deprecated:
Use SQLAddr instead of SQLPort'
format: int32
type: integer
tlsEnabled:
Expand Down
5 changes: 3 additions & 2 deletions pkg/actor/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ func (d decommission) Act(ctx context.Context, cluster *resource.Cluster, log lo

// The connection needs to use the discovery service name because of the
// hostnames in the SSL certificates
sqlPort := cluster.GetSQLPort()
conn := &database.DBConnection{
Ctx: ctx,
Client: d.client,
RestConfig: d.config,
ServiceName: serviceName,
Namespace: cluster.Namespace(),
DatabaseName: "system", // TODO we need to use variable instead of string
Port: cluster.Spec().SQLPort,
Port: &sqlPort,
RunningInsideK8s: runningInsideK8s,
}

Expand Down Expand Up @@ -140,7 +141,7 @@ func (d decommission) Act(ctx context.Context, cluster *resource.Cluster, log lo
Drainer: drainer,
PVCPruner: &pvcPruner,
}
if err := scaler.EnsureScale(ctx, nodes, *cluster.Spec().GRPCPort, utilfeature.DefaultMutableFeatureGate.Enabled(features.AutoPrunePVC)); err != nil {
if err := scaler.EnsureScale(ctx, nodes, *cluster.Spec().ListenAddr, utilfeature.DefaultMutableFeatureGate.Enabled(features.AutoPrunePVC)); err != nil {
/// now check if the decommissionStaleErr and update status
log.Error(err, "decommission failed")
cluster.SetFalse(api.DecommissionCondition)
Expand Down
5 changes: 2 additions & 3 deletions pkg/actor/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"github.com/go-logr/logr"
"k8s.io/client-go/util/retry"
"strconv"
"strings"

api "github.com/cockroachdb/cockroach-operator/apis/v1alpha1"
Expand Down Expand Up @@ -92,12 +91,12 @@ func (init initialize) Act(ctx context.Context, cluster *resource.Cluster, log l

log.V(DEBUGLEVEL).Info("Pod is ready")

port := strconv.FormatInt(int64(*cluster.Spec().GRPCPort), 10)
listenAddr := cluster.GetListenAddr()
cmd := []string{
"/cockroach/cockroach.sh",
"init",
cluster.SecureMode(),
"--host=localhost:" + port,
"--host=" + listenAddr,
}

log.V(DEBUGLEVEL).Info(fmt.Sprintf("Executing init in pod %s with phase %s", podName, phase))
Expand Down
3 changes: 2 additions & 1 deletion pkg/actor/partitioned_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ func (up *partitionedUpdate) Act(ctx context.Context, cluster *resource.Cluster,

// The connection needs to use the discovery service name because of the
// hostnames in the SSL certificates
sqlPort := cluster.GetSQLPort()
conn := &database.DBConnection{
Ctx: ctx,
Client: up.client,
RestConfig: up.config,
ServiceName: serviceName,
Namespace: cluster.Namespace(),
DatabaseName: "system", // TODO we need to use variable instead of string
Port: cluster.Spec().SQLPort,
Port: &sqlPort,
RunningInsideK8s: runningInsideK8s,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/healthchecker/healthchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (hc *HealthCheckerImpl) waitUntilUnderReplicatedMetricIsZero(ctx context.Co
// ranges_underreplicated{store="1"} 0
func (hc *HealthCheckerImpl) checkUnderReplicatedMetric(ctx context.Context, l logr.Logger, logSuffix, podname, stsname, stsnamespace string, partition int32) error {
l.V(int(zapcore.DebugLevel)).Info("checkUnderReplicatedMetric", "label", logSuffix, "podname", podname, "partition", partition)
port := strconv.FormatInt(int64(*hc.cluster.Spec().HTTPPort), 10)
port := strconv.FormatInt(int64(hc.cluster.GetHTTPPort()), 10)
url := fmt.Sprintf("https://%s.%s.%s:%s/_status/vars", podname, stsname, stsnamespace, port)

runningInsideK8s := inK8s("/var/run/secrets/kubernetes.io/serviceaccount/token")
Expand Down
43 changes: 42 additions & 1 deletion pkg/resource/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package resource
import (
"fmt"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -48,7 +49,6 @@ const (

func NewCluster(original *api.CrdbCluster) Cluster {
cr := original.DeepCopy()
cr.Default()

timeNow := metav1.Now()
condition.InitConditionsIfNeeded(&cr.Status, timeNow)
Expand Down Expand Up @@ -415,3 +415,44 @@ func (cluster Cluster) IsUIIngressEnabled() bool {
func (cluster Cluster) IsSQLIngressEnabled() bool {
return cluster.Spec().Ingress != nil && cluster.Spec().Ingress.SQL != nil
}

func (cluster Cluster) GetListenAddr() string {
if cluster.Spec().ListenAddr != nil {
return *cluster.Spec().ListenAddr
}
return fmt.Sprintf(":%d", cluster.GetGRPCPort())
}

func (cluster Cluster) GetSQLAddr() string {
if cluster.Spec().SQLAddr != nil {
return *cluster.Spec().SQLAddr
}
return fmt.Sprintf(":%d", cluster.GetSQLPort())
}

func (cluster Cluster) GetGRPCPort() int32 {
if cluster.Spec().GRPCPort != nil {
return *cluster.Spec().GRPCPort
}
addr := strings.Split(*cluster.Spec().ListenAddr, ":")
i, _ := strconv.ParseInt(addr[1], 10, 32)
return int32(i)
}

func (cluster Cluster) GetSQLPort() int32 {
if cluster.Spec().SQLPort != nil {
return *cluster.Spec().SQLPort
}
addr := strings.Split(*cluster.Spec().SQLAddr, ":")
i, _ := strconv.ParseInt(addr[1], 10, 32)
return int32(i)
}

func (cluster Cluster) GetHTTPPort() int32 {
if cluster.Spec().HTTPPort != nil {
return *cluster.Spec().HTTPPort
}
addr := strings.Split(*cluster.Spec().HTTPAddr, ":")
i, _ := strconv.ParseInt(addr[1], 10, 32)
return int32(i)
}
8 changes: 4 additions & 4 deletions pkg/resource/discovery_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (b DiscoveryServiceBuilder) Build(obj client.Object) error {
ClusterIP: "None",
PublishNotReadyAddresses: true,
Ports: []corev1.ServicePort{
{Name: "grpc", Port: *b.Cluster.Spec().GRPCPort},
{Name: "http", Port: *b.Cluster.Spec().HTTPPort},
{Name: "sql", Port: *b.Cluster.Spec().SQLPort},
{Name: "grpc", Port: b.Cluster.GetGRPCPort()},
{Name: "http", Port: b.Cluster.GetHTTPPort()},
{Name: "sql", Port: b.Cluster.GetSQLPort()},
},
Selector: b.Selector,
}
Expand All @@ -89,6 +89,6 @@ func (b *DiscoveryServiceBuilder) monitoringAnnotations() map[string]string {
return map[string]string{
"prometheus.io/scrape": "true",
"prometheus.io/path": "_status/vars",
"prometheus.io/port": fmt.Sprint(*(b.Cluster.Spec().HTTPPort)),
"prometheus.io/port": fmt.Sprint(b.Cluster.GetHTTPPort()),
}
}
6 changes: 3 additions & 3 deletions pkg/resource/public_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func (b PublicServiceBuilder) Build(obj client.Object) error {
service.Spec = corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{Name: "grpc", Port: *b.Cluster.Spec().GRPCPort},
{Name: "http", Port: *b.Cluster.Spec().HTTPPort},
{Name: "sql", Port: *b.Cluster.Spec().SQLPort},
{Name: "grpc", Port: b.Cluster.GetGRPCPort()},
{Name: "http", Port: b.Cluster.GetHTTPPort()},
{Name: "sql", Port: b.Cluster.GetSQLPort()},
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/resource/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReconcile(t *testing.T) {
{
name: "updates object when its spec is different",
cluster: testutil.NewBuilder("test-cluster").Namespaced("default").
WithUID("test-cluster-uid").WithHTTPPort(8443).Cluster(),
WithUID("test-cluster-uid").WithHTTPAddr(":8443").Cluster(),
existingObjs: []runtime.Object{makeTestService()},
wantUpserted: true,
expected: modifyHTTPPort(8443, makeTestService()),
Expand Down
Loading

0 comments on commit a8fb565

Please sign in to comment.