diff --git a/ReadMe.md b/ReadMe.md index ef43008d..72fd3a05 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -378,7 +378,8 @@ general: watch_interval: 1h # WATCH_INTERVAL, use only for `watch` command, backup will create every 1h full_interval: 24h # FULL_INTERVAL, use only for `watch` command, full backup will create every 24h watch_backup_name_template: "shard{shard}-{type}-{time:20060102150405}" # WATCH_BACKUP_NAME_TEMPLATE, used only for `watch` command, macros values will apply from `system.macros` for time:XXX, look format in https://go.dev/src/time/format.go - + + sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas. clickhouse: username: default # CLICKHOUSE_USERNAME password: "" # CLICKHOUSE_PASSWORD diff --git a/pkg/backup/backup_shard.go b/pkg/backup/backup_shard.go new file mode 100644 index 00000000..867e1971 --- /dev/null +++ b/pkg/backup/backup_shard.go @@ -0,0 +1,129 @@ +package backup + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "sort" +) + +var ( + // errUnknownBackupShard is returned when sharding assignment is requested for a table for which + // active replication state is not known. + errUnknownBackupShard = errors.New("unknown backup shard") + + // errNoActiveReplicas is returned when a table is has no current active replicas + errNoActiveReplicas = errors.New("no active replicas") +) + +// shardDetermination is an object holding information on whether or not a table is within the +// backup shard +type shardDetermination map[string]bool + +// inShard returns whether or not a given table is within a backup shard +func (d shardDetermination) inShard(database, table string) (bool, error) { + fullName := fmt.Sprintf("%s.%s", database, table) + presentInShard, ok := d[fullName] + if !ok { + return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName, + errUnknownBackupShard) + } + return presentInShard, nil +} + +// backupSharder is an interface which can obtain a shard determination at a given point in time +type backupSharder interface { + determineShards(ctx context.Context) (shardDetermination, error) +} + +// tableReplicaMetadata is data derived from `system.replicas` +type tableReplicaMetadata struct { + Database string `ch:"database" json:"database"` + Table string `ch:"table" json:"table"` + ReplicaName string `ch:"replica_name" json:"replica_name"` + // TODO: Change type to use replica_is_active directly after upgrade to clickhouse-go v2 + ActiveReplicas []string `ch:"active_replicas" json:"replica_is_active"` +} + +// fullName returns the table name in the form of `database.table` +func (md *tableReplicaMetadata) fullName() string { + return fmt.Sprintf("%s.%s", md.Database, md.Table) +} + +// querier is an interface that can query Clickhouse +type querier interface { + SelectContext(context.Context, interface{}, string, ...interface{}) error +} + +// shardFunc is a function that is used to determine whether or not a given database/table should be +// handled by a given replica +type shardFunc func(md *tableReplicaMetadata) (bool, error) + +// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then +// performs a mod N operation (where N is the number of active replicas) in order to find an index +// in an alphabetically sorted list of active replicas which corresponds to the replica that will +// handle the backup of the table +func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) { + if len(md.ActiveReplicas) == 0 { + return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(), + errNoActiveReplicas) + } + sort.Strings(md.ActiveReplicas) + + h := fnv.New32a() + h.Write([]byte(md.fullName())) + i := h.Sum32() % uint32(len(md.ActiveReplicas)) + return md.ActiveReplicas[i] == md.ReplicaName, nil +} + +// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination +// by examining replica information +type replicaDeterminer struct { + q querier + sf shardFunc +} + +// newReplicaDeterminer returns a new shardDeterminer +func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer { + sd := &replicaDeterminer{ + q: q, + sf: sf, + } + return sd +} + +// getReplicaState obtains the local replication state through a query to `system.replicas` +func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) { + md := []tableReplicaMetadata{} + // TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2 + query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table" + if err := rd.q.SelectContext(ctx, &md, query); err != nil { + return nil, fmt.Errorf("could not determine replication state: %w", err) + } + + // Handle views and memory tables by putting in stand-in replication metadata + for i, entry := range md { + if entry.ReplicaName == "" && len(entry.ActiveReplicas) == 0 { + md[i].ReplicaName = "no-replicas" + md[i].ActiveReplicas = []string{"no-replicas"} + } + } + return md, nil +} + +func (rd *replicaDeterminer) determineShards(ctx context.Context) (shardDetermination, error) { + md, err := rd.getReplicaState(ctx) + if err != nil { + return nil, err + } + sd := shardDetermination{} + for _, entry := range md { + assigned, err := rd.sf(&entry) + if err != nil { + return nil, err + } + sd[entry.fullName()] = assigned + } + return sd, nil +} diff --git a/pkg/backup/backup_shard_test.go b/pkg/backup/backup_shard_test.go new file mode 100644 index 00000000..f75a1a65 --- /dev/null +++ b/pkg/backup/backup_shard_test.go @@ -0,0 +1,306 @@ +package backup + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "testing" +) + +func TestInShard(t *testing.T) { + d := shardDetermination{ + "present_db.present_table": true, + "present_db.absent_table": false, + "absent_db.present_table": false, + "absent_db.absent_table": false, + } + testcases := []struct { + name string + database string + table string + expectPresent bool + expectErr error + }{ + { + name: "Test nonexistent database", + database: "nonexistent", + table: "present_table", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test nonexistent table", + database: "present_db", + table: "nonexistent", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test in-shard table", + database: "present_db", + table: "present_table", + expectPresent: true, + }, + { + name: "Test out-shard table", + database: "present_db", + table: "absent_table", + expectPresent: false, + }, + { + name: "Test out-shard database with in-shard table name for other database", + database: "absent_db", + table: "present_table", + expectPresent: false, + }, + { + name: "Test out-shard database", + database: "absent_db", + table: "absent_table", + expectPresent: false, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + present, err := d.inShard(tc.database, tc.table) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected err %q, got %q", tc.expectErr, err) + } + if present != tc.expectPresent { + t.Fatalf("expected in-shard status %v, got %v", tc.expectPresent, present) + } + } +} + +func TestFullName(t *testing.T) { + testcases := []struct { + md tableReplicaMetadata + expect string + }{ + { + md: tableReplicaMetadata{ + Database: "a", + Table: "b", + }, + expect: "a.b", + }, + { + md: tableReplicaMetadata{ + Database: "db", + }, + expect: "db.", + }, + { + md: tableReplicaMetadata{ + Table: "t", + }, + expect: ".t", + }, + } + for _, tc := range testcases { + if tc.md.fullName() != tc.expect { + t.Fatalf("expected %q, got %q", tc.expect, tc.md.fullName()) + } + } +} + +func TestFNVHashModShardFunc(t *testing.T) { + testcases := []struct { + name string + md *tableReplicaMetadata + expect bool + expectErr error + }{ + { + name: "Test no active replicas", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + expectErr: errNoActiveReplicas, + }, + { + name: "Test single active replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"replica"}, + }, + expect: true, + }, + { + name: "Test not assigned replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"different"}, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + got, err := fnvHashModShardFunc(tc.md) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if got != tc.expect { + t.Fatalf("expected shard membership %v, got %v", tc.expect, got) + } + } +} + +type testQuerier struct { + data any + returnErr error +} + +func (tq *testQuerier) SelectContext(_ context.Context, dest interface{}, _ string, + args ...interface{}) error { + if tq.returnErr != nil { + return tq.returnErr + } + jsonData, err := json.Marshal(tq.data) + if err != nil { + return fmt.Errorf("error encoding data: %w", err) + } + return json.NewDecoder(bytes.NewReader(jsonData)).Decode(dest) +} + +func TestGetReplicaState(t *testing.T) { + data := []tableReplicaMetadata{ + { + Database: "db", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + { + Database: "db2", + Table: "table2", + ReplicaName: "replica2", + ActiveReplicas: []string{"replica2"}, + }, + } + expectedErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect []tableReplicaMetadata + expectErr error + }{ + { + name: "Test error on obtaining replica state", + q: &testQuerier{ + returnErr: expectedErr, + }, + expectErr: expectedErr, + }, + { + name: "Test pulling data", + q: &testQuerier{ + data: data, + }, + expect: data, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + rd := newReplicaDeterminer(tc.q, nil) + got, err := rd.getReplicaState(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + } +} + +var errNameSharder = errors.New("expected error") + +func nameSharder(md *tableReplicaMetadata) (bool, error) { + if md.Table == "error" { + return false, errNameSharder + } + if md.Table == "present" { + return true, nil + } + return false, nil +} + +func TestDetermineShards(t *testing.T) { + expectErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect shardDetermination + expectErr error + }{ + { + name: "Test query error", + q: &testQuerier{ + returnErr: expectErr, + }, + expectErr: expectErr, + }, + { + name: "Test shard func error", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Table: "error", + }, + }, + }, + expectErr: errNameSharder, + }, + { + name: "Test normal operation", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Database: "a", + Table: "present", + }, + { + Database: "a", + Table: "absent", + }, + { + Database: "b", + Table: "present", + }, + }, + }, + expect: shardDetermination{ + "a.present": true, + "a.absent": false, + "b.present": true, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + rd := newReplicaDeterminer(tc.q, nameSharder) + got, err := rd.determineShards(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + } +} diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index a88ee657..f0d68646 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -3,17 +3,23 @@ package backup import ( "context" "fmt" + "path" + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/pkg/config" "github.com/Altinity/clickhouse-backup/pkg/resumable" "github.com/Altinity/clickhouse-backup/pkg/storage" + apexLog "github.com/apex/log" - "path" ) +type BackuperOpt func(*Backuper) + type Backuper struct { cfg *config.Config ch *clickhouse.ClickHouse + vers versioner + bs backupSharder dst *storage.BackupDestination log *apexLog.Entry DiskToPathMap map[string]string @@ -24,15 +30,33 @@ type Backuper struct { resumableState *resumable.State } -func NewBackuper(cfg *config.Config) *Backuper { +func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper { ch := &clickhouse.ClickHouse{ Config: &cfg.ClickHouse, Log: apexLog.WithField("logger", "clickhouse"), } - return &Backuper{ - cfg: cfg, - ch: ch, - log: apexLog.WithField("logger", "backuper"), + b := &Backuper{ + cfg: cfg, + ch: ch, + vers: ch, + bs: newReplicaDeterminer(ch, fnvHashModShardFunc), + log: apexLog.WithField("logger", "backuper"), + } + for _, opt := range opts { + opt(b) + } + return b +} + +func WithVersioner(v versioner) BackuperOpt { + return func(b *Backuper) { + b.vers = v + } +} + +func WithBackupSharder(s backupSharder) BackuperOpt { + return func(b *Backuper) { + b.bs = s } } @@ -75,3 +99,37 @@ func (b *Backuper) getLocalBackupDataPathForTable(backupName string, disk string } return backupPath } + +// populateBackupShardField populates the BackupShard field for a slice of Table structs +func (b *Backuper) populateBackupShardField(ctx context.Context, tables []clickhouse.Table) error { + // By default, have all fields populated to full backup unless the table is to be skipped + for i := range tables { + tables[i].BackupType = clickhouse.ShardBackupFull + if tables[i].Skip { + tables[i].BackupType = clickhouse.ShardBackupNone + } + } + if !b.cfg.General.ShardedOperation { + return nil + } + if err := canShardOperation(ctx, b.vers); err != nil { + return err + } + assignment, err := b.bs.determineShards(ctx) + if err != nil { + return err + } + for i, t := range tables { + if t.Skip { + continue + } + fullBackup, err := assignment.inShard(t.Database, t.Name) + if err != nil { + return err + } + if !fullBackup { + tables[i].BackupType = clickhouse.ShardBackupSchema + } + } + return nil +} diff --git a/pkg/backup/backuper_test.go b/pkg/backup/backuper_test.go new file mode 100644 index 00000000..cdeb8de3 --- /dev/null +++ b/pkg/backup/backuper_test.go @@ -0,0 +1,170 @@ +package backup + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/config" +) + +type testBackupSharder struct { + data shardDetermination + err error +} + +func (bs *testBackupSharder) determineShards(_ context.Context) (shardDetermination, error) { + if bs.err != nil { + return nil, bs.err + } + return bs.data, nil +} + +func TestPopulateBackupShardField(t *testing.T) { + errVersion := errors.New("versioner error") + errVersioner := newTestVersioner(withVersionErr(errVersion)) + goodVersioner := newTestVersioner(withVersion(minVersShardOp)) + oldVersioner := newTestVersioner(withVersion(-1)) + + // Create tables to reset field state + tableData := func() []clickhouse.Table { + return []clickhouse.Table{ + { + Database: "a", + Name: "present", + }, + { + Database: "a", + Name: "absent", + }, + { + Database: "b", + Name: "present", + Skip: true, + }, + } + } + + errShard := errors.New("backup sharder error") + errSharder := &testBackupSharder{err: errShard} + staticSharder := &testBackupSharder{ + data: shardDetermination{ + "a.present": true, + "a.absent": false, + }, + } + emptySharder := &testBackupSharder{ + data: shardDetermination{}, + } + + testcases := []struct { + name string + shardConfig bool + v versioner + bs backupSharder + expect []clickhouse.Table + expectErr error + }{ + { + name: "Test versioner error", + shardConfig: true, + v: errVersioner, + bs: staticSharder, + expectErr: errVersion, + }, + { + name: "Test incompatible version", + shardConfig: true, + v: oldVersioner, + bs: staticSharder, + expectErr: errShardOperationVers, + }, + { + name: "Test incompatible version without sharding config", + shardConfig: false, + v: oldVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + { + name: "Test sharder error", + shardConfig: true, + v: goodVersioner, + bs: errSharder, + expectErr: errShard, + }, + { + name: "Test incomplete replica data", + shardConfig: true, + v: goodVersioner, + bs: emptySharder, + expectErr: errUnknownBackupShard, + }, + { + name: "Test normal sharding", + shardConfig: true, + v: goodVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupSchema, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + cfg := &config.Config{ + General: config.GeneralConfig{ + ShardedOperation: tc.shardConfig, + }, + } + b := NewBackuper(cfg, + WithVersioner(tc.v), + WithBackupSharder(tc.bs), + ) + tables := tableData() + err := b.populateBackupShardField(context.Background(), tables) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(tables, tc.expect) { + t.Fatalf("expected %+v, got %+v", tc.expect, tables) + } + } +} diff --git a/pkg/backup/create.go b/pkg/backup/create.go index ea4f1c74..ca266d5d 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -5,11 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/config" - "github.com/Altinity/clickhouse-backup/pkg/partition" - "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/Altinity/clickhouse-backup/pkg/storage" - "github.com/Altinity/clickhouse-backup/pkg/storage/object_disk" "os" "path" "path/filepath" @@ -18,9 +13,15 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/pkg/common" + "github.com/Altinity/clickhouse-backup/pkg/config" "github.com/Altinity/clickhouse-backup/pkg/filesystemhelper" "github.com/Altinity/clickhouse-backup/pkg/metadata" + "github.com/Altinity/clickhouse-backup/pkg/partition" + "github.com/Altinity/clickhouse-backup/pkg/status" + "github.com/Altinity/clickhouse-backup/pkg/storage" + "github.com/Altinity/clickhouse-backup/pkg/storage/object_disk" "github.com/Altinity/clickhouse-backup/pkg/utils" + apexLog "github.com/apex/log" "github.com/google/uuid" recursiveCopy "github.com/otiai10/copy" @@ -59,7 +60,6 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st defer cancel() startBackup := time.Now() - doBackupData := !schemaOnly if backupName == "" { backupName = NewBackupName() } @@ -81,7 +81,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st if err != nil { return fmt.Errorf("can't get database engines from clickhouse: %v", err) } - tables, err := b.ch.GetTables(ctx, tablePattern) + tables, err := b.GetTables(ctx, tablePattern) if err != nil { return fmt.Errorf("can't get tables from clickhouse: %v", err) } @@ -117,7 +117,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st if b.cfg.ClickHouse.UseEmbeddedBackupRestore { err = b.createBackupEmbedded(ctx, backupName, tablePattern, partitionsNameList, partitionsIdMap, schemaOnly, rbacOnly, configsOnly, tables, allDatabases, allFunctions, disks, diskMap, diskTypes, log, startBackup, version) } else { - err = b.createBackupLocal(ctx, backupName, partitionsIdMap, tables, doBackupData, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, diskTypes, allDatabases, allFunctions, log, startBackup) + err = b.createBackupLocal(ctx, backupName, partitionsIdMap, tables, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, diskTypes, allDatabases, allFunctions, log, startBackup) } if err != nil { return err @@ -130,7 +130,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st return nil } -func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, doBackupData bool, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error { +func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, tables []clickhouse.Table, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error { // Create backup dir on all clickhouse disks for _, disk := range disks { if err := filesystemhelper.Mkdir(path.Join(disk.Path, "backup"), b.ch, disks); err != nil { @@ -165,7 +165,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } var realSize map[string]int64 var disksToPartsMap map[string][]metadata.Part - if doBackupData { + if !schemaOnly && table.BackupType == clickhouse.ShardBackupFull { log.Debug("create data") shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "") disksToPartsMap, realSize, err = b.AddTableToBackup(ctx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}]) @@ -208,7 +208,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par Size: realSize, Parts: disksToPartsMap, Mutations: inProgressMutations, - MetadataOnly: schemaOnly, + MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema, }, disks) if err != nil { if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil { @@ -250,6 +250,10 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, schemaOnly, rbacOnly, configsOnly bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, log *apexLog.Entry, startBackup time.Time, backupVersion string) error { + // TODO: Implement sharded backup operations for embedded backups + if b.cfg.General.ShardedOperation { + return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported) + } if _, isBackupDiskExists := diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk]; !isBackupDiskExists { return fmt.Errorf("backup disk `%s` not exists in system.disks", b.cfg.ClickHouse.EmbeddedBackupDisk) } diff --git a/pkg/backup/list.go b/pkg/backup/list.go index 31c575b2..25952416 100644 --- a/pkg/backup/list.go +++ b/pkg/backup/list.go @@ -4,9 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/custom" - "github.com/Altinity/clickhouse-backup/pkg/status" - apexLog "github.com/apex/log" "io" "os" "path" @@ -15,9 +12,13 @@ import ( "text/tabwriter" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/custom" "github.com/Altinity/clickhouse-backup/pkg/metadata" + "github.com/Altinity/clickhouse-backup/pkg/status" "github.com/Altinity/clickhouse-backup/pkg/storage" "github.com/Altinity/clickhouse-backup/pkg/utils" + + apexLog "github.com/apex/log" ) // List - list backups to stdout from command line @@ -368,7 +369,7 @@ func (b *Backuper) GetRemoteBackups(ctx context.Context, parseMetadata bool) ([] return backupList, err } -// GetTables - get all tables for use by PrintTables and API +// GetTables - get all tables for use by CreateBackup, PrintTables, and API func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickhouse.Table, error) { if !b.ch.IsOpen { if err := b.ch.Connect(); err != nil { @@ -381,6 +382,9 @@ func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickh if err != nil { return []clickhouse.Table{}, fmt.Errorf("can't get tables: %v", err) } + if err := b.populateBackupShardField(ctx, allTables); err != nil { + return nil, err + } return allTables, nil } @@ -416,7 +420,7 @@ func (b *Backuper) PrintTables(printAll bool, tablePattern string) error { } continue } - if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ",")); err != nil { + if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t%v\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ","), table.BackupType); err != nil { log.Errorf("fmt.Fprintf write %d bytes return error: %v", bytes, err) } } diff --git a/pkg/backup/version.go b/pkg/backup/version.go new file mode 100644 index 00000000..cb0a3129 --- /dev/null +++ b/pkg/backup/version.go @@ -0,0 +1,33 @@ +package backup + +import ( + "context" + "errors" +) + +const ( + minVersShardOp = 21000000 +) + +var ( + errShardOperationVers = errors.New("sharded operations are only supported for " + + "clickhouse-server >= v21.x") + errShardOperationUnsupported = errors.New("sharded operations are not supported") +) + +// versioner is an interface for determining the version of Clickhouse +type versioner interface { + GetVersion(context.Context) (int, error) +} + +// canShardOperation returns whether or not sharded backup creation is supported +func canShardOperation(ctx context.Context, v versioner) error { + version, err := v.GetVersion(ctx) + if err != nil { + return err + } + if version < minVersShardOp { + return errShardOperationVers + } + return nil +} diff --git a/pkg/backup/version_test.go b/pkg/backup/version_test.go new file mode 100644 index 00000000..2259aba3 --- /dev/null +++ b/pkg/backup/version_test.go @@ -0,0 +1,67 @@ +package backup + +import ( + "context" + "errors" + "testing" +) + +type testVersionerOpt func(sd *testVersioner) + +type testVersioner struct { + version int + versionErr error +} + +func newTestVersioner(opts ...testVersionerOpt) *testVersioner { + v := &testVersioner{} + for _, opt := range opts { + opt(v) + } + return v +} + +func withVersion(version int) testVersionerOpt { + return func(v *testVersioner) { + v.version = version + } +} + +func withVersionErr(err error) testVersionerOpt { + return func(v *testVersioner) { + v.versionErr = err + } +} + +func (v *testVersioner) GetVersion(_ context.Context) (int, error) { + if v.versionErr != nil { + return -1, v.versionErr + } + return v.version, nil +} + +func TestCanShardOperation(t *testing.T) { + ctx := context.Background() + + t.Log("test error on version retrieval") + v := newTestVersioner(withVersionErr(errors.New("error"))) + if err := canShardOperation(ctx, v); err == nil { + t.Fatal("expected error when getting shard determiner error on version retrieval") + } + + t.Log("test version too low") + v = newTestVersioner(withVersion(-1)) + err := canShardOperation(ctx, v) + if err == nil { + t.Fatal("expected error when version number is too low") + } + if !errors.Is(err, errShardOperationVers) { + t.Fatalf("expected errShardOperationUnsupported, got %v", err) + } + + t.Log("test version should be OK") + v = newTestVersioner(withVersion(minVersShardOp)) + if err = canShardOperation(ctx, v); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/pkg/clickhouse/structs.go b/pkg/clickhouse/structs.go index 31f134aa..dcbc7904 100644 --- a/pkg/clickhouse/structs.go +++ b/pkg/clickhouse/structs.go @@ -4,6 +4,14 @@ import ( "time" ) +type ShardBackupType string + +const ( + ShardBackupFull = "full" + ShardBackupNone = "none" + ShardBackupSchema = "schema-only" +) + // Table - ClickHouse table struct type Table struct { // common fields for all `clickhouse-server` versions @@ -17,6 +25,7 @@ type Table struct { CreateTableQuery string `ch:"create_table_query"` TotalBytes uint64 `ch:"total_bytes"` Skip bool + BackupType ShardBackupType } // IsSystemTablesFieldPresent - ClickHouse `system.tables` varius field flags diff --git a/pkg/config/config.go b/pkg/config/config.go index 0a16c084..a1510207 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -55,6 +55,7 @@ type GeneralConfig struct { WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"` FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"` WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"` + ShardedOperation bool `yaml:"sharded_operation" envconfig:"SHARDED_OPERATION"` RetriesDuration time.Duration WatchDuration time.Duration FullDuration time.Duration diff --git a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot index 7940675f..4e9d8e49 100644 --- a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot +++ b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot @@ -1,4 +1,4 @@ -default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' object_disk_path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' ca_cert_file: ""\', \' ca_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" +default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation: false\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' object_disk_path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' ca_cert_file: ""\', \' ca_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --help, -h show help\n --version, -v print the version'"""