Skip to content

Commit

Permalink
feat(restore): extend batch with type and TOC
Browse files Browse the repository at this point in the history
This commit extends SSTable structure with
its TOC component, which is needed when using
Scylla restore API.
Moreover, it introduces batch types, which are
also needed for deciding, whether given batch
can be restored with Scylla restore API or
the Rclone API.
It also makes sure that all SSTables within
the same batch belong to the same batch type.
  • Loading branch information
Michal-Leszczynski committed Jan 8, 2025
1 parent 0aaf966 commit d5264ad
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 34 deletions.
92 changes: 75 additions & 17 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
)

// batchDispatcher is a tool for batching SSTables from
// Workload across different hosts during restore.
// It follows a few rules:
//
// - all SSTables within a batch have the same have the same batchType
//
// - it dispatches batches from the RemoteDirWorkload with the biggest
// initial size first
//
Expand Down Expand Up @@ -103,7 +106,7 @@ type workloadProgress struct {
// that are yet to be batched.
type remoteSSTableDirProgress struct {
RemainingSize int64
RemainingSSTables []RemoteSSTable
RemainingSSTables map[batchType][]RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[Location][]string) workloadProgress {
Expand All @@ -115,7 +118,7 @@ func newWorkloadProgress(workload Workload, locationHosts map[Location][]string)
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC)
p[i] = remoteSSTableDirProgress{
RemainingSize: rdw.Size,
RemainingSSTables: rdw.SSTables,
RemainingSSTables: groupSSTablesByBatchType(rdw.SSTables),
}
}
hostDCAccess := make(map[string][]string)
Expand Down Expand Up @@ -150,11 +153,24 @@ type batch struct {
TableName
*ManifestInfo

batchType batchType
RemoteSSTableDir string
Size int64
SSTables []RemoteSSTable
}

// Dividing batches by simplifies the restore procedure:
// - Files from versioned batches need to be downloaded one by one
// in order to rename them on the fly with Rclone API.
// - Batches with sstable.UUID type can be restored with native Scylla restore API.
type batchType struct {
// All SSTables within a batch have the same ID type
IDType sstable.IDType
// All SSTables within a batch are either versioned or not.
Versioned bool
// In theory, batchType{IDType: sstable.UUID, Versioned: true} shouldn't exist
}

