diff --git a/Makefile b/Makefile index 6084a30..94d36ce 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,6 @@ # Image URL to use all building/pushing image targets IMG ?= controller:latest # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. -ENVTEST_K8S_VERSION = 1.31.1 # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) @@ -28,8 +27,9 @@ CONTAINER_TOOL ?= docker SHELL = /usr/bin/env bash -o pipefail .SHELLFLAGS = -ec -K8S_VERSION ?= 1.31.2 -CILIUM_VERSION ?= 1.16.3 +K8S_VERSION ?= 1.31.3 +ENVTEST_K8S_VERSION = $(K8S_VERSION) +CILIUM_VERSION ?= 1.16.4 V ?= 0 ifeq ($(V), 1) diff --git a/api/v1/valkey_types.go b/api/v1/valkey_types.go index d857cf2..8b07af1 100644 --- a/api/v1/valkey_types.go +++ b/api/v1/valkey_types.go @@ -35,9 +35,9 @@ type ValkeySpec struct { // Exporter Image to use ExporterImage string `json:"exporterImage,omitempty"` - // Number of nodes + // Number of shards // +kubebuilder:default:=3 - Nodes int32 `json:"nodes,omitempty"` + Shards int32 `json:"nodes,omitempty"` // Number of replicas // +kubebuilder:default:=0 @@ -72,6 +72,58 @@ type ValkeySpec struct { // Resources requirements and limits for the Valkey Server container Resources *corev1.ResourceRequirements `json:"resources,omitempty"` + + // External access configuration + ExternalAccess *ExternalAccess `json:"externalAccess,omitempty"` +} + +// ExternalAccess defines the external access configuration +type ExternalAccess struct { + // Enable external access + // +kubebuilder:default:=false + Enabled bool `json:"enabled,omitempty"` + + // External access type + // LoadBalancer or Proxy, the LoadBalancer type will create a LoadBalancer service for each Valkey Shard (master node) + // The Proxy type will create a single LoadBalancer service and use an envoy proxy to route traffic to the Valkey Shards + // +kubebuilder:default:=Proxy + // +kubebuilder:validation:Enum=LoadBalancer;Proxy + Type string `json:"type,omitempty"` + + // Proxy Settings + Proxy *ProxySettings `json:"proxy,omitempty"` + + // LoadBalancer Settings + LoadBalancer *LoadBalancerSettings `json:"loadBalancer,omitempty"` + + // Support External DNS + // +kubebuilder:default:=false + ExternalDNS bool `json:"externalDNS,omitempty"` +} + +// ProxySettings defines the proxy settings +type ProxySettings struct { + // Image to use for the proxy + // +kubebuilder:default:="envoyproxy/envoy:v1.32.1" + Image string `json:"image,omitempty"` + // Resources requirements and limits for the proxy container + Resources *corev1.ResourceRequirements `json:"resources,omitempty"` + + // Extra Envoy configuration + ExtraConfig string `json:"extraConfig,omitempty"` + + // Annotations for the proxy service + Annotations map[string]string `json:"annotations,omitempty"` + + // Replicas for the proxy + // +kubebuilder:default:=1 + Replicas *int32 `json:"replicas,omitempty"` +} + +// LoadBalancerSettings defines the load balancer settings +type LoadBalancerSettings struct { + // Annotations for the load balancer service + Annotations map[string]string `json:"annotations,omitempty"` } // ValkeyStatus defines the observed state of Valkey diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 29b543a..29143da 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -26,6 +26,85 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExternalAccess) DeepCopyInto(out *ExternalAccess) { + *out = *in + if in.Proxy != nil { + in, out := &in.Proxy, &out.Proxy + *out = new(ProxySettings) + (*in).DeepCopyInto(*out) + } + if in.LoadBalancer != nil { + in, out := &in.LoadBalancer, &out.LoadBalancer + *out = new(LoadBalancerSettings) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExternalAccess. +func (in *ExternalAccess) DeepCopy() *ExternalAccess { + if in == nil { + return nil + } + out := new(ExternalAccess) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LoadBalancerSettings) DeepCopyInto(out *LoadBalancerSettings) { + *out = *in + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoadBalancerSettings. +func (in *LoadBalancerSettings) DeepCopy() *LoadBalancerSettings { + if in == nil { + return nil + } + out := new(LoadBalancerSettings) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProxySettings) DeepCopyInto(out *ProxySettings) { + *out = *in + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = new(corev1.ResourceRequirements) + (*in).DeepCopyInto(*out) + } + if in.Annotations != nil { + in, out := &in.Annotations, &out.Annotations + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProxySettings. +func (in *ProxySettings) DeepCopy() *ProxySettings { + if in == nil { + return nil + } + out := new(ProxySettings) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Valkey) DeepCopyInto(out *Valkey) { *out = *in @@ -105,6 +184,11 @@ func (in *ValkeySpec) DeepCopyInto(out *ValkeySpec) { *out = new(corev1.ResourceRequirements) (*in).DeepCopyInto(*out) } + if in.ExternalAccess != nil { + in, out := &in.ExternalAccess, &out.ExternalAccess + *out = new(ExternalAccess) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ValkeySpec. diff --git a/config/crd/bases/hyperspike.io_valkeys.yaml b/config/crd/bases/hyperspike.io_valkeys.yaml index e201e87..c32f03d 100644 --- a/config/crd/bases/hyperspike.io_valkeys.yaml +++ b/config/crd/bases/hyperspike.io_valkeys.yaml @@ -79,12 +79,124 @@ spec: exporterImage: description: Exporter Image to use type: string + externalAccess: + description: External access configuration + properties: + enabled: + default: false + description: Enable external access + type: boolean + externalDNS: + default: false + description: Support External DNS + type: boolean + loadBalancer: + description: LoadBalancer Settings + properties: + annotations: + additionalProperties: + type: string + description: Annotations for the load balancer service + type: object + type: object + proxy: + description: Proxy Settings + properties: + annotations: + additionalProperties: + type: string + description: Annotations for the proxy service + type: object + extraConfig: + description: Extra Envoy configuration + type: string + image: + default: envoyproxy/envoy:v1.32.1 + description: Image to use for the proxy + type: string + replicas: + default: 1 + description: Replicas for the proxy + format: int32 + type: integer + resources: + description: Resources requirements and limits for the proxy + container + properties: + claims: + description: |- + Claims lists the names of resources, defined in spec.resourceClaims, + that are used by this container. + + This is an alpha field and requires enabling the + DynamicResourceAllocation feature gate. + + This field is immutable. It can only be set for containers. + items: + description: ResourceClaim references one entry in PodSpec.ResourceClaims. + properties: + name: + description: |- + Name must match the name of one entry in pod.spec.resourceClaims of + the Pod where this field is used. It makes that resource available + inside a container. + type: string + request: + description: |- + Request is the name chosen for a request in the referenced claim. + If empty, everything from the claim is made available, otherwise + only the result of this request. + type: string + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map + limits: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Limits describes the maximum amount of compute resources allowed. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + requests: + additionalProperties: + anyOf: + - type: integer + - type: string + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + description: |- + Requests describes the minimum amount of compute resources required. + If Requests is omitted for a container, it defaults to Limits if that is explicitly specified, + otherwise to an implementation-defined value. Requests cannot exceed Limits. + More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + type: object + type: object + type: object + type: + default: Proxy + description: |- + External access type + LoadBalancer or Proxy, the LoadBalancer type will create a LoadBalancer service for each Valkey Shard (master node) + The Proxy type will create a single LoadBalancer service and use an envoy proxy to route traffic to the Valkey Shards + enum: + - LoadBalancer + - Proxy + type: string + type: object image: description: Image to use type: string nodes: default: 3 - description: Number of nodes + description: Number of shards format: int32 type: integer prometheus: diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 1e909ad..ca6d171 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -5,4 +5,4 @@ kind: Kustomization images: - name: controller newName: localhost:5000/controller - newTag: "1" + newTag: "24" diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 89e53cd..be75eb8 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -75,7 +75,7 @@ spec: - command: - /manager args: - - --leader-elect + - --leader-elect=false - --health-probe-bind-address=:8081 image: controller:latest name: manager diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 779d478..f0fa8c7 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -30,6 +30,7 @@ rules: - apiGroups: - apps resources: + - deployments - statefulsets verbs: - create diff --git a/internal/controller/valkey_controller.go b/internal/controller/valkey_controller.go index ee61d5a..f5bb875 100644 --- a/internal/controller/valkey_controller.go +++ b/internal/controller/valkey_controller.go @@ -55,7 +55,9 @@ import ( ) const ( - Metrics = "metrics" + Metrics = "metrics" + LoadBalancer = "LoadBalancer" + ValkeyProxy = "valkey-proxy" ) func init() { @@ -100,7 +102,7 @@ var scripts embed.FS // +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch -// +kubebuilder:rbac:groups="apps",resources=statefulsets,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups="apps",resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch // +kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;list;watch;create;update;patch;delete @@ -114,7 +116,7 @@ var scripts embed.FS // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.18.2/pkg/reconcile -func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // nolint:gocyclo _ = log.FromContext(ctx) valkey := &hyperv1.Valkey{} @@ -143,6 +145,31 @@ func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } } + externalAccess := false + externalType := "" + if valkey.Spec.ExternalAccess != nil && valkey.Spec.ExternalAccess.Enabled { + externalAccess = true + } + if externalAccess { + externalType = valkey.Spec.ExternalAccess.Type + } + if externalAccess && externalType == LoadBalancer { + if err := r.upsertExternalAccessLBSvc(ctx, valkey); err != nil { + return ctrl.Result{}, err + } + } + if externalAccess && externalType == "Proxy" { + if err := r.upsertExternalAccessProxySecret(ctx, valkey); err != nil { + return ctrl.Result{}, err + } + if err := r.upsertExternalAccessProxySvc(ctx, valkey); err != nil { + return ctrl.Result{}, err + } + if err := r.upsertExternalAccessProxyDeployment(ctx, valkey); err != nil { + return ctrl.Result{}, err + } + } + if valkey.Spec.TLS { if err := r.upsertCertificate(ctx, valkey); err != nil { return ctrl.Result{}, err @@ -162,9 +189,19 @@ func (r *ValkeyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr if err := r.checkState(ctx, valkey, password); err != nil { return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 3}, nil } - if err := r.balanceNodes(ctx, valkey); err != nil { + if externalType != LoadBalancer { + if err := r.balanceNodes(ctx, valkey); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + } + if !valkey.Status.Ready { return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } + if externalAccess && externalType == LoadBalancer { + if err := r.setClusterAnnounceIp(ctx, valkey); err != nil { + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil + } + } return ctrl.Result{}, nil } @@ -363,6 +400,444 @@ func (r *ValkeyReconciler) upsertConfigMap(ctx context.Context, valkey *hyperv1. } return nil } + +func (r *ValkeyReconciler) GetPassword(ctx context.Context, valkey *hyperv1.Valkey) (string, error) { + logger := log.FromContext(ctx) + + secret := &corev1.Secret{} + if err := r.Get(ctx, types.NamespacedName{Namespace: valkey.Namespace, Name: valkey.Name}, secret); err != nil { + logger.Error(err, "failed to get secret") + return "", err + } + return string(secret.Data["password"]), nil +} + +func (r *ValkeyReconciler) setClusterAnnounceIp(ctx context.Context, valkey *hyperv1.Valkey) error { + logger := log.FromContext(ctx) + + logger.Info("setting cluster announce ip", "valkey", valkey.Name, "namespace", valkey.Namespace) + + ips, err := r.fetchExternalIPs(ctx, valkey) + if err != nil { + return err + } + if len(ips) == 0 { + return errors.NewBadRequest("external ip is empty") + } + password, err := r.GetPassword(ctx, valkey) + if err != nil { + logger.Error(err, "failed to get password") + return err + } + clients := map[string]valkeyClient.Client{} + for podName, ip := range ips { + address := podName + "." + valkey.Name + "-headless." + valkey.Namespace + ":6379" + logger.Info("working on node", "ip", ip, "pod", podName, "address", address) + opt := valkeyClient.ClientOption{ + InitAddress: []string{address}, + Password: password, + ForceSingleClient: true, + } + if valkey.Spec.TLS { + ca, err := r.getCACertificate(ctx, valkey) + if err != nil { + logger.Error(err, "failed to get ca certificate") + return err + } + if ca == "" { + return fmt.Errorf("ca certificate not ready") + } + certpool, err := x509.SystemCertPool() + if err != nil { + logger.Error(err, "failed to get system cert pool") + return err + } + certpool.AppendCertsFromPEM([]byte(ca)) + opt.TLSConfig = &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: certpool, + } + } + clients[podName], err = valkeyClient.NewClient(opt) + if err != nil { + logger.Error(err, "failed to create valkey client", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + defer clients[podName].Close() + logger.Info("setting cluster announce ip", "valkey", valkey.Name, "namespace", valkey.Namespace, "ip", ip, "pod", podName) + r.Recorder.Event(valkey, "Normal", "Setting", + fmt.Sprintf("Setting cluster announce ip %s on pod %s for %s/%s", ip, podName, valkey.Namespace, valkey.Name)) + + out, err := clients[podName].Do(ctx, clients[podName].B().ConfigSet().ParameterValue().ParameterValue("cluster-announce-ip", ip).Build()).ToString() + if err != nil { + logger.Error(err, "failed to set cluster announce ip "+out, "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + cfgs, err := clients[podName].Do(ctx, clients[podName].B().ConfigGet().Parameter("cluster-announce-ip").Build()).ToMap() + if err != nil { + logger.Error(err, "failed to get cluster announce ip") + } + for k, v := range cfgs { + str, _ := v.ToString() + if str != ip { + logger.Error(err, "failed to set cluster announce ip ", k, str) + } + } + time.Sleep(time.Second * 1) + } + /* + for podName, _ := range ips { + for shard, shardIp := range ips { + if shard == podName { + continue + } + time.Sleep(time.Second * 1) + logger.Info("node meeting peer", "valkey", valkey.Name, "namespace", valkey.Namespace, "peer", shardIp, "pod", podName) + r.Recorder.Event(valkey, "Normal", "Setting", + fmt.Sprintf("Node meeting peer %s on pod %s for %s/%s", shardIp, podName, valkey.Namespace, valkey.Name)) + if err := clients[podName].Do(ctx, clients[podName].B().ClusterMeet().Ip(shardIp).Port(6379).Build()).Error(); err != nil { + logger.Error(err, "failed to cluster meet", "valkey", valkey.Name, "namespace", valkey.Namespace, "shard", shard, "ip", shardIp, "pod", podName) + } + } + } + */ + return nil +} + +func (r *ValkeyReconciler) fetchExternalIPs(ctx context.Context, valkey *hyperv1.Valkey) (map[string]string, error) { + logger := log.FromContext(ctx) + + ips := map[string]string{} + svcs := &corev1.ServiceList{} + if err := r.List(ctx, svcs, client.InNamespace(valkey.Namespace)); err != nil { + logger.Error(err, "failed to list services", "valkey", valkey.Name, "namespace", valkey.Namespace) + return nil, err + } + for _, svc := range svcs.Items { + if svc.Labels["app.kubernetes.io/component"] == "valkey-external" && svc.Labels["app.kubernetes.io/instance"] == valkey.Name { + podName := strings.Replace(svc.Name, "-external", "", -1) + if svc.Status.LoadBalancer.Ingress == nil || len(svc.Status.LoadBalancer.Ingress) == 0 { // nolint:gosimple + logger.Info("external ip is empty", "valkey", valkey.Name, "namespace", valkey.Namespace) + return nil, nil + } + ip := svc.Status.LoadBalancer.Ingress[0].IP + if ip == "" { + logger.Info("external ip is empty", "valkey") + return nil, nil + } + logger.Info("external ip", "pod", podName, "ip", ip) + ips[podName] = ip + } + } + return ips, nil +} + +func (r *ValkeyReconciler) upsertExternalAccessLBSvc(ctx context.Context, valkey *hyperv1.Valkey) error { + logger := log.FromContext(ctx) + + logger.Info("upserting external access (NodePort/LoadBalancer)", "valkey", valkey.Name, "namespace", valkey.Namespace) + + for i := 0; i < int(valkey.Spec.Shards); i++ { + selectorLabels := labels(valkey) + selectorLabels["apps.kubernetes.io/pod-index"] = fmt.Sprintf("%d", i) + svcLabels := labels(valkey) + svcLabels["app.kubernetes.io/component"] = "valkey-external" + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-external-%d", valkey.Name, i), + Namespace: valkey.Namespace, + Labels: svcLabels, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceType(valkey.Spec.ExternalAccess.Type), + Ports: []corev1.ServicePort{ + { + Name: "tcp-valkey", + Port: 6379, + TargetPort: intstr.FromString("tcp-valkey"), + Protocol: corev1.ProtocolTCP, + }, + { + Name: "tcp-valkey-bus", + Port: 16379, + TargetPort: intstr.FromString("tcp-valkey-bus"), + Protocol: corev1.ProtocolTCP, + }, + }, + Selector: selectorLabels, + }, + } + if valkey.Spec.ExternalAccess.LoadBalancer != nil && len(valkey.Spec.ExternalAccess.LoadBalancer.Annotations) > 0 { + svc.Annotations = valkey.Spec.ExternalAccess.LoadBalancer.Annotations + } + + if err := controllerutil.SetControllerReference(valkey, svc, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, svc); err != nil { + if errors.IsAlreadyExists(err) { + if err := r.Update(ctx, svc); err != nil { + logger.Error(err, "failed to update external access svc", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + } else { + logger.Error(err, "failed to create external access svc", "valkey", valkey.Name, "namespace", valkey.Namespace) + return err + } + } else { + r.Recorder.Event(valkey, "Normal", "Created", + fmt.Sprintf("Service %s/%s is created", valkey.Namespace, valkey.Name+"-external")) + } + } + return nil +} + +func (r *ValkeyReconciler) upsertExternalAccessProxySvc(ctx context.Context, valkey *hyperv1.Valkey) error { + logger := log.FromContext(ctx) + + logger.Info("upserting external proxy load balancer service") + + proxyLabels := labels(valkey) + proxyLabels["app.kubernetes.io/component"] = ValkeyProxy + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: valkey.Name + "-proxy", + Namespace: valkey.Namespace, + Labels: proxyLabels, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + Ports: []corev1.ServicePort{ + { + Name: "tcp-valkey", + Port: 6379, + TargetPort: intstr.FromString("tcp-valkey"), + Protocol: corev1.ProtocolTCP, + }, + }, + Selector: proxyLabels, + }, + } + if valkey.Spec.ExternalAccess.Proxy != nil && valkey.Spec.ExternalAccess.Proxy.Annotations != nil && len(valkey.Spec.ExternalAccess.Proxy.Annotations) > 0 { + svc.Annotations = valkey.Spec.ExternalAccess.Proxy.Annotations + } + if err := controllerutil.SetControllerReference(valkey, svc, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, svc); err != nil { + if errors.IsAlreadyExists(err) { + if err := r.Update(ctx, svc); err != nil { + logger.Error(err, "failed to update external proxy svc") + return err + } + } else { + logger.Error(err, "failed to create external proxy svc") + return err + } + } else { + r.Recorder.Event(valkey, "Normal", "Created", + fmt.Sprintf("Service %s/%s is created", valkey.Namespace, valkey.Name+"-proxy")) + } + return nil +} + +func (r *ValkeyReconciler) upsertExternalAccessProxySecret(ctx context.Context, valkey *hyperv1.Valkey) error { + logger := log.FromContext(ctx) + + logger.Info("upserting external proxy configmap") + + endpoints := []string{} + for i := 0; i < int(valkey.Spec.Shards); i++ { + host := fmt.Sprintf("%s-%d.%s-headless.%s", valkey.Name, i, valkey.Name, valkey.Namespace) + endpoints = append(endpoints, ` - endpoint: + address: + socket_address: + address: `+host+` + port_value: 6379`) + } + + password, err := r.GetPassword(ctx, valkey) + if err != nil { + logger.Error(err, "failed to get password") + return err + } + proxyLabels := labels(valkey) + proxyLabels["app.kubernetes.io/component"] = ValkeyProxy + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: valkey.Name + "-proxy", + Namespace: valkey.Namespace, + Labels: proxyLabels, + }, + Data: map[string][]byte{ + "envoy.yaml": []byte(` +static_resources: + listeners: + - name: redis_listener + address: + socket_address: + address: 0.0.0.0 + port_value: 6379 + filter_chains: + - filters: + - name: envoy.filters.network.redis_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.redis_proxy.v3.RedisProxy + stat_prefix: egress_redis + settings: + op_timeout: 5s + prefix_routes: + catch_all_route: + cluster: redis_cluster + downstream_auth_password: + inline_string: "` + password + `" + clusters: + - name: redis_cluster + # type: STRICT_DNS # static + lb_policy: CLUSTER_PROVIDED + load_assignment: + cluster_name: redis_cluster + endpoints: + - lb_endpoints: +` + strings.Join(endpoints, "\n") + ` + cluster_type: + name: envoy.clusters.redis + typed_config: + "@type": type.googleapis.com/google.protobuf.Struct + value: + cluster_refresh_rate: 1s + cluster_refresh_timeout: 4s + typed_extension_protocol_options: + envoy.filters.network.redis_proxy: + "@type": type.googleapis.com/google.protobuf.Struct + value: + auth_password: + inline_string: "` + password + `" +admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 8001 +`), + }, + } + + if err := controllerutil.SetControllerReference(valkey, secret, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, secret); err != nil { + if errors.IsAlreadyExists(err) { + if err := r.Update(ctx, secret); err != nil { + logger.Error(err, "failed to update external proxy secret") + return err + } + } else { + logger.Error(err, "failed to create external proxy configmap") + } + } else { + r.Recorder.Event(valkey, "Normal", "Created", + fmt.Sprintf("ConfigMap %s/%s is created", valkey.Namespace, valkey.Name+"-proxy")) + } + return nil +} + +const ( + // DefaultProxyImage is the default image for the proxy + DefaultProxyImage = "envoyproxy/envoy:v1.32.1" +) + +func (r *ValkeyReconciler) upsertExternalAccessProxyDeployment(ctx context.Context, valkey *hyperv1.Valkey) error { + logger := log.FromContext(ctx) + + logger.Info("upserting external proxy deployment") + + proxyLabels := labels(valkey) + proxyLabels["app.kubernetes.io/component"] = ValkeyProxy + + proxyEnvoyConfigMap := valkey.Name + "-proxy" + + replicas := int32(1) + if valkey.Spec.ExternalAccess.Proxy != nil && valkey.Spec.ExternalAccess.Proxy.Replicas != nil { + replicas = *valkey.Spec.ExternalAccess.Proxy.Replicas + } + image := DefaultProxyImage + if valkey.Spec.ExternalAccess.Proxy != nil && valkey.Spec.ExternalAccess.Proxy.Image != "" { + image = valkey.Spec.ExternalAccess.Proxy.Image + } + proxyDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: valkey.Name + "-proxy", + Namespace: valkey.Namespace, + Labels: proxyLabels, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: proxyLabels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: proxyLabels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "envoy", + Image: image, + Args: []string{ + "-c", "/etc/envoy.yaml", + }, + Ports: []corev1.ContainerPort{ + { + Name: "tcp-valkey", + ContainerPort: 6379, + Protocol: corev1.ProtocolTCP, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "envoy-config", + MountPath: "/etc/envoy.yaml", + SubPath: "envoy.yaml", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "envoy-config", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: proxyEnvoyConfigMap, + }, + }, + }, + }, + }, + }, + }, + } + + if err := controllerutil.SetControllerReference(valkey, proxyDeployment, r.Scheme); err != nil { + return err + } + if err := r.Create(ctx, proxyDeployment); err != nil { + if errors.IsAlreadyExists(err) { + if err := r.Update(ctx, proxyDeployment); err != nil { + logger.Error(err, "failed to update external proxy deployment") + return err + } + } else { + logger.Error(err, "failed to create external proxy deployment") + return err + } + } else { + r.Recorder.Event(valkey, "Normal", "Created", + fmt.Sprintf("Deployment %s/%s is created", valkey.Namespace, valkey.Name+"-proxy")) + } + return nil +} + func (r *ValkeyReconciler) upsertServiceHeadless(ctx context.Context, valkey *hyperv1.Valkey) error { logger := log.FromContext(ctx) @@ -735,7 +1210,7 @@ func (r *ValkeyReconciler) balanceNodes(ctx context.Context, valkey *hyperv1.Val pods := map[string]string{} var tries int for { - if len(pods) != int(valkey.Spec.Nodes) { + if len(pods) != int(valkey.Spec.Shards) { pods, err = r.getPodIps(ctx, valkey) if err != nil { logger.Error(err, "failed to get pod ips", "valkey", valkey.Name, "namespace", valkey.Namespace) @@ -948,7 +1423,7 @@ func (r *ValkeyReconciler) waitForPod(ctx context.Context, name, namespace strin func getNodeNames(valkey *hyperv1.Valkey) string { var nodes []string - for i := 0; i < int(valkey.Spec.Nodes)*(int(valkey.Spec.Replicas)+1); i++ { + for i := 0; i < int(valkey.Spec.Shards)*(int(valkey.Spec.Replicas)+1); i++ { nodes = append(nodes, valkey.Name+"-"+fmt.Sprint(i)+"."+valkey.Name+"-headless") } return strings.Join(nodes, " ") @@ -1184,7 +1659,7 @@ func getInitContainerResourceRequirements() corev1.ResourceRequirements { } func createCluster(valkey *hyperv1.Valkey) string { create := "no" - if valkey.Spec.Nodes > 1 { + if valkey.Spec.Shards > 1 { create = "yes" } return create @@ -1210,7 +1685,7 @@ func (r *ValkeyReconciler) upsertStatefulSet(ctx context.Context, valkey *hyperv Labels: labels(valkey), }, Spec: appsv1.StatefulSetSpec{ - Replicas: func(i int32) *int32 { return &i }(valkey.Spec.Nodes * (valkey.Spec.Replicas + 1)), + Replicas: func(i int32) *int32 { return &i }(valkey.Spec.Shards * (valkey.Spec.Replicas + 1)), Selector: &metav1.LabelSelector{ MatchLabels: labels(valkey), }, @@ -1522,6 +1997,12 @@ export VALKEY_CLUSTER_ANNOUNCE_HOSTNAME="${POD_NAME}.%s" }, }) } + if valkey.Spec.ExternalAccess != nil && valkey.Spec.ExternalAccess.Enabled { + sts.Spec.Template.Spec.Containers[0].Env = append(sts.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{ + Name: "VALKEY_EXTERNAL_ACCESS", + Value: "yes", + }) + } if valkey.Spec.Prometheus { sts.Spec.Template.Spec.Containers = append(sts.Spec.Template.Spec.Containers, r.exporter(valkey)) } @@ -1542,8 +2023,8 @@ export VALKEY_CLUSTER_ANNOUNCE_HOSTNAME="${POD_NAME}.%s" return err } - if *sts.Spec.Replicas != valkey.Spec.Nodes { - replicas := valkey.Spec.Nodes * (valkey.Spec.Replicas + 1) + if *sts.Spec.Replicas != valkey.Spec.Shards { + replicas := valkey.Spec.Shards * (valkey.Spec.Replicas + 1) sts.Spec.Replicas = &replicas sts.Spec.Template.Spec.Containers[0].Env[1].Value = getNodeNames(valkey) if err := r.Update(ctx, sts); err != nil {