diff --git a/pkg/controller/ps/controller.go b/pkg/controller/ps/controller.go index cfcd3b688..9452cce87 100644 --- a/pkg/controller/ps/controller.go +++ b/pkg/controller/ps/controller.go @@ -45,7 +45,7 @@ import ( apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" "github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore" - "github.com/percona/percona-server-mysql-operator/pkg/db" + database "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/haproxy" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" @@ -204,7 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr * firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace) firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) - um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) + um := database.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN) mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri) if err != nil { @@ -224,7 +224,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr * return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name) } - if state == db.MemberStateOffline { + if state == database.MemberStateOffline { log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state) continue } @@ -789,16 +789,10 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con return errors.Wrap(err, "get operator password") } - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr)) - - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) - if err != nil { - return err - } - + db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr)) cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped) if cond == nil || cond.Status == metav1.ConditionFalse { - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists { log.V(1).Info("InnoDB cluster is not created yet") return nil } @@ -814,7 +808,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con return nil } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists { return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status") } @@ -929,13 +923,9 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context, } firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri) - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) - if err != nil { - return err - } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri) + if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist { log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name) return nil } @@ -1005,7 +995,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr return "", err } - um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn) + um := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn) return um.GetGroupReplicationPrimary(ctx) } @@ -1053,7 +1043,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context, if err != nil { return err } - repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) + repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil { return errors.Wrapf(err, "stop replica %s", hostname) @@ -1064,7 +1054,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context, return errors.Wrapf(err, "get replication status of %s", hostname) } - for status == db.ReplicationStatusActive { + for status == database.ReplicationStatusActive { time.Sleep(250 * time.Millisecond) status, _, err = repDb.ReplicationStatus(ctx) if err != nil { @@ -1107,7 +1097,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context if err != nil { return err } - um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) + um := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname) log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname) if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil { diff --git a/pkg/controller/ps/status.go b/pkg/controller/ps/status.go index eddf7e194..82db138a1 100644 --- a/pkg/controller/ps/status.go +++ b/pkg/controller/ps/status.go @@ -3,7 +3,6 @@ package ps import ( "bytes" "context" - "fmt" "strings" "github.com/pkg/errors" @@ -18,11 +17,11 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" + database "github.com/percona/percona-server-mysql-operator/pkg/db" "github.com/percona/percona-server-mysql-operator/pkg/haproxy" "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" - "github.com/percona/percona-server-mysql-operator/pkg/mysqlsh" "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" "github.com/percona/percona-server-mysql-operator/pkg/router" ) @@ -185,38 +184,39 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a } firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri) - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) + db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri) + + dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata") if err != nil { return false, err } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if !dbExists { return false, nil } - status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName()) + members, err := db.GetGroupReplicationMembers(ctx) if err != nil { - return false, errors.Wrap(err, "get cluster status") + return false, err } - for addr, member := range status.DefaultReplicaSet.Topology { - for _, err := range member.InstanceErrors { - log.WithName(addr).Info(err) + onlineMembers := 0 + for _, member := range members { + if member.MemberState != innodbcluster.MemberStateOnline { + log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState) + return false, nil } + onlineMembers++ } - log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText) - - switch status.DefaultReplicaSet.Status { - case innodbcluster.ClusterStatusOK: - return true, nil - case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial: - log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText) - return true, nil - default: + if onlineMembers < int(cr.Spec.MySQL.Size) { + log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size) return false, nil } + + log.V(1).Info("GR is ready") + + return true, nil } func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) { diff --git a/pkg/controller/ps/status_test.go b/pkg/controller/ps/status_test.go index 7649e5f77..6ab78ab00 100644 --- a/pkg/controller/ps/status_test.go +++ b/pkg/controller/ps/status_test.go @@ -1,14 +1,17 @@ package ps import ( + "bytes" "context" - "encoding/json" + "database/sql" + "encoding/csv" "fmt" "io" "reflect" "testing" cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1" + "github.com/gocarina/gocsv" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -330,6 +333,7 @@ func TestReconcileStatusAsync(t *testing.T) { }) } } + func TestReconcileStatusHAProxyGR(t *testing.T) { ctx := context.Background() @@ -363,12 +367,12 @@ func TestReconcileStatusHAProxyGR(t *testing.T) { } tests := []struct { - name string - cr *apiv1alpha1.PerconaServerMySQL - clusterStatus innodbcluster.ClusterStatus - objects []client.Object - expected apiv1alpha1.PerconaServerMySQLStatus - mysqlReady bool + name string + cr *apiv1alpha1.PerconaServerMySQL + objects []client.Object + expected apiv1alpha1.PerconaServerMySQLStatus + mysqlMemberStates []innodbcluster.MemberState + noMetadataDB bool }{ { name: "without pods", @@ -408,8 +412,40 @@ func TestReconcileStatusHAProxyGR(t *testing.T) { State: apiv1alpha1.StateReady, Host: cr.Name + "-haproxy." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOK, - mysqlReady: true, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + }, + }, + { + name: "with all ready pods, ok cluster status and invalid database", + cr: cr, + objects: appendSlices( + makeFakeReadyPods(cr, 3, "mysql"), + makeFakeReadyPods(cr, 3, "haproxy"), + []client.Object{secret}, + ), + expected: apiv1alpha1.PerconaServerMySQLStatus{ + MySQL: apiv1alpha1.StatefulAppStatus{ + Size: 3, + Ready: 3, + State: apiv1alpha1.StateInitializing, + }, + HAProxy: apiv1alpha1.StatefulAppStatus{ + Size: 3, + Ready: 3, + State: apiv1alpha1.StateReady, + }, + State: apiv1alpha1.StateInitializing, + Host: cr.Name + "-haproxy." + cr.Namespace, + }, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + }, + noMetadataDB: true, }, { name: "with all ready pods and offline cluster status", @@ -433,7 +469,11 @@ func TestReconcileStatusHAProxyGR(t *testing.T) { State: apiv1alpha1.StateInitializing, Host: cr.Name + "-haproxy." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOffline, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOffline, + innodbcluster.MemberStateOffline, + innodbcluster.MemberStateOffline, + }, }, { name: "with all ready pods and partial ok cluster status", @@ -447,25 +487,28 @@ func TestReconcileStatusHAProxyGR(t *testing.T) { MySQL: apiv1alpha1.StatefulAppStatus{ Size: 3, Ready: 3, - State: apiv1alpha1.StateReady, + State: apiv1alpha1.StateInitializing, }, HAProxy: apiv1alpha1.StatefulAppStatus{ Size: 3, Ready: 3, State: apiv1alpha1.StateReady, }, - State: apiv1alpha1.StateReady, + State: apiv1alpha1.StateInitializing, Host: cr.Name + "-haproxy." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOKPartial, - mysqlReady: true, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOffline, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cr := tt.cr.DeepCopy() cb := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cr).WithStatusSubresource(cr).WithObjects(tt.objects...).WithStatusSubresource(tt.objects...) - cliCmd, err := getFakeClient(cr, operatorPass, tt.mysqlReady, tt.clusterStatus) + cliCmd, err := getFakeClient(cr, tt.mysqlMemberStates, tt.noMetadataDB) if err != nil { t.Fatal(err) } @@ -526,12 +569,12 @@ func TestReconcileStatusRouterGR(t *testing.T) { } tests := []struct { - name string - cr *apiv1alpha1.PerconaServerMySQL - clusterStatus innodbcluster.ClusterStatus - objects []client.Object - expected apiv1alpha1.PerconaServerMySQLStatus - mysqlReady bool + name string + cr *apiv1alpha1.PerconaServerMySQL + objects []client.Object + expected apiv1alpha1.PerconaServerMySQLStatus + mysqlMemberStates []innodbcluster.MemberState + noMetadataDB bool }{ { name: "without pods", @@ -571,8 +614,40 @@ func TestReconcileStatusRouterGR(t *testing.T) { State: apiv1alpha1.StateReady, Host: cr.Name + "-router." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOK, - mysqlReady: true, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + }, + }, + { + name: "with all ready pods, ok cluster status and invalid databaes", + cr: cr, + objects: appendSlices( + makeFakeReadyPods(cr, 3, "mysql"), + makeFakeReadyPods(cr, 3, "router"), + []client.Object{secret}, + ), + expected: apiv1alpha1.PerconaServerMySQLStatus{ + MySQL: apiv1alpha1.StatefulAppStatus{ + Size: 3, + Ready: 3, + State: apiv1alpha1.StateInitializing, + }, + Router: apiv1alpha1.StatefulAppStatus{ + Size: 3, + Ready: 3, + State: apiv1alpha1.StateReady, + }, + State: apiv1alpha1.StateInitializing, + Host: cr.Name + "-router." + cr.Namespace, + }, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + }, + noMetadataDB: true, }, { name: "with all ready pods and offline cluster status", @@ -596,7 +671,11 @@ func TestReconcileStatusRouterGR(t *testing.T) { State: apiv1alpha1.StateInitializing, Host: cr.Name + "-router." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOffline, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOffline, + innodbcluster.MemberStateOffline, + innodbcluster.MemberStateOffline, + }, }, { name: "with all ready pods and partial ok cluster status", @@ -610,25 +689,28 @@ func TestReconcileStatusRouterGR(t *testing.T) { MySQL: apiv1alpha1.StatefulAppStatus{ Size: 3, Ready: 3, - State: apiv1alpha1.StateReady, + State: apiv1alpha1.StateInitializing, }, Router: apiv1alpha1.StatefulAppStatus{ Size: 3, Ready: 3, State: apiv1alpha1.StateReady, }, - State: apiv1alpha1.StateReady, + State: apiv1alpha1.StateInitializing, Host: cr.Name + "-router." + cr.Namespace, }, - clusterStatus: innodbcluster.ClusterStatusOKPartial, - mysqlReady: true, + mysqlMemberStates: []innodbcluster.MemberState{ + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOnline, + innodbcluster.MemberStateOffline, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cr := tt.cr.DeepCopy() cb := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cr).WithStatusSubresource(cr).WithObjects(tt.objects...).WithStatusSubresource(tt.objects...) - cliCmd, err := getFakeClient(cr, operatorPass, tt.mysqlReady, tt.clusterStatus) + cliCmd, err := getFakeClient(cr, tt.mysqlMemberStates, tt.noMetadataDB) if err != nil { t.Fatal(err) } @@ -661,7 +743,12 @@ type fakeClient struct { execCount int } -func (c *fakeClient) Exec(ctx context.Context, pod *corev1.Pod, containerName string, command []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { +// Exec increments the internal counter `execCount`. +// It compares the `scripts[execCount].cmd`, `scripts[execCount].stdin` with `command` and stdin parameters. +// It writes the `scripts[execCount].stdout`, `scripts[execCount].stderr` to `stdin` and `stderr` parameters. +// It writes the `scripts[execCount].stdout`, `scripts[execCount].stderr` to `stdin` and `stderr` parameters. +// fakeClient should have the array of fakeClientScript objects in order they are going to be executed in the tested function. +func (c *fakeClient) Exec(_ context.Context, _ *corev1.Pod, _ string, command []string, stdin io.Reader, stdout, stderr io.Writer, _ bool) error { if c.execCount >= len(c.scripts) { return errors.Errorf("unexpected exec call") } @@ -688,8 +775,8 @@ func (c *fakeClient) Exec(ctx context.Context, pod *corev1.Pod, containerName st return err } c.execCount++ - if c.scripts[c.execCount-1].shouldErr { - return errors.Errorf("fake error") + if c.scripts[c.execCount-1].err != nil { + return c.scripts[c.execCount-1].err } return nil } @@ -698,78 +785,89 @@ func (c *fakeClient) REST() restclient.Interface { return nil } +// fakeClientScript is an object which contains an info about executed command. +// cmd, stdin values are compared with the corresponding values in the Exec method. +// stdin, stdout values are written to the corresponding streams in the Exec method. +// err is the error which should be returned in the Exec method type fakeClientScript struct { - cmd []string - stdin []byte - stdout []byte - stderr []byte - shouldErr bool + cmd []string + stdin []byte + stdout []byte + stderr []byte + err error } -func getFakeClient(cr *apiv1alpha1.PerconaServerMySQL, operatorPass string, mysqlReady bool, clusterStatus innodbcluster.ClusterStatus) (clientcmd.Client, error) { - status, err := json.Marshal(innodbcluster.Status{ - DefaultReplicaSet: innodbcluster.ReplicaSetStatus{ - Status: clusterStatus, - }, - }) - if err != nil { - return nil, err - } - var scripts []fakeClientScript - if mysqlReady { +// getFakeClient returns a fake clientcmd.Client object with the array of fakeClientScript objects. +// This array is constructed to cover every possible client call in the reconcileCRStatus function. +func getFakeClient(cr *apiv1alpha1.PerconaServerMySQL, mysqlMemberStates []innodbcluster.MemberState, noMetadataDB bool) (clientcmd.Client, error) { + queryScript := func(query string, out any) fakeClientScript { + buf := new(bytes.Buffer) + w := csv.NewWriter(buf) + w.Comma = '\t' + if err := gocsv.MarshalCSV(out, w); err != nil { + panic(err) + } host := fmt.Sprintf("%s.%s.%s", mysql.PodName(cr, 0), mysql.ServiceName(cr), cr.Namespace) - scripts = append(scripts, []fakeClientScript{ - { - cmd: []string{ - "mysqlsh", - "--no-wizard", - "--uri", - fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, host), - "-e", - fmt.Sprintf("dba.getCluster('%s').status()", cr.InnoDBClusterName()), - }, - }, - { - cmd: []string{ - "mysqlsh", - "--result-format", - "json", - "--uri", - fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, host), - "--cluster", - "--", - "cluster", - "status", - }, - stdout: status, - }}...) - } - scripts = append(scripts, []fakeClientScript{ - { + + return fakeClientScript{ cmd: []string{ - "cat", - "/var/lib/mysql/full-cluster-crash", + "mysql", + "--database", + "performance_schema", + "-ptest", + "-u", + "operator", + "-h", + host, + "-e", + query, }, - stderr: []byte("No such file or directory"), - shouldErr: true, - }, + stdout: buf.Bytes(), + } + } + + var scripts []fakeClientScript + + // CheckIfDatabaseExists + type db struct { + DB string `csv:"db"` + } + dbs := []*db{ { - cmd: []string{ - "cat", - "/var/lib/mysql/full-cluster-crash", - }, - stderr: []byte("No such file or directory"), - shouldErr: true, + DB: "mysql_innodb_cluster_metadata", }, - { - cmd: []string{ - "cat", - "/var/lib/mysql/full-cluster-crash", - }, - stderr: []byte("No such file or directory"), - shouldErr: true, + } + s := queryScript("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE 'mysql_innodb_cluster_metadata'", dbs) + if !noMetadataDB { + } else { + s.err = sql.ErrNoRows + } + scripts = append(scripts, s) + + // GetGroupReplicationMembers + if !noMetadataDB { + type member struct { + Member string `csv:"member"` + State string `csv:"state"` + } + var members []*member + for _, state := range mysqlMemberStates { + members = append(members, &member{ + Member: cr.Name + "-mysql-0." + cr.Namespace, + State: string(state), + }) + } + scripts = append(scripts, queryScript("SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", members)) + } + + scripts = append(scripts, fakeClientScript{ + cmd: []string{ + "cat", + "/var/lib/mysql/full-cluster-crash", }, - }...) + stderr: []byte("No such file or directory"), + err: errors.New("fake error"), + }) return &fakeClient{ scripts: scripts, }, nil diff --git a/pkg/db/replication.go b/pkg/db/replication.go index f5a04b1f3..4c296ea9e 100644 --- a/pkg/db/replication.go +++ b/pkg/db/replication.go @@ -6,14 +6,15 @@ import ( "database/sql" "encoding/csv" "fmt" - "github.com/gocarina/gocsv" "strings" + "github.com/gocarina/gocsv" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1" "github.com/percona/percona-server-mysql-operator/pkg/clientcmd" + "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" ) const defaultChannelName = "" @@ -167,3 +168,38 @@ func (m *ReplicationDBManager) GetMemberState(ctx context.Context, host string) return rows[0].State, nil } + +func (m *ReplicationDBManager) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) { + rows := []*struct { + Member string `csv:"member"` + State string `csv:"state"` + }{} + + err := m.query(ctx, "SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", &rows) + if err != nil { + return nil, errors.Wrap(err, "query members") + } + + members := make([]innodbcluster.Member, 0) + for _, row := range rows { + state := innodbcluster.MemberState(row.State) + members = append(members, innodbcluster.Member{Address: row.Member, MemberState: state}) + } + + return members, nil +} + +func (m *ReplicationDBManager) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) { + rows := []*struct { + DB string `csv:"db"` + }{} + q := fmt.Sprintf("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%s'", name) + err := m.query(ctx, q, &rows) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + return true, nil +} diff --git a/pkg/mysqlsh/mysqlshexec.go b/pkg/mysqlsh/mysqlshexec.go index 4e57c4d82..cd3543adb 100644 --- a/pkg/mysqlsh/mysqlshexec.go +++ b/pkg/mysqlsh/mysqlshexec.go @@ -39,54 +39,6 @@ func (m *mysqlshExec) runWithExec(ctx context.Context, cmd string) error { return nil } -func (m *mysqlshExec) ConfigureInstanceWithExec(ctx context.Context, instance string) error { - cmd := fmt.Sprintf( - "dba.configureInstance('%s', {'interactive': false, 'clearReadOnly': true})", - instance, - ) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "configure instance") - } - - return nil -} - -func (m *mysqlshExec) AddInstanceWithExec(ctx context.Context, clusterName, instance string) error { - opts := struct { - Interactive bool `json:"interactive"` - RecoveryMethod string `json:"recoveryMethod"` - WaitRecovery int `json:"waitRecovery"` - }{ - Interactive: false, - RecoveryMethod: "clone", - WaitRecovery: 0, - } - - o, err := json.Marshal(opts) - if err != nil { - return errors.Wrap(err, "marshal options") - } - - cmd := fmt.Sprintf("dba.getCluster('%s').addInstance('%s', %s)", clusterName, instance, string(o)) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "add instance") - } - - return nil -} - -func (m *mysqlshExec) RejoinInstanceWithExec(ctx context.Context, clusterName, instance string) error { - cmd := fmt.Sprintf("dba.getCluster('%s').rejoinInstance('%s', {'interactive': false})", clusterName, instance) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "rejoin instance") - } - - return nil -} - func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, instance string) error { cmd := fmt.Sprintf("dba.getCluster('%s').removeInstance('%s', {'interactive': false, 'force': true})", clusterName, instance) @@ -97,16 +49,6 @@ func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, i return nil } -func (m *mysqlshExec) CreateClusterWithExec(ctx context.Context, clusterName string) error { - cmd := fmt.Sprintf("dba.createCluster('%s', {'adoptFromGR': true})", clusterName) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "create cluster") - } - - return nil -} - func (m *mysqlshExec) DoesClusterExistWithExec(ctx context.Context, clusterName string) bool { log := logf.FromContext(ctx) @@ -141,33 +83,6 @@ func (m *mysqlshExec) ClusterStatusWithExec(ctx context.Context, clusterName str return status, nil } -func (m *mysqlshExec) MemberStateWithExec(ctx context.Context, clusterName, instance string) (innodbcluster.MemberState, error) { - log := logf.FromContext(ctx).WithName("InnoDBCluster").WithValues("cluster", clusterName) - - status, err := m.ClusterStatusWithExec(ctx, clusterName) - if err != nil { - return innodbcluster.MemberStateOffline, errors.Wrap(err, "get cluster status") - } - - log.V(1).Info("Cluster status", "status", status) - - member, ok := status.DefaultReplicaSet.Topology[instance] - if !ok { - return innodbcluster.MemberStateOffline, innodbcluster.ErrMemberNotFound - } - - return member.MemberState, nil -} - -func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) (map[string]innodbcluster.Member, error) { - status, err := m.ClusterStatusWithExec(ctx, clusterName) - if err != nil { - return nil, errors.Wrap(err, "get cluster status") - } - - return status.DefaultReplicaSet.Topology, nil -} - func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error { cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName)