Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8SPS-317: Remove JSON usage #466

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/controller/psrestore"
"github.com/percona/percona-server-mysql-operator/pkg/db"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
Expand Down Expand Up @@ -204,7 +204,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
firstPodFQDN := fmt.Sprintf("%s.%s.%s", firstPod.Name, mysql.ServiceName(cr), cr.Namespace)
firstPodUri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

um := db.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)
um := database.NewReplicationManager(&firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodFQDN)

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, &firstPod, firstPodUri)
if err != nil {
Expand All @@ -224,7 +224,7 @@ func (r *PerconaServerMySQLReconciler) deleteMySQLPods(ctx context.Context, cr *
return errors.Wrapf(err, "get member state of %s from performance_schema", pod.Name)
}

if state == db.MemberStateOffline {
if state == database.MemberStateOffline {
log.Info("Member is not part of GR or already removed", "member", pod.Name, "memberState", state)
continue
}
Expand Down Expand Up @@ -789,16 +789,10 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return errors.Wrap(err, "get operator password")
}

uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))

mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, mysql.ServiceName(cr))
cond := meta.FindStatusCondition(cr.Status.Conditions, apiv1alpha1.ConditionInnoDBClusterBootstrapped)
if cond == nil || cond.Status == metav1.ConditionFalse {
if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
log.V(1).Info("InnoDB cluster is not created yet")
return nil
}
Expand All @@ -814,7 +808,7 @@ func (r *PerconaServerMySQLReconciler) reconcileGroupReplication(ctx context.Con
return nil
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if exists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exists {
return errors.New("InnoDB cluster is already bootstrapped, but failed to check its status")
}

Expand Down Expand Up @@ -929,13 +923,9 @@ func (r *PerconaServerMySQLReconciler) reconcileMySQLRouter(ctx context.Context,
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
if err != nil {
return err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)
if exist, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata"); err != nil || !exist {
log.V(1).Info("Waiting for InnoDB Cluster", "cluster", cr.Name)
return nil
}
Expand Down Expand Up @@ -1005,7 +995,7 @@ func (r *PerconaServerMySQLReconciler) getPrimaryFromGR(ctx context.Context, cr
return "", err
}

um := db.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)
um := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, fqdn)

return um.GetGroupReplicationPrimary(ctx)
}
Expand Down Expand Up @@ -1053,7 +1043,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
if err != nil {
return err
}
repDb := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
repDb := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

if err := orchestrator.StopReplicationExec(gCtx, r.ClientCmd, orcPod, hostname, port); err != nil {
return errors.Wrapf(err, "stop replica %s", hostname)
Expand All @@ -1064,7 +1054,7 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
return errors.Wrapf(err, "get replication status of %s", hostname)
}

