diff --git a/README.md b/README.md index f802597..632f678 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Cain is a backup and restore tool for Cassandra on Kubernetes. It is named after Cain supports the following cloud storage services: * AWS S3 +* Minio S3 * Azure Blob Storage Cain is now an official part of the Helm [incubator/cassandra](https://github.com/helm/charts/tree/master/incubator/cassandra) chart! @@ -109,6 +110,7 @@ Flags: -k, --keyspace string keyspace to act on -n, --namespace string namespace to find cassandra cluster -p, --parallel int number of files to copy in parallel. set this flag to 0 for full parallelism (default 1) + -s, --schema string schema version to restore (optional) -l, --selector string selector to filter on --src string source to restore from. Example: s3://bucket/cassandra/namespace/cluster-name -t, --tag string tag to restore diff --git a/cmd/cain.go b/cmd/cain.go index 94865fa..b8026ef 100644 --- a/cmd/cain.go +++ b/cmd/cain.go @@ -157,7 +157,7 @@ func NewRestoreCmd(out io.Writer) *cobra.Command { f.StringVar(&r.src, "src", "", "source to restore from. Example: s3://bucket/cassandra/namespace/cluster-name") f.StringVarP(&r.keyspace, "keyspace", "k", "", "keyspace to act on") f.StringVarP(&r.tag, "tag", "t", "", "tag to restore") - f.StringVarP(&r.schema, "schema", "s", "", "schema to restore") + f.StringVarP(&r.schema, "schema", "s", "", "schema version to restore (optional)") f.StringVarP(&r.namespace, "namespace", "n", "", "namespace to find cassandra cluster") f.StringVarP(&r.selector, "selector", "l", "", "selector to filter on") f.StringVarP(&r.container, "container", "c", "cassandra", "container name to act on") diff --git a/pkg/cain/cain.go b/pkg/cain/cain.go index f6015ea..238e143 100644 --- a/pkg/cain/cain.go +++ b/pkg/cain/cain.go @@ -1,6 +1,7 @@ package cain import ( + "fmt" "log" "path/filepath" @@ -100,8 +101,21 @@ func Restore(o RestoreOptions) error { log.Println("Getting current schema") _, sum, err := DescribeKeyspaceSchema(k8sClient, o.Namespace, existingPods[0], o.Container, o.Keyspace) if err != nil { - return err + if o.Schema == "" { + return err + } + log.Println("Schema not found, restoring schema", o.Schema) + sum, err = RestoreKeyspaceSchema(srcClient, k8sClient, srcPrefix, srcBasePath, o.Namespace, existingPods[0], o.Container, o.Keyspace, o.Schema, o.Parallel, o.BufferSize) + if err != nil { + return err + } + log.Println("Restored schema:", sum) + } + + if o.Schema != "" && sum != o.Schema { + return fmt.Errorf("specified schema %s is not the same as found schema %s", o.Schema, sum) } + log.Println("Found schema:", sum) log.Println("Calculating paths. This may take a while...") diff --git a/pkg/cain/cqlsh.go b/pkg/cain/cqlsh.go index 853f9f1..71ee095 100644 --- a/pkg/cain/cqlsh.go +++ b/pkg/cain/cqlsh.go @@ -40,7 +40,7 @@ func DescribeKeyspaceSchema(iK8sClient interface{}, namespace, pod, container, k command := []string{fmt.Sprintf("DESC %s;", keyspace)} schema, err := Cqlsh(iK8sClient, namespace, pod, container, command) if err != nil { - return nil, "", fmt.Errorf("Could not describe schema. make sure a schema exists for keyspace \"%s\". %s", keyspace, err) + return nil, "", fmt.Errorf("Could not describe schema. make sure a schema exists for keyspace \"%s\" or restore it using \"--schema\". %s", keyspace, err) } h := sha256.New() h.Write(schema) @@ -49,6 +49,24 @@ func DescribeKeyspaceSchema(iK8sClient interface{}, namespace, pod, container, k return schema, sum, nil } +// RestoreKeyspaceSchema restores a keyspace schema +func RestoreKeyspaceSchema(srcClient, iK8sClient interface{}, srcPrefix, srcPath, namespace, pod, container, keyspace, schema string, parallel int, bufferSize float64) (string, error) { + schemaTmpFile := fmt.Sprintf("/tmp/%s/schema.cql", keyspace) + fromTo := skbn.FromToPair{ + FromPath: filepath.Join(srcPath, keyspace, schema, "schema.cql"), + ToPath: filepath.Join(namespace, pod, container, schemaTmpFile), + } + if err := skbn.PerformCopy(srcClient, iK8sClient, srcPrefix, "k8s", []skbn.FromToPair{fromTo}, parallel, bufferSize); err != nil { + return "", err + } + if _, err := CqlshF(iK8sClient, namespace, pod, container, schemaTmpFile); err != nil { + return "", err + } + _, sum, err := DescribeKeyspaceSchema(iK8sClient, namespace, pod, container, keyspace) + + return sum, err +} + // TruncateTables truncates the provided tables in all pods func TruncateTables(iK8sClient interface{}, namespace, container, keyspace string, pods, tables, materializedViews []string) { bwgSize := len(pods) @@ -123,6 +141,24 @@ func Cqlsh(iK8sClient interface{}, namespace, pod, container string, command []s return removeWarning(stdout.Bytes()), nil } +// CqlshF executes cqlsh -f file in a given pod +func CqlshF(iK8sClient interface{}, namespace, pod, container string, file string) ([]byte, error) { + k8sClient := iK8sClient.(*skbn.K8sClient) + + command := []string{"cqlsh", "-f", file} + stdout := new(bytes.Buffer) + stderr, err := skbn.Exec(*k8sClient, namespace, pod, container, command, nil, stdout) + + if len(stderr) != 0 { + return nil, fmt.Errorf("STDERR: " + (string)(stderr)) + } + if err != nil { + return nil, err + } + + return removeWarning(stdout.Bytes()), nil +} + func removeWarning(b []byte) []byte { const warning = "Warning: Cannot create directory at `/home/cassandra/.cassandra`. Command history will not be saved." return []byte(strings.Replace((string)(b), warning, "", 1))