Skip to content

Commit

Permalink
fix rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
pooknull committed Jul 19, 2023
1 parent e656c43 commit 39a4892
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 85 deletions.
3 changes: 2 additions & 1 deletion cmd/bootstrap/async_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func getTopology(ctx context.Context, fqdn string, peers sets.Set[string]) (stri
if err != nil {
return "", nil, errors.Wrapf(err, "get %s password", apiv1alpha1.UserOperator)
}
t, err := topology.GetAsync(ctx, operatorPass, sets.List(peers)...)
tm := topology.NewTopologyManager(apiv1alpha1.ClusterTypeAsync, operatorPass)
t, err := topology.GetAsync(ctx, tm, sets.List(peers)...)
if err != nil {
return "", nil, errors.Wrap(err, "failed to get topology")
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/haproxy-check/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func main() {
if err != nil {
log.Fatalln("Failed to get secret:", err.Error())
}
t, err := topology.GetAsync(ctx, operatorPass, fqdn)
tm := topology.NewTopologyManager(apiv1alpha1.ClusterTypeAsync, operatorPass)
t, err := topology.GetAsync(ctx, tm, fqdn)
if err != nil {
log.Fatalln("Failed to get topology:", err.Error())
}
Expand Down
12 changes: 3 additions & 9 deletions e2e-tests/functions
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,7 @@ delete_resource_with_finalizer() {
return
fi

timeout 30 kubectl patch "${resource}" "${name}" -p '{"metadata":{"finalizers":[]}}' --type=merge
set +e
timeout 30 kubectl delete "${resource}" "${name}"
# if exit status is 124 (timeout exit code), then apply patch again and delete
if [ $? -eq 124 ]; then
timeout 30 kubectl patch "${resource}" "${name}" -p '{"metadata":{"finalizers":[]}}' --type=merge
timeout 30 kubectl delete "${resource}" "${name}"
fi
set -e
kubectl delete "${resource}" "${name}" --wait=false --ignore-not-found=true
kubectl patch "${resource}" "${name}" -p '{"metadata":{"finalizers":[]}}' --type=merge || :
timeout 60 kubectl delete "${resource}" "${name}" --ignore-not-found=true
}
4 changes: 2 additions & 2 deletions e2e-tests/tests/self-healing-chaos/09-destroy-chaos-mesh.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
apiVersion: kuttl.dev/v1beta1
kind: TestStep
timeout: 10
timeout: 180
commands:
- script: |-
set -o errexit
Expand All @@ -9,4 +9,4 @@ commands:
source ../../functions
destroy_chaos_mesh
timeout: 30
timeout: 180
39 changes: 36 additions & 3 deletions pkg/clientcmd/clientcmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"context"
"io"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Client struct {
Expand Down Expand Up @@ -47,18 +51,47 @@ func NewClient() (*Client, error) {

func (c *Client) Exec(
ctx context.Context,
pod *corev1.Pod,
obj client.Object,
containerName string,
command []string,
stdin io.Reader,
stdout, stderr io.Writer,
tty bool) error {

var pod *corev1.Pod
switch t := obj.(type) {
case *corev1.Pod:
pod = t
case *corev1.Service:
clientset, err := corev1client.NewForConfig(c.restconfig)
if err != nil {
return err
}
namespace := t.GetNamespace()
if t.Spec.Selector == nil || len(t.Spec.Selector) == 0 {
return errors.Errorf("invalid service '%s': Service is defined without a selector", t.Name)
}
selector := labels.SelectorFromSet(t.Spec.Selector)

podList, err := clientset.Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: selector.String(),
})
if err != nil {
return err
}
if len(podList.Items) == 0 {
return errors.Errorf("invalid service '%s': no pods found", t.Name)
}
pod = &podList.Items[0]
default:
return errors.Errorf("invalid object type '%T'", obj)
}

req := c.client.RESTClient().
Post().
Namespace(pod.Namespace).
Namespace(pod.GetNamespace()).
Resource("pods").
Name(pod.Name).
Name(pod.GetName()).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: containerName,
Expand Down
10 changes: 7 additions & 3 deletions pkg/controller/ps/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,8 @@ func (r *PerconaServerMySQLReconciler) reconcileLabels(ctx context.Context, cr *
if err != nil {
return errors.Wrap(err, "get operator password")
}
t, err := topology.Get(ctx, cr, operatorPass)
tm := topology.NewTopologyManagerExec(cr, r.Client, operatorPass)
t, err := tm.Get(ctx)
if err != nil {
return errors.Wrap(err, "get topology")
}
Expand Down Expand Up @@ -1348,7 +1349,9 @@ func (r *PerconaServerMySQLReconciler) stopAsyncReplication(ctx context.Context,
if err != nil {
return errors.Wrap(err, "get operator password")
}
topology, err := topology.Get(ctx, cr, operatorPass)

tm := topology.NewTopologyManagerExec(cr, r.Client, operatorPass)
topology, err := tm.Get(ctx)
if err != nil {
return errors.Wrap(err, "failed to get topology")
}
Expand Down Expand Up @@ -1413,7 +1416,8 @@ func (r *PerconaServerMySQLReconciler) startAsyncReplication(ctx context.Context
return errors.Wrap(err, "get operator password")
}

topology, err := topology.Get(ctx, cr, operatorPass)
tm := topology.NewTopologyManagerExec(cr, r.Client, operatorPass)
topology, err := tm.Get(ctx)
if err != nil {
return errors.Wrap(err, "get cluster topology")
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/ps/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ func (r *PerconaServerMySQLReconciler) getPrimaryHost(ctx context.Context, cl cl
if err != nil {
return "", errors.Wrap(err, "get root password")
}
t, err := topology.Get(ctx, cr, operatorPass)

tm := topology.NewTopologyManagerExec(cr, r.Client, operatorPass)
t, err := tm.Get(ctx)
if err != nil {
return "", errors.Wrap(err, "discover topology")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/psbackup/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ func (r *PerconaServerMySQLBackupReconciler) getBackupSource(ctx context.Context
return "", errors.Wrap(err, "get operator password")
}

top, err := topology.Get(ctx, cluster, operatorPass)
tm := topology.NewTopologyManagerExec(cluster, r.Client, operatorPass)
top, err := tm.Get(ctx)
if err != nil {
return "", errors.Wrap(err, "get topology")
}
Expand Down
17 changes: 6 additions & 11 deletions pkg/mysql/topology/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (

"github.com/pkg/errors"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"
)

func GetAsync(ctx context.Context, operatorPass string, hosts ...string) (Topology, error) {
func GetAsync(ctx context.Context, m Manager, hosts ...string) (Topology, error) {
t := new(Topology)
for _, host := range hosts {
if err := recursiveAsyncDiscover(ctx, host, operatorPass, replicator.ReplicationStatusNotInitiated, t); err != nil {
if err := recursiveAsyncDiscover(ctx, m, host, replicator.ReplicationStatusNotInitiated, t); err != nil {
return Topology{}, err
}
}
Expand All @@ -29,17 +27,14 @@ func GetAsync(ctx context.Context, operatorPass string, hosts ...string) (Topolo
return *t, nil
}

func recursiveAsyncDiscover(ctx context.Context, host, operatorPass string, replicaStatus replicator.ReplicationStatus, t *Topology) error {
func recursiveAsyncDiscover(ctx context.Context, m Manager, host string, replicaStatus replicator.ReplicationStatus, t *Topology) error {
var readOnly bool
var replicas []string
var source string
var status replicator.ReplicationStatus
var failedToConnect bool
err := func() error {
db, err := replicator.NewReplicator(apiv1alpha1.UserOperator,
operatorPass,
host,
mysql.DefaultAdminPort)
db, err := m.Replicator(host)
if err != nil {
// We should ignore connection errors because function can try to connect to starting pod
failedToConnect = true
Expand Down Expand Up @@ -99,14 +94,14 @@ func recursiveAsyncDiscover(ctx context.Context, host, operatorPass string, repl
if t.HasReplica(replica) {
continue
}
err := recursiveAsyncDiscover(ctx, replica, operatorPass, status, t)
err := recursiveAsyncDiscover(ctx, m, replica, status, t)
if err != nil {
return errors.Wrapf(err, "failed to discover %s", replica)
}
}

if source != "" {
err = recursiveAsyncDiscover(ctx, source, operatorPass, status, t)
err = recursiveAsyncDiscover(ctx, m, source, status, t)
return errors.Wrapf(err, "failed to discover %s", source)
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/mysql/topology/gr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package topology

import "github.com/pkg/errors"

func getGRTopology(m Manager, hostname string) (Topology, error) {
db, err := m.Replicator(hostname)
if err != nil {
return Topology{}, errors.Wrapf(err, "open connection to %s", hostname)
}
defer db.Close()

replicas, err := db.GetGroupReplicationReplicas()
if err != nil {
return Topology{}, errors.Wrap(err, "get group-replication replicas")
}

primary, err := db.GetGroupReplicationPrimary()
if err != nil {
return Topology{}, errors.Wrap(err, "get group-replication primary")
}
return Topology{
Primary: primary,
Replicas: replicas,
}, nil
}
42 changes: 42 additions & 0 deletions pkg/mysql/topology/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package topology

import (
"context"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"

"github.com/pkg/errors"
)

type Manager interface {
Replicator(host string) (replicator.Replicator, error)
ClusterType() apiv1alpha1.ClusterType
Get(ctx context.Context) (Topology, error)
}

type topologyManager struct {
operatorPass string
clusterType apiv1alpha1.ClusterType
}

func NewTopologyManager(clusterType apiv1alpha1.ClusterType, operatorPass string) Manager {
return &topologyManager{
operatorPass: operatorPass,
clusterType: clusterType,
}
}

func (m *topologyManager) Replicator(hostname string) (replicator.Replicator, error) {
return replicator.NewReplicator(apiv1alpha1.UserOperator, m.operatorPass, hostname, mysql.DefaultAdminPort)
}

func (m *topologyManager) ClusterType() apiv1alpha1.ClusterType {
return m.clusterType
}

func (s *topologyManager) Get(ctx context.Context) (Topology, error) {
// TODO: implement
return Topology{}, errors.New("not implemented")
}
86 changes: 86 additions & 0 deletions pkg/mysql/topology/managerexec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package topology

import (
"context"
"strings"

apiv1alpha1 "github.com/percona/percona-server-mysql-operator/api/v1alpha1"
"github.com/percona/percona-server-mysql-operator/pkg/mysql"
"github.com/percona/percona-server-mysql-operator/pkg/replicator"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type topologyManagerExec struct {
operatorPass string
clusterType apiv1alpha1.ClusterType
cluster *apiv1alpha1.PerconaServerMySQL
cl client.Reader
}

func NewTopologyManagerExec(cluster *apiv1alpha1.PerconaServerMySQL, cl client.Reader, operatorPass string) Manager {
return &topologyManagerExec{
operatorPass: operatorPass,
clusterType: cluster.Spec.MySQL.ClusterType,
cluster: cluster,
cl: cl,
}
}

func (m *topologyManagerExec) Replicator(hostname string) (replicator.Replicator, error) {
if hostname == mysql.ServiceName(m.cluster) {
return replicator.NewReplicatorExec(mysql.HeadlessService(m.cluster), apiv1alpha1.UserOperator, m.operatorPass, "localhost")
}

pod, err := getPodByFQDN(context.TODO(), m.cl, hostname)
if err != nil {
return nil, errors.Wrap(err, "failed to get pod")
}

return replicator.NewReplicatorExec(pod, apiv1alpha1.UserOperator, m.operatorPass, "localhost")
}

func getPodByFQDN(ctx context.Context, cl client.Reader, fqdn string) (*corev1.Pod, error) {
fqdnSplit := strings.Split(fqdn, ".")
if len(fqdnSplit) < 3 {
return nil, errors.Errorf("invalid FQDN: %s", fqdn)
}
podName := fqdnSplit[0]
namespace := fqdnSplit[2]
pod := new(corev1.Pod)
err := cl.Get(ctx, client.ObjectKey{
Namespace: namespace,
Name: podName,
}, pod)
if err != nil {
return nil, errors.Wrapf(err, "failed to get pod %s", fqdn)
}
return pod, nil
}

func (m *topologyManagerExec) ClusterType() apiv1alpha1.ClusterType {
return m.clusterType
}

func (m *topologyManagerExec) Get(ctx context.Context) (Topology, error) {
var err error
var top Topology
switch m.ClusterType() {
case apiv1alpha1.ClusterTypeGR:
top, err = getGRTopology(m, mysql.FQDN(m.cluster, 0))
if err != nil {
return Topology{}, errors.Wrap(err, "get group-replication topology")
}
case apiv1alpha1.ClusterTypeAsync:
top, err = GetAsync(ctx, m, mysql.ServiceName(m.cluster))
if err != nil {
return Topology{}, err
}
default:
return Topology{}, errors.New("unknown cluster type")
}
return top, nil
}
Loading

0 comments on commit 39a4892

Please sign in to comment.