for status == db.ReplicationStatusActive {
for status == database.ReplicationStatusActive {
time.Sleep(250 * time.Millisecond)
status, _, err = repDb.ReplicationStatus(ctx)
if err != nil {
Expand Down Expand Up @@ -1107,7 +1097,7 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
if err != nil {
return err
}
um := db.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)
um := database.NewReplicationManager(pod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, hostname)

log.V(1).Info("Change replication source", "primary", primary.Key.Hostname, "replica", hostname)
if err := um.ChangeReplicationSource(ctx, primary.Key.Hostname, replicaPass, primary.Key.Port); err != nil {
Expand Down
38 changes: 19 additions & 19 deletions pkg/controller/ps/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ps
import (
"bytes"
"context"
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -18,11 +17,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
database "github.com/percona/percona-server-mysql-operator/pkg/db"
"github.com/percona/percona-server-mysql-operator/pkg/haproxy"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
"github.com/percona/percona-server-mysql-operator/pkg/k8s"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/mysqlsh"
"github.com/percona/percona-server-mysql-operator/pkg/orchestrator"
"github.com/percona/percona-server-mysql-operator/pkg/router"
)
Expand Down Expand Up @@ -185,38 +184,39 @@ func (r *PerconaServerMySQLReconciler) isGRReady(ctx context.Context, cr *apiv1a
}

firstPodUri := mysql.PodName(cr, 0) + "." + mysql.ServiceName(cr) + "." + cr.Namespace
uri := fmt.Sprintf("%s:%s@%s", apiv1alpha1.UserOperator, operatorPass, firstPodUri)
mysh, err := mysqlsh.NewWithExec(r.ClientCmd, firstPod, uri)
db := database.NewReplicationManager(firstPod, r.ClientCmd, apiv1alpha1.UserOperator, operatorPass, firstPodUri)

dbExists, err := db.CheckIfDatabaseExists(ctx, "mysql_innodb_cluster_metadata")
if err != nil {
return false, err
}

if !mysh.DoesClusterExistWithExec(ctx, cr.InnoDBClusterName()) {
if !dbExists {
return false, nil
}

status, err := mysh.ClusterStatusWithExec(ctx, cr.InnoDBClusterName())
members, err := db.GetGroupReplicationMembers(ctx)
if err != nil {
return false, errors.Wrap(err, "get cluster status")
return false, err
}

for addr, member := range status.DefaultReplicaSet.Topology {
for _, err := range member.InstanceErrors {
log.WithName(addr).Info(err)
onlineMembers := 0
for _, member := range members {
if member.MemberState != innodbcluster.MemberStateOnline {
log.WithName(member.Address).Info("Member is not ONLINE", "state", member.MemberState)
return false, nil
}
onlineMembers++
}

log.V(1).Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)

switch status.DefaultReplicaSet.Status {
case innodbcluster.ClusterStatusOK:
return true, nil
case innodbcluster.ClusterStatusOKPartial, innodbcluster.ClusterStatusOKNoTolerance, innodbcluster.ClusterStatusOKNoTolerancePartial:
log.Info("GR status", "status", status.DefaultReplicaSet.Status, "statusText", status.DefaultReplicaSet.StatusText)
return true, nil
default:
if onlineMembers < int(cr.Spec.MySQL.Size) {
log.V(1).Info("Not enough ONLINE members", "onlineMembers", onlineMembers, "size", cr.Spec.MySQL.Size)
return false, nil
}

log.V(1).Info("GR is ready")

return true, nil
}

func (r *PerconaServerMySQLReconciler) allLoadBalancersReady(ctx context.Context, cr *apiv1alpha1.PerconaServerMySQL) (bool, error) {
Expand Down
38 changes: 37 additions & 1 deletion pkg/db/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"database/sql"
"encoding/csv"
"fmt"
"github.com/gocarina/gocsv"
"strings"

"github.com/gocarina/gocsv"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/clientcmd"
"github.com/percona/percona-server-mysql-operator/pkg/innodbcluster"
)

const defaultChannelName = ""
Expand Down Expand Up @@ -167,3 +168,38 @@ func (m *ReplicationDBManager) GetMemberState(ctx context.Context, host string)

return rows[0].State, nil
}

func (m *ReplicationDBManager) GetGroupReplicationMembers(ctx context.Context) ([]innodbcluster.Member, error) {
rows := []*struct {
Member string `csv:"member"`
State string `csv:"state"`
}{}

err := m.query(ctx, "SELECT MEMBER_HOST as member, MEMBER_STATE as state FROM replication_group_members", &rows)
if err != nil {
return nil, errors.Wrap(err, "query members")
}

members := make([]innodbcluster.Member, 0)
for _, row := range rows {
state := innodbcluster.MemberState(row.State)
members = append(members, innodbcluster.Member{Address: row.Member, MemberState: state})
}

return members, nil
}

func (m *ReplicationDBManager) CheckIfDatabaseExists(ctx context.Context, name string) (bool, error) {
rows := []*struct {
DB string `csv:"db"`
}{}
q := fmt.Sprintf("SELECT SCHEMA_NAME AS db FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME LIKE '%s'", name)
err := m.query(ctx, q, &rows)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, err
}
return true, nil
}
85 changes: 0 additions & 85 deletions pkg/mysqlsh/mysqlshexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,54 +39,6 @@ func (m *mysqlshExec) runWithExec(ctx context.Context, cmd string) error {
return nil
}

func (m *mysqlshExec) ConfigureInstanceWithExec(ctx context.Context, instance string) error {
cmd := fmt.Sprintf(
"dba.configureInstance('%s', {'interactive': false, 'clearReadOnly': true})",
instance,
)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "configure instance")
}

return nil
}

func (m *mysqlshExec) AddInstanceWithExec(ctx context.Context, clusterName, instance string) error {
opts := struct {
Interactive bool `json:"interactive"`
RecoveryMethod string `json:"recoveryMethod"`
WaitRecovery int `json:"waitRecovery"`
}{
Interactive: false,
RecoveryMethod: "clone",
WaitRecovery: 0,
}

o, err := json.Marshal(opts)
if err != nil {
return errors.Wrap(err, "marshal options")
}

cmd := fmt.Sprintf("dba.getCluster('%s').addInstance('%s', %s)", clusterName, instance, string(o))

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "add instance")
}

return nil
}

func (m *mysqlshExec) RejoinInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').rejoinInstance('%s', {'interactive': false})", clusterName, instance)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "rejoin instance")
}

return nil
}

func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, instance string) error {
cmd := fmt.Sprintf("dba.getCluster('%s').removeInstance('%s', {'interactive': false, 'force': true})", clusterName, instance)

Expand All @@ -97,16 +49,6 @@ func (m *mysqlshExec) RemoveInstanceWithExec(ctx context.Context, clusterName, i
return nil
}

func (m *mysqlshExec) CreateClusterWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.createCluster('%s', {'adoptFromGR': true})", clusterName)

if err := m.runWithExec(ctx, cmd); err != nil {
return errors.Wrap(err, "create cluster")
}

return nil
}

func (m *mysqlshExec) DoesClusterExistWithExec(ctx context.Context, clusterName string) bool {
log := logf.FromContext(ctx)

Expand Down Expand Up @@ -141,33 +83,6 @@ func (m *mysqlshExec) ClusterStatusWithExec(ctx context.Context, clusterName str
return status, nil
}

func (m *mysqlshExec) MemberStateWithExec(ctx context.Context, clusterName, instance string) (innodbcluster.MemberState, error) {
log := logf.FromContext(ctx).WithName("InnoDBCluster").WithValues("cluster", clusterName)

status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return innodbcluster.MemberStateOffline, errors.Wrap(err, "get cluster status")
}

log.V(1).Info("Cluster status", "status", status)

member, ok := status.DefaultReplicaSet.Topology[instance]
if !ok {
return innodbcluster.MemberStateOffline, innodbcluster.ErrMemberNotFound
}

return member.MemberState, nil
}

func (m *mysqlshExec) TopologyWithExec(ctx context.Context, clusterName string) (map[string]innodbcluster.Member, error) {
status, err := m.ClusterStatusWithExec(ctx, clusterName)
if err != nil {
return nil, errors.Wrap(err, "get cluster status")
}

return status.DefaultReplicaSet.Topology, nil
}

func (m *mysqlshExec) RebootClusterFromCompleteOutageWithExec(ctx context.Context, clusterName string) error {
cmd := fmt.Sprintf("dba.rebootClusterFromCompleteOutage('%s')", clusterName)

Expand Down
Loading