-
Notifications
You must be signed in to change notification settings - Fork 0
Check patronic config and only update if neccessary #423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3a21b29
4e7a999
6c599cc
f5b3adc
395937e
759147b
a3408fd
bef00f1
293ae9c
64354f5
7ac265c
3f289f2
898bdf2
a17cf11
3f57b1c
298bf05
15680d4
f03bb68
ce32f84
41a85ed
06cc9bb
febb2a9
6422829
4e8042e
2b99dbf
fb7df8f
be390d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,12 +13,14 @@ import ( | |
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"math/big" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/go-logr/logr" | ||
zalando "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" | ||
|
@@ -64,13 +66,13 @@ var requeue = ctrl.Result{ | |
|
||
// PostgresReconciler reconciles a Postgres object | ||
type PostgresReconciler struct { | ||
CtrlClient client.Client | ||
SvcClient client.Client | ||
Log logr.Logger | ||
Scheme *runtime.Scheme | ||
PartitionID, Tenant, StorageClass string | ||
OperatorManager *operatormanager.OperatorManager | ||
LBManager *lbmanager.LBManager | ||
CtrlClient client.Client | ||
SvcClient client.Client | ||
Log logr.Logger | ||
Scheme *runtime.Scheme | ||
PartitionID, Tenant, StorageClass string | ||
*operatormanager.OperatorManager | ||
*lbmanager.LBManager | ||
recorder record.EventRecorder | ||
PgParamBlockList map[string]bool | ||
StandbyClustersSourceRanges []string | ||
|
@@ -81,11 +83,23 @@ type PostgresReconciler struct { | |
PatroniTTL uint32 | ||
PatroniLoopWait uint32 | ||
PatroniRetryTimeout uint32 | ||
ReplicationChangeRequeueDuration time.Duration | ||
EnableRandomStorageEncryptionSecret bool | ||
EnableWalGEncryption bool | ||
PostgresletFullname string | ||
} | ||
|
||
type PatroniStandbyCluster struct { | ||
CreateReplicaMethods []string `json:"create_replica_methods"` | ||
Host string `json:"host"` | ||
Port int `json:"port"` | ||
ApplicationName string `json:"application_name"` | ||
} | ||
type PatroniConfig struct { | ||
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` | ||
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` | ||
} | ||
|
||
// Reconcile is the entry point for postgres reconciliation. | ||
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres,verbs=get;list;watch;create;update;patch;delete | ||
// +kubebuilder:rbac:groups=database.fits.cloud,resources=postgres/status,verbs=get;update;patch | ||
|
@@ -226,17 +240,13 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c | |
return ctrl.Result{}, fmt.Errorf("error while creating standby secrets: %w", err) | ||
} | ||
|
||
if instance.IsReplicationPrimary() { | ||
// the field is not empty, which means we were either a regular, standalone database that was promoted to leader | ||
// (meaning we are already running) or we are a standby which was promoted to leader (also meaning we are | ||
// already running) | ||
// That means we should be able to call the patroni api already. this is required, as updating the custom | ||
// ressource of a standby db seems to fail (maybe because of the users/databases?)... | ||
// anyway, let's get on with it | ||
if err := r.updatePatroniConfig(ctx, instance); err != nil { | ||
// TODO what to do here? reschedule or ignore? | ||
log.Error(err, "failed to update patroni config via REST call") | ||
} | ||
// check (and update if neccessary) the current patroni replication config. | ||
immediateRequeue, patroniConfigChangeErr := r.checkAndUpdatePatroniReplicationConfig(ctx, instance) | ||
if immediateRequeue && patroniConfigChangeErr == nil { | ||
// if a (successful) config change was performed that requires a while to settle in, we simply requeue. | ||
// on the next reconciliation loop, the config should be correct already so we can continue with the rest. | ||
log.Info("Requeueing after patroni replication config change") | ||
return ctrl.Result{Requeue: true, RequeueAfter: r.ReplicationChangeRequeueDuration}, nil | ||
} | ||
|
||
// create standby egress rule first, so the standby can actually connect to the primary | ||
|
@@ -297,9 +307,11 @@ func (r *PostgresReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c | |
return ctrl.Result{}, fmt.Errorf("unable to create or update ingress ClusterwideNetworkPolicy: %w", err) | ||
} | ||
|
||
// this is the call for standbys | ||
if err := r.updatePatroniConfig(ctx, instance); err != nil { | ||
return requeue, fmt.Errorf("unable to update patroni config: %w", err) | ||
// when an error occurred while updating the patroni config, requeue here | ||
// this is done down here to make sure the rest of the resource updates were performed | ||
if patroniConfigChangeErr != nil { | ||
log.Info("Requeueing after modifying patroni replication config failed") | ||
return ctrl.Result{Requeue: true, RequeueAfter: 10 * time.Second}, patroniConfigChangeErr | ||
} | ||
|
||
r.recorder.Event(instance, "Normal", "Reconciled", "postgres up to date") | ||
|
@@ -781,47 +793,107 @@ func (r *PostgresReconciler) ensureStandbySecrets(ctx context.Context, instance | |
|
||
} | ||
|
||
func (r *PostgresReconciler) updatePatroniConfig(ctx context.Context, instance *pg.Postgres) error { | ||
// Finally, send a POST to to the database with the correct config | ||
func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(ctx context.Context, instance *pg.Postgres) (bool, error) { | ||
|
||
const requeueImmediately = true | ||
const continueWithReconciliation = false | ||
|
||
// If there is no connected postgres, no need to tinker with patroni directly | ||
if instance.Spec.PostgresConnection == nil { | ||
return nil | ||
return continueWithReconciliation, nil | ||
} | ||
|
||
r.Log.Info("Sending REST call to Patroni API") | ||
pods := &corev1.PodList{} | ||
r.Log.Info("Checking replication config from Patroni API") | ||
|
||
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) | ||
// Get the leader pod | ||
leaderPods, err := r.findLeaderPods(ctx, instance) | ||
if err != nil { | ||
r.Log.Info("could not create requirements for label selector to query pods, requeuing") | ||
return err | ||
r.Log.Info("could not query pods, requeuing") | ||
return requeueImmediately, err | ||
} | ||
leaderSelector := labels.NewSelector() | ||
leaderSelector = leaderSelector.Add(*roleReq) | ||
|
||
opts := []client.ListOption{ | ||
client.InNamespace(instance.ToPeripheralResourceNamespace()), | ||
client.MatchingLabelsSelector{Selector: leaderSelector}, | ||
if len(leaderPods.Items) != 1 { | ||
r.Log.Info("expected exactly one leader pod, selecting all spilo pods as a last resort (might be ok if it is still creating)") | ||
// To make sure any updates to the Zalando postgresql manifest are written, we do not requeue in this case | ||
return continueWithReconciliation, r.updatePatroniReplicationConfigOnAllPods(ctx, instance) | ||
} | ||
if err := r.SvcClient.List(ctx, pods, opts...); err != nil { | ||
r.Log.Info("could not query pods, requeuing") | ||
return err | ||
leaderIP := leaderPods.Items[0].Status.PodIP | ||
|
||
var resp *PatroniConfig | ||
resp, err = r.httpGetPatroniConfig(ctx, leaderIP) | ||
if err != nil { | ||
return continueWithReconciliation, err | ||
} | ||
if resp == nil { | ||
return continueWithReconciliation, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if len(pods.Items) == 0 { | ||
r.Log.Info("no leader pod found, selecting all spilo pods as a last resort (might be ok if it is still creating)") | ||
|
||
err = r.updatePatroniConfigOnAllPods(ctx, instance) | ||
if err != nil { | ||
r.Log.Info("updating patroni config failed, got one or more errors") | ||
return err | ||
if instance.IsReplicationPrimary() { | ||
if resp.StandbyCluster != nil { | ||
r.Log.Info("standby_cluster mistmatch, updating and requeing", "response", resp) | ||
// what happens, if patroni does not do what it is asked to do? what if it returns an error here? | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if instance.Spec.PostgresConnection.SynchronousReplication { | ||
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID { | ||
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
} else { | ||
if resp.SynchronousNodesAdditional != nil { | ||
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
} | ||
|
||
} else { | ||
if resp.StandbyCluster == nil { | ||
r.Log.Info("standby_cluster mismatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if resp.StandbyCluster.CreateReplicaMethods == nil { | ||
r.Log.Info("create_replica_methods mismatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP { | ||
r.Log.Info("host mismatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) { | ||
r.Log.Info("port mismatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name { | ||
r.Log.Info("application_name mismatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
|
||
if resp.SynchronousNodesAdditional != nil { | ||
r.Log.Info("synchronous_nodes_additional mistmatch, updating and requeing", "response", resp) | ||
return requeueImmediately, r.httpPatchPatroni(ctx, instance, leaderIP) | ||
} | ||
return nil | ||
} | ||
podIP := pods.Items[0].Status.PodIP | ||
|
||
return r.httpPatchPatroni(ctx, instance, podIP) | ||
r.Log.Info("replication config from Patroni API up to date") | ||
return continueWithReconciliation, nil | ||
} | ||
|
||
func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { | ||
func (r *PostgresReconciler) findLeaderPods(ctx context.Context, instance *pg.Postgres) (*corev1.PodList, error) { | ||
leaderPods := &corev1.PodList{} | ||
roleReq, err := labels.NewRequirement(pg.SpiloRoleLabelName, selection.In, []string{pg.SpiloRoleLabelValueMaster, pg.SpiloRoleLabelValueStandbyLeader}) | ||
if err != nil { | ||
r.Log.Info("could not create requirements for label selector to query pods, requeuing") | ||
return leaderPods, err | ||
} | ||
leaderSelector := labels.NewSelector().Add(*roleReq) | ||
opts := []client.ListOption{ | ||
client.InNamespace(instance.ToPeripheralResourceNamespace()), | ||
client.MatchingLabelsSelector{Selector: leaderSelector}, | ||
} | ||
return leaderPods, r.SvcClient.List(ctx, leaderPods, opts...) | ||
} | ||
|
||
func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(ctx context.Context, instance *pg.Postgres) error { | ||
pods := &corev1.PodList{} | ||
opts := []client.ListOption{ | ||
client.InNamespace(instance.ToPeripheralResourceNamespace()), | ||
|
@@ -835,6 +907,8 @@ func (r *PostgresReconciler) updatePatroniConfigOnAllPods(ctx context.Context, i | |
if len(pods.Items) == 0 { | ||
r.Log.Info("no spilo pods found at all, requeueing") | ||
return errors.New("no spilo pods found at all") | ||
} else if len(pods.Items) < int(instance.Spec.NumberOfInstances) { | ||
r.Log.Info("expected %d pods, but only found %d (might be ok if it is still creating)", instance.Spec.NumberOfInstances, len(pods.Items)) | ||
} | ||
|
||
// iterate all spilo pods | ||
|
@@ -863,21 +937,10 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. | |
podPort := "8008" | ||
path := "config" | ||
|
||
type PatroniStandbyCluster struct { | ||
CreateReplicaMethods []string `json:"create_replica_methods"` | ||
Host string `json:"host"` | ||
Port int `json:"port"` | ||
ApplicationName string `json:"application_name"` | ||
} | ||
type PatroniConfigRequest struct { | ||
StandbyCluster *PatroniStandbyCluster `json:"standby_cluster"` | ||
SynchronousNodesAdditional *string `json:"synchronous_nodes_additional"` | ||
} | ||
|
||
r.Log.Info("Preparing request") | ||
var request PatroniConfigRequest | ||
var request PatroniConfig | ||
if instance.IsReplicationPrimary() { | ||
request = PatroniConfigRequest{ | ||
request = PatroniConfig{ | ||
StandbyCluster: nil, | ||
} | ||
if instance.Spec.PostgresConnection.SynchronousReplication { | ||
|
@@ -889,7 +952,7 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. | |
} | ||
} else { | ||
// TODO check values first | ||
request = PatroniConfigRequest{ | ||
request = PatroniConfig{ | ||
StandbyCluster: &PatroniStandbyCluster{ | ||
CreateReplicaMethods: []string{"basebackup_fast_xlog"}, | ||
Host: instance.Spec.PostgresConnection.ConnectionIP, | ||
|
@@ -926,6 +989,49 @@ func (r *PostgresReconciler) httpPatchPatroni(ctx context.Context, instance *pg. | |
return nil | ||
} | ||
|
||
func (r *PostgresReconciler) httpGetPatroniConfig(ctx context.Context, podIP string) (*PatroniConfig, error) { | ||
if podIP == "" { | ||
return nil, errors.New("podIP must not be empty") | ||
} | ||
|
||
podPort := "8008" | ||
path := "config" | ||
|
||
httpClient := &http.Client{} | ||
url := "http://" + podIP + ":" + podPort + "/" + path | ||
|
||
req, err := http.NewRequestWithContext(ctx, "GET", url, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can use |
||
if err != nil { | ||
r.Log.Error(err, "could not create request") | ||
return nil, err | ||
} | ||
req.Header.Set("Content-Type", "application/json") | ||
|
||
resp, err := httpClient.Do(req) | ||
if err != nil { | ||
r.Log.Error(err, "could not perform request") | ||
return nil, err | ||
} | ||
|
||
defer resp.Body.Close() | ||
|
||
body, err := io.ReadAll(resp.Body) | ||
if err != nil { | ||
r.Log.Info("could not read body") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should also use Log.Error and wrap the error. |
||
return nil, err | ||
} | ||
var jsonResp PatroniConfig | ||
err = json.Unmarshal(body, &jsonResp) | ||
if err != nil { | ||
r.Log.Info("could not parse config") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should also use Log.Error and wrap the error. |
||
return nil, err | ||
} | ||
|
||
r.Log.Info("Got config", "response", jsonResp) | ||
|
||
return &jsonResp, err | ||
} | ||
|
||
func (r *PostgresReconciler) getBackupConfig(ctx context.Context, ns, name string) (*pg.BackupConfig, error) { | ||
// fetch secret | ||
backupSecret := &corev1.Secret{} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ import ( | |
"net" | ||
"os" | ||
"strings" | ||
"time" | ||
|
||
"github.com/metal-stack/v" | ||
coreosv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" | ||
|
@@ -36,6 +37,7 @@ import ( | |
|
||
const ( | ||
// envPrefix = "pg" | ||
|
||
metricsAddrSvcMgrFlg = "metrics-addr-svc-mgr" | ||
metricsAddrCtrlMgrFlg = "metrics-addr-ctrl-mgr" | ||
enableLeaderElectionFlg = "enable-leader-election" | ||
|
@@ -68,6 +70,7 @@ const ( | |
etcdBackupSecretNameFlg = "etcd-backup-secret-name" // nolint | ||
etcdPSPNameFlg = "etcd-psp-name" | ||
postgresletFullnameFlg = "postgreslet-fullname" | ||
replicationChangeRequeueTimeFlg = "replication-change-requeue-time-in-seconds" | ||
enableLBSourceRangesFlg = "enable-lb-source-ranges" | ||
enableRandomStorageEncrytionSecretFlg = "enable-random-storage-encryption-secret" | ||
enableWalGEncryptionFlg = "enable-walg-encryption" | ||
|
@@ -123,8 +126,9 @@ func main() { | |
enableRandomStorageEncrytionSecret bool | ||
enableWalGEncryption bool | ||
|
||
portRangeStart int | ||
portRangeSize int | ||
portRangeStart int | ||
portRangeSize int | ||
replicationChangeRequeueTimeInSeconds int | ||
|
||
patroniTTL uint32 | ||
patroniLoopWait uint32 | ||
|
@@ -246,6 +250,10 @@ func main() { | |
viper.SetDefault(postgresletFullnameFlg, partitionID) // fall back to partition id | ||
postgresletFullname = viper.GetString(postgresletFullnameFlg) | ||
|
||
viper.SetDefault(replicationChangeRequeueTimeFlg, 10) | ||
replicationChangeRequeueTimeInSeconds = viper.GetInt(replicationChangeRequeueTimeFlg) | ||
replicationChangeRequeueDuration := time.Duration(replicationChangeRequeueTimeInSeconds) * time.Second | ||
Comment on lines
+253
to
+255
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is also a Duration type for viper arguments, which simplifies handling the flag. |
||
|
||
viper.SetDefault(enableLBSourceRangesFlg, true) | ||
enableLBSourceRanges = viper.GetBool(enableLBSourceRangesFlg) | ||
|
||
|
@@ -292,6 +300,7 @@ func main() { | |
enableLBSourceRangesFlg, enableLBSourceRanges, | ||
enableRandomStorageEncrytionSecretFlg, enableRandomStorageEncrytionSecret, | ||
postgresletFullnameFlg, postgresletFullname, | ||
replicationChangeRequeueTimeFlg, replicationChangeRequeueTimeInSeconds, | ||
enableWalGEncryptionFlg, enableWalGEncryption, | ||
) | ||
|
||
|
@@ -396,6 +405,7 @@ func main() { | |
PatroniTTL: patroniTTL, | ||
PatroniLoopWait: patroniLoopWait, | ||
PatroniRetryTimeout: patroniRetryTimeout, | ||
ReplicationChangeRequeueDuration: replicationChangeRequeueDuration, | ||
EnableRandomStorageEncryptionSecret: enableRandomStorageEncrytionSecret, | ||
EnableWalGEncryption: enableWalGEncryption, | ||
PostgresletFullname: postgresletFullname, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exponential backoff might be better on Patroni API unavailability