diff --git a/controllers/postgres_controller.go b/controllers/postgres_controller.go index d9122f28..579914cb 100644 --- a/controllers/postgres_controller.go +++ b/controllers/postgres_controller.go @@ -13,12 +13,14 @@ import ( "encoding/json" "errors" "fmt" + "io" "math/big" "net" "net/http" "net/url" "strconv" "strings" + "time" "github.com/go-logr/logr" zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" @@ -64,13 +66,13 @@ var requeue = ctrl.Result{ // PostgresReconciler reconciles a Postgres object type PostgresReconciler struct { - CtrlClient client.Client - SvcClient client.Client - Log logr.Logger - Scheme *runtime.Scheme - PartitionID, Tenant, StorageClass string - OperatorManager *operatormanager.OperatorManager - LBManager *lbmanager.LBManager + CtrlClient client.Client + SvcClient client.Client + Log logr.Logger + Scheme *runtime.Scheme + PartitionID, Tenant, StorageClass string + *operatormanager.OperatorManager + *lbmanager.LBManager recorder record.EventRecorder PgParamBlockList map[string]bool StandbyClustersSourceRanges []string @@ -81,11 +83,23 @@ type PostgresReconciler struct { PatroniTTL uint32 PatroniLoopWait uint32 PatroniRetryTimeout uint32 + ReplicationChangeRequeueDuration time.Duration EnableRandomStorageEncryptionSecret bool EnableWalGEncryption bool PostgresletFullname string } +type PatroniStandbyCluster struct { + CreateReplicaMethods []string `json:"create_replica_methods"` + Host string `json:"host"` + Port int `json:"port"` + ApplicationName string `json:"application_name"` +} +type PatroniConfig struct { + StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` + SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` +} + // Reconcile is the entry point for postgres reconciliation. // +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres/status,verbs=get;update;patch @@ -226,17 +240,13 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("error while creating standby secrets: %w", err) } - if instance.IsReplicationPrimary() { - // the field is not empty, which means we were either a regular, standalone database that was promoted to leader - // (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are - // already running) - // That means we should be able to call the patroni api already. this is required, as updating the custom - // ressource of a standby db seems to fail (maybe because of the users/databases?)... - // anyway, let's get on with it - if err := r.updatePatroniConfig(ctx, instance); err != nil { - // TODO what to do here? reschedule or ignore? - log.Error(err, "failed to update patroni config via REST call") - } + // check (and update if neccessary) the current patroni replication config. + immediateRequeue, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(ctx, instance) + if immediateRequeue && patroniConfigChangeErr == nil { + // if a (successful) config change was performed that requires a while to settle in, we simply requeue. + // on the next reconciliation loop, the config should be correct already so we can continue with the rest. + log.Info("Requeueing after patroni replication config change") + return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, nil } // create standby egress rule first, so the standby can actually connect to the primary @@ -297,9 +307,11 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err) } - // this is the call for standbys - if err := r.updatePatroniConfig(ctx, instance); err != nil { - return requeue, fmt.Errorf("unable to update patroni config: %w", err) + // when an error occurred while updating the patroni config, requeue here + // this is done down here to make sure the rest of the resource updates were performed + if patroniConfigChangeErr != nil { + log.Info("Requeueing after modifying patroni replication config failed") + return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr } r.recorder.Event(instance, "Normal", "Reconciled", "postgres up to date") @@ -781,47 +793,107 @@ func (r *PostgresReconciler) ensureStandbySecrets(ctx context.Context, instance } -func (r *PostgresReconciler) updatePatroniConfig(ctx context.Context, instance *pg.Postgres) error { - // Finally, send a POST to to the database with the correct config +func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(ctx context.Context, instance *pg.Postgres) (bool, error) { + + const requeueImmediately = true + const continueWithReconciliation = false + + // If there is no connected postgres, no need to tinker with patroni directly if instance.Spec.PostgresConnection == nil { - return nil + return continueWithReconciliation, nil } - r.Log.Info("Sending REST call to Patroni API") - pods := &corev1.PodList{} + r.Log.Info("Checking replication config from Patroni API") - roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + // Get the leader pod + leaderPods, err := r.findLeaderPods(ctx, instance) if err != nil { - r.Log.Info("could not create requirements for label selector to query pods, requeuing") - return err + r.Log.Info("could not query pods, requeuing") + return requeueImmediately, err } - leaderSelector := labels.NewSelector() - leaderSelector = leaderSelector.Add(*roleReq) - opts := []client.ListOption{ - client.InNamespace(instance.ToPeripheralResourceNamespace()), - client.MatchingLabelsSelector{Selector: leaderSelector}, + if len(leaderPods.Items) != 1 { + r.Log.Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)") + // To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case + return continueWithReconciliation, r.updatePatroniReplicationConfigOnAllPods(ctx, instance) } - if err := r.SvcClient.List(ctx, pods, opts...); err != nil { - r.Log.Info("could not query pods, requeuing") - return err + leaderIP := leaderPods.Items[0].Status.PodIP + + var resp *PatroniConfig + resp, err = r.httpGetPatroniConfig(ctx, leaderIP) + if err != nil { + return continueWithReconciliation, err + } + if resp == nil { + return continueWithReconciliation, r.httpPatchPatroni(ctx, instance, leaderIP) } - if len(pods.Items) == 0 { - r.Log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)") - err = r.updatePatroniConfigOnAllPods(ctx, instance) - if err != nil { - r.Log.Info("updating patroni config failed, got one or more errors") - return err + if instance.IsReplicationPrimary() { + if resp.StandbyCluster != nil { + r.Log.Info("standby_cluster mistmatch, updating and requeing", "response", resp) + // what happens, if patroni does not do what it is asked to do? what if it returns an error here? + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + if instance.Spec.PostgresConnection.SynchronousReplication { + if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + } else { + if resp.SynchronousNodesAdditional != nil { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + } + + } else { + if resp.StandbyCluster == nil { + r.Log.Info("standby_cluster mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + if resp.StandbyCluster.CreateReplicaMethods == nil { + r.Log.Info("create_replica_methods mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP { + r.Log.Info("host mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) { + r.Log.Info("port mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name { + r.Log.Info("application_name mismatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) + } + + if resp.SynchronousNodesAdditional != nil { + r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) + return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) } - return nil } - podIP := pods.Items[0].Status.PodIP - return r.httpPatchPatroni(ctx, instance, podIP) + r.Log.Info("replication config from Patroni API up to date") + return continueWithReconciliation, nil } -func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { +func (r *PostgresReconciler) findLeaderPods(ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) { + leaderPods := &corev1.PodList{} + roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) + if err != nil { + r.Log.Info("could not create requirements for label selector to query pods, requeuing") + return leaderPods, err + } + leaderSelector := labels.NewSelector().Add(*roleReq) + opts := []client.ListOption{ + client.InNamespace(instance.ToPeripheralResourceNamespace()), + client.MatchingLabelsSelector{Selector: leaderSelector}, + } + return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...) +} + +func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { pods := &corev1.PodList{} opts := []client.ListOption{ client.InNamespace(instance.ToPeripheralResourceNamespace()), @@ -835,6 +907,8 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i if len(pods.Items) == 0 { r.Log.Info("no spilo pods found at all, requeueing") return errors.New("no spilo pods found at all") + } else if len(pods.Items) < int(instance.Spec.NumberOfInstances) { + r.Log.Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items)) } // iterate all spilo pods @@ -863,21 +937,10 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. podPort := "8008" path := "config" - type PatroniStandbyCluster struct { - CreateReplicaMethods []string `json:"create_replica_methods"` - Host string `json:"host"` - Port int `json:"port"` - ApplicationName string `json:"application_name"` - } - type PatroniConfigRequest struct { - StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` - SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` - } - r.Log.Info("Preparing request") - var request PatroniConfigRequest + var request PatroniConfig if instance.IsReplicationPrimary() { - request = PatroniConfigRequest{ + request = PatroniConfig{ StandbyCluster: nil, } if instance.Spec.PostgresConnection.SynchronousReplication { @@ -889,7 +952,7 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. } } else { // TODO check values first - request = PatroniConfigRequest{ + request = PatroniConfig{ StandbyCluster: &PatroniStandbyCluster{ CreateReplicaMethods: []string{"basebackup_fast_xlog"}, Host: instance.Spec.PostgresConnection.ConnectionIP, @@ -926,6 +989,49 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. return nil } +func (r *PostgresReconciler) httpGetPatroniConfig(ctx context.Context, podIP string) (*PatroniConfig, error) { + if podIP == "" { + return nil, errors.New("podIP must not be empty") + } + + podPort := "8008" + path := "config" + + httpClient := &http.Client{} + url := "http://" + podIP + ":" + podPort + "/" + path + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + r.Log.Error(err, "could not create request") + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + r.Log.Error(err, "could not perform request") + return nil, err + } + + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + r.Log.Info("could not read body") + return nil, err + } + var jsonResp PatroniConfig + err = json.Unmarshal(body, &jsonResp) + if err != nil { + r.Log.Info("could not parse config") + return nil, err + } + + r.Log.Info("Got config", "response", jsonResp) + + return &jsonResp, err +} + func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) { // fetch secret backupSecret := &corev1.Secret{} diff --git a/main.go b/main.go index 4d7eaeff..44edd9c1 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "net" "os" "strings" + "time" "github.com/metal-stack/v" coreosv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" @@ -36,6 +37,7 @@ import ( const ( // envPrefix = "pg" + metricsAddrSvcMgrFlg = "metrics-addr-svc-mgr" metricsAddrCtrlMgrFlg = "metrics-addr-ctrl-mgr" enableLeaderElectionFlg = "enable-leader-election" @@ -68,6 +70,7 @@ const ( etcdBackupSecretNameFlg = "etcd-backup-secret-name" // nolint etcdPSPNameFlg = "etcd-psp-name" postgresletFullnameFlg = "postgreslet-fullname" + replicationChangeRequeueTimeFlg = "replication-change-requeue-time-in-seconds" enableLBSourceRangesFlg = "enable-lb-source-ranges" enableRandomStorageEncrytionSecretFlg = "enable-random-storage-encryption-secret" enableWalGEncryptionFlg = "enable-walg-encryption" @@ -123,8 +126,9 @@ func main() { enableRandomStorageEncrytionSecret bool enableWalGEncryption bool - portRangeStart int - portRangeSize int + portRangeStart int + portRangeSize int + replicationChangeRequeueTimeInSeconds int patroniTTL uint32 patroniLoopWait uint32 @@ -246,6 +250,10 @@ func main() { viper.SetDefault(postgresletFullnameFlg, partitionID) // fall back to partition id postgresletFullname = viper.GetString(postgresletFullnameFlg) + viper.SetDefault(replicationChangeRequeueTimeFlg, 10) + replicationChangeRequeueTimeInSeconds = viper.GetInt(replicationChangeRequeueTimeFlg) + replicationChangeRequeueDuration := time.Duration(replicationChangeRequeueTimeInSeconds) * time.Second + viper.SetDefault(enableLBSourceRangesFlg, true) enableLBSourceRanges = viper.GetBool(enableLBSourceRangesFlg) @@ -292,6 +300,7 @@ func main() { enableLBSourceRangesFlg, enableLBSourceRanges, enableRandomStorageEncrytionSecretFlg, enableRandomStorageEncrytionSecret, postgresletFullnameFlg, postgresletFullname, + replicationChangeRequeueTimeFlg, replicationChangeRequeueTimeInSeconds, enableWalGEncryptionFlg, enableWalGEncryption, ) @@ -396,6 +405,7 @@ func main() { PatroniTTL: patroniTTL, PatroniLoopWait: patroniLoopWait, PatroniRetryTimeout: patroniRetryTimeout, + ReplicationChangeRequeueDuration: replicationChangeRequeueDuration, EnableRandomStorageEncryptionSecret: enableRandomStorageEncrytionSecret, EnableWalGEncryption: enableWalGEncryption, PostgresletFullname: postgresletFullname,