Skip to content

Commit

Permalink
fix(restore): store view build status in SM DB instead of querying it…
Browse files Browse the repository at this point in the history
… in progress

This way there is no problem with getting sctool progress
even when the cluster is no longer available.
This makes TestRestoreTablesProgressIntegration pass.

Fixes #4191
  • Loading branch information
Michal-Leszczynski committed Jan 10, 2025
1 parent e458320 commit 97f1edd
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 39 deletions.
25 changes: 3 additions & 22 deletions pkg/service/restore/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
package restore

import (
"context"
"slices"
"time"

"github.com/pkg/errors"
"github.com/scylladb/gocqlx/v2"
"github.com/scylladb/gocqlx/v2/qb"
"github.com/scylladb/scylla-manager/v3/pkg/schema/table"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand All @@ -26,7 +25,7 @@ type tableKey struct {
}

// aggregateProgress returns restore progress information classified by keyspace and tables.
func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {
func (w *worker) aggregateProgress() (Progress, error) {
var (
p = Progress{
SnapshotTag: w.run.SnapshotTag,
Expand Down Expand Up @@ -99,25 +98,7 @@ func (w *worker) aggregateProgress(ctx context.Context) (Progress, error) {

p.extremeToNil()

for _, v := range w.run.Views {
viewTableName := v.View
if v.Type == SecondaryIndex {
viewTableName += "_index"
}

status, err := w.client.ViewBuildStatus(ctx, v.Keyspace, viewTableName)
if err != nil {
w.logger.Error(ctx, "Couldn't get view build status",
"keyspace", v.Keyspace,
"view", v.View,
"error", err,
)
status = scyllaclient.StatusUnknown
}
v.BuildStatus = status
p.Views = append(p.Views, v)
}

p.Views = slices.Clone(w.run.Views)
for _, hp := range hostProgress {
p.Hosts = append(p.Hosts, hp)
}
Expand Down
18 changes: 4 additions & 14 deletions pkg/service/restore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,8 @@ func (s *Service) GetProgress(ctx context.Context, clusterID, taskID, runID uuid
return Progress{}, errors.Wrap(err, "get run")
}

w, err := s.newProgressWorker(ctx, run)
if err != nil {
return Progress{}, errors.Wrap(err, "create progress worker")
}

pr, err := w.aggregateProgress(ctx)
w := s.newProgressWorker(run)
pr, err := w.aggregateProgress()
if err != nil {
return Progress{}, err
}
Expand Down Expand Up @@ -221,20 +217,14 @@ func (w *worker) setRunInfo(taskID, runID uuid.UUID) {
w.run.ID = runID
}

func (s *Service) newProgressWorker(ctx context.Context, run *Run) (worker, error) {
client, err := s.scyllaClient(ctx, run.ClusterID)
if err != nil {
return worker{}, errors.Wrap(err, "get client")
}

func (s *Service) newProgressWorker(run *Run) worker {
return worker{
run: run,
config: s.config,
logger: s.logger,
metrics: s.metrics,
client: client,
session: s.session,
}, nil
}
}

// GetRun returns run with specified cluster, task and run ID.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func (w *tablesWorker) restore(ctx context.Context) error {
return nil
},
StageRecreateViews: func() error {
for _, v := range w.run.Views {
for i, v := range w.run.Views {
if err := w.CreateView(ctx, v); err != nil {
return errors.Wrapf(err, "recreate %s.%s with statement %s", v.Keyspace, v.View, v.CreateStmt)
}
if err := w.WaitForViewBuilding(ctx, v); err != nil {
if err := w.WaitForViewBuilding(ctx, &w.run.Views[i]); err != nil {
return errors.Wrapf(err, "wait for %s.%s", v.Keyspace, v.View)
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/service/restore/worker_views.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *worker) CreateView(ctx context.Context, view View) error {
return alterSchemaRetryWrapper(ctx, op, notify)
}

func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
func (w *worker) WaitForViewBuilding(ctx context.Context, view *View) error {
labels := metrics.RestoreViewBuildStatusLabels{
ClusterID: w.run.ClusterID.String(),
Keyspace: view.Keyspace,
Expand All @@ -90,6 +90,8 @@ func (w *worker) WaitForViewBuilding(ctx context.Context, view View) error {
return retry.Permanent(err)
}

view.BuildStatus = status
w.insertRun(ctx)
switch status {
case scyllaclient.StatusUnknown:
w.metrics.SetViewBuildStatus(labels, metrics.BuildStatusUnknown)
Expand Down

0 comments on commit 97f1edd

Please sign in to comment.