Skip to content

Commit

Permalink
Schedule switchovers in maintenance window
Browse files Browse the repository at this point in the history
  • Loading branch information
hughcapet committed Jan 12, 2025
1 parent 9689f25 commit 9e926d8
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 19 deletions.
Empty file.
42 changes: 39 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,12 +1733,48 @@ func (c *Cluster) GetStatus() *ClusterStatus {

// Switchover does a switchover (via Patroni) to a candidate pod
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {

var err error
c.logger.Debugf("switching over from %q to %q", curMaster.Name, candidate)

if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping switchover, not in maintenance window")
c.logger.Infof("postponing switchover, not in maintenance window")

var possibleSwitchover, schedule time.Time

now := time.Now().UTC()
for _, window := range c.Spec.MaintenanceWindows {
if window.Everyday {
possibleSwitchover = time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
if now.After(possibleSwitchover) {
// we are already past the time for today, try tomorrow
day := now.AddDate(0, 0, 1)
possibleSwitchover = time.Date(day.Year(), day.Month(), day.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
}
} else {
timeToday := time.Date(now.Year(), now.Month(), now.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
// is it still possible today?
if now.Weekday() == window.Weekday {
if now.Before(timeToday) {
possibleSwitchover = timeToday
}
} else {
c.logger.Debugf("3. switching over at %s", schedule.Format("2006-01-02T15:04+00"))
// get closest possible time for this window
date := now.AddDate(0, 0, int((7+window.Weekday-now.Weekday())%7))
possibleSwitchover = time.Date(date.Year(), date.Month(), date.Day(), window.StartTime.Hour(), window.StartTime.Minute(), 0, 0, time.UTC)
}
}

if (schedule == time.Time{}) || possibleSwitchover.Before(schedule) {
schedule = possibleSwitchover
}
c.logger.Debugf("switching over at %s", schedule.Format("2006-01-02T15:04+00"))
}

if err := c.patroni.Switchover(curMaster, candidate.Name, schedule.Format("2006-01-02T15:04+00")); err != nil {
return fmt.Errorf("could not schedule switchover: %v", err)
}
c.logger.Infof("switchover is scheduled at %s", schedule.Format("2006-01-02T15:04+00"))
return nil
}

Expand All @@ -1748,7 +1784,7 @@ func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) e
defer c.unregisterPodSubscriber(candidate)
defer close(stopCh)

if err = c.patroni.Switchover(curMaster, candidate.Name); err == nil {
if err = c.patroni.Switchover(curMaster, candidate.Name, ""); err == nil {
c.logger.Debugf("successfully switched over from %q to %q", curMaster.Name, candidate)
c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeNormal, "Switchover", "Successfully switched over from %q to %q", curMaster.Name, candidate)
_, err = c.waitForPodLabel(ch, stopCh, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func (c *Cluster) preScaleDown(newStatefulSet *appsv1.StatefulSet) error {
return fmt.Errorf("pod %q does not belong to cluster", podName)
}

if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
return fmt.Errorf("could not failover: %v", err)
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name, ""); err != nil {
return fmt.Errorf("could not switchover: %v", err)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func (c *Cluster) syncStatefulSet() error {
// statefulset or those that got their configuration from the outdated statefulset)
if len(podsToRecreate) > 0 {
if !isInMaintenanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping pod recreation, not in maintenance window")
c.logger.Infof("postpone pod recreation - not in maintenance window")
} else if isSafeToRecreatePods {
c.logger.Info("performing rolling update")
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Update", "Performing rolling update")
Expand Down
26 changes: 13 additions & 13 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,19 @@ import (
)

const (
failoverPath = "/failover"
configPath = "/config"
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
ApiPort = 8008
timeout = 30 * time.Second
switchoverPath = "/switchover"
configPath = "/config"
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
ApiPort = 8008
timeout = 30 * time.Second
)

// Interface describe patroni methods
type Interface interface {
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
Switchover(master *v1.Pod, candidate string) error
Switchover(master *v1.Pod, candidate string, scheduled_at string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
SetStandbyClusterParameters(server *v1.Pod, options map[string]interface{}) error
GetMemberData(server *v1.Pod) (MemberData, error)
Expand Down Expand Up @@ -103,7 +103,7 @@ func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer)
}
}()

if resp.StatusCode != http.StatusOK {
if resp.StatusCode < http.StatusOK || resp.StatusCode >= 300 {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("could not read response: %v", err)
Expand All @@ -128,25 +128,25 @@ func (p *Patroni) httpGet(url string) (string, error) {
return "", fmt.Errorf("could not read response: %v", err)
}

if response.StatusCode != http.StatusOK {
if response.StatusCode < http.StatusOK || response.StatusCode >= 300 {
return string(bodyBytes), fmt.Errorf("patroni returned '%d'", response.StatusCode)
}

return string(bodyBytes), nil
}

// Switchover by calling Patroni REST API
func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
func (p *Patroni) Switchover(master *v1.Pod, candidate string, scheduled_at string) error {
buf := &bytes.Buffer{}
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate, "scheduled_at": scheduled_at})
if err != nil {
return fmt.Errorf("could not encode json: %v", err)
}
apiURLString, err := apiURL(master)
if err != nil {
return err
}
return p.httpPostOrPatch(http.MethodPost, apiURLString+failoverPath, buf)
return p.httpPostOrPatch(http.MethodPost, apiURLString+switchoverPath, buf)
}

//TODO: add an option call /patroni to check if it is necessary to restart the server
Expand Down

0 comments on commit 9e926d8

Please sign in to comment.