From a2734f07f689b76a4d63ad71a5ebafac4a7b21d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 24 Jul 2023 10:51:40 +0300 Subject: [PATCH] K8SPS-280: Improve full cluster crash recovery Before these changes, we were rebooting the cluster from complete outage from pod-0, without checking which member has the latest transactions. Therefore our full cluster recovery implementation was prone to data loss. Now we're using mysql-shell's built-in checks to detect the member to reboot from. For this, mysql-shell requires every member to be reachable, so it can connect and check GTID's in each one. That means in case of full cluster crash we need to start each pod and ensure they're reachable. We're bringing back the `/var/lib/mysql/full-cluster-crash` to address this requirement. Pods create this file if they detect they're in full cluster crash and restart themselves. After the restart, they'll start the mysqld process but ensure the server started as read only. After all pods up and running (ready), the operator will run `dba.rebootClusterFromCompleteOutage()` in one of the MySQL pods. In which pod we run this is not important, since mysql-shell will connect to each pod and select the suitable one to reboot. *Events* This commit also introduces the event recorder and two events: 1. FullClusterCrashDetected 2. FullClusterCrashRecovered Users will be able to see these events on `PerconaServerMySQL` object. For example: ``` $ kubectl describe ps cluster1 ... Events: Type Reason Age From Message ---- ------ ---- ---- ------- Warning FullClusterCrashDetected 19m (x10 over 20m) ps-controller Full cluster crash detected Normal FullClusterCrashRecovered 17m ps-controller Cluster recovered from full cluster crash ``` *Probe timeouts* Kubernetes had some problems with timeouts in exec probes which they fixed in recent releases. But we still see problematic behaviors. For example, even though Kubernetes successfully detects the timeout in probe it doesn't count the timeouts as failure. So container is not restarted even if its liveness probe timed out million times. With this commit we're handling timeouts by ourselves with contexts. --- api/v1alpha1/perconaservermysql_types.go | 2 +- build/ps-entrypoint.sh | 52 +++---- cmd/bootstrap/async_replication.go | 41 +++--- cmd/bootstrap/group_replication.go | 102 ++++++++----- cmd/bootstrap/main.go | 19 ++- cmd/bootstrap/utils.go | 14 ++ cmd/healthcheck/main.go | 59 +++++--- cmd/manager/main.go | 1 + config/rbac/role.yaml | 7 + deploy/bundle.yaml | 7 + deploy/rbac.yaml | 7 + pkg/controller/ps/controller.go | 66 +++++++-- pkg/controller/ps/crash_recovery.go | 112 ++++++++++++++ pkg/mysql/topology/topology.go | 10 +- pkg/mysqlsh/mysqlshexec.go | 9 +- pkg/replicator/replicator.go | 179 +++++++++++++---------- pkg/replicator/replicatorexec.go | 135 +++++++++-------- 17 files changed, 550 insertions(+), 272 deletions(-) create mode 100644 pkg/controller/ps/crash_recovery.go diff --git a/api/v1alpha1/perconaservermysql_types.go b/api/v1alpha1/perconaservermysql_types.go index 86c3446e4..6d52eed03 100644 --- a/api/v1alpha1/perconaservermysql_types.go +++ b/api/v1alpha1/perconaservermysql_types.go @@ -480,7 +480,7 @@ func (cr *PerconaServerMySQL) CheckNSetDefaults(ctx context.Context, serverVersi cr.Spec.MySQL.LivenessProbe.SuccessThreshold = 1 } if cr.Spec.MySQL.LivenessProbe.TimeoutSeconds == 0 { - cr.Spec.MySQL.LivenessProbe.TimeoutSeconds = 30 + cr.Spec.MySQL.LivenessProbe.TimeoutSeconds = 10 } if cr.Spec.MySQL.ReadinessProbe.InitialDelaySeconds == 0 { diff --git a/build/ps-entrypoint.sh b/build/ps-entrypoint.sh index c97baece7..85c6dad7c 100755 --- a/build/ps-entrypoint.sh +++ b/build/ps-entrypoint.sh @@ -182,6 +182,12 @@ load_group_replication_plugin() { POD_IP=$(hostname -I | awk '{print $1}') sed -i "/\[mysqld\]/a plugin_load_add=group_replication.so" $CFG + sed -i "/\[mysqld\]/a group_replication_exit_state_action=ABORT_SERVER" $CFG +} + +ensure_read_only() { + sed -i "/\[mysqld\]/a read_only=ON" $CFG + sed -i "/\[mysqld\]/a super_read_only=ON" $CFG } MYSQL_VERSION=$(mysqld -V | awk '{print $3}' | awk -F'.' '{print $1"."$2}') @@ -399,34 +405,24 @@ if [[ -f /var/lib/mysql/full-cluster-crash ]]; then namespace=$( 0 { + return "", errors.New(result.Error) + } + + return result.Rows[0]["@@GTID_EXECUTED"], nil } func (m *mysqlsh) configureLocalInstance(ctx context.Context) error { @@ -112,18 +140,6 @@ func (m *mysqlsh) createCluster(ctx context.Context) error { return nil } -func (m *mysqlsh) rebootClusterFromCompleteOutage(ctx context.Context, force bool) error { - _, stderr, err := m.run(ctx, fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s', {'force': %t})", m.clusterName, force)) - if err != nil { - if strings.Contains(stderr.String(), "Could not determine if Cluster is completely OFFLINE") { - return errCouldNotDetermineIfClusterIsOffline - } - return errors.Wrap(err, "reboot cluster from complete outage") - } - - return nil -} - func (m *mysqlsh) addInstance(ctx context.Context, instanceDef string) error { _, _, err := m.run(ctx, fmt.Sprintf("dba.getCluster('%s').addInstance('%s', {'recoveryMethod': 'clone', 'waitRecovery': 3})", m.clusterName, instanceDef)) if err != nil { @@ -190,6 +206,27 @@ func connectToCluster(ctx context.Context, peers sets.Set[string]) (*mysqlsh, er return nil, errors.New("failed to open connection to cluster") } +func handleFullClusterCrash(ctx context.Context) error { + localShell, err := connectToLocal(ctx) + if err != nil { + return errors.Wrap(err, "connect to local") + } + + result, err := localShell.runSQL(ctx, "SELECT @@GTID_EXECUTED") + if err != nil { + return errors.Wrap(err, "get GTID_EXECUTED") + } + + gtidExecuted := strings.ReplaceAll(result, "\n", "") + log.Printf("GTID_EXECUTED: %s", gtidExecuted) + + if err := createFile(fullClusterCrashFile, gtidExecuted); err != nil { + return err + } + + return nil +} + func bootstrapGroupReplication(ctx context.Context) error { timer := stopwatch.NewNamedStopwatch() err := timer.Add("total") @@ -227,11 +264,6 @@ func bootstrapGroupReplication(ctx context.Context) error { } log.Printf("Instance (%s) configured to join to the InnoDB cluster", localShell.host) - podName, err := os.Hostname() - if err != nil { - return errors.Wrap(err, "get pod hostname") - } - peers, err := lookup(os.Getenv("SERVICE_NAME")) if err != nil { return errors.Wrap(err, "lookup") @@ -240,6 +272,7 @@ func bootstrapGroupReplication(ctx context.Context) error { shell, err := connectToCluster(ctx, peers) if err != nil { + log.Printf("Failed to connect to the cluster: %v", err) if peers.Len() == 1 { log.Printf("Creating InnoDB cluster: %s", localShell.clusterName) @@ -247,10 +280,12 @@ func bootstrapGroupReplication(ctx context.Context) error { if err != nil { if errors.Is(err, errRebootClusterFromCompleteOutage) { log.Printf("Cluster already exists, we need to reboot") - err := localShell.rebootClusterFromCompleteOutage(ctx, peers.Len() == 1) - if err != nil { - return err + if err := handleFullClusterCrash(ctx); err != nil { + return errors.Wrap(err, "handle full cluster crash") } + + // force restart container + os.Exit(1) } else { return err } @@ -260,21 +295,14 @@ func bootstrapGroupReplication(ctx context.Context) error { if err != nil { return errors.Wrap(err, "connect to the cluster") } - } else if peers.Len() > 1 && strings.HasSuffix(podName, "-0") { + } else { log.Printf("Can't connect to any of the peers, we need to reboot") - err := localShell.rebootClusterFromCompleteOutage(ctx, false) - if err != nil { - if errors.Is(err, errCouldNotDetermineIfClusterIsOffline) { - err := localShell.rebootClusterFromCompleteOutage(ctx, true) - if err != nil { - return err - } - } else { - return err - } + if err := handleFullClusterCrash(ctx); err != nil { + return errors.Wrap(err, "handle full cluster crash") } - } else { - return errors.Wrap(err, "connect to the cluster") + + // force restart container + os.Exit(1) } } diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 26500a520..19b4a9f45 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -9,6 +9,11 @@ import ( "github.com/percona/percona-server-mysql-operator/pkg/mysql" ) +const ( + fullClusterCrashFile = "/var/lib/mysql/full-cluster-crash" + manualRecoveryFile = "/var/lib/mysql/sleep-forever" +) + func main() { f, err := os.OpenFile(filepath.Join(mysql.DataMountPath, "bootstrap.log"), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -17,6 +22,18 @@ func main() { defer f.Close() log.SetOutput(f) + fullClusterCrash, err := fileExists(fullClusterCrashFile) + if err == nil && fullClusterCrash { + log.Printf("%s exists. exiting...", fullClusterCrashFile) + os.Exit(0) + } + + manualRecovery, err := fileExists(manualRecoveryFile) + if err == nil && manualRecovery { + log.Printf("%s exists. exiting...", manualRecoveryFile) + os.Exit(0) + } + clusterType := os.Getenv("CLUSTER_TYPE") switch clusterType { case "group-replication": @@ -24,7 +41,7 @@ func main() { log.Fatalf("bootstrap failed: %v", err) } case "async": - if err := bootstrapAsyncReplication(); err != nil { + if err := bootstrapAsyncReplication(context.Background()); err != nil { log.Fatalf("bootstrap failed: %v", err) } default: diff --git a/cmd/bootstrap/utils.go b/cmd/bootstrap/utils.go index fcdfa80b1..2306df25e 100644 --- a/cmd/bootstrap/utils.go +++ b/cmd/bootstrap/utils.go @@ -95,3 +95,17 @@ func waitLockRemoval() error { } } } + +func createFile(name, content string) error { + f, err := os.Create(name) + if err != nil { + return errors.Wrapf(err, "create %s", name) + } + + _, err = f.WriteString(content) + if err != nil { + return errors.Wrapf(err, "write to %s", name) + } + + return nil +} diff --git a/cmd/healthcheck/main.go b/cmd/healthcheck/main.go index 17c69cc38..5dd982f2d 100644 --- a/cmd/healthcheck/main.go +++ b/cmd/healthcheck/main.go @@ -1,12 +1,14 @@ package main import ( + "context" "fmt" "log" "net" "os" "path/filepath" "strings" + "time" "github.com/pkg/errors" @@ -25,31 +27,42 @@ func main() { os.Exit(0) } + manualRecovery, err := fileExists("/var/lib/mysql/sleep-forever") + if err != nil { + log.Fatalf("check /var/lib/mysql/sleep-forever: %s", err) + } + if manualRecovery { + os.Exit(0) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + switch os.Args[1] { case "readiness": switch os.Getenv("CLUSTER_TYPE") { case "async": - if err := checkReadinessAsync(); err != nil { + if err := checkReadinessAsync(ctx); err != nil { log.Fatalf("readiness check failed: %v", err) } case "group-replication": - if err := checkReadinessGR(); err != nil { + if err := checkReadinessGR(ctx); err != nil { log.Fatalf("readiness check failed: %v", err) } } case "liveness": switch os.Getenv("CLUSTER_TYPE") { case "async": - if err := checkLivenessAsync(); err != nil { - log.Fatalf("readiness check failed: %v", err) + if err := checkLivenessAsync(ctx); err != nil { + log.Fatalf("liveness check failed: %v", err) } case "group-replication": - if err := checkLivenessGR(); err != nil { - log.Fatalf("readiness check failed: %v", err) + if err := checkLivenessGR(ctx); err != nil { + log.Fatalf("liveness check failed: %v", err) } } case "replication": - if err := checkReplication(); err != nil { + if err := checkReplication(ctx); err != nil { log.Fatalf("replication check failed: %v", err) } default: @@ -57,7 +70,7 @@ func main() { } } -func checkReadinessAsync() error { +func checkReadinessAsync(ctx context.Context) error { podIP, err := getPodIP() if err != nil { return errors.Wrap(err, "get pod IP") @@ -68,19 +81,19 @@ func checkReadinessAsync() error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } defer db.Close() - readOnly, err := db.IsReadonly() + readOnly, err := db.IsReadonly(ctx) if err != nil { return errors.Wrap(err, "check read only status") } // if isReplica is true, replication is active - isReplica, err := db.IsReplica() + isReplica, err := db.IsReplica(ctx) if err != nil { return errors.Wrap(err, "check replica status") } @@ -92,7 +105,7 @@ func checkReadinessAsync() error { return nil } -func checkReadinessGR() error { +func checkReadinessGR(ctx context.Context) error { podIP, err := getPodIP() if err != nil { return errors.Wrap(err, "get pod IP") @@ -103,7 +116,7 @@ func checkReadinessGR() error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } @@ -114,7 +127,7 @@ func checkReadinessGR() error { return errors.Wrap(err, "get pod hostname") } - state, err := db.GetMemberState(fqdn) + state, err := db.GetMemberState(ctx, fqdn) if err != nil { return errors.Wrap(err, "get member state") } @@ -126,7 +139,7 @@ func checkReadinessGR() error { return nil } -func checkLivenessAsync() error { +func checkLivenessAsync(ctx context.Context) error { podIP, err := getPodIP() if err != nil { return errors.Wrap(err, "get pod IP") @@ -137,16 +150,16 @@ func checkLivenessAsync() error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } defer db.Close() - return db.DumbQuery() + return db.DumbQuery(ctx) } -func checkLivenessGR() error { +func checkLivenessGR(ctx context.Context) error { podIP, err := getPodIP() if err != nil { return errors.Wrap(err, "get pod IP") @@ -157,13 +170,13 @@ func checkLivenessGR() error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } defer db.Close() - in, err := db.CheckIfInPrimaryPartition() + in, err := db.CheckIfInPrimaryPartition(ctx) if err != nil { return errors.Wrap(err, "check if member in primary partition") } @@ -177,7 +190,7 @@ func checkLivenessGR() error { return nil } -func checkReplication() error { +func checkReplication(ctx context.Context) error { podIP, err := getPodIP() if err != nil { return errors.Wrap(err, "get pod IP") @@ -188,14 +201,14 @@ func checkReplication() error { return errors.Wrapf(err, "get %s password", apiv1alpha1.UserMonitor) } - db, err := replicator.NewReplicator(apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserMonitor, monitorPass, podIP, mysql.DefaultAdminPort) if err != nil { return errors.Wrap(err, "connect to db") } defer db.Close() // if isReplica is true, replication is active - isReplica, err := db.IsReplica() + isReplica, err := db.IsReplica(ctx) if err != nil { return errors.Wrap(err, "check replica status") } diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 08e62e76a..b9f76b1f0 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -117,6 +117,7 @@ func main() { Client: nsClient, Scheme: mgr.GetScheme(), ServerVersion: serverVersion, + Recorder: mgr.GetEventRecorderFor("ps-controller"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ps-controller") os.Exit(1) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index b429cf985..8c9e21f94 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -21,6 +21,13 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index fdaa117de..95611f24a 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -8488,6 +8488,13 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index 6701f519b..f45d300c6 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -67,6 +67,13 @@ rules: - patch - update - watch +- apiGroups: + - "" + resources: + - events + verbs: + - create + - patch - apiGroups: - "" resources: diff --git a/pkg/controller/ps/controller.go b/pkg/controller/ps/controller.go index 354bed1ac..7adec1eeb 100644 --- a/pkg/controller/ps/controller.go +++ b/pkg/controller/ps/controller.go @@ -17,6 +17,7 @@ limitations under the License. package ps import ( + "bytes" "context" "crypto/md5" "encoding/json" @@ -37,6 +38,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" k8sretry "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -44,6 +46,7 @@ import ( "github.com/percona/percona-server-mysql-operator/api/v1alpha1" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" "github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore" "github.com/percona/percona-server-mysql-operator/pkg/haproxy" "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" @@ -62,10 +65,12 @@ type PerconaServerMySQLReconciler struct { client.Client Scheme *runtime.Scheme ServerVersion *platform.ServerVersion + Recorder record.EventRecorder } //+kubebuilder:rbac:groups=ps.percona.com,resources=perconaservermysqls;perconaservermysqls/status;perconaservermysqls/finalizers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups="",resources=pods;pods/exec;configmaps;services;secrets,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch //+kubebuilder:rbac:groups=apps,resources=statefulsets;deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=certmanager.k8s.io;cert-manager.io,resources=issuers;certificates,verbs=get;list;watch;create;update;patch;delete;deletecollection @@ -221,7 +226,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr * podFQDN := fmt.Sprintf("%s.%s.%s", pod.Name, mysql.ServiceName(cr), cr.Namespace) - state, err := db.GetMemberState(podFQDN) + state, err := db.GetMemberState(ctx, podFQDN) if err != nil { return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name) } @@ -335,6 +340,9 @@ func (r *PerconaServerMySQLReconciler) doReconcile( ) error { log := logf.FromContext(ctx).WithName("doReconcile") + if err := r.reconcileFullClusterCrash(ctx, cr); err != nil { + return errors.Wrap(err, "failed to check full cluster crash") + } if err := r.reconcileVersions(ctx, cr); err != nil { log.Error(err, "failed to reconcile versions") } @@ -1182,6 +1190,38 @@ func (r *PerconaServerMySQLReconciler) reconcileCRStatus(ctx context.Context, cr cr.Status.State = cr.Status.MySQL.State } + if cr.Spec.MySQL.IsGR() { + cli, err := clientcmd.NewClient() + if err != nil { + return err + } + + pods, err := k8s.PodsByLabels(ctx, r.Client, mysql.MatchLabels(cr)) + if err != nil { + return errors.Wrap(err, "get pods") + } + + var outb, errb bytes.Buffer + cmd := []string{"/bin/bash", "-c", "cat /var/lib/mysql/full-cluster-crash"} + fullClusterCrash := false + for _, pod := range pods { + err = cli.Exec(ctx, &pod, "mysql", cmd, nil, &outb, &errb, false) + if err != nil { + if strings.Contains(errb.String(), "No such file or directory") { + continue + } + return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", cmd, outb.String(), errb.String()) + } + + fullClusterCrash = true + } + + if fullClusterCrash { + cr.Status.State = apiv1alpha1.StateError + r.Recorder.Event(cr, "Warning", "FullClusterCrashDetected", "Full cluster crash detected") + } + } + cr.Status.Host, err = appHost(ctx, r.Client, cr) if err != nil { return errors.Wrap(err, "get app host") @@ -1352,7 +1392,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr return "", errors.Wrapf(err, "open connection to %s", fqdn) } - return db.GetGroupReplicationPrimary() + return db.GetGroupReplicationPrimary(ctx) } func (r *PerconaServerMySQLReconciler) getPrimaryHost(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (string, error) { @@ -1407,14 +1447,14 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context, return errors.Wrapf(err, "stop replica %s", hostname) } - status, _, err := repDb.ReplicationStatus() + status, _, err := repDb.ReplicationStatus(ctx) if err != nil { return errors.Wrapf(err, "get replication status of %s", hostname) } for status == replicator.ReplicationStatusActive { time.Sleep(250 * time.Millisecond) - status, _, err = repDb.ReplicationStatus() + status, _, err = repDb.ReplicationStatus(ctx) if err != nil { return errors.Wrapf(err, "get replication status of %s", hostname) } @@ -1462,7 +1502,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context defer db.Close() log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname) - if err := db.ChangeReplicationSource(primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil { + if err := db.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil { return errors.Wrapf(err, "change replication source on %s", hostname) } @@ -1498,7 +1538,7 @@ func (r *PerconaServerMySQLReconciler) restartGroupReplication(ctx context.Conte } defer db.Close() - replicas, err := db.GetGroupReplicationReplicas() + replicas, err := db.GetGroupReplicationReplicas(ctx) if err != nil { return errors.Wrap(err, "get replicas") } @@ -1519,23 +1559,23 @@ func (r *PerconaServerMySQLReconciler) restartGroupReplication(ctx context.Conte } defer db.Close() - if err := db.StopGroupReplication(); err != nil { + if err := db.StopGroupReplication(ctx); err != nil { return errors.Wrapf(err, "stop group replication on %s", host) } log.V(1).Info("Stopped group replication", "hostname", host) - if err := db.ChangeGroupReplicationPassword(replicaPass); err != nil { + if err := db.ChangeGroupReplicationPassword(ctx, replicaPass); err != nil { return errors.Wrapf(err, "change group replication password on %s", host) } log.V(1).Info("Changed group replication password", "hostname", host) - if err := db.StartGroupReplication(replicaPass); err != nil { + if err := db.StartGroupReplication(ctx, replicaPass); err != nil { return errors.Wrapf(err, "start group replication on %s", host) } log.V(1).Info("Started group replication", "hostname", host) } - primary, err := db.GetGroupReplicationPrimary() + primary, err := db.GetGroupReplicationPrimary(ctx) if err != nil { return errors.Wrap(err, "get primary member") } @@ -1555,17 +1595,17 @@ func (r *PerconaServerMySQLReconciler) restartGroupReplication(ctx context.Conte } defer db.Close() - if err := db.StopGroupReplication(); err != nil { + if err := db.StopGroupReplication(ctx); err != nil { return errors.Wrapf(err, "stop group replication on %s", primary) } log.V(1).Info("Stopped group replication", "hostname", primary) - if err := db.ChangeGroupReplicationPassword(replicaPass); err != nil { + if err := db.ChangeGroupReplicationPassword(ctx, replicaPass); err != nil { return errors.Wrapf(err, "change group replication password on %s", primary) } log.V(1).Info("Changed group replication password", "hostname", primary) - if err := db.StartGroupReplication(replicaPass); err != nil { + if err := db.StartGroupReplication(ctx, replicaPass); err != nil { return errors.Wrapf(err, "start group replication on %s", primary) } log.V(1).Info("Started group replication", "hostname", primary) diff --git a/pkg/controller/ps/crash_recovery.go b/pkg/controller/ps/crash_recovery.go new file mode 100644 index 000000000..9d73da08a --- /dev/null +++ b/pkg/controller/ps/crash_recovery.go @@ -0,0 +1,112 @@ +package ps + +import ( + "bytes" + "context" + "fmt" + "strings" + + "github.com/pkg/errors" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" + "github.com/percona/percona-server-mysql-operator/pkg/k8s" + "github.com/percona/percona-server-mysql-operator/pkg/mysql" + "github.com/percona/percona-server-mysql-operator/pkg/mysqlsh" +) + +func (r *PerconaServerMySQLReconciler) reconcileFullClusterCrash(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) error { + log := logf.FromContext(ctx).WithName("Crash recovery") + + cli, err := clientcmd.NewClient() + if err != nil { + return err + } + + pods, err := k8s.PodsByLabels(ctx, r.Client, mysql.MatchLabels(cr)) + if err != nil { + return errors.Wrap(err, "get pods") + } + + if len(pods) < int(cr.MySQLSpec().Size) { + return nil + } + + // we need every pod to be ready to reboot + for _, pod := range pods { + if !k8s.IsPodReady(pod) { + return nil + } + } + + operatorPass, err := k8s.UserPassword(ctx, r.Client, cr, apiv1alpha1.UserOperator) + if err != nil { + return errors.Wrap(err, "get operator password") + } + + var outb, errb bytes.Buffer + cmd := []string{"/bin/bash", "-c", "cat /var/lib/mysql/full-cluster-crash"} + + for _, pod := range pods { + err = cli.Exec(ctx, &pod, "mysql", cmd, nil, &outb, &errb, false) + if err != nil { + if strings.Contains(errb.String(), "No such file or directory") { + continue + } + return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", cmd, outb.String(), errb.String()) + } + + log.Info("Pod is waiting for recovery", "pod", pod.Name, "gtidExecuted", outb.String()) + + podFQDN := fmt.Sprintf("%s.%s.%s", pod.Name, mysql.ServiceName(cr), cr.Namespace) + podUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, podFQDN) + + mysh, err := mysqlsh.NewWithExec(&pod, podUri) + if err != nil { + return err + } + + err = mysh.RebootClusterFromCompleteOutageWithExec(ctx, cr.InnoDBClusterName()) + if err == nil { + log.Info("Cluster was successfully rebooted") + r.Recorder.Event(cr, "Normal", "FullClusterCrashRecovered", "Cluster recovered from full cluster crash") + err := r.cleanupFullClusterCrashFile(ctx, cr) + if err != nil { + log.Error(err, "failed to remove /var/lib/mysql/full-cluster-crash") + } + break + } + } + + return nil +} + +func (r *PerconaServerMySQLReconciler) cleanupFullClusterCrashFile(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) error { + log := logf.FromContext(ctx) + + cli, err := clientcmd.NewClient() + if err != nil { + return err + } + + pods, err := k8s.PodsByLabels(ctx, r.Client, mysql.MatchLabels(cr)) + if err != nil { + return errors.Wrap(err, "get pods") + } + + var outb, errb bytes.Buffer + cmd := []string{"/bin/bash", "-c", "rm /var/lib/mysql/full-cluster-crash"} + for _, pod := range pods { + err = cli.Exec(ctx, &pod, "mysql", cmd, nil, &outb, &errb, false) + if err != nil { + if strings.Contains(errb.String(), "No such file or directory") { + continue + } + return errors.Wrapf(err, "run %s, stdout: %s, stderr: %s", cmd, outb.String(), errb.String()) + } + log.V(1).Info("Removed /var/lib/mysql/full-cluster-crash", "pod", pod.Name) + } + + return nil +} diff --git a/pkg/mysql/topology/topology.go b/pkg/mysql/topology/topology.go index f6e0ca928..32a5b143a 100644 --- a/pkg/mysql/topology/topology.go +++ b/pkg/mysql/topology/topology.go @@ -21,7 +21,7 @@ func Get(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL, operatorP var top Topology switch cluster.Spec.MySQL.ClusterType { case apiv1alpha1.ClusterTypeGR: - top, err = getGRTopology(cluster, operatorPass) + top, err = getGRTopology(ctx, cluster, operatorPass) if err != nil { return Topology{}, errors.Wrap(err, "get group-replication topology") } @@ -36,20 +36,20 @@ func Get(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL, operatorP return top, nil } -func getGRTopology(cluster *apiv1alpha1.PerconaServerMySQL, operatorPass string) (Topology, error) { +func getGRTopology(ctx context.Context, cluster *apiv1alpha1.PerconaServerMySQL, operatorPass string) (Topology, error) { fqdn := mysql.FQDN(cluster, 0) - db, err := replicator.NewReplicator(apiv1alpha1.UserOperator, operatorPass, fqdn, mysql.DefaultAdminPort) + db, err := replicator.NewReplicator(ctx, apiv1alpha1.UserOperator, operatorPass, fqdn, mysql.DefaultAdminPort) if err != nil { return Topology{}, errors.Wrapf(err, "open connection to %s", fqdn) } defer db.Close() - replicas, err := db.GetGroupReplicationReplicas() + replicas, err := db.GetGroupReplicationReplicas(ctx) if err != nil { return Topology{}, errors.Wrap(err, "get group-replication replicas") } - primary, err := db.GetGroupReplicationPrimary() + primary, err := db.GetGroupReplicationPrimary(ctx) if err != nil { return Topology{}, errors.Wrap(err, "get group-replication primary") } diff --git a/pkg/mysqlsh/mysqlshexec.go b/pkg/mysqlsh/mysqlshexec.go index 6e798c6b0..ff30ece5a 100644 --- a/pkg/mysqlsh/mysqlshexec.go +++ b/pkg/mysqlsh/mysqlshexec.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "strings" corev1 "k8s.io/api/core/v1" @@ -173,12 +172,8 @@ func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) return status.DefaultReplicaSet.Topology, nil } -func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string, rejoinInstances []string) error { - cmd := fmt.Sprintf( - "dba.rebootClusterFromCompleteOutage('%s', {rejoinInstances: ['%s'], removeInstances: []})", - clusterName, - strings.Join(rejoinInstances, ","), - ) +func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error { + cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName) if err := m.runWithExec(ctx, cmd); err != nil { return errors.Wrap(err, "reboot cluster from complete outage") diff --git a/pkg/replicator/replicator.go b/pkg/replicator/replicator.go index 55af2d3e1..5fba5adee 100644 --- a/pkg/replicator/replicator.go +++ b/pkg/replicator/replicator.go @@ -1,6 +1,7 @@ package replicator import ( + "context" "database/sql" "fmt" @@ -8,6 +9,7 @@ import ( "github.com/pkg/errors" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" ) const DefaultChannelName = "" @@ -35,36 +37,37 @@ const ( var ErrGroupReplicationNotReady = errors.New("Error 3092: The server is not configured properly to be an active member of the group.") type Replicator interface { - ChangeReplicationSource(host, replicaPass string, port int32) error - StartReplication(host, replicaPass string, port int32) error - StopReplication() error - ResetReplication() error - ReplicationStatus() (ReplicationStatus, string, error) - EnableSuperReadonly() error - IsReadonly() (bool, error) - ReportHost() (string, error) + ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error + StartReplication(ctx context.Context, host, replicaPass string, port int32) error + StopReplication(ctx context.Context) error + ResetReplication(ctx context.Context) error + ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) + EnableSuperReadonly(ctx context.Context) error + IsReadonly(ctx context.Context) (bool, error) + ReportHost(ctx context.Context) (string, error) Close() error - CloneInProgress() (bool, error) - NeedsClone(donor string, port int32) (bool, error) - Clone(donor, user, pass string, port int32) error - IsReplica() (bool, error) - DumbQuery() error - GetGlobal(variable string) (interface{}, error) - SetGlobal(variable, value interface{}) error - ChangeGroupReplicationPassword(replicaPass string) error - StartGroupReplication(password string) error - StopGroupReplication() error - GetGroupReplicationPrimary() (string, error) - GetGroupReplicationReplicas() ([]string, error) - GetMemberState(host string) (MemberState, error) - GetGroupReplicationMembers() ([]string, error) - CheckIfDatabaseExists(name string) (bool, error) - CheckIfInPrimaryPartition() (bool, error) + CloneInProgress(ctx context.Context) (bool, error) + NeedsClone(ctx context.Context, donor string, port int32) (bool, error) + Clone(ctx context.Context, donor, user, pass string, port int32) error + IsReplica(ctx context.Context) (bool, error) + DumbQuery(ctx context.Context) error + GetGlobal(ctx context.Context, variable string) (interface{}, error) + SetGlobal(ctx context.Context, variable, value interface{}) error + ChangeGroupReplicationPassword(ctx context.Context, replicaPass string) error + StartGroupReplication(ctx context.Context, password string) error + StopGroupReplication(ctx context.Context) error + GetGroupReplicationPrimary(ctx context.Context) (string, error) + GetGroupReplicationReplicas(ctx context.Context) ([]string, error) + GetMemberState(ctx context.Context, host string) (MemberState, error) + GetGroupReplicationMembers(ctx context.Context) ([]string, error) + CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) + CheckIfInPrimaryPartition(ctx context.Context) (bool, error) + CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) } type dbImpl struct{ db *sql.DB } -func NewReplicator(user apiv1alpha1.SystemUser, pass, host string, port int32) (Replicator, error) { +func NewReplicator(ctx context.Context, user apiv1alpha1.SystemUser, pass, host string, port int32) (Replicator, error) { config := mysql.NewConfig() config.User = string(user) @@ -74,9 +77,9 @@ func NewReplicator(user apiv1alpha1.SystemUser, pass, host string, port int32) ( config.DBName = "performance_schema" config.Params = map[string]string{ "interpolateParams": "true", - "timeout": "20s", - "readTimeout": "20s", - "writeTimeout": "20s", + "timeout": "10s", + "readTimeout": "10s", + "writeTimeout": "10s", "tls": "preferred", } @@ -85,16 +88,16 @@ func NewReplicator(user apiv1alpha1.SystemUser, pass, host string, port int32) ( return nil, errors.Wrap(err, "connect to MySQL") } - if err := db.Ping(); err != nil { + if err := db.PingContext(ctx); err != nil { return nil, errors.Wrap(err, "ping database") } return &dbImpl{db}, nil } -func (d *dbImpl) ChangeReplicationSource(host, replicaPass string, port int32) error { +func (d *dbImpl) ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error { // TODO: Make retries configurable - _, err := d.db.Exec(` + _, err := d.db.ExecContext(ctx, ` CHANGE REPLICATION SOURCE TO SOURCE_USER=?, SOURCE_PASSWORD=?, @@ -113,27 +116,27 @@ func (d *dbImpl) ChangeReplicationSource(host, replicaPass string, port int32) e return nil } -func (d *dbImpl) StartReplication(host, replicaPass string, port int32) error { - if err := d.ChangeReplicationSource(host, replicaPass, port); err != nil { +func (d *dbImpl) StartReplication(ctx context.Context, host, replicaPass string, port int32) error { + if err := d.ChangeReplicationSource(ctx, host, replicaPass, port); err != nil { return errors.Wrap(err, "change replication source") } - _, err := d.db.Exec("START REPLICA") + _, err := d.db.ExecContext(ctx, "START REPLICA") return errors.Wrap(err, "start replication") } -func (d *dbImpl) StopReplication() error { - _, err := d.db.Exec("STOP REPLICA") +func (d *dbImpl) StopReplication(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "STOP REPLICA") return errors.Wrap(err, "stop replication") } -func (d *dbImpl) ResetReplication() error { - _, err := d.db.Exec("RESET REPLICA ALL") +func (d *dbImpl) ResetReplication(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "RESET REPLICA ALL") return errors.Wrap(err, "reset replication") } -func (d *dbImpl) ReplicationStatus() (ReplicationStatus, string, error) { - row := d.db.QueryRow(` +func (d *dbImpl) ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) { + row := d.db.QueryRowContext(ctx, ` SELECT connection_status.SERVICE_STATE, applier_status.SERVICE_STATE, @@ -161,25 +164,25 @@ func (d *dbImpl) ReplicationStatus() (ReplicationStatus, string, error) { return ReplicationStatusNotInitiated, "", nil } -func (d *dbImpl) IsReplica() (bool, error) { - status, _, err := d.ReplicationStatus() +func (d *dbImpl) IsReplica(ctx context.Context) (bool, error) { + status, _, err := d.ReplicationStatus(ctx) return status == ReplicationStatusActive, errors.Wrap(err, "get replication status") } -func (d *dbImpl) EnableSuperReadonly() error { - _, err := d.db.Exec("SET GLOBAL SUPER_READ_ONLY=1") +func (d *dbImpl) EnableSuperReadonly(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "SET GLOBAL SUPER_READ_ONLY=1") return errors.Wrap(err, "set global super_read_only param to 1") } -func (d *dbImpl) IsReadonly() (bool, error) { +func (d *dbImpl) IsReadonly(ctx context.Context) (bool, error) { var readonly int - err := d.db.QueryRow("select @@read_only and @@super_read_only").Scan(&readonly) + err := d.db.QueryRowContext(ctx, "select @@read_only and @@super_read_only").Scan(&readonly) return readonly == 1, errors.Wrap(err, "select global read_only param") } -func (d *dbImpl) ReportHost() (string, error) { +func (d *dbImpl) ReportHost(ctx context.Context) (string, error) { var reportHost string - err := d.db.QueryRow("select @@report_host").Scan(&reportHost) + err := d.db.QueryRowContext(ctx, "select @@report_host").Scan(&reportHost) return reportHost, errors.Wrap(err, "select report_host param") } @@ -187,8 +190,8 @@ func (d *dbImpl) Close() error { return d.db.Close() } -func (d *dbImpl) CloneInProgress() (bool, error) { - rows, err := d.db.Query("SELECT STATE FROM clone_status") +func (d *dbImpl) CloneInProgress(ctx context.Context) (bool, error) { + rows, err := d.db.QueryContext(ctx, "SELECT STATE FROM clone_status") if err != nil { return false, errors.Wrap(err, "fetch clone status") } @@ -208,8 +211,8 @@ func (d *dbImpl) CloneInProgress() (bool, error) { return false, nil } -func (d *dbImpl) NeedsClone(donor string, port int32) (bool, error) { - rows, err := d.db.Query("SELECT SOURCE, STATE FROM clone_status") +func (d *dbImpl) NeedsClone(ctx context.Context, donor string, port int32) (bool, error) { + rows, err := d.db.QueryContext(ctx, "SELECT SOURCE, STATE FROM clone_status") if err != nil { return false, errors.Wrap(err, "fetch clone status") } @@ -228,13 +231,13 @@ func (d *dbImpl) NeedsClone(donor string, port int32) (bool, error) { return true, nil } -func (d *dbImpl) Clone(donor, user, pass string, port int32) error { - _, err := d.db.Exec("SET GLOBAL clone_valid_donor_list=?", fmt.Sprintf("%s:%d", donor, port)) +func (d *dbImpl) Clone(ctx context.Context, donor, user, pass string, port int32) error { + _, err := d.db.ExecContext(ctx, "SET GLOBAL clone_valid_donor_list=?", fmt.Sprintf("%s:%d", donor, port)) if err != nil { return errors.Wrap(err, "set clone_valid_donor_list") } - _, err = d.db.Exec("CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", user, donor, port, pass) + _, err = d.db.ExecContext(ctx, "CLONE INSTANCE FROM ?@?:? IDENTIFIED BY ?", user, donor, port, pass) mErr, ok := err.(*mysql.MySQLError) if !ok { @@ -249,25 +252,25 @@ func (d *dbImpl) Clone(donor, user, pass string, port int32) error { return nil } -func (d *dbImpl) DumbQuery() error { - _, err := d.db.Exec("SELECT 1") +func (d *dbImpl) DumbQuery(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "SELECT 1") return errors.Wrap(err, "SELECT 1") } -func (d *dbImpl) GetGlobal(variable string) (interface{}, error) { +func (d *dbImpl) GetGlobal(ctx context.Context, variable string) (interface{}, error) { // TODO: check how to do this without being vulnerable to injection var value interface{} - err := d.db.QueryRow(fmt.Sprintf("SELECT @@%s", variable)).Scan(&value) + err := d.db.QueryRowContext(ctx, fmt.Sprintf("SELECT @@%s", variable)).Scan(&value) return value, errors.Wrapf(err, "SELECT @@%s", variable) } -func (d *dbImpl) SetGlobal(variable, value interface{}) error { - _, err := d.db.Exec(fmt.Sprintf("SET GLOBAL %s=?", variable), value) +func (d *dbImpl) SetGlobal(ctx context.Context, variable, value interface{}) error { + _, err := d.db.ExecContext(ctx, fmt.Sprintf("SET GLOBAL %s=?", variable), value) return errors.Wrapf(err, "SET GLOBAL %s=%s", variable, value) } -func (d *dbImpl) StartGroupReplication(password string) error { - _, err := d.db.Exec("START GROUP_REPLICATION USER=?, PASSWORD=?", apiv1alpha1.UserReplication, password) +func (d *dbImpl) StartGroupReplication(ctx context.Context, password string) error { + _, err := d.db.ExecContext(ctx, "START GROUP_REPLICATION USER=?, PASSWORD=?", apiv1alpha1.UserReplication, password) mErr, ok := err.(*mysql.MySQLError) if !ok { @@ -282,13 +285,13 @@ func (d *dbImpl) StartGroupReplication(password string) error { return errors.Wrap(err, "start group replication") } -func (d *dbImpl) StopGroupReplication() error { - _, err := d.db.Exec("STOP GROUP_REPLICATION") +func (d *dbImpl) StopGroupReplication(ctx context.Context) error { + _, err := d.db.ExecContext(ctx, "STOP GROUP_REPLICATION") return errors.Wrap(err, "stop group replication") } -func (d *dbImpl) ChangeGroupReplicationPassword(replicaPass string) error { - _, err := d.db.Exec(` +func (d *dbImpl) ChangeGroupReplicationPassword(ctx context.Context, replicaPass string) error { + _, err := d.db.ExecContext(ctx, ` CHANGE REPLICATION SOURCE TO SOURCE_USER=?, SOURCE_PASSWORD=? @@ -301,10 +304,10 @@ func (d *dbImpl) ChangeGroupReplicationPassword(replicaPass string) error { return nil } -func (d *dbImpl) GetGroupReplicationPrimary() (string, error) { +func (d *dbImpl) GetGroupReplicationPrimary(ctx context.Context) (string, error) { var host string - err := d.db.QueryRow("SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'").Scan(&host) + err := d.db.QueryRowContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'").Scan(&host) if err != nil { return "", errors.Wrap(err, "query primary member") } @@ -312,10 +315,10 @@ func (d *dbImpl) GetGroupReplicationPrimary() (string, error) { return host, nil } -func (d *dbImpl) GetGroupReplicationReplicas() ([]string, error) { +func (d *dbImpl) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) { replicas := make([]string, 0) - rows, err := d.db.Query("SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'") + rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'") if err != nil { return nil, errors.Wrap(err, "query replicas") } @@ -333,10 +336,10 @@ func (d *dbImpl) GetGroupReplicationReplicas() ([]string, error) { return replicas, nil } -func (d *dbImpl) GetMemberState(host string) (MemberState, error) { +func (d *dbImpl) GetMemberState(ctx context.Context, host string) (MemberState, error) { var state MemberState - err := d.db.QueryRow("SELECT MEMBER_STATE FROM replication_group_members WHERE MEMBER_HOST=?", host).Scan(&state) + err := d.db.QueryRowContext(ctx, "SELECT MEMBER_STATE FROM replication_group_members WHERE MEMBER_HOST=?", host).Scan(&state) if err != nil { if errors.Is(err, sql.ErrNoRows) { return MemberStateOffline, nil @@ -347,10 +350,10 @@ func (d *dbImpl) GetMemberState(host string) (MemberState, error) { return state, nil } -func (d *dbImpl) GetGroupReplicationMembers() ([]string, error) { +func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { members := make([]string, 0) - rows, err := d.db.Query("SELECT MEMBER_HOST FROM replication_group_members") + rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members") if err != nil { return nil, errors.Wrap(err, "query members") } @@ -368,10 +371,10 @@ func (d *dbImpl) GetGroupReplicationMembers() ([]string, error) { return members, nil } -func (d *dbImpl) CheckIfDatabaseExists(name string) (bool, error) { +func (d *dbImpl) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) { var db string - err := d.db.QueryRow("SHOW DATABASES LIKE ?", name).Scan(&db) + err := d.db.QueryRowContext(ctx, "SHOW DATABASES LIKE ?", name).Scan(&db) if err != nil { if errors.Is(err, sql.ErrNoRows) { return false, nil @@ -382,10 +385,10 @@ func (d *dbImpl) CheckIfDatabaseExists(name string) (bool, error) { return true, nil } -func (d *dbImpl) CheckIfInPrimaryPartition() (bool, error) { +func (d *dbImpl) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) { var in bool - err := d.db.QueryRow(` + err := d.db.QueryRowContext(ctx, ` SELECT MEMBER_STATE = 'ONLINE' AND ( @@ -417,3 +420,21 @@ func (d *dbImpl) CheckIfInPrimaryPartition() (bool, error) { return in, nil } + +func (d *dbImpl) CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) { + var state string + + err := d.db.QueryRowContext(ctx, ` + SELECT + MEMBER_STATE + FROM + performance_schema.replication_group_members + WHERE + MEMBER_ROLE = 'PRIMARY' + `).Scan(&state) + if err != nil { + return false, err + } + + return state == string(innodbcluster.MemberStateUnreachable), nil +} diff --git a/pkg/replicator/replicatorexec.go b/pkg/replicator/replicatorexec.go index 478dc58fb..03d6bd93e 100644 --- a/pkg/replicator/replicatorexec.go +++ b/pkg/replicator/replicatorexec.go @@ -16,6 +16,7 @@ import ( apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" + "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" ) var sensitiveRegexp = regexp.MustCompile(":.*@") @@ -37,10 +38,10 @@ func NewReplicatorExec(pod *corev1.Pod, user apiv1alpha1.SystemUser, pass, host return &dbImplExec{client: c, pod: pod, user: user, pass: pass, host: host}, nil } -func (d *dbImplExec) exec(stm string, stdout, stderr *bytes.Buffer) error { +func (d *dbImplExec) exec(ctx context.Context, stm string, stdout, stderr *bytes.Buffer) error { cmd := []string{"mysql", "--database", "performance_schema", fmt.Sprintf("-p%s", d.pass), "-u", string(d.user), "-h", d.host, "-e", stm} - err := d.client.Exec(context.TODO(), d.pod, "mysql", cmd, nil, stdout, stderr, false) + err := d.client.Exec(ctx, d.pod, "mysql", cmd, nil, stdout, stderr, false) if err != nil { sout := sensitiveRegexp.ReplaceAllString(stdout.String(), ":*****@") serr := sensitiveRegexp.ReplaceAllString(stderr.String(), ":*****@") @@ -54,9 +55,9 @@ func (d *dbImplExec) exec(stm string, stdout, stderr *bytes.Buffer) error { return nil } -func (d *dbImplExec) query(query string, out interface{}) error { +func (d *dbImplExec) query(ctx context.Context, query string, out interface{}) error { var errb, outb bytes.Buffer - err := d.exec(query, &outb, &errb) + err := d.exec(ctx, query, &outb, &errb) if err != nil { return err } @@ -75,7 +76,7 @@ func (d *dbImplExec) query(query string, out interface{}) error { return nil } -func (d *dbImplExec) ChangeReplicationSource(host, replicaPass string, port int32) error { +func (d *dbImplExec) ChangeReplicationSource(ctx context.Context, host, replicaPass string, port int32) error { var errb, outb bytes.Buffer q := fmt.Sprintf(` CHANGE REPLICATION SOURCE TO @@ -89,7 +90,7 @@ func (d *dbImplExec) ChangeReplicationSource(host, replicaPass string, port int3 SOURCE_RETRY_COUNT=3, SOURCE_CONNECT_RETRY=60 `, apiv1alpha1.UserReplication, replicaPass, host, port) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) if err != nil { return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") @@ -98,30 +99,30 @@ func (d *dbImplExec) ChangeReplicationSource(host, replicaPass string, port int3 return nil } -func (d *dbImplExec) StartReplication(host, replicaPass string, port int32) error { - if err := d.ChangeReplicationSource(host, replicaPass, port); err != nil { +func (d *dbImplExec) StartReplication(ctx context.Context, host, replicaPass string, port int32) error { + if err := d.ChangeReplicationSource(ctx, host, replicaPass, port); err != nil { return errors.Wrap(err, "change replication source") } var errb, outb bytes.Buffer - err := d.exec("START REPLICA", &outb, &errb) + err := d.exec(ctx, "START REPLICA", &outb, &errb) return errors.Wrap(err, "start replication") } -func (d *dbImplExec) StopReplication() error { +func (d *dbImplExec) StopReplication(ctx context.Context) error { var errb, outb bytes.Buffer - err := d.exec("STOP REPLICA", &outb, &errb) + err := d.exec(ctx, "STOP REPLICA", &outb, &errb) return errors.Wrap(err, "stop replication") } -func (d *dbImplExec) ResetReplication() error { +func (d *dbImplExec) ResetReplication(ctx context.Context) error { var errb, outb bytes.Buffer - err := d.exec("RESET REPLICA ALL", &outb, &errb) + err := d.exec(ctx, "RESET REPLICA ALL", &outb, &errb) return errors.Wrap(err, "reset replication") } -func (d *dbImplExec) ReplicationStatus() (ReplicationStatus, string, error) { +func (d *dbImplExec) ReplicationStatus(ctx context.Context) (ReplicationStatus, string, error) { rows := []*struct { IoState string `csv:"conn_state"` SqlState string `csv:"applier_state"` @@ -140,7 +141,7 @@ func (d *dbImplExec) ReplicationStatus() (ReplicationStatus, string, error) { ON connection_status.channel_name = applier_status.channel_name WHERE connection_status.channel_name = '%s' `, DefaultChannelName) - err := d.query(q, &rows) + err := d.query(ctx, q, &rows) if err != nil { if errors.Is(err, sql.ErrNoRows) { return ReplicationStatusNotInitiated, "", nil @@ -155,23 +156,23 @@ func (d *dbImplExec) ReplicationStatus() (ReplicationStatus, string, error) { return ReplicationStatusNotInitiated, "", err } -func (d *dbImplExec) IsReplica() (bool, error) { - status, _, err := d.ReplicationStatus() +func (d *dbImplExec) IsReplica(ctx context.Context) (bool, error) { + status, _, err := d.ReplicationStatus(ctx) return status == ReplicationStatusActive, errors.Wrap(err, "get replication status") } -func (d *dbImplExec) EnableSuperReadonly() error { +func (d *dbImplExec) EnableSuperReadonly(ctx context.Context) error { var errb, outb bytes.Buffer - err := d.exec("SET GLOBAL SUPER_READ_ONLY=1", &outb, &errb) + err := d.exec(ctx, "SET GLOBAL SUPER_READ_ONLY=1", &outb, &errb) return errors.Wrap(err, "set global super_read_only param to 1") } -func (d *dbImplExec) IsReadonly() (bool, error) { +func (d *dbImplExec) IsReadonly(ctx context.Context) (bool, error) { rows := []*struct { Readonly int `csv:"readonly"` }{} - err := d.query("select @@read_only and @@super_read_only as readonly", &rows) + err := d.query(ctx, "select @@read_only and @@super_read_only as readonly", &rows) if err != nil { return false, err } @@ -179,12 +180,12 @@ func (d *dbImplExec) IsReadonly() (bool, error) { return rows[0].Readonly == 1, nil } -func (d *dbImplExec) ReportHost() (string, error) { +func (d *dbImplExec) ReportHost(ctx context.Context) (string, error) { rows := []*struct { Host string `csv:"host"` }{} - err := d.query("select @@report_host as host", &rows) + err := d.query(ctx, "select @@report_host as host", &rows) if err != nil { return "", err } @@ -196,11 +197,11 @@ func (d *dbImplExec) Close() error { return nil } -func (d *dbImplExec) CloneInProgress() (bool, error) { +func (d *dbImplExec) CloneInProgress(ctx context.Context) (bool, error) { rows := []*struct { State string `csv:"state"` }{} - err := d.query("SELECT STATE FROM clone_status as state", &rows) + err := d.query(ctx, "SELECT STATE FROM clone_status as state", &rows) if err != nil { return false, errors.Wrap(err, "fetch clone status") } @@ -214,12 +215,12 @@ func (d *dbImplExec) CloneInProgress() (bool, error) { return false, nil } -func (d *dbImplExec) NeedsClone(donor string, port int32) (bool, error) { +func (d *dbImplExec) NeedsClone(ctx context.Context, donor string, port int32) (bool, error) { rows := []*struct { Source string `csv:"source"` State string `csv:"state"` }{} - err := d.query("SELECT SOURCE as source, STATE as state FROM clone_status", &rows) + err := d.query(ctx, "SELECT SOURCE as source, STATE as state FROM clone_status", &rows) if err != nil { return false, errors.Wrap(err, "fetch clone status") } @@ -233,16 +234,16 @@ func (d *dbImplExec) NeedsClone(donor string, port int32) (bool, error) { return true, nil } -func (d *dbImplExec) Clone(donor, user, pass string, port int32) error { +func (d *dbImplExec) Clone(ctx context.Context, donor, user, pass string, port int32) error { var errb, outb bytes.Buffer q := fmt.Sprintf("SET GLOBAL clone_valid_donor_list='%s'", fmt.Sprintf("%s:%d", donor, port)) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) if err != nil { return errors.Wrap(err, "set clone_valid_donor_list") } q = fmt.Sprintf("CLONE INSTANCE FROM %s@%s:%d IDENTIFIED BY %s", user, donor, port, pass) - err = d.exec(q, &outb, &errb) + err = d.exec(ctx, q, &outb, &errb) if strings.Contains(errb.String(), "ERROR") { return errors.Wrap(err, "clone instance") @@ -256,34 +257,34 @@ func (d *dbImplExec) Clone(donor, user, pass string, port int32) error { return nil } -func (d *dbImplExec) DumbQuery() error { +func (d *dbImplExec) DumbQuery(ctx context.Context) error { var errb, outb bytes.Buffer - err := d.exec("SELECT 1", &outb, &errb) + err := d.exec(ctx, "SELECT 1", &outb, &errb) return errors.Wrap(err, "SELECT 1") } -func (d *dbImplExec) SetSemiSyncSource(enabled bool) error { +func (d *dbImplExec) SetSemiSyncSource(ctx context.Context, enabled bool) error { var errb, outb bytes.Buffer q := fmt.Sprintf("SET GLOBAL rpl_semi_sync_master_enabled=%t", enabled) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) return errors.Wrap(err, "set rpl_semi_sync_master_enabled") } -func (d *dbImplExec) SetSemiSyncSize(size int) error { +func (d *dbImplExec) SetSemiSyncSize(ctx context.Context, size int) error { var errb, outb bytes.Buffer q := fmt.Sprintf("SET GLOBAL rpl_semi_sync_master_wait_for_slave_count=%d", size) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) return errors.Wrap(err, "set rpl_semi_sync_master_wait_for_slave_count") } -func (d *dbImplExec) GetGlobal(variable string) (interface{}, error) { +func (d *dbImplExec) GetGlobal(ctx context.Context, variable string) (interface{}, error) { rows := []*struct { Val interface{} `csv:"val"` }{} // TODO: check how to do this without being vulnerable to injection - err := d.query(fmt.Sprintf("SELECT @@%s as val", variable), &rows) + err := d.query(ctx, fmt.Sprintf("SELECT @@%s as val", variable), &rows) if err != nil { return nil, errors.Wrapf(err, "SELECT @@%s", variable) } @@ -291,10 +292,10 @@ func (d *dbImplExec) GetGlobal(variable string) (interface{}, error) { return rows[0].Val, nil } -func (d *dbImplExec) SetGlobal(variable, value interface{}) error { +func (d *dbImplExec) SetGlobal(ctx context.Context, variable, value interface{}) error { var errb, outb bytes.Buffer q := fmt.Sprintf("SET GLOBAL %s=%s", variable, value) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) if err != nil { return errors.Wrapf(err, "SET GLOBAL %s=%s", variable, value) @@ -302,10 +303,10 @@ func (d *dbImplExec) SetGlobal(variable, value interface{}) error { return nil } -func (d *dbImplExec) StartGroupReplication(password string) error { +func (d *dbImplExec) StartGroupReplication(ctx context.Context, password string) error { var errb, outb bytes.Buffer q := fmt.Sprintf("START GROUP_REPLICATION USER='%s', PASSWORD='%s'", apiv1alpha1.UserReplication, password) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) mErr, ok := err.(*mysql.MySQLError) if !ok { @@ -320,13 +321,13 @@ func (d *dbImplExec) StartGroupReplication(password string) error { return errors.Wrap(err, "start group replication") } -func (d *dbImplExec) StopGroupReplication() error { +func (d *dbImplExec) StopGroupReplication(ctx context.Context) error { var errb, outb bytes.Buffer - err := d.exec("STOP GROUP_REPLICATION", &outb, &errb) + err := d.exec(ctx, "STOP GROUP_REPLICATION", &outb, &errb) return errors.Wrap(err, "stop group replication") } -func (d *dbImplExec) ChangeGroupReplicationPassword(replicaPass string) error { +func (d *dbImplExec) ChangeGroupReplicationPassword(ctx context.Context, replicaPass string) error { var errb, outb bytes.Buffer q := fmt.Sprintf(` CHANGE REPLICATION SOURCE TO @@ -335,7 +336,7 @@ func (d *dbImplExec) ChangeGroupReplicationPassword(replicaPass string) error { FOR CHANNEL 'group_replication_recovery' `, apiv1alpha1.UserReplication, replicaPass) - err := d.exec(q, &outb, &errb) + err := d.exec(ctx, q, &outb, &errb) if err != nil { return errors.Wrap(err, "exec CHANGE REPLICATION SOURCE TO") } @@ -343,12 +344,12 @@ func (d *dbImplExec) ChangeGroupReplicationPassword(replicaPass string) error { return nil } -func (d *dbImplExec) GetGroupReplicationPrimary() (string, error) { +func (d *dbImplExec) GetGroupReplicationPrimary(ctx context.Context) (string, error) { rows := []*struct { Host string `csv:"host"` }{} - err := d.query("SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'", &rows) + err := d.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='PRIMARY' AND MEMBER_STATE='ONLINE'", &rows) if err != nil { return "", errors.Wrap(err, "query primary member") } @@ -357,12 +358,12 @@ func (d *dbImplExec) GetGroupReplicationPrimary() (string, error) { } // TODO: finish implementation -func (d *dbImplExec) GetGroupReplicationReplicas() ([]string, error) { +func (d *dbImplExec) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) { rows := []*struct { Host string `csv:"host"` }{} - err := d.query("SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'", &rows) + err := d.query(ctx, "SELECT MEMBER_HOST as host FROM replication_group_members WHERE MEMBER_ROLE='SECONDARY' AND MEMBER_STATE='ONLINE'", &rows) if err != nil { return nil, errors.Wrap(err, "query replicas") } @@ -375,12 +376,12 @@ func (d *dbImplExec) GetGroupReplicationReplicas() ([]string, error) { return replicas, nil } -func (d *dbImplExec) GetMemberState(host string) (MemberState, error) { +func (d *dbImplExec) GetMemberState(ctx context.Context, host string) (MemberState, error) { rows := []*struct { State MemberState `csv:"state"` }{} q := fmt.Sprintf(`SELECT MEMBER_STATE as state FROM replication_group_members WHERE MEMBER_HOST='%s'`, host) - err := d.query(q, &rows) + err := d.query(ctx, q, &rows) if err != nil { if errors.Is(err, sql.ErrNoRows) { return MemberStateOffline, nil @@ -391,12 +392,12 @@ func (d *dbImplExec) GetMemberState(host string) (MemberState, error) { return rows[0].State, nil } -func (d *dbImplExec) GetGroupReplicationMembers() ([]string, error) { +func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { rows := []*struct { Member string `csv:"member"` }{} - err := d.query("SELECT MEMBER_HOST as member FROM replication_group_members", &rows) + err := d.query(ctx, "SELECT MEMBER_HOST as member FROM replication_group_members", &rows) if err != nil { return nil, errors.Wrap(err, "query members") } @@ -409,13 +410,13 @@ func (d *dbImplExec) GetGroupReplicationMembers() ([]string, error) { return members, nil } -func (d *dbImplExec) CheckIfDatabaseExists(name string) (bool, error) { +func (d *dbImplExec) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) { rows := []*struct { DB string `csv:"db"` }{} q := fmt.Sprintf("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%s'", name) - err := d.query(q, &rows) + err := d.query(ctx, q, &rows) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -428,12 +429,12 @@ func (d *dbImplExec) CheckIfDatabaseExists(name string) (bool, error) { } // TODO: finish implementation -func (d *dbImplExec) CheckIfInPrimaryPartition() (bool, error) { +func (d *dbImplExec) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) { rows := []*struct { In bool `csv:"in"` }{} - err := d.query(` + err := d.query(ctx, ` SELECT MEMBER_STATE = 'ONLINE' AND ( @@ -466,3 +467,21 @@ func (d *dbImplExec) CheckIfInPrimaryPartition() (bool, error) { return rows[0].In, nil } + +func (d *dbImplExec) CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) { + var state string + + err := d.query(ctx, ` + SELECT + MEMBER_STATE + FROM + performance_schema.replication_group_members + WHERE + MEMBER_ROLE = 'PRIMARY' + `, &state) + if err != nil { + return false, err + } + + return state == string(innodbcluster.MemberStateUnreachable), nil +}