Skip to content

Commit

Permalink
K8SPS-317: Remove JSON usage
Browse files Browse the repository at this point in the history
  • Loading branch information
egegunes committed Jan 17, 2024
1 parent 0e5401a commit 6dd9e71
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 127 deletions.
34 changes: 12 additions & 22 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore"
"github.com/percona/percona-server-mysql-operator/pkg/db"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace)
firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)
um := database.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri)
if err != nil {
Expand All @@ -224,7 +224,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name)
}

if state == db.MemberStateOffline {
if state == database.MemberStateOffline {
log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state)
continue
}
Expand Down Expand Up @@ -789,16 +789,10 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return errors.Wrap(err, "get operator password")
}

uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))
cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped)
if cond == nil || cond.Status == metav1.ConditionFalse {
if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
log.V(1).Info("InnoDB cluster is not created yet")
return nil
}
Expand All @@ -814,7 +808,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return nil
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status")
}

Expand Down Expand Up @@ -929,13 +923,9 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context,
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist {
log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name)
return nil
}
Expand Down Expand Up @@ -1005,7 +995,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr
return "", err
}

um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)
um := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)

return um.GetGroupReplicationPrimary(ctx)
}
Expand Down Expand Up @@ -1053,7 +1043,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
if err != nil {
return err
}
repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
return errors.Wrapf(err, "stop replica %s", hostname)
Expand All @@ -1064,7 +1054,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
return errors.Wrapf(err, "get replication status of %s", hostname)
}

for status == db.ReplicationStatusActive {
for status == database.ReplicationStatusActive {
time.Sleep(250 * time.Millisecond)
status, _, err = repDb.ReplicationStatus(ctx)
if err != nil {
Expand Down Expand Up @@ -1107,7 +1097,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
if err != nil {
return err
}
um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
um := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname)
if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil {
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ps
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -18,11 +17,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/mysqlsh"
"github.com/percona/percona-server-mysql-operator/pkg/orchestrator"
"github.com/percona/percona-server-mysql-operator/pkg/router"
)
Expand Down Expand Up @@ -185,38 +184,39 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)

dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata")
if err != nil {
return false, err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if !dbExists {
return false, nil
}

status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName())
members, err := db.GetGroupReplicationMembers(ctx)
if err != nil {
return false, errors.Wrap(err, "get cluster status")
return false, err
}

for addr, member := range status.DefaultReplicaSet.Topology {
for _, err := range member.InstanceErrors {
log.WithName(addr).Info(err)
onlineMembers := 0
for _, member := range members {
if member.MemberState != innodbcluster.MemberStateOnline {
log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState)
return false, nil
}
onlineMembers++
}

log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)

switch status.DefaultReplicaSet.Status {
case innodbcluster.ClusterStatusOK:
return true, nil
case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial:
log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)
return true, nil
default:
if onlineMembers < int(cr.Spec.MySQL.Size) {
log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size)
return false, nil
}

log.V(1).Info("GR is ready")

return true, nil
}

func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
Expand Down
38 changes: 37 additions & 1 deletion pkg/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"database/sql"
"encoding/csv"
"fmt"
"github.com/gocarina/gocsv"
"strings"

"github.com/gocarina/gocsv"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
)

const defaultChannelName = ""
Expand Down Expand Up @@ -167,3 +168,38 @@ func (m *ReplicationDBManager) GetMemberState(ctx context.Context, host string)

return rows[0].State, nil
}

func (m *ReplicationDBManager) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) {
rows := []*struct {
Member string `csv:"member"`
State string `csv:"state"`
}{}

err := m.query(ctx, "SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", &rows)
if err != nil {
return nil, errors.Wrap(err, "query members")
}

members := make([]innodbcluster.Member, 0)
for _, row := range rows {
state := innodbcluster.MemberState(row.State)
members = append(members, innodbcluster.Member{Address: row.Member, MemberState: state})
}

return members, nil
}

func (m *ReplicationDBManager) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) {
rows := []*struct {
DB string `csv:"db"`
}{}
q := fmt.Sprintf("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%s'", name)
err := m.query(ctx, q, &rows)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return true, nil
}
85 changes: 0 additions & 85 deletions pkg/mysqlsh/mysqlshexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,6 @@ func (m *mysqlshExec) runWithExec(ctx context.Context, cmd string) error {
return nil
}

func (m *mysqlshExec) ConfigureInstanceWithExec(ctx context.Context, instance string) error {
cmd := fmt.Sprintf(
"dba.configureInstance('%s', {'interactive': false, 'clearReadOnly': true})",
instance,
)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "configure instance")
}

return nil
}

func (m *mysqlshExec) AddInstanceWithExec(ctx context.Context, clusterName, instance string) error {
opts := struct {
Interactive bool `json:"interactive"`
RecoveryMethod string `json:"recoveryMethod"`
WaitRecovery int `json:"waitRecovery"`
}{
Interactive: false,
RecoveryMethod: "clone",
WaitRecovery: 0,
}

o, err := json.Marshal(opts)
if err != nil {
return errors.Wrap(err, "marshal options")
}

cmd := fmt.Sprintf("dba.getCluster('%s').addInstance('%s', %s)", clusterName, instance, string(o))

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "add instance")
}

return nil
}

func (m *mysqlshExec) RejoinInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').rejoinInstance('%s', {'interactive': false})", clusterName, instance)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "rejoin instance")
}

return nil
}

func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').removeInstance('%s', {'interactive': false, 'force': true})", clusterName, instance)

Expand All @@ -97,16 +49,6 @@ func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, i
return nil
}

func (m *mysqlshExec) CreateClusterWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.createCluster('%s', {'adoptFromGR': true})", clusterName)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "create cluster")
}

return nil
}

func (m *mysqlshExec) DoesClusterExistWithExec(ctx context.Context, clusterName string) bool {
log := logf.FromContext(ctx)

Expand Down Expand Up @@ -141,33 +83,6 @@ func (m *mysqlshExec) ClusterStatusWithExec(ctx context.Context, clusterName str
return status, nil
}

func (m *mysqlshExec) MemberStateWithExec(ctx context.Context, clusterName, instance string) (innodbcluster.MemberState, error) {
log := logf.FromContext(ctx).WithName("InnoDBCluster").WithValues("cluster", clusterName)

status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return innodbcluster.MemberStateOffline, errors.Wrap(err, "get cluster status")
}

log.V(1).Info("Cluster status", "status", status)

member, ok := status.DefaultReplicaSet.Topology[instance]
if !ok {
return innodbcluster.MemberStateOffline, innodbcluster.ErrMemberNotFound
}

return member.MemberState, nil
}

func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) (map[string]innodbcluster.Member, error) {
status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return nil, errors.Wrap(err, "get cluster status")
}

return status.DefaultReplicaSet.Topology, nil
}

func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName)

Expand Down

0 comments on commit 6dd9e71

Please sign in to comment.