Skip to content

Commit

Permalink
Add --only-archiver flag to pause mysql-archiver
Browse files Browse the repository at this point in the history
Signed-off-by: SK Ali Arman <[email protected]>
  • Loading branch information
sheikh-arman committed Feb 8, 2024
1 parent 8ff0608 commit 5dc158c
Show file tree
Hide file tree
Showing 41 changed files with 3,938 additions and 36 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ require (
github.com/google/btree v1.0.1 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-containerregistry v0.17.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
Expand All @@ -91,6 +92,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/onsi/gomega v1.30.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opensearch-project/opensearch-go v1.1.0 // indirect
github.com/opensearch-project/opensearch-go/v2 v2.2.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-containerregistry v0.17.0 h1:5p+zYs/R4VGHkhyvgWurWrpJ2hW4Vv9fQI+GzdcwXLk=
github.com/google/go-containerregistry v0.17.0/go.mod h1:u0qB2l7mvtWVR5kNcbFIhFY1hLbf8eeGapA+vbFDCtQ=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
Expand Down Expand Up @@ -383,6 +385,8 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
github.com/onsi/gomega v1.30.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg8LaZ+DjEzQH9aLN3M=
github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo=
github.com/opensearch-project/opensearch-go/v2 v2.2.0 h1:6RicCBiqboSVtLMjSiKgVQIsND4I3sxELg9uwWe/TKM=
Expand Down
13 changes: 9 additions & 4 deletions pkg/cmds/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type PauseOptions struct {

genericclioptions.IOStreams

onlyDb bool
onlyBackup bool
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewCmdPause(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewCmdPause(parent string, f cmdutil.Factory, streams genericclioptions.IOS
cmd.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&o.onlyDb, "only-db", false, "If provided, only the database is paused.")
cmd.Flags().BoolVar(&o.onlyBackup, "only-backupconfig", false, "If provided, only the backupconfiguration for the database is paused.")
cmd.Flags().BoolVar(&o.onlyArchiver, "only-archiver", false, "If provided, only the archiver for the database is paused.")

return cmd
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func (o *PauseOptions) Run() error {

errs := sets.NewString()
for _, info := range infos {
psr, err := pauser.NewPauser(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup)
psr, err := pauser.NewPauser(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup, o.onlyArchiver)
if err != nil {
if errs.Has(err.Error()) {
continue
Expand All @@ -182,14 +184,17 @@ func (o *PauseOptions) Run() error {
allErrs = append(allErrs, err)
errs.Insert(err.Error())
}
pauseAll := !(o.onlyBackup || o.onlyDb)
pauseAll := !(o.onlyBackup || o.onlyDb || o.onlyArchiver)

if o.onlyDb || pauseAll {
fmt.Fprintf(o.Out, "Successfully paused %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyBackup || pauseAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully paused backupconfigurations of %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyArchiver || pauseAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully paused archiver of db %s/%s.\n", info.Namespace, info.Name)
}
}

return utilerrors.NewAggregate(allErrs)
Expand Down
13 changes: 9 additions & 4 deletions pkg/cmds/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type ResumeOptions struct {

genericclioptions.IOStreams

onlyDb bool
onlyBackup bool
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewCmdResume(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewCmdResume(parent string, f cmdutil.Factory, streams genericclioptions.IO
cmd.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&o.onlyDb, "only-db", false, "If provided, only the database is resumed.")
cmd.Flags().BoolVar(&o.onlyBackup, "only-backupconfig", false, "If provided, only the backupconfiguration for the database is resumed.")
cmd.Flags().BoolVar(&o.onlyArchiver, "only-archiver", false, "If provided, only the archiver for the database is resumed.")

return cmd
}
Expand Down Expand Up @@ -164,7 +166,7 @@ func (o *ResumeOptions) Run() error {

errs := sets.NewString()
for _, info := range infos {
rsr, err := resumer.NewResumer(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup)
rsr, err := resumer.NewResumer(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup, o.onlyArchiver)
if err != nil {
if errs.Has(err.Error()) {
continue
Expand All @@ -182,14 +184,17 @@ func (o *ResumeOptions) Run() error {
errs.Insert(err.Error())
}

resumeAll := !(o.onlyBackup || o.onlyDb)
resumeAll := !(o.onlyBackup || o.onlyDb || o.onlyArchiver)

if o.onlyDb || resumeAll {
fmt.Fprintf(o.Out, "Successfully resumed %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyBackup || resumeAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully resumed backupconfigurations of %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyArchiver || resumeAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully resumed archiver of db %s/%s.\n", info.Namespace, info.Name)
}
}
return utilerrors.NewAggregate(allErrs)
}
32 changes: 19 additions & 13 deletions pkg/pauser/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package pauser

import (
"context"

api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2"
dbutil "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2/util"
Expand All @@ -31,13 +30,14 @@ import (
)

type MySQLPauser struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMySQLPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MySQLPauser, error) {
func NewMySQLPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MySQLPauser, error) {
dbClient, err := cs.NewForConfig(clientConfig)
if err != nil {
return nil, err
Expand All @@ -49,20 +49,21 @@ func NewMySQLPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MySQLP
}

return &MySQLPauser{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

func (e *MySQLPauser) Pause(name, namespace string) (bool, error) {
func (e *MySQLPauser) Pause(name string, namespace string) (bool, error) {
db, err := e.dbClient.MySQLs(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, nil
}

pauseAll := !(e.onlyBackup || e.onlyDb)
pauseAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)

if e.onlyDb || pauseAll {
_, err = dbutil.UpdateMySQLStatus(context.TODO(), e.dbClient, db.ObjectMeta, func(status *api.MySQLStatus) (types.UID, *api.MySQLStatus) {
Expand All @@ -82,5 +83,10 @@ func (e *MySQLPauser) Pause(name, namespace string) (bool, error) {
return PauseBackupConfiguration(e.stashClient, db.ObjectMeta)
}

return false, nil
if e.onlyArchiver || pauseAll {
if err := SetArchiverPauseField(true, db.Spec.Archiver.Ref.Name, db.Spec.Archiver.Ref.Namespace); err != nil {
return false, err
}
}
return true, nil
}
5 changes: 3 additions & 2 deletions pkg/pauser/pauser.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type Pauser interface {
Pause(string, string) (bool, error) // returns true if backupconfiguration is paused
}

func NewPauser(restClientGetter genericclioptions.RESTClientGetter, mapping *meta.RESTMapping, onlyDb, onlyBackup bool) (Pauser, error) {
func NewPauser(restClientGetter genericclioptions.RESTClientGetter, mapping *meta.RESTMapping, onlyDb, onlyBackup, onlyArchiver bool) (Pauser, error) {

clientConfig, err := restClientGetter.ToRESTConfig()
if err != nil {
return nil, err
Expand All @@ -44,7 +45,7 @@ func NewPauser(restClientGetter genericclioptions.RESTClientGetter, mapping *met
case api.ResourceKindMongoDB:
return NewMongoDBPauser(clientConfig, onlyDb, onlyBackup)
case api.ResourceKindMySQL:
return NewMySQLPauser(clientConfig, onlyDb, onlyBackup)
return NewMySQLPauser(clientConfig, onlyDb, onlyBackup, onlyArchiver)
case api.ResourceKindMariaDB:
return NewMariaDBPauser(clientConfig, onlyDb, onlyBackup)
case api.ResourceKindPostgres:
Expand Down
58 changes: 57 additions & 1 deletion pkg/pauser/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ package pauser

import (
"context"
"fmt"
kmapi "kmodules.xyz/client-go/api/v1"
kmc "kmodules.xyz/client-go/client"
coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,6 +33,19 @@ import (
scsutil "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1/util"
)

var ()

func NewUncachedClient() (client.Client, error) {
cfg, err := ctrl.GetConfig()
if err != nil {
return nil, fmt.Errorf("failed to get Kubernetes config. Reason: %w", err)
}
return kmc.NewUncachedClient(
cfg,
coreapi.AddToScheme,
)
}

func PauseBackupConfiguration(stashClient scs.StashV1beta1Interface, dbMeta metav1.ObjectMeta) (bool, error) {
configs, err := stashClient.BackupConfigurations(dbMeta.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand All @@ -53,6 +72,43 @@ func PauseBackupConfiguration(stashClient scs.StashV1beta1Interface, dbMeta meta
return false, err
}
}

return dbBackupConfig != nil, nil
}

func SetArchiverPauseField(value bool, name string, namespace string) error {
var klient client.Client
klient, err := NewUncachedClient()
if err != nil {
return err
}
archiver, err := getArchiverConfiguration(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MySQLArchiver)
in.Spec.Pause = value
return in
},
)
return err
}
func getArchiverConfiguration(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MySQLArchiver, error) {
archiver := &coreapi.MySQLArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}
32 changes: 22 additions & 10 deletions pkg/resumer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,19 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
condutil "kmodules.xyz/client-go/conditions"
pautil "kubedb.dev/cli/pkg/pauser"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1"
)

type MySQLResumer struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMySQLResumer(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MySQLResumer, error) {
func NewMySQLResumer(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MySQLResumer, error) {
dbClient, err := cs.NewForConfig(clientConfig)
if err != nil {
return nil, err
Expand All @@ -49,10 +51,11 @@ func NewMySQLResumer(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MySQL
}

return &MySQLResumer{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

Expand All @@ -62,7 +65,16 @@ func (e *MySQLResumer) Resume(name, namespace string) (bool, error) {
return false, err
}

resumeAll := !(e.onlyBackup || e.onlyDb)
resumeAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)

if e.onlyArchiver || resumeAll {
if err := pautil.SetArchiverPauseField(false, db.Spec.Archiver.Ref.Name, db.Spec.Archiver.Ref.Namespace); err != nil {
return false, err
}
if e.onlyArchiver {
return true, nil
}
}

if e.onlyDb || resumeAll {
_, err = dbutil.UpdateMySQLStatus(context.TODO(), e.dbClient, db.ObjectMeta, func(status *api.MySQLStatus) (types.UID, *api.MySQLStatus) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/resumer/resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Resumer interface {
Resume(string, string) (bool, error) // returns true if backupconfiguration is resumed
}

func NewResumer(restClientGetter genericclioptions.RESTClientGetter, mapping *meta.RESTMapping, onlyDb, onlyBackup bool) (Resumer, error) {
func NewResumer(restClientGetter genericclioptions.RESTClientGetter, mapping *meta.RESTMapping, onlyDb, onlyBackup, onlyArchiver bool) (Resumer, error) {
clientConfig, err := restClientGetter.ToRESTConfig()
if err != nil {
return nil, err
Expand All @@ -44,7 +44,7 @@ func NewResumer(restClientGetter genericclioptions.RESTClientGetter, mapping *me
case api.ResourceKindMongoDB:
return NewMongoDBResumer(clientConfig, onlyDb, onlyBackup)
case api.ResourceKindMySQL:
return NewMySQLResumer(clientConfig, onlyDb, onlyBackup)
return NewMySQLResumer(clientConfig, onlyDb, onlyBackup, onlyArchiver)
case api.ResourceKindMariaDB:
return NewMariaDBResumer(clientConfig, onlyDb, onlyBackup)
case api.ResourceKindPostgres:
Expand Down
Loading

0 comments on commit 5dc158c

Please sign in to comment.