func (b batch) NotVersionedSSTables() []RemoteSSTable {
var ssts []RemoteSSTable
for _, sst := range b.SSTables {
Expand Down Expand Up @@ -188,21 +204,36 @@ func (b batch) VersionedSize() int64 {
func (b batch) IDs() []string {
var ids []string
for _, sst := range b.SSTables {
ids = append(ids, sst.ID)
ids = append(ids, sst.ID.ID)
}
return ids
}

// TOC returns a list of batch's sstable.ComponentTOC.
func (b batch) TOC() []string {
out := make([]string, 0, len(b.SSTables))
for _, sst := range b.SSTables {
out = append(out, sst.TOC)
}
return out
}

// ValidateAllDispatched returns error if not all SSTables were dispatched.
func (bd *batchDispatcher) ValidateAllDispatched() error {
bd.mu.Lock()
defer bd.mu.Unlock()

for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
failed := rdp.RemainingSize != 0
for _, ssts := range rdp.RemainingSSTables {
if len(ssts) != 0 {
failed = true
}
}
if failed {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
rdw.Location, rdw.Keyspace, rdw.Table, rdp.RemainingSize)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down Expand Up @@ -277,6 +308,21 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
if shardCnt == 0 {
shardCnt = 1
}

// Choose batch type and candidate sstables
var batchT batchType
var sstables []RemoteSSTable
for bt, ssts := range rdp.RemainingSSTables {
if len(ssts) > 0 {
batchT = bt
sstables = ssts
break
}
}
if len(sstables) == 0 {
return batch{}, false
}

var i int
var size int64
if bd.batchSize == maxBatchSize {
Expand All @@ -286,13 +332,13 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
sizeLimit := expectedNodeWorkload / 20
for {
for range shardCnt {
if i >= len(rdp.RemainingSSTables) {
if i >= len(sstables) {
break
}
size += rdp.RemainingSSTables[i].Size
size += sstables[i].Size
i++
}
if i >= len(rdp.RemainingSSTables) {
if i >= len(sstables) {
break
}
if size > sizeLimit {
Expand All @@ -301,9 +347,9 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
}
} else {
// Create batch containing node_shard_count*batch_size sstables.
i = min(bd.batchSize*int(shardCnt), len(rdp.RemainingSSTables))
i = min(bd.batchSize*int(shardCnt), len(sstables))
for j := range i {
size += rdp.RemainingSSTables[j].Size
size += sstables[j].Size
}
}

Expand All @@ -312,23 +358,23 @@ func (bd *batchDispatcher) createBatch(dirIdx int, host string) (batch, bool) {
}
// Extend batch if it was to leave less than
// 1 sstable per shard for the next one.
if len(rdp.RemainingSSTables)-i < int(shardCnt) {
for ; i < len(rdp.RemainingSSTables); i++ {
size += rdp.RemainingSSTables[i].Size
if len(sstables)-i < int(shardCnt) {
for ; i < len(sstables); i++ {
size += sstables[i].Size
}
}

sstables := rdp.RemainingSSTables[:i]
rdp.RemainingSSTables = rdp.RemainingSSTables[i:]
rdp.RemainingSSTables[batchT] = sstables[i:]
rdw := bd.workload.RemoteDir[dirIdx]

rdp.RemainingSize -= size
return batch{
TableName: rdw.TableName,
ManifestInfo: rdw.ManifestInfo,
batchType: batchT,
RemoteSSTableDir: rdw.RemoteSSTableDir,
Size: size,
SSTables: sstables,
SSTables: sstables[:i],
}, true
}

Expand Down Expand Up @@ -365,7 +411,7 @@ func (bd *batchDispatcher) ReportFailure(host string, b batch) error {
}

rdp := &bd.workloadProgress.remoteDir[dirIdx]
rdp.RemainingSSTables = append(b.SSTables, rdp.RemainingSSTables...)
rdp.RemainingSSTables[b.batchType] = append(b.SSTables, rdp.RemainingSSTables[b.batchType]...)
rdp.RemainingSize += b.Size

bd.wakeUpWaiting()
Expand Down Expand Up @@ -394,3 +440,15 @@ func sortWorkload(workload Workload) {
})
}
}

func groupSSTablesByBatchType(sstables []RemoteSSTable) map[batchType][]RemoteSSTable {
out := make(map[batchType][]RemoteSSTable)
for _, sst := range sstables {
bt := batchType{
IDType: sst.ID.Type,
Versioned: sst.Versioned,
}
out[bt] = append(out[bt], sst)
}
return out
}
118 changes: 118 additions & 0 deletions pkg/service/restore/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
package restore

import (
"maps"
"testing"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
)

func TestBatchDispatcher(t *testing.T) {
Expand Down Expand Up @@ -166,3 +168,119 @@ func TestBatchDispatcher(t *testing.T) {
t.Fatalf("Expected sstables to be batched: %s", err)
}
}

func TestBatchDispatcherType(t *testing.T) {
l := backupspec.Location{
Provider: "s3",
Path: "l",
}

rawWorkload := []RemoteDirWorkload{
{
ManifestInfo: &backupspec.ManifestInfo{
Location: l,
DC: "dc1",
},
TableName: TableName{
Keyspace: "ks1",
Table: "t1",
},
RemoteSSTableDir: "a",
Size: 7,
SSTables: []RemoteSSTable{
// Integer ID, not versioned
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.IntegerID}},
Versioned: false,
Size: 1,
},
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.IntegerID}},
Versioned: false,
Size: 1,
},
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.IntegerID}},
Versioned: false,
Size: 1,
},
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.IntegerID}},
Versioned: false,
Size: 1,
},
// Integer ID, versioned
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.IntegerID}},
Versioned: true,
Size: 1,
},
// UUID, not versioned
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.UUID}},
Size: 1,
},
{
SSTable: SSTable{ID: sstable.ID{Type: sstable.UUID}},
Size: 1,
},
},
},
}

workload := aggregateWorkload(rawWorkload)
batchSize := 3
locationHosts := map[backupspec.Location][]string{
l: {"h1"},
}
hostToShard := map[string]uint{
"h1": 1,
}

bd := newBatchDispatcher(workload, batchSize, hostToShard, locationHosts)

type batchTypeWithSSTableCnt struct {
bt batchType
SSTableCnt int
}

// Describes how many batchTypeWithSSTableCnt are we expecting to encounter
expected := map[batchTypeWithSSTableCnt]int{
batchTypeWithSSTableCnt{
bt: batchType{IDType: sstable.IntegerID, Versioned: false},
SSTableCnt: 3,
}: 1,
batchTypeWithSSTableCnt{
bt: batchType{IDType: sstable.IntegerID, Versioned: false},
SSTableCnt: 1,
}: 1,
batchTypeWithSSTableCnt{
bt: batchType{IDType: sstable.IntegerID, Versioned: true},
SSTableCnt: 1,
}: 1,
batchTypeWithSSTableCnt{
bt: batchType{IDType: sstable.UUID},
SSTableCnt: 2,
}: 1,
}

result := make(map[batchTypeWithSSTableCnt]int)
for {
b, ok := bd.dispatchBatch("h1")
if !ok {
break
}
result[batchTypeWithSSTableCnt{
bt: b.batchType,
SSTableCnt: len(b.SSTables),
}]++
bd.ReportSuccess(b)
}

if !maps.Equal(expected, result) {
t.Fatalf("Expected batches %v, got %v", expected, result)
}
if err := bd.ValidateAllDispatched(); err != nil {
t.Fatalf("Expected all sstables to be batched: %s", err)
}
}
Loading

0 comments on commit d5264ad

Please sign in to comment.