Skip to content

Commit

Permalink
remove JSON usage
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes committed Oct 9, 2023
1 parent 426aaab commit c0db51e
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 122 deletions.
14 changes: 6 additions & 8 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,16 +793,14 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return errors.Wrap(err, "get operator password")
}

uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))
if err != nil {
return err
}

cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped)
if cond == nil || cond.Status == metav1.ConditionFalse {
if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
log.V(1).Info("InnoDB cluster is not created yet")
return nil
}
Expand All @@ -818,7 +816,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return nil
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status")
}

Expand Down Expand Up @@ -933,13 +931,13 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context,
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)

db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if err != nil {
return err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist {
log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name)
return nil
}
Expand Down
41 changes: 22 additions & 19 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ps
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -22,8 +21,8 @@ import (
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/mysqlsh"
"github.com/percona/percona-server-mysql-operator/pkg/orchestrator"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"
"github.com/percona/percona-server-mysql-operator/pkg/router"
)

Expand Down Expand Up @@ -185,38 +184,42 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db, err := replicator.NewReplicatorExec(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if err != nil {
return false, err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata")
if err != nil {
return false, err
}

if !dbExists {
return false, nil
}

status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName())
members, err := db.GetGroupReplicationMembers(ctx)
if err != nil {
return false, errors.Wrap(err, "get cluster status")
return false, err
}

for addr, member := range status.DefaultReplicaSet.Topology {
for _, err := range member.InstanceErrors {
log.WithName(addr).Info(err)
onlineMembers := 0
for _, member := range members {
if member.MemberState != innodbcluster.MemberStateOnline {
log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState)
return false, nil
}
onlineMembers++
}

log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)

switch status.DefaultReplicaSet.Status {
case innodbcluster.ClusterStatusOK:
return true, nil
case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial:
log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)
return true, nil
default:
if onlineMembers < int(cr.Spec.MySQL.Size) {
log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size)
return false, nil
}

log.V(1).Info("GR is ready")

return true, nil
}

func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
Expand Down
85 changes: 0 additions & 85 deletions pkg/mysqlsh/mysqlshexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,6 @@ func (m *mysqlshExec) runWithExec(ctx context.Context, cmd string) error {
return nil
}

func (m *mysqlshExec) ConfigureInstanceWithExec(ctx context.Context, instance string) error {
cmd := fmt.Sprintf(
"dba.configureInstance('%s', {'interactive': false, 'clearReadOnly': true})",
instance,
)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "configure instance")
}

return nil
}

func (m *mysqlshExec) AddInstanceWithExec(ctx context.Context, clusterName, instance string) error {
opts := struct {
Interactive bool `json:"interactive"`
RecoveryMethod string `json:"recoveryMethod"`
WaitRecovery int `json:"waitRecovery"`
}{
Interactive: false,
RecoveryMethod: "clone",
WaitRecovery: 0,
}

o, err := json.Marshal(opts)
if err != nil {
return errors.Wrap(err, "marshal options")
}

cmd := fmt.Sprintf("dba.getCluster('%s').addInstance('%s', %s)", clusterName, instance, string(o))

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "add instance")
}

return nil
}

func (m *mysqlshExec) RejoinInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').rejoinInstance('%s', {'interactive': false})", clusterName, instance)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "rejoin instance")
}

return nil
}

func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').removeInstance('%s', {'interactive': false, 'force': true})", clusterName, instance)

Expand All @@ -97,16 +49,6 @@ func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, i
return nil
}

func (m *mysqlshExec) CreateClusterWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.createCluster('%s', {'adoptFromGR': true})", clusterName)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "create cluster")
}

return nil
}

func (m *mysqlshExec) DoesClusterExistWithExec(ctx context.Context, clusterName string) bool {
log := logf.FromContext(ctx)

Expand Down Expand Up @@ -141,33 +83,6 @@ func (m *mysqlshExec) ClusterStatusWithExec(ctx context.Context, clusterName str
return status, nil
}

func (m *mysqlshExec) MemberStateWithExec(ctx context.Context, clusterName, instance string) (innodbcluster.MemberState, error) {
log := logf.FromContext(ctx).WithName("InnoDBCluster").WithValues("cluster", clusterName)

status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return innodbcluster.MemberStateOffline, errors.Wrap(err, "get cluster status")
}

log.V(1).Info("Cluster status", "status", status)

member, ok := status.DefaultReplicaSet.Topology[instance]
if !ok {
return innodbcluster.MemberStateOffline, innodbcluster.ErrMemberNotFound
}

return member.MemberState, nil
}

func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) (map[string]innodbcluster.Member, error) {
status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return nil, errors.Wrap(err, "get cluster status")
}

return status.DefaultReplicaSet.Topology, nil
}

func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName)

Expand Down
16 changes: 10 additions & 6 deletions pkg/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Replicator interface {
GetGroupReplicationPrimary(ctx context.Context) (string, error)
GetGroupReplicationReplicas(ctx context.Context) ([]string, error)
GetMemberState(ctx context.Context, host string) (MemberState, error)
GetGroupReplicationMembers(ctx context.Context) ([]string, error)
GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error)
CheckIfDatabaseExists(ctx context.Context, name string) (bool, error)
CheckIfInPrimaryPartition(ctx context.Context) (bool, error)
CheckIfPrimaryUnreachable(ctx context.Context) (bool, error)
Expand Down Expand Up @@ -341,22 +341,26 @@ func (d *dbImpl) GetMemberState(ctx context.Context, host string) (MemberState,
return state, nil
}

func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]string, error) {
members := make([]string, 0)
func (d *dbImpl) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) {
members := make([]innodbcluster.Member, 0)

rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST FROM replication_group_members")
rows, err := d.db.QueryContext(ctx, "SELECT MEMBER_HOST as host, MEMBER_STATE as state FROM replication_group_members")
if err != nil {
return nil, errors.Wrap(err, "query members")
}
defer rows.Close()

for rows.Next() {
var host string
if err := rows.Scan(&host); err != nil {
var state string
if err := rows.Scan(&host, &state); err != nil {
return nil, errors.Wrap(err, "scan rows")
}

members = append(members, host)
members = append(members, innodbcluster.Member{
Address: host,
MemberState: innodbcluster.MemberState(state),
})
}

return members, nil
Expand Down
10 changes: 6 additions & 4 deletions pkg/replicator/replicatorexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,19 +362,21 @@ func (d *dbImplExec) GetMemberState(ctx context.Context, host string) (MemberSta
return rows[0].State, nil
}

func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]string, error) {
func (d *dbImplExec) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) {
rows := []*struct {
Member string `csv:"member"`
State string `csv:"state"`
}{}

err := d.query(ctx, "SELECT MEMBER_HOST as member FROM replication_group_members", &rows)
err := d.query(ctx, "SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", &rows)
if err != nil {
return nil, errors.Wrap(err, "query members")
}

members := make([]string, 0)
members := make([]innodbcluster.Member, 0)
for _, row := range rows {
members = append(members, row.Member)
state := innodbcluster.MemberState(row.State)
members = append(members, innodbcluster.Member{Address: row.Member, MemberState: state})
}

return members, nil
Expand Down

0 comments on commit c0db51e

Please sign in to comment.