Skip to content

Commit

Permalink
feat(restore): use Scylla restore API
Browse files Browse the repository at this point in the history
This commit adds code for using Scylla restore API.
Luckily for us, handling pause/resume
is analogous to the Rclone API handling.

Fixes #4144
Fixes #4137
  • Loading branch information
Michal-Leszczynski committed Jan 8, 2025
1 parent d5264ad commit 43684fd
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 7 deletions.
5 changes: 5 additions & 0 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ func (s *Service) newWorker(ctx context.Context, clusterID uuid.UUID) (worker, e
if err != nil {
return worker{}, errors.Wrap(err, "get CQL cluster session")
}
nodeConfig, err := s.configCache.ReadAll(clusterID)
if err != nil {
return worker{}, errors.Wrap(err, "read all nodes config")
}

return worker{
run: &Run{
Expand All @@ -213,6 +217,7 @@ func (s *Service) newWorker(ctx context.Context, clusterID uuid.UUID) (worker, e
client: client,
session: s.session,
clusterSession: clusterSession,
nodeConfig: nodeConfig,
}, nil
}

Expand Down
23 changes: 21 additions & 2 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}
if err := w.checkAvailableDiskSpace(ctx, hi.Host); err != nil {
return errors.Wrap(err, "validate free disk space")
}
// Download and stream in parallel
b, ok := bd.DispatchBatch(ctx, hi.Host)
if !ok {
Expand All @@ -235,6 +238,22 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.onBatchDispatch(ctx, b, host)

if ok, err := w.useScyllaRestoreAPI(ctx, b, host); err != nil {
return errors.Wrap(err, "check Scylla restore API support")
} else if ok {
w.logger.Info(ctx, "Use Scylla restore API", "host", host, "keyspace", b.Keyspace, "table", b.Table)
if err := w.scyllaRestore(ctx, host, b); err != nil {
err = multierr.Append(errors.Wrap(err, "restore batch"), bd.ReportFailure(hi.Host, b))
w.logger.Error(ctx, "Failed to restore batch",
"host", hi.Host,
"error", err)
} else {
bd.ReportSuccess(b)
}
continue
}

w.logger.Info(ctx, "Use Rclone copypaths API", "host", host, "keyspace", b.Keyspace, "table", b.Table)
pr, err := w.newRunProgress(ctx, hi, b)
if err != nil {
err = multierr.Append(errors.Wrap(err, "create new run progress"), bd.ReportFailure(hi.Host, b))
Expand All @@ -244,8 +263,8 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
continue
}
if err := w.restoreBatch(ctx, b, pr); err != nil {
err = multierr.Append(errors.Wrap(err, "restore batch"), bd.ReportFailure(hi.Host, b))
w.logger.Error(ctx, "Failed to restore batch",
err = multierr.Append(errors.Wrap(err, "load and stream batch"), bd.ReportFailure(hi.Host, b))
w.logger.Error(ctx, "Failed to load and stream batch",
"host", hi.Host,
"error", err)
continue
Expand Down
4 changes: 0 additions & 4 deletions pkg/service/restore/tablesdir_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProg

// newRunProgress creates RunProgress by starting download to host's upload dir.
func (w *tablesWorker) newRunProgress(ctx context.Context, hi HostInfo, b batch) (*RunProgress, error) {
if err := w.checkAvailableDiskSpace(ctx, hi.Host); err != nil {
return nil, errors.Wrap(err, "validate free disk space")
}

uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName])
if err := w.cleanUploadDir(ctx, hi.Host, uploadDir, nil); err != nil {
return nil, errors.Wrapf(err, "clean upload dir of host %s", hi.Host)
Expand Down
23 changes: 22 additions & 1 deletion pkg/service/restore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"net"
"path"
"regexp"
"slices"
Expand All @@ -22,14 +23,15 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
"github.com/scylladb/scylla-manager/v3/pkg/util/version"
)

// restoreWorkerTools consists of utils common for both schemaWorker and tablesWorker.
// worker consists of utils common for both schemaWorker and tablesWorker.
type worker struct {
run *Run
target Target
Expand All @@ -42,6 +44,7 @@ type worker struct {
client *scyllaclient.Client
session gocqlx.Session
clusterSession gocqlx.Session
nodeConfig map[string]configcache.NodeConfig
}

func (w *worker) randomHostFromLocation(loc Location) string {
Expand Down Expand Up @@ -809,3 +812,21 @@ func (w *worker) stopJob(ctx context.Context, jobID int64, host string) {
)
}
}

// nodeInfo is a getter for worker.nodeConfig which is a workaround for #4181.
func (w *worker) nodeInfo(ctx context.Context, host string) (*scyllaclient.NodeInfo, error) {
// Try to get direct entry in config cache
if nc, ok := w.nodeConfig[host]; ok {
return nc.NodeInfo, nil
}
// Try to get resolved entry in config cache
if hostIP := net.ParseIP(host); hostIP != nil {
for h, nc := range w.nodeConfig {
if ip := net.ParseIP(h); ip != nil && hostIP.Equal(ip) {
return nc.NodeInfo, nil
}
}
}
// Last resort - query node info from the scratch
return w.client.NodeInfo(ctx, host)
}
122 changes: 122 additions & 0 deletions pkg/service/restore/worker_scylla_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (C) 2024 ScyllaDB

package restore

import (
"context"
"slices"
"strings"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/models"
)

// Decides whether we should use Scylla restore API for restoring the batch.
func (w *worker) useScyllaRestoreAPI(ctx context.Context, b batch, host string) (bool, error) {
// Scylla restore API does not handle restoration of versioned files
if b.batchType.Versioned {
return false, nil
}
// Scylla restore API does not handle SSTables with sstable.IntegerID
if b.batchType.IDType == sstable.IntegerID {
return false, nil
}
// List of object storage providers supported by Scylla restore API
scyllaSupportedProviders := []Provider{
S3,
}
if !slices.Contains(scyllaSupportedProviders, b.Location.Provider) {
return false, nil
}
// Check if node exposes Scylla restore API
nc, err := w.nodeInfo(ctx, host)
if err != nil {
return false, errors.Wrapf(err, "get node %s info", host)
}
return nc.SupportsScyllaBackupRestoreAPI()
}

func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch) error {
// batch.RemoteSSTableDir has "<provider>:<bucket>/<path>" format
prefix, ok := strings.CutPrefix(b.RemoteSSTableDir, b.Location.StringWithoutDC())
if !ok {
return errors.Errorf("remote sstable dir (%s) should contain location path prefix (%s)", b.RemoteSSTableDir, b.Location.Path)
}
id, err := w.client.ScyllaRestore(ctx, host, string(b.Location.Provider), b.Location.Path, prefix, b.Keyspace, b.Table, b.TOC())
if err != nil {
return errors.Wrap(err, "restore")
}

pr := &RunProgress{
ClusterID: w.run.ClusterID,
TaskID: w.run.TaskID,
RunID: w.run.ID,
RemoteSSTableDir: b.RemoteSSTableDir,
Keyspace: b.Keyspace,
Table: b.Table,
Host: host,
ShardCnt: int64(w.hostShardCnt[host]),
ScyllaTaskID: id,
SSTableID: b.IDs(),
}
w.insertRunProgress(ctx, pr)

err = w.scyllaWaitTask(ctx, pr, b)
if err != nil {
w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, host, metrics.RestoreStateError)
w.cleanupRunProgress(context.Background(), pr)
}
return err
}

func (w *tablesWorker) scyllaWaitTask(ctx context.Context, pr *RunProgress, b batch) (err error) {
defer func() {
// On error abort task
if err != nil {
if e := w.client.ScyllaAbortTask(context.Background(), pr.Host, pr.ScyllaTaskID); e != nil {
w.logger.Error(ctx, "Failed to abort task",
"host", pr.Host,
"id", pr.ScyllaTaskID,
"error", e,
)
}
}
}()

for {
if ctx.Err() != nil {
return ctx.Err()
}

task, err := w.client.ScyllaWaitTask(ctx, pr.Host, pr.ScyllaTaskID, int64(w.config.LongPollingTimeoutSeconds))
if err != nil {
return errors.Wrap(err, "wait for scylla task")
}
w.scyllaUpdateProgress(ctx, pr, b, task)
switch scyllaclient.ScyllaTaskState(task.State) {
case scyllaclient.ScyllaTaskStateFailed:
return errors.Errorf("task error (%s): %s", pr.ScyllaTaskID, task.Error)
case scyllaclient.ScyllaTaskStateDone:
return nil
}
}
}

func (w *worker) scyllaUpdateProgress(ctx context.Context, pr *RunProgress, b batch, task *models.TaskStatus) {
if t := time.Time(task.StartTime); !t.IsZero() {
pr.DownloadStartedAt = &t
pr.RestoreStartedAt = &t
}
if t := time.Time(task.EndTime); !t.IsZero() {
pr.DownloadCompletedAt = &t
pr.RestoreCompletedAt = &t
}
pr.Error = task.Error
pr.Downloaded = b.Size * int64(task.ProgressCompleted/task.ProgressTotal)
w.insertRunProgress(ctx, pr)
}

0 comments on commit 43684fd

Please sign in to comment.