diff --git a/ReadMe.md b/ReadMe.md index 2b80bc28..648338e6 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -366,7 +366,9 @@ general: # The format for this env variable is "src_db1:target_db1,src_db2:target_db2". For YAML please continue using map syntax restore_database_mapping: {} retries_on_failure: 3 # RETRIES_ON_FAILURE, how many times to retry after a failure during upload or download - retries_pause: 30s # RETRIES_PAUSE, duration time to pause after each download or upload failure + retries_pause: 30s # RETRIES_PAUSE, duration time to pause after each download or upload failure + + sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas. clickhouse: username: default # CLICKHOUSE_USERNAME password: "" # CLICKHOUSE_PASSWORD diff --git a/pkg/backup/backup_shard.go b/pkg/backup/backup_shard.go new file mode 100644 index 00000000..c6985aa2 --- /dev/null +++ b/pkg/backup/backup_shard.go @@ -0,0 +1,129 @@ +package backup + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "sort" +) + +var ( + // errUnknownBackupShard is returned when sharding assignment is requested for a table for which + // active replication state is not known. + errUnknownBackupShard = errors.New("unknown backup shard") + + // errNoActiveReplicas is returned when a table is has no current active replicas + errNoActiveReplicas = errors.New("no active replicas") +) + +// shardDetermination is an object holding information on whether or not a table is within the +// backup shard +type shardDetermination map[string]bool + +// inShard returns whether or not a given table is within a backup shard +func (d shardDetermination) inShard(database, table string) (bool, error) { + fullName := fmt.Sprintf("%s.%s", database, table) + presentInShard, ok := d[fullName] + if !ok { + return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName, + errUnknownBackupShard) + } + return presentInShard, nil +} + +// backupSharder is an interface which can obtain a shard determination at a given point in time +type backupSharder interface { + determineShards(ctx context.Context) (shardDetermination, error) +} + +// tableReplicaMetadata is data derived from `system.replicas` +type tableReplicaMetadata struct { + Database string `db:"database" json:"database"` + Table string `db:"table" json:"table"` + ReplicaName string `db:"replica_name" json:"replica_name"` + // TODO: Change type to use replica_is_active directly after upgrade to clickhouse-go v2 + ActiveReplicas []string `db:"active_replicas" json:"replica_is_active"` +} + +// fullName returns the table name in the form of `database.table` +func (md *tableReplicaMetadata) fullName() string { + return fmt.Sprintf("%s.%s", md.Database, md.Table) +} + +// querier is an interface that can query Clickhouse +type querier interface { + StructSelectContext(ctx context.Context, dest any, query string) error +} + +// shardFunc is a function that is used to determine whether or not a given database/table should be +// handled by a given replica +type shardFunc func(md *tableReplicaMetadata) (bool, error) + +// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then +// performs a mod N operation (where N is the number of active replicas) in order to find an index +// in an alphabetically sorted list of active replicas which corresponds to the replica that will +// handle the backup of the table +func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) { + if len(md.ActiveReplicas) == 0 { + return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(), + errNoActiveReplicas) + } + sort.Strings(md.ActiveReplicas) + + h := fnv.New32a() + h.Write([]byte(md.fullName())) + i := h.Sum32() % uint32(len(md.ActiveReplicas)) + return md.ActiveReplicas[i] == md.ReplicaName, nil +} + +// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination +// by examining replica information +type replicaDeterminer struct { + q querier + sf shardFunc +} + +// newReplicaDeterminer returns a new shardDeterminer +func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer { + sd := &replicaDeterminer{ + q: q, + sf: sf, + } + return sd +} + +// getReplicaState obtains the local replication state through a query to `system.replicas` +func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) { + md := []tableReplicaMetadata{} + // TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2 + query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table" + if err := rd.q.StructSelectContext(ctx, &md, query); err != nil { + return nil, fmt.Errorf("could not determine replication state: %w", err) + } + + // Handle views and memory tables by putting in stand-in replication metadata + for i, entry := range md { + if entry.ReplicaName == "" && len(entry.ActiveReplicas) == 0 { + md[i].ReplicaName = "no-replicas" + md[i].ActiveReplicas = []string{"no-replicas"} + } + } + return md, nil +} + +func (rd *replicaDeterminer) determineShards(ctx context.Context) (shardDetermination, error) { + md, err := rd.getReplicaState(ctx) + if err != nil { + return nil, err + } + sd := shardDetermination{} + for _, entry := range md { + assigned, err := rd.sf(&entry) + if err != nil { + return nil, err + } + sd[entry.fullName()] = assigned + } + return sd, nil +} diff --git a/pkg/backup/backup_shard_test.go b/pkg/backup/backup_shard_test.go new file mode 100644 index 00000000..4df6967a --- /dev/null +++ b/pkg/backup/backup_shard_test.go @@ -0,0 +1,305 @@ +package backup + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "testing" +) + +func TestInShard(t *testing.T) { + d := shardDetermination{ + "present_db.present_table": true, + "present_db.absent_table": false, + "absent_db.present_table": false, + "absent_db.absent_table": false, + } + testcases := []struct { + name string + database string + table string + expectPresent bool + expectErr error + }{ + { + name: "Test nonexistent database", + database: "nonexistent", + table: "present_table", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test nonexistent table", + database: "present_db", + table: "nonexistent", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test in-shard table", + database: "present_db", + table: "present_table", + expectPresent: true, + }, + { + name: "Test out-shard table", + database: "present_db", + table: "absent_table", + expectPresent: false, + }, + { + name: "Test out-shard database with in-shard table name for other database", + database: "absent_db", + table: "present_table", + expectPresent: false, + }, + { + name: "Test out-shard database", + database: "absent_db", + table: "absent_table", + expectPresent: false, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + present, err := d.inShard(tc.database, tc.table) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected err %q, got %q", tc.expectErr, err) + } + if present != tc.expectPresent { + t.Fatalf("expected in-shard status %v, got %v", tc.expectPresent, present) + } + } +} + +func TestFullName(t *testing.T) { + testcases := []struct { + md tableReplicaMetadata + expect string + }{ + { + md: tableReplicaMetadata{ + Database: "a", + Table: "b", + }, + expect: "a.b", + }, + { + md: tableReplicaMetadata{ + Database: "db", + }, + expect: "db.", + }, + { + md: tableReplicaMetadata{ + Table: "t", + }, + expect: ".t", + }, + } + for _, tc := range testcases { + if tc.md.fullName() != tc.expect { + t.Fatalf("expected %q, got %q", tc.expect, tc.md.fullName()) + } + } +} + +func TestFNVHashModShardFunc(t *testing.T) { + testcases := []struct { + name string + md *tableReplicaMetadata + expect bool + expectErr error + }{ + { + name: "Test no active replicas", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + expectErr: errNoActiveReplicas, + }, + { + name: "Test single active replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"replica"}, + }, + expect: true, + }, + { + name: "Test not assigned replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"different"}, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + got, err := fnvHashModShardFunc(tc.md) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if got != tc.expect { + t.Fatalf("expected shard membership %v, got %v", tc.expect, got) + } + } +} + +type testQuerier struct { + data any + returnErr error +} + +func (tq *testQuerier) StructSelectContext(_ context.Context, dest any, _ string) error { + if tq.returnErr != nil { + return tq.returnErr + } + jsonData, err := json.Marshal(tq.data) + if err != nil { + return fmt.Errorf("error encoding data: %w", err) + } + return json.NewDecoder(bytes.NewReader(jsonData)).Decode(dest) +} + +func TestGetReplicaState(t *testing.T) { + data := []tableReplicaMetadata{ + { + Database: "db", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + { + Database: "db2", + Table: "table2", + ReplicaName: "replica2", + ActiveReplicas: []string{"replica2"}, + }, + } + expectedErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect []tableReplicaMetadata + expectErr error + }{ + { + name: "Test error on obtaining replica state", + q: &testQuerier{ + returnErr: expectedErr, + }, + expectErr: expectedErr, + }, + { + name: "Test pulling data", + q: &testQuerier{ + data: data, + }, + expect: data, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + rd := newReplicaDeterminer(tc.q, nil) + got, err := rd.getReplicaState(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + } +} + +var errNameSharder = errors.New("expected error") + +func nameSharder(md *tableReplicaMetadata) (bool, error) { + if md.Table == "error" { + return false, errNameSharder + } + if md.Table == "present" { + return true, nil + } + return false, nil +} + +func TestDetermineShards(t *testing.T) { + expectErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect shardDetermination + expectErr error + }{ + { + name: "Test query error", + q: &testQuerier{ + returnErr: expectErr, + }, + expectErr: expectErr, + }, + { + name: "Test shard func error", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Table: "error", + }, + }, + }, + expectErr: errNameSharder, + }, + { + name: "Test normal operation", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Database: "a", + Table: "present", + }, + { + Database: "a", + Table: "absent", + }, + { + Database: "b", + Table: "present", + }, + }, + }, + expect: shardDetermination{ + "a.present": true, + "a.absent": false, + "b.present": true, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + rd := newReplicaDeterminer(tc.q, nameSharder) + got, err := rd.determineShards(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + } +} diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index bd5957e0..e2992b64 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -3,17 +3,22 @@ package backup import ( "context" "fmt" + "path" + "github.com/AlexAkulov/clickhouse-backup/pkg/clickhouse" "github.com/AlexAkulov/clickhouse-backup/pkg/config" "github.com/AlexAkulov/clickhouse-backup/pkg/resumable" "github.com/AlexAkulov/clickhouse-backup/pkg/storage" apexLog "github.com/apex/log" - "path" ) +type BackuperOpt func(*Backuper) + type Backuper struct { cfg *config.Config ch *clickhouse.ClickHouse + vers versioner + bs backupSharder dst *storage.BackupDestination log *apexLog.Entry Version string @@ -25,15 +30,33 @@ type Backuper struct { resumableState *resumable.State } -func NewBackuper(cfg *config.Config) *Backuper { +func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper { ch := &clickhouse.ClickHouse{ Config: &cfg.ClickHouse, Log: apexLog.WithField("logger", "clickhouse"), } - return &Backuper{ - cfg: cfg, - ch: ch, - log: apexLog.WithField("logger", "backuper"), + b := &Backuper{ + cfg: cfg, + ch: ch, + vers: ch, + bs: newReplicaDeterminer(ch, fnvHashModShardFunc), + log: apexLog.WithField("logger", "backuper"), + } + for _, opt := range opts { + opt(b) + } + return b +} + +func WithVersioner(v versioner) BackuperOpt { + return func(b *Backuper) { + b.vers = v + } +} + +func WithBackupSharder(s backupSharder) BackuperOpt { + return func(b *Backuper) { + b.bs = s } } @@ -76,3 +99,37 @@ func (b *Backuper) getLocalBackupDataPathForTable(backupName string, disk string } return backupPath } + +// populateBackupShardField populates the BackupShard field for a slice of Table structs +func (b *Backuper) populateBackupShardField(ctx context.Context, tables []clickhouse.Table) error { + // By default, have all fields populated to full backup unless the table is to be skipped + for i := range tables { + tables[i].BackupType = clickhouse.ShardBackupFull + if tables[i].Skip { + tables[i].BackupType = clickhouse.ShardBackupNone + } + } + if !b.cfg.General.ShardedOperation { + return nil + } + if err := canShardOperation(ctx, b.vers); err != nil { + return err + } + assignment, err := b.bs.determineShards(ctx) + if err != nil { + return err + } + for i, t := range tables { + if t.Skip { + continue + } + fullBackup, err := assignment.inShard(t.Database, t.Name) + if err != nil { + return err + } + if !fullBackup { + tables[i].BackupType = clickhouse.ShardBackupSchema + } + } + return nil +} diff --git a/pkg/backup/backuper_test.go b/pkg/backup/backuper_test.go new file mode 100644 index 00000000..2b862a07 --- /dev/null +++ b/pkg/backup/backuper_test.go @@ -0,0 +1,170 @@ +package backup + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/AlexAkulov/clickhouse-backup/pkg/clickhouse" + "github.com/AlexAkulov/clickhouse-backup/pkg/config" +) + +type testBackupSharder struct { + data shardDetermination + err error +} + +func (bs *testBackupSharder) determineShards(_ context.Context) (shardDetermination, error) { + if bs.err != nil { + return nil, bs.err + } + return bs.data, nil +} + +func TestPopulateBackupShardField(t *testing.T) { + errVersion := errors.New("versioner error") + errVersioner := newTestVersioner(withVersionErr(errVersion)) + goodVersioner := newTestVersioner(withVersion(minVersShardOp)) + oldVersioner := newTestVersioner(withVersion(-1)) + + // Create tables to reset field state + tableData := func() []clickhouse.Table { + return []clickhouse.Table{ + { + Database: "a", + Name: "present", + }, + { + Database: "a", + Name: "absent", + }, + { + Database: "b", + Name: "present", + Skip: true, + }, + } + } + + errShard := errors.New("backup sharder error") + errSharder := &testBackupSharder{err: errShard} + staticSharder := &testBackupSharder{ + data: shardDetermination{ + "a.present": true, + "a.absent": false, + }, + } + emptySharder := &testBackupSharder{ + data: shardDetermination{}, + } + + testcases := []struct { + name string + shardConfig bool + v versioner + bs backupSharder + expect []clickhouse.Table + expectErr error + }{ + { + name: "Test versioner error", + shardConfig: true, + v: errVersioner, + bs: staticSharder, + expectErr: errVersion, + }, + { + name: "Test incompatible version", + shardConfig: true, + v: oldVersioner, + bs: staticSharder, + expectErr: errShardOperationVers, + }, + { + name: "Test incompatible version without sharding config", + shardConfig: false, + v: oldVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + { + name: "Test sharder error", + shardConfig: true, + v: goodVersioner, + bs: errSharder, + expectErr: errShard, + }, + { + name: "Test incomplete replica data", + shardConfig: true, + v: goodVersioner, + bs: emptySharder, + expectErr: errUnknownBackupShard, + }, + { + name: "Test normal sharding", + shardConfig: true, + v: goodVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupSchema, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + } + for _, tc := range testcases { + t.Log(tc.name) + cfg := &config.Config{ + General: config.GeneralConfig{ + ShardedOperation: tc.shardConfig, + }, + } + b := NewBackuper(cfg, + WithVersioner(tc.v), + WithBackupSharder(tc.bs), + ) + tables := tableData() + err := b.populateBackupShardField(context.Background(), tables) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + continue + } + if !reflect.DeepEqual(tables, tc.expect) { + t.Fatalf("expected %+v, got %+v", tc.expect, tables) + } + } +} diff --git a/pkg/backup/create.go b/pkg/backup/create.go index c65d6cc7..c309fe60 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/AlexAkulov/clickhouse-backup/pkg/status" "os" "path" "path/filepath" @@ -16,7 +15,9 @@ import ( "github.com/AlexAkulov/clickhouse-backup/pkg/common" "github.com/AlexAkulov/clickhouse-backup/pkg/filesystemhelper" "github.com/AlexAkulov/clickhouse-backup/pkg/metadata" + "github.com/AlexAkulov/clickhouse-backup/pkg/status" "github.com/AlexAkulov/clickhouse-backup/pkg/utils" + apexLog "github.com/apex/log" "github.com/google/uuid" recursiveCopy "github.com/otiai10/copy" @@ -84,7 +85,6 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st defer cancel() startBackup := time.Now() - doBackupData := !schemaOnly if backupName == "" { backupName = NewBackupName() } @@ -102,9 +102,9 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st if err != nil { return fmt.Errorf("can't get database engines from clickhouse: %v", err) } - allTables, err := b.ch.GetTables(ctx, tablePattern) + allTables, err := b.GetTables(ctx, tablePattern) if err != nil { - return fmt.Errorf("can't get tables from clickhouse: %v", err) + return err } tables := filterTablesByPattern(allTables, tablePattern) i := 0 @@ -137,7 +137,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st if b.cfg.ClickHouse.UseEmbeddedBackupRestore { err = b.createBackupEmbedded(ctx, backupName, tablePattern, partitions, partitionsToBackupMap, schemaOnly, rbacOnly, configsOnly, tables, allDatabases, allFunctions, disks, diskMap, log, startBackup, version) } else { - err = b.createBackupLocal(ctx, backupName, partitionsToBackupMap, tables, doBackupData, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, allDatabases, allFunctions, log, startBackup) + err = b.createBackupLocal(ctx, backupName, partitionsToBackupMap, tables, schemaOnly, rbacOnly, configsOnly, version, disks, diskMap, allDatabases, allFunctions, log, startBackup) } if err != nil { return err @@ -150,7 +150,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st return nil } -func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsToBackupMap common.EmptyMap, tables []clickhouse.Table, doBackupData bool, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error { +func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, partitionsToBackupMap common.EmptyMap, tables []clickhouse.Table, schemaOnly bool, rbacOnly bool, configsOnly bool, version string, disks []clickhouse.Disk, diskMap map[string]string, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, log *apexLog.Entry, startBackup time.Time) error { // Create backup dir on all clickhouse disks for _, disk := range disks { if err := filesystemhelper.Mkdir(path.Join(disk.Path, "backup"), b.ch, disks); err != nil { @@ -185,7 +185,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } var realSize map[string]int64 var disksToPartsMap map[string][]metadata.Part - if doBackupData { + if !schemaOnly && table.BackupType == clickhouse.ShardBackupFull { log.Debug("create data") shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "") disksToPartsMap, realSize, err = b.AddTableToBackup(ctx, backupName, shadowBackupUUID, disks, &table, partitionsToBackupMap) @@ -213,7 +213,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par TotalBytes: table.TotalBytes, Size: realSize, Parts: disksToPartsMap, - MetadataOnly: schemaOnly, + MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema, }, disks) if err != nil { if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil { @@ -255,6 +255,10 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitions []string, partitionsToBackupMap common.EmptyMap, schemaOnly, rbacOnly, configsOnly bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap map[string]string, log *apexLog.Entry, startBackup time.Time, backupVersion string) error { + // TODO: Implement sharded backup operations for embedded backups + if b.cfg.General.ShardedOperation { + return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported) + } if _, isBackupDiskExists := diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk]; !isBackupDiskExists { return fmt.Errorf("backup disk `%s` not exists in system.disks", b.cfg.ClickHouse.EmbeddedBackupDisk) } diff --git a/pkg/backup/list.go b/pkg/backup/list.go index 657deda4..53d3c8a1 100644 --- a/pkg/backup/list.go +++ b/pkg/backup/list.go @@ -4,9 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/AlexAkulov/clickhouse-backup/pkg/custom" - "github.com/AlexAkulov/clickhouse-backup/pkg/status" - apexLog "github.com/apex/log" "io" "os" "path" @@ -15,9 +12,13 @@ import ( "text/tabwriter" "github.com/AlexAkulov/clickhouse-backup/pkg/clickhouse" + "github.com/AlexAkulov/clickhouse-backup/pkg/custom" "github.com/AlexAkulov/clickhouse-backup/pkg/metadata" + "github.com/AlexAkulov/clickhouse-backup/pkg/status" "github.com/AlexAkulov/clickhouse-backup/pkg/storage" "github.com/AlexAkulov/clickhouse-backup/pkg/utils" + + apexLog "github.com/apex/log" ) // List - list backups to stdout from command line @@ -368,7 +369,7 @@ func (b *Backuper) GetRemoteBackups(ctx context.Context, parseMetadata bool) ([] return backupList, err } -// GetTables - get all tables for use by PrintTables and API +// GetTables - get all tables for use by CreateBackup, PrintTables, and API func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickhouse.Table, error) { if !b.ch.IsOpen { if err := b.ch.Connect(); err != nil { @@ -381,6 +382,9 @@ func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickh if err != nil { return []clickhouse.Table{}, fmt.Errorf("can't get tables: %v", err) } + if err := b.populateBackupShardField(ctx, allTables); err != nil { + return nil, err + } return allTables, nil } @@ -416,7 +420,7 @@ func (b *Backuper) PrintTables(printAll bool, tablePattern string) error { } continue } - if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ",")); err != nil { + if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t%v\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ","), table.BackupType); err != nil { log.Errorf("fmt.Fprintf write %d bytes return error: %v", bytes, err) } } diff --git a/pkg/backup/version.go b/pkg/backup/version.go new file mode 100644 index 00000000..cb0a3129 --- /dev/null +++ b/pkg/backup/version.go @@ -0,0 +1,33 @@ +package backup + +import ( + "context" + "errors" +) + +const ( + minVersShardOp = 21000000 +) + +var ( + errShardOperationVers = errors.New("sharded operations are only supported for " + + "clickhouse-server >= v21.x") + errShardOperationUnsupported = errors.New("sharded operations are not supported") +) + +// versioner is an interface for determining the version of Clickhouse +type versioner interface { + GetVersion(context.Context) (int, error) +} + +// canShardOperation returns whether or not sharded backup creation is supported +func canShardOperation(ctx context.Context, v versioner) error { + version, err := v.GetVersion(ctx) + if err != nil { + return err + } + if version < minVersShardOp { + return errShardOperationVers + } + return nil +} diff --git a/pkg/backup/version_test.go b/pkg/backup/version_test.go new file mode 100644 index 00000000..2259aba3 --- /dev/null +++ b/pkg/backup/version_test.go @@ -0,0 +1,67 @@ +package backup + +import ( + "context" + "errors" + "testing" +) + +type testVersionerOpt func(sd *testVersioner) + +type testVersioner struct { + version int + versionErr error +} + +func newTestVersioner(opts ...testVersionerOpt) *testVersioner { + v := &testVersioner{} + for _, opt := range opts { + opt(v) + } + return v +} + +func withVersion(version int) testVersionerOpt { + return func(v *testVersioner) { + v.version = version + } +} + +func withVersionErr(err error) testVersionerOpt { + return func(v *testVersioner) { + v.versionErr = err + } +} + +func (v *testVersioner) GetVersion(_ context.Context) (int, error) { + if v.versionErr != nil { + return -1, v.versionErr + } + return v.version, nil +} + +func TestCanShardOperation(t *testing.T) { + ctx := context.Background() + + t.Log("test error on version retrieval") + v := newTestVersioner(withVersionErr(errors.New("error"))) + if err := canShardOperation(ctx, v); err == nil { + t.Fatal("expected error when getting shard determiner error on version retrieval") + } + + t.Log("test version too low") + v = newTestVersioner(withVersion(-1)) + err := canShardOperation(ctx, v) + if err == nil { + t.Fatal("expected error when version number is too low") + } + if !errors.Is(err, errShardOperationVers) { + t.Fatalf("expected errShardOperationUnsupported, got %v", err) + } + + t.Log("test version should be OK") + v = newTestVersioner(withVersion(minVersShardOp)) + if err = canShardOperation(ctx, v); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/pkg/clickhouse/structs.go b/pkg/clickhouse/structs.go index 9cb6d60c..bb03c00c 100644 --- a/pkg/clickhouse/structs.go +++ b/pkg/clickhouse/structs.go @@ -4,6 +4,14 @@ import ( "time" ) +type ShardBackupType string + +const ( + ShardBackupFull = "full" + ShardBackupNone = "none" + ShardBackupSchema = "schema-only" +) + // Table - ClickHouse table struct type Table struct { // common fields for all `clickhouse-server` versions @@ -17,6 +25,7 @@ type Table struct { CreateTableQuery string `db:"create_table_query,omitempty"` TotalBytes uint64 `db:"total_bytes,omitempty"` Skip bool + BackupType ShardBackupType } // IsSystemTablesFieldPresent - ClickHouse `system.tables` varius field flags diff --git a/pkg/config/config.go b/pkg/config/config.go index e1fa0b3b..78b1ba57 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -3,7 +3,6 @@ package config import ( "crypto/tls" "fmt" - s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "math" "os" "runtime" @@ -11,6 +10,7 @@ import ( "time" "github.com/apex/log" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/kelseyhightower/envconfig" "github.com/urfave/cli" "gopkg.in/yaml.v3" @@ -55,6 +55,7 @@ type GeneralConfig struct { WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"` FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"` WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"` + ShardedOperation bool `yaml:"sharded_operation" envconfig:"SHARDED_OPERATION"` RetriesDuration time.Duration WatchDuration time.Duration FullDuration time.Duration diff --git a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot index 994195b3..b6bfc526 100644 --- a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot +++ b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot @@ -1,4 +1,4 @@ -default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' upload_retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" +default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' upload_retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation: false\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --help, -h show help\n --version, -v print the version'"""