From c0db51e304d7b3ae6881ec6038031c9cf901a846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Mon, 9 Oct 2023 11:26:35 +0300 Subject: [PATCH] remove JSON usage --- pkg/controller/ps/controller.go | 14 +++--- pkg/controller/ps/status.go | 41 ++++++++------- pkg/mysqlsh/mysqlshexec.go | 85 -------------------------------- pkg/replicator/replicator.go | 16 +++--- pkg/replicator/replicatorexec.go | 10 ++-- 5 files changed, 44 insertions(+), 122 deletions(-) diff --git a/pkg/controller/ps/controller.go b/pkg/controller/ps/controller.go index 1305ab333..67df0bd9d 100644 --- a/pkg/controller/ps/controller.go +++ b/pkg/controller/ps/controller.go @@ -793,16 +793,14 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con return errors.Wrap(err, "get operator password") } - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr)) - - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) + db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr)) if err != nil { return err } cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped) if cond == nil || cond.Status == metav1.ConditionFalse { - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists { log.V(1).Info("InnoDB cluster is not created yet") return nil } @@ -818,7 +816,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con return nil } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists { return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status") } @@ -933,13 +931,13 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context, } firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri) - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) + + db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri) if err != nil { return err } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist { log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name) return nil } diff --git a/pkg/controller/ps/status.go b/pkg/controller/ps/status.go index eddf7e194..3745e22ed 100644 --- a/pkg/controller/ps/status.go +++ b/pkg/controller/ps/status.go @@ -3,7 +3,6 @@ package ps import ( "bytes" "context" - "fmt" "strings" "github.com/pkg/errors" @@ -22,8 +21,8 @@ import ( "github.com/percona/percona-server-mysql-operator/pkg/innodbcluster" "github.com/percona/percona-server-mysql-operator/pkg/k8s" "github.com/percona/percona-server-mysql-operator/pkg/mysql" - "github.com/percona/percona-server-mysql-operator/pkg/mysqlsh" "github.com/percona/percona-server-mysql-operator/pkg/orchestrator" + "github.com/percona/percona-server-mysql-operator/pkg/replicator" "github.com/percona/percona-server-mysql-operator/pkg/router" ) @@ -185,38 +184,42 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a } firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace - uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri) - mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri) + db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri) if err != nil { return false, err } - if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) { + dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata") + if err != nil { + return false, err + } + + if !dbExists { return false, nil } - status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName()) + members, err := db.GetGroupReplicationMembers(ctx) if err != nil { - return false, errors.Wrap(err, "get cluster status") + return false, err } - for addr, member := range status.DefaultReplicaSet.Topology { - for _, err := range member.InstanceErrors { - log.WithName(addr).Info(err) + onlineMembers := 0 + for _, member := range members { + if member.MemberState != innodbcluster.MemberStateOnline { + log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState) + return false, nil } + onlineMembers++ } - log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText) - - switch status.DefaultReplicaSet.Status { - case innodbcluster.ClusterStatusOK: - return true, nil - case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial: - log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText) - return true, nil - default: + if onlineMembers < int(cr.Spec.MySQL.Size) { + log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size) return false, nil } + + log.V(1).Info("GR is ready") + + return true, nil } func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) { diff --git a/pkg/mysqlsh/mysqlshexec.go b/pkg/mysqlsh/mysqlshexec.go index 4e57c4d82..cd3543adb 100644 --- a/pkg/mysqlsh/mysqlshexec.go +++ b/pkg/mysqlsh/mysqlshexec.go @@ -39,54 +39,6 @@ func (m *mysqlshExec) runWithExec(ctx context.Context, cmd string) error { return nil } -func (m *mysqlshExec) ConfigureInstanceWithExec(ctx context.Context, instance string) error { - cmd := fmt.Sprintf( - "dba.configureInstance('%s', {'interactive': false, 'clearReadOnly': true})", - instance, - ) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "configure instance") - } - - return nil -} - -func (m *mysqlshExec) AddInstanceWithExec(ctx context.Context, clusterName, instance string) error { - opts := struct { - Interactive bool `json:"interactive"` - RecoveryMethod string `json:"recoveryMethod"` - WaitRecovery int `json:"waitRecovery"` - }{ - Interactive: false, - RecoveryMethod: "clone", - WaitRecovery: 0, - } - - o, err := json.Marshal(opts) - if err != nil { - return errors.Wrap(err, "marshal options") - } - - cmd := fmt.Sprintf("dba.getCluster('%s').addInstance('%s', %s)", clusterName, instance, string(o)) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "add instance") - } - - return nil -} - -func (m *mysqlshExec) RejoinInstanceWithExec(ctx context.Context, clusterName, instance string) error { - cmd := fmt.Sprintf("dba.getCluster('%s').rejoinInstance('%s', {'interactive': false})", clusterName, instance) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "rejoin instance") - } - - return nil -} - func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, instance string) error { cmd := fmt.Sprintf("dba.getCluster('%s').removeInstance('%s', {'interactive': false, 'force': true})", clusterName, instance) @@ -97,16 +49,6 @@ func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, i return nil } -func (m *mysqlshExec) CreateClusterWithExec(ctx context.Context, clusterName string) error { - cmd := fmt.Sprintf("dba.createCluster('%s', {'adoptFromGR': true})", clusterName) - - if err := m.runWithExec(ctx, cmd); err != nil { - return errors.Wrap(err, "create cluster") - } - - return nil -} - func (m *mysqlshExec) DoesClusterExistWithExec(ctx context.Context, clusterName string) bool { log := logf.FromContext(ctx) @@ -141,33 +83,6 @@ func (m *mysqlshExec) ClusterStatusWithExec(ctx context.Context, clusterName str return status, nil } -func (m *mysqlshExec) MemberStateWithExec(ctx context.Context, clusterName, instance string) (innodbcluster.MemberState, error) { - log := logf.FromContext(ctx).WithName("InnoDBCluster").WithValues("cluster", clusterName) - - status, err := m.ClusterStatusWithExec(ctx, clusterName) - if err != nil { - return innodbcluster.MemberStateOffline, errors.Wrap(err, "get cluster status") - } - - log.V(1).Info("Cluster status", "status", status) - - member, ok := status.DefaultReplicaSet.Topology[instance] - if !ok { - return innodbcluster.MemberStateOffline, innodbcluster.ErrMemberNotFound - } - - return member.MemberState, nil -} - -func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) (map[string]innodbcluster.Member, error) { - status, err := m.ClusterStatusWithExec(ctx, clusterName) - if err != nil { - return nil, errors.Wrap(err, "get cluster status") - } - - return status.DefaultReplicaSet.Topology, nil -} - func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error { cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName) diff --git a/pkg/replicator/replicator.go b/pkg/replicator/replicator.go index 435d3527c..480124848 100644 --- a/pkg/replicator/replicator.go +++ b/pkg/replicator/replicator.go @@ -59,7 +59,7 @@ type Replicator interface { GetGroupReplicationPrimary(ctx context.Context) (string, error) GetGroupReplicationReplicas(ctx context.Context) ([]string, error) GetMemberState(ctx context.Context, host string) (MemberState, error) - GetGroupReplicationMembers(ctx context.Context) ([]string, error) + GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) CheckIfInPrimaryPartition(ctx context.Context) (bool, error) CheckIfPrimaryUnreachable(ctx context.Context) (bool, error) @@ -341,10 +341,10 @@ func (d *dbImpl) GetMemberState(ctx context.Context, host string) (MemberState, return state, nil } -func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { - members := make([]string, 0) +func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) { + members := make([]innodbcluster.Member, 0) - rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members") + rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST as host, MEMBER_STATE as state FROM replication_group_members") if err != nil { return nil, errors.Wrap(err, "query members") } @@ -352,11 +352,15 @@ func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]string, erro for rows.Next() { var host string - if err := rows.Scan(&host); err != nil { + var state string + if err := rows.Scan(&host, &state); err != nil { return nil, errors.Wrap(err, "scan rows") } - members = append(members, host) + members = append(members, innodbcluster.Member{ + Address: host, + MemberState: innodbcluster.MemberState(state), + }) } return members, nil diff --git a/pkg/replicator/replicatorexec.go b/pkg/replicator/replicatorexec.go index 46cb2ee03..6a8dae00e 100644 --- a/pkg/replicator/replicatorexec.go +++ b/pkg/replicator/replicatorexec.go @@ -362,19 +362,21 @@ func (d *dbImplExec) GetMemberState(ctx context.Context, host string) (MemberSta return rows[0].State, nil } -func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]string, error) { +func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) { rows := []*struct { Member string `csv:"member"` + State string `csv:"state"` }{} - err := d.query(ctx, "SELECT MEMBER_HOST as member FROM replication_group_members", &rows) + err := d.query(ctx, "SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", &rows) if err != nil { return nil, errors.Wrap(err, "query members") } - members := make([]string, 0) + members := make([]innodbcluster.Member, 0) for _, row := range rows { - members = append(members, row.Member) + state := innodbcluster.MemberState(row.State) + members = append(members, innodbcluster.Member{Address: row.Member, MemberState: state}) } return members, nil