Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8SPS-317: Remove JSON usage #466

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore"
"github.com/percona/percona-server-mysql-operator/pkg/db"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace)
firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)
um := database.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri)
if err != nil {
Expand All @@ -224,7 +224,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name)
}

if state == db.MemberStateOffline {
if state == database.MemberStateOffline {
log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state)
continue
}
Expand Down Expand Up @@ -789,16 +789,10 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return errors.Wrap(err, "get operator password")
}

uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))
cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped)
if cond == nil || cond.Status == metav1.ConditionFalse {
if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
log.V(1).Info("InnoDB cluster is not created yet")
return nil
}
Expand All @@ -814,7 +808,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return nil
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status")
}

Expand Down Expand Up @@ -929,13 +923,9 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context,
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist {
log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name)
return nil
}
Expand Down Expand Up @@ -1005,7 +995,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr
return "", err
}

um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)
um := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)

return um.GetGroupReplicationPrimary(ctx)
}
Expand Down Expand Up @@ -1053,7 +1043,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
if err != nil {
return err
}
repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
return errors.Wrapf(err, "stop replica %s", hostname)
Expand All @@ -1064,7 +1054,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
return errors.Wrapf(err, "get replication status of %s", hostname)
}

for status == db.ReplicationStatusActive {
for status == database.ReplicationStatusActive {
time.Sleep(250 * time.Millisecond)
status, _, err = repDb.ReplicationStatus(ctx)
if err != nil {
Expand Down Expand Up @@ -1107,7 +1097,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
if err != nil {
return err
}
um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
um := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname)
if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil {
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ps
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -18,11 +17,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/mysqlsh"
"github.com/percona/percona-server-mysql-operator/pkg/orchestrator"
"github.com/percona/percona-server-mysql-operator/pkg/router"
)
Expand Down Expand Up @@ -185,38 +184,39 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)

dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata")
if err != nil {
return false, err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if !dbExists {
return false, nil
}

status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName())
members, err := db.GetGroupReplicationMembers(ctx)
if err != nil {
return false, errors.Wrap(err, "get cluster status")
return false, err
}

for addr, member := range status.DefaultReplicaSet.Topology {
for _, err := range member.InstanceErrors {
log.WithName(addr).Info(err)
onlineMembers := 0
for _, member := range members {
if member.MemberState != innodbcluster.MemberStateOnline {
log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState)
return false, nil
}
onlineMembers++
}

log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)

switch status.DefaultReplicaSet.Status {
case innodbcluster.ClusterStatusOK:
return true, nil
case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial:
log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)
return true, nil
default:
if onlineMembers < int(cr.Spec.MySQL.Size) {
log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size)
return false, nil
}

log.V(1).Info("GR is ready")

return true, nil
}

func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
Expand Down
Loading
Loading