Skip to content

Commit

Permalink
K8SPS-317: Remove JSON usage (#466)
Browse files Browse the repository at this point in the history
* K8SPS-317: Remove JSON usage

* fix `status_test.go`

---------

Co-authored-by: Andrii Dema <[email protected]>
Co-authored-by: Viacheslav Sarzhan <[email protected]>
  • Loading branch information
3 people authored Jan 29, 2024
1 parent a69828b commit 22c1a35
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 222 deletions.
34 changes: 12 additions & 22 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore"
"github.com/percona/percona-server-mysql-operator/pkg/db"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace)
firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)
um := database.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri)
if err != nil {
Expand All @@ -224,7 +224,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name)
}

if state == db.MemberStateOffline {
if state == database.MemberStateOffline {
log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state)
continue
}
Expand Down Expand Up @@ -789,16 +789,10 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return errors.Wrap(err, "get operator password")
}

uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))
cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped)
if cond == nil || cond.Status == metav1.ConditionFalse {
if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
log.V(1).Info("InnoDB cluster is not created yet")
return nil
}
Expand All @@ -814,7 +808,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return nil
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status")
}

Expand Down Expand Up @@ -929,13 +923,9 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context,
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist {
log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name)
return nil
}
Expand Down Expand Up @@ -1005,7 +995,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr
return "", err
}

um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)
um := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)

return um.GetGroupReplicationPrimary(ctx)
}
Expand Down Expand Up @@ -1053,7 +1043,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
if err != nil {
return err
}
repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
return errors.Wrapf(err, "stop replica %s", hostname)
Expand All @@ -1064,7 +1054,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
return errors.Wrapf(err, "get replication status of %s", hostname)
}

for status == db.ReplicationStatusActive {
for status == database.ReplicationStatusActive {
time.Sleep(250 * time.Millisecond)
status, _, err = repDb.ReplicationStatus(ctx)
if err != nil {
Expand Down Expand Up @@ -1107,7 +1097,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
if err != nil {
return err
}
um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
um := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname)
if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil {
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ps
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -18,11 +17,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/mysqlsh"
"github.com/percona/percona-server-mysql-operator/pkg/orchestrator"
"github.com/percona/percona-server-mysql-operator/pkg/router"
)
Expand Down Expand Up @@ -185,38 +184,39 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)

dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata")
if err != nil {
return false, err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if !dbExists {
return false, nil
}

status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName())
members, err := db.GetGroupReplicationMembers(ctx)
if err != nil {
return false, errors.Wrap(err, "get cluster status")
return false, err
}

for addr, member := range status.DefaultReplicaSet.Topology {
for _, err := range member.InstanceErrors {
log.WithName(addr).Info(err)
onlineMembers := 0
for _, member := range members {
if member.MemberState != innodbcluster.MemberStateOnline {
log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState)
return false, nil
}
onlineMembers++
}

log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)

switch status.DefaultReplicaSet.Status {
case innodbcluster.ClusterStatusOK:
return true, nil
case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial:
log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)
return true, nil
default:
if onlineMembers < int(cr.Spec.MySQL.Size) {
log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size)
return false, nil
}

log.V(1).Info("GR is ready")

return true, nil
}

func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
Expand Down
Loading

0 comments on commit 22c1a35

Please sign in to comment.