Skip to content

Commit

Permalink
Add --only-archiver flag to pause/resume DB archiver (#756)
Browse files Browse the repository at this point in the history
Signed-off-by: SK Ali Arman <[email protected]>
  • Loading branch information
sheikh-arman authored May 8, 2024
1 parent f93894a commit c12d92b
Show file tree
Hide file tree
Showing 49 changed files with 4,269 additions and 108 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
github.com/google/btree v1.1.2 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-containerregistry v0.19.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -95,6 +96,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/onsi/gomega v1.31.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opensearch-project/opensearch-go v1.1.0 // indirect
github.com/opensearch-project/opensearch-go/v2 v2.3.0 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-containerregistry v0.19.0 h1:uIsMRBV7m/HDkDxE/nXMnv1q+lOOSPlQ/ywc5JbB8Ic=
github.com/google/go-containerregistry v0.19.0/go.mod h1:u0qB2l7mvtWVR5kNcbFIhFY1hLbf8eeGapA+vbFDCtQ=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -324,6 +326,8 @@ github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY
github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM=
github.com/onsi/gomega v1.31.1 h1:KYppCUK+bUgAZwHOu7EXVBKyQA6ILvOESHkn/tgoqvo=
github.com/onsi/gomega v1.31.1/go.mod h1:y40C95dwAD1Nz36SsEnxvfFe8FFfNxzI5eJ0EYGyAy0=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opensearch-project/opensearch-go v1.1.0 h1:eG5sh3843bbU1itPRjA9QXbxcg8LaZ+DjEzQH9aLN3M=
github.com/opensearch-project/opensearch-go v1.1.0/go.mod h1:+6/XHCuTH+fwsMJikZEWsucZ4eZMma3zNSeLrTtVGbo=
github.com/opensearch-project/opensearch-go/v2 v2.3.0 h1:nQIEMr+A92CkhHrZgUhcfsrZjibvB3APXf2a1VwCmMQ=
Expand Down
14 changes: 10 additions & 4 deletions pkg/cmds/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type PauseOptions struct {

genericclioptions.IOStreams

onlyDb bool
onlyBackup bool
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewCmdPause(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewCmdPause(parent string, f cmdutil.Factory, streams genericclioptions.IOS
cmd.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&o.onlyDb, "only-db", false, "If provided, only the database is paused.")
cmd.Flags().BoolVar(&o.onlyBackup, "only-backupconfig", false, "If provided, only the backupconfiguration for the database is paused.")
cmd.Flags().BoolVar(&o.onlyArchiver, "only-archiver", false, "If provided, only the archiver for the database is paused.")

return cmd
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func (o *PauseOptions) Run() error {

errs := sets.NewString()
for _, info := range infos {
psr, err := pauser.NewPauser(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup)
psr, err := pauser.NewPauser(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup, o.onlyArchiver)
if err != nil {
if errs.Has(err.Error()) {
continue
Expand All @@ -181,15 +183,19 @@ func (o *PauseOptions) Run() error {
}
allErrs = append(allErrs, err)
errs.Insert(err.Error())
continue
}
pauseAll := !(o.onlyBackup || o.onlyDb)
pauseAll := !(o.onlyBackup || o.onlyDb || o.onlyArchiver)

if o.onlyDb || pauseAll {
fmt.Fprintf(o.Out, "Successfully paused %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyBackup || pauseAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully paused backupconfigurations of %s/%s.\n", info.Namespace, info.Name)
}
if o.onlyArchiver || pauseAll {
fmt.Fprintf(o.Out, "Successfully paused archiver of db %s/%s.\n", info.Namespace, info.Name)
}
}

return utilerrors.NewAggregate(allErrs)
Expand Down
13 changes: 9 additions & 4 deletions pkg/cmds/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type ResumeOptions struct {

genericclioptions.IOStreams

onlyDb bool
onlyBackup bool
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewCmdResume(parent string, f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.Command {
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewCmdResume(parent string, f cmdutil.Factory, streams genericclioptions.IO
cmd.Flags().BoolVar(&o.AllNamespaces, "all-namespaces", o.AllNamespaces, "If present, list the requested object(s) across all namespaces. Namespace in current context is ignored even if specified with --namespace.")
cmd.Flags().BoolVar(&o.onlyDb, "only-db", false, "If provided, only the database is resumed.")
cmd.Flags().BoolVar(&o.onlyBackup, "only-backupconfig", false, "If provided, only the backupconfiguration for the database is resumed.")
cmd.Flags().BoolVar(&o.onlyArchiver, "only-archiver", false, "If provided, only the archiver for the database is resumed.")

return cmd
}
Expand Down Expand Up @@ -164,7 +166,7 @@ func (o *ResumeOptions) Run() error {

errs := sets.NewString()
for _, info := range infos {
rsr, err := resumer.NewResumer(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup)
rsr, err := resumer.NewResumer(o.Factory, info.Mapping, o.onlyDb, o.onlyBackup, o.onlyArchiver)
if err != nil {
if errs.Has(err.Error()) {
continue
Expand All @@ -182,14 +184,17 @@ func (o *ResumeOptions) Run() error {
errs.Insert(err.Error())
}

resumeAll := !(o.onlyBackup || o.onlyDb)
resumeAll := !(o.onlyBackup || o.onlyDb || o.onlyArchiver)

if o.onlyDb || resumeAll {
fmt.Fprintf(o.Out, "Successfully resumed %s/%s.\n", info.Namespace, info.Name)
}
if (o.onlyBackup || resumeAll) && backupConfigFound {
fmt.Fprintf(o.Out, "Successfully resumed backupconfigurations of %s/%s.\n", info.Namespace, info.Name)
}
if o.onlyArchiver || resumeAll {
fmt.Fprintf(o.Out, "Successfully resumed archiver of db %s/%s.\n", info.Namespace, info.Name)
}
}
return utilerrors.NewAggregate(allErrs)
}
4 changes: 2 additions & 2 deletions pkg/data/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ func (opts *mariadbOpts) getShellCommand(command string) (string, error) {
containerName := "mariadb"

if db.Spec.TLS != nil {
cmd = fmt.Sprintf("kubectl exec -n %s svc/%s -c %s -- mysql -u%s -p'%s' --host=%s --port=%s --ssl-ca='%v' --ssl-cert='%v' --ssl-key='%v' %s -e \"%s\"", db.Namespace, db.OffshootName(), containerName, user, password, "127.0.0.1", "3306", mdCaFile, mdCertFile, mdKeyFile, api.ResourceSingularMySQL, command)
cmd = fmt.Sprintf("kubectl exec -n %s svc/%s -c %s -- mariadb -u%s -p'%s' --host=%s --port=%s --ssl-ca='%v' --ssl-cert='%v' --ssl-key='%v' %s -e \"%s\"", db.Namespace, db.OffshootName(), containerName, user, password, "127.0.0.1", "3306", mdCaFile, mdCertFile, mdKeyFile, api.ResourceSingularMySQL, command)
} else {
cmd = fmt.Sprintf("kubectl exec -n %s svc/%s -c %s -- mysql -u%s -p'%s' %s -e \"%s\"", db.Namespace, db.OffshootName(), containerName, user, password, api.ResourceSingularMySQL, command)
cmd = fmt.Sprintf("kubectl exec -n %s svc/%s -c %s -- mariadb -u%s -p'%s' %s -e \"%s\"", db.Namespace, db.OffshootName(), containerName, user, password, api.ResourceSingularMySQL, command)
}

return cmd, err
Expand Down
173 changes: 173 additions & 0 deletions pkg/pauser/archiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the AppsCode Community License 1.0.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://github.com/appscode/licenses/raw/1.0.0/AppsCode-Community-1.0.0.md
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pauser

import (
"context"

coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kmapi "kmodules.xyz/client-go/api/v1"
kmc "kmodules.xyz/client-go/client"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func PauseOrResumeMySQLArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMysqlArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MySQLArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMysqlArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MySQLArchiver, error) {
archiver := &coreapi.MySQLArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}

func PauseOrResumeMariaDBArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMariaDBArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MariaDBArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMariaDBArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MariaDBArchiver, error) {
archiver := &coreapi.MariaDBArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}

return archiver, nil
}

func PauseOrResumePostgresArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getPostgresArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.PostgresArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getPostgresArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.PostgresArchiver, error) {
archiver := &coreapi.PostgresArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}

func PauseOrResumeMongoDBArchiver(klient client.Client, value bool, reference kmapi.ObjectReference) error {
name := reference.Name
namespace := reference.Namespace
archiver, err := getMongoDBArchiver(klient, kmapi.ObjectReference{
Name: name,
Namespace: namespace,
})
if err != nil {
return err
}
_, err = kmc.CreateOrPatch(
context.Background(),
klient,
archiver,
func(obj client.Object, createOp bool) client.Object {
in := obj.(*coreapi.MongoDBArchiver)
in.Spec.Pause = value
return in
},
)
return err
}

func getMongoDBArchiver(klient client.Client, ref kmapi.ObjectReference) (*coreapi.MongoDBArchiver, error) {
archiver := &coreapi.MongoDBArchiver{
ObjectMeta: metav1.ObjectMeta{
Name: ref.Name,
Namespace: ref.Namespace,
},
}
if err := klient.Get(context.Background(), client.ObjectKeyFromObject(archiver), archiver); err != nil {
return nil, err
}
return archiver, nil
}
40 changes: 30 additions & 10 deletions pkg/pauser/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,30 @@ package pauser
import (
"context"

coreapi "kubedb.dev/apimachinery/apis/archiver/v1alpha1"
api "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
cs "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2"
dbutil "kubedb.dev/apimachinery/client/clientset/versioned/typed/kubedb/v1alpha2/util"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
kmc "kmodules.xyz/client-go/client"
condutil "kmodules.xyz/client-go/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
scs "stash.appscode.dev/apimachinery/client/clientset/versioned/typed/stash/v1beta1"
)

type MariaDBPauser struct {
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
onlyDb bool
onlyBackup bool
dbClient cs.KubedbV1alpha2Interface
stashClient scs.StashV1beta1Interface
kc client.Client
onlyDb bool
onlyBackup bool
onlyArchiver bool
}

func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*MariaDBPauser, error) {
func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup, onlyArchiver bool) (*MariaDBPauser, error) {
dbClient, err := cs.NewForConfig(clientConfig)
if err != nil {
return nil, err
Expand All @@ -48,11 +53,18 @@ func NewMariaDBPauser(clientConfig *rest.Config, onlyDb, onlyBackup bool) (*Mari
return nil, err
}

kc, err := kmc.NewUncachedClient(clientConfig, coreapi.AddToScheme)
if err != nil {
return nil, err
}

return &MariaDBPauser{
dbClient: dbClient,
stashClient: stashClient,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
dbClient: dbClient,
stashClient: stashClient,
kc: kc,
onlyDb: onlyDb,
onlyBackup: onlyBackup,
onlyArchiver: onlyArchiver,
}, nil
}

Expand All @@ -62,7 +74,15 @@ func (e *MariaDBPauser) Pause(name, namespace string) (bool, error) {
return false, nil
}

pauseAll := !(e.onlyBackup || e.onlyDb)
pauseAll := !(e.onlyBackup || e.onlyDb || e.onlyArchiver)
if e.onlyArchiver || pauseAll {
if err := PauseOrResumeMariaDBArchiver(e.kc, true, db.Spec.Archiver.Ref); err != nil {
return false, err
}
if e.onlyArchiver {
return false, nil
}
}

if e.onlyDb || pauseAll {
_, err = dbutil.UpdateMariaDBStatus(context.TODO(), e.dbClient, db.ObjectMeta, func(status *api.MariaDBStatus) (types.UID, *api.MariaDBStatus) {
Expand Down
Loading

0 comments on commit c12d92b

Please sign in to comment.