From cfbffc5f27fc6789e0702851746dc1b7b686f881 Mon Sep 17 00:00:00 2001 From: Michael Kwon Date: Wed, 26 Apr 2023 10:26:42 -0700 Subject: [PATCH] clickhouse-backup: Add support for sharded backup This change adds a new configuration 'general.sharded_operation' which shards tables for backup across replicas, allowing for a uniform backup and restore call to the server without consideration for table replication state. Fixes #639 clickhouse-backup/backup_shard: Use Array for active replicas clickhouse-go v1 does not support clickhouse Map types. Force the Map(String, UInt8) column replica_is_active to a string array for now. clickhouse-backup/backuper: Skip shard assignment for skipped tables Skip shard assignment for skipped tables. Also add the new ShardBackupType "ShardBackupNone", which is assigned to skipped tables clickhouse-backup/backuper: Use b.GetTables for CreateBackup Use b.GetTables for CreateBackup instead of b.ch.GetTables and move b.populateBackupShardField to b.GetTables so as to populate the field for the server API. backup: Addressing changes for adding sharding support Add in different sharded operation modes to give users the ability to specify granularity of sharding --- ReadMe.md | 1 + pkg/backup/backup_shard.go | 197 ++++++++ pkg/backup/backup_shard_test.go | 419 ++++++++++++++++++ pkg/backup/backuper.go | 87 +++- pkg/backup/backuper_test.go | 185 ++++++++ pkg/backup/create.go | 23 +- pkg/backup/list.go | 14 +- pkg/clickhouse/structs.go | 9 + pkg/clickhouse/version.go | 34 ++ pkg/clickhouse/version_test.go | 76 ++++ pkg/config/config.go | 1 + .../tests/snapshots/cli.py.cli.snapshot | 2 +- 12 files changed, 1027 insertions(+), 21 deletions(-) create mode 100644 pkg/backup/backup_shard.go create mode 100644 pkg/backup/backup_shard_test.go create mode 100644 pkg/backup/backuper_test.go create mode 100644 pkg/clickhouse/version.go create mode 100644 pkg/clickhouse/version_test.go diff --git a/ReadMe.md b/ReadMe.md index 28cde51c..fd4ea015 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -380,6 +380,7 @@ general: full_interval: 24h # FULL_INTERVAL, use only for `watch` command, full backup will create every 24h watch_backup_name_template: "shard{shard}-{type}-{time:20060102150405}" # WATCH_BACKUP_NAME_TEMPLATE, used only for `watch` command, macros values will apply from `system.macros` for time:XXX, look format in https://go.dev/src/time/format.go + sharded_operation_mode: none # SHARDED_OPERATION_MODE, how different replicas will shard backing up data for tables. Options are: none (no sharding), table (table granularity), database (database granularity), first-replica (on the lexicographically sorted first active replica). If left empty, then the "none" option will be set as default. clickhouse: username: default # CLICKHOUSE_USERNAME password: "" # CLICKHOUSE_PASSWORD diff --git a/pkg/backup/backup_shard.go b/pkg/backup/backup_shard.go new file mode 100644 index 00000000..6c6f1ca8 --- /dev/null +++ b/pkg/backup/backup_shard.go @@ -0,0 +1,197 @@ +package backup + +import ( + "context" + "errors" + "fmt" + "hash/fnv" +) + +var ( + // errUnknownBackupShard is returned when sharding assignment is requested for a table for which + // active replication state is not known. + errUnknownBackupShard = errors.New("unknown backup shard") + + // errNoActiveReplicas is returned when a table is has no current active replicas + errNoActiveReplicas = errors.New("no active replicas") + + shardFuncRegistry = map[string]shardFunc{ + "table": fnvHashModTableShardFunc, + "database": fnvHashModDatabaseShardFunc, + "first-replica": firstReplicaShardFunc, + "none": noneShardFunc, + "": noneShardFunc, + } +) + +// shardDetermination is an object holding information on whether or not a table is within the +// backup shard +type shardDetermination map[string]bool + +// inShard returns whether or not a given table is within a backup shard +func (d shardDetermination) inShard(database, table string) (bool, error) { + fullName := fmt.Sprintf("`%s`.`%s`", database, table) + presentInShard, ok := d[fullName] + if !ok { + return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName, + errUnknownBackupShard) + } + return presentInShard, nil +} + +// backupSharder is an interface which can obtain a shard determination at a given point in time +type backupSharder interface { + determineShards(ctx context.Context) (shardDetermination, error) +} + +// tableReplicaMetadata is data derived from `system.replicas` +type tableReplicaMetadata struct { + Database string `ch:"database" json:"database"` + Table string `ch:"table" json:"table"` + ReplicaName string `ch:"replica_name" json:"replica_name"` + // TODO: Change type to use replica_is_active directly after upgrade to clickhouse-go v2 + ActiveReplicas []string `ch:"active_replicas" json:"replica_is_active"` +} + +// fullName returns the table name in the form of `database.table` +func (md *tableReplicaMetadata) fullName() string { + return fmt.Sprintf("`%s`.`%s`", md.Database, md.Table) +} + +// querier is an interface that can query Clickhouse +type querier interface { + SelectContext(context.Context, interface{}, string, ...interface{}) error +} + +// shardFunc is a function that is determines whether or not a given database/table should have its +// data backed up by the replica calling this function +type shardFunc func(md *tableReplicaMetadata) (bool, error) + +func shardFuncByName(name string) (shardFunc, error) { + chosen, ok := shardFuncRegistry[name] + if !ok { + validOptions := make([]string, len(shardFuncRegistry)) + for k := range shardFuncRegistry { + if k == "" { + continue + } + validOptions = append(validOptions, k) + } + return nil, fmt.Errorf("unknown backup sharding option %q, valid options: %v", name, + validOptions) + } + return chosen, nil +} + +// fnvShardReplicaFromString returns a replica assignment from a slice of active replicas by taking +// an arbitrary string, performing a FNV hash on it (mod NumActiveReplicas), and using the resulting +// number as an index of the sorted slice of active replicas. It is assumed that the active replicas +// slice is provided pre-sorted. +func fnvShardReplicaFromString(str string, activeReplicas []string) (string, error) { + if len(activeReplicas) == 0 { + return "", fmt.Errorf("could not determine in-shard state for %s: %w", str, + errNoActiveReplicas) + } + + h := fnv.New32a() + h.Write([]byte(str)) + i := h.Sum32() % uint32(len(activeReplicas)) + return activeReplicas[i], nil +} + +// fnvHashModTableShardFunc determines whether a replica should handle backing up data based on the +// table name in the form of `database.table`. It is assumed that the active replicas slice is +// provided pre-sorted. +func fnvHashModTableShardFunc(md *tableReplicaMetadata) (bool, error) { + assignedReplica, err := fnvShardReplicaFromString(md.fullName(), md.ActiveReplicas) + if err != nil { + return false, err + } + return assignedReplica == md.ReplicaName, nil +} + +// fnvHashModDatabaseShardFunc determines whether a replica should handle backing up data based on +// database name. It is assumed that the active replicas slice is provided pre-sorted. +func fnvHashModDatabaseShardFunc(md *tableReplicaMetadata) (bool, error) { + assignedReplica, err := fnvShardReplicaFromString(md.Database, md.ActiveReplicas) + if err != nil { + return false, err + } + return assignedReplica == md.ReplicaName, nil +} + +// firstReplicaShardFunc determines whether a replica should handle backing up data based on whether +// or not it is the lexicographically first active replica. It is assumed that the active replicas +// slice is provided pre-sorted. +func firstReplicaShardFunc(md *tableReplicaMetadata) (bool, error) { + if len(md.ActiveReplicas) == 0 { + return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(), + errNoActiveReplicas) + } + return md.ReplicaName == md.ActiveReplicas[0], nil +} + +// noneShardFunc always returns true +func noneShardFunc(md *tableReplicaMetadata) (bool, error) { + return true, nil +} + +// doesShard returns whether a ShardedOperationMode configuration performs sharding or not +func doesShard(mode string) bool { + _, ok := shardFuncRegistry[mode] + if !ok { + return false + } + return mode != "" && mode != "none" +} + +// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination +// by examining replica information +type replicaDeterminer struct { + q querier + sf shardFunc +} + +// newReplicaDeterminer returns a new shardDeterminer +func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer { + sd := &replicaDeterminer{ + q: q, + sf: sf, + } + return sd +} + +// getReplicaState obtains the local replication state through a query to `system.replicas` +func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) { + md := []tableReplicaMetadata{} + // TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2 + query := "SELECT t.database, t.name AS table, r.replica_name, arraySort(mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active))) AS active_replicas FROM system.tables t LEFT JOIN system.replicas r ON t.database = r.database AND t.name = r.table" + if err := rd.q.SelectContext(ctx, &md, query); err != nil { + return nil, fmt.Errorf("could not determine replication state: %w", err) + } + + // Handle views and memory tables by putting in stand-in replication metadata + for i, entry := range md { + if entry.ReplicaName == "" && len(entry.ActiveReplicas) == 0 { + md[i].ReplicaName = "no-replicas" + md[i].ActiveReplicas = []string{"no-replicas"} + } + } + return md, nil +} + +func (rd *replicaDeterminer) determineShards(ctx context.Context) (shardDetermination, error) { + md, err := rd.getReplicaState(ctx) + if err != nil { + return nil, err + } + sd := shardDetermination{} + for _, entry := range md { + assigned, err := rd.sf(&entry) + if err != nil { + return nil, err + } + sd[entry.fullName()] = assigned + } + return sd, nil +} diff --git a/pkg/backup/backup_shard_test.go b/pkg/backup/backup_shard_test.go new file mode 100644 index 00000000..fda5a1e2 --- /dev/null +++ b/pkg/backup/backup_shard_test.go @@ -0,0 +1,419 @@ +package backup + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "testing" +) + +func TestInShard(t *testing.T) { + d := shardDetermination{ + "`present_db`.`present_table`": true, + "`present_db`.`absent_table`": false, + "`absent_db`.`present_table`": false, + "`absent_db`.`absent_table`": false, + "`a.b`.`c`": true, + "`a`.`b.c`": false, + } + testcases := []struct { + name string + database string + table string + expectPresent bool + expectErr error + }{ + { + name: "Test nonexistent database", + database: "nonexistent", + table: "present_table", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test nonexistent table", + database: "present_db", + table: "nonexistent", + expectPresent: false, + expectErr: errUnknownBackupShard, + }, + { + name: "Test in-shard table", + database: "present_db", + table: "present_table", + expectPresent: true, + }, + { + name: "Test out-shard table", + database: "present_db", + table: "absent_table", + expectPresent: false, + }, + { + name: "Test out-shard database with in-shard table name for other database", + database: "absent_db", + table: "present_table", + expectPresent: false, + }, + { + name: "Test out-shard database", + database: "absent_db", + table: "absent_table", + expectPresent: false, + }, + { + name: "Test ambiguous database/table name combination", + database: "a.b", + table: "c", + expectPresent: true, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + present, err := d.inShard(tc.database, tc.table) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected err %q, got %q", tc.expectErr, err) + } + if present != tc.expectPresent { + t.Fatalf("expected in-shard status %v, got %v", tc.expectPresent, present) + } + }, + ) + } +} + +func TestFullName(t *testing.T) { + testcases := []struct { + md tableReplicaMetadata + expect string + }{ + { + md: tableReplicaMetadata{ + Database: "a.b", + Table: "c", + }, + expect: "`a.b`.`c`", + }, + { + md: tableReplicaMetadata{ + Database: "a", + Table: "b.c", + }, + expect: "`a`.`b.c`", + }, + { + md: tableReplicaMetadata{ + Database: "db", + }, + expect: "`db`.``", + }, + { + md: tableReplicaMetadata{ + Table: "t", + }, + expect: "``.`t`", + }, + } + for _, tc := range testcases { + if tc.md.fullName() != tc.expect { + t.Fatalf("expected %q, got %q", tc.expect, tc.md.fullName()) + } + } +} + +func TestDoesShard(t *testing.T) { + testcases := []struct { + name string + shardName string + expect bool + }{ + { + name: "Test present and sharding function name string", + shardName: "table", + expect: true, + }, + { + name: "Test present and non-sharding function name string", + shardName: "none", + expect: false, + }, + { + name: "Test empty function name string", + shardName: "", + expect: false, + }, + { + name: "Test absent name string", + shardName: "nonexistent", + expect: false, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + got := doesShard(tc.shardName) + if got != tc.expect { + t.Fatalf("expected %v, got %v", tc.expect, got) + } + }, + ) + } +} + +func TestShardFunc(t *testing.T) { + t.Run("Test obtaining nonexistent shard func", + func(t *testing.T) { + _, err := shardFuncByName("non-existent") + if err == nil { + t.Fatalf("expected error when trying to get a nonexistent shard function") + } + }, + ) + + testcases := []struct { + name string + md *tableReplicaMetadata + expect map[string]bool + expectErr error + }{ + { + name: "Test no active replicas", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + expect: map[string]bool{ + "table": false, + "database": false, + "first-replica": false, + }, + expectErr: errNoActiveReplicas, + }, + { + name: "Test no active replicas for no sharding", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + expect: map[string]bool{ + "none": true, + }, + }, + { + name: "Test single active replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"replica"}, + }, + expect: map[string]bool{ + "table": true, + "database": true, + "first-replica": true, + "none": true, + }, + }, + { + name: "Test not assigned replica", + md: &tableReplicaMetadata{ + Database: "database", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{"different"}, + }, + expect: map[string]bool{ + "table": false, + "database": false, + "first-replica": false, + "none": true, + }, + }, + } + for _, tc := range testcases { + for name, expect := range tc.expect { + t.Run(fmt.Sprintf("%s - shard mode: %s", tc.name, name), + func(t *testing.T) { + shardFunc, err := shardFuncByName(name) + if err != nil { + t.Fatalf("unable to get shard function: %v", err) + } + got, err := shardFunc(tc.md) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if tc.expectErr == nil { + return + } + if got != expect { + t.Fatalf("expected shard membership %v, got %v", tc.expect, got) + } + }, + ) + } + } +} + +type testQuerier struct { + data any + returnErr error +} + +func (tq *testQuerier) SelectContext(_ context.Context, dest interface{}, _ string, + args ...interface{}) error { + if tq.returnErr != nil { + return tq.returnErr + } + jsonData, err := json.Marshal(tq.data) + if err != nil { + return fmt.Errorf("error encoding data: %w", err) + } + return json.NewDecoder(bytes.NewReader(jsonData)).Decode(dest) +} + +func TestGetReplicaState(t *testing.T) { + data := []tableReplicaMetadata{ + { + Database: "db", + Table: "table", + ReplicaName: "replica", + ActiveReplicas: []string{}, + }, + { + Database: "db2", + Table: "table2", + ReplicaName: "replica2", + ActiveReplicas: []string{"replica2"}, + }, + } + expectedErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect []tableReplicaMetadata + expectErr error + }{ + { + name: "Test error on obtaining replica state", + q: &testQuerier{ + returnErr: expectedErr, + }, + expectErr: expectedErr, + }, + { + name: "Test pulling data", + q: &testQuerier{ + data: data, + }, + expect: data, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + rd := newReplicaDeterminer(tc.q, nil) + got, err := rd.getReplicaState(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + }, + ) + + } +} + +var errNameSharder = errors.New("expected error") + +func nameSharder(md *tableReplicaMetadata) (bool, error) { + if md.Table == "error" { + return false, errNameSharder + } + if md.Table == "present" { + return true, nil + } + return false, nil +} + +func TestDetermineShards(t *testing.T) { + expectErr := errors.New("expected error") + testcases := []struct { + name string + q querier + expect shardDetermination + expectErr error + }{ + { + name: "Test query error", + q: &testQuerier{ + returnErr: expectErr, + }, + expectErr: expectErr, + }, + { + name: "Test shard func error", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Table: "error", + }, + }, + }, + expectErr: errNameSharder, + }, + { + name: "Test normal operation", + q: &testQuerier{ + data: []tableReplicaMetadata{ + { + Database: "a", + Table: "present", + }, + { + Database: "a", + Table: "absent", + }, + { + Database: "b", + Table: "present", + }, + }, + }, + expect: shardDetermination{ + "`a`.`present`": true, + "`a`.`absent`": false, + "`b`.`present`": true, + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + rd := newReplicaDeterminer(tc.q, nameSharder) + got, err := rd.determineShards(context.Background()) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(got, tc.expect) { + t.Fatalf("expected data %v, got %v", tc.expect, got) + } + }, + ) + } +} diff --git a/pkg/backup/backuper.go b/pkg/backup/backuper.go index a88ee657..56e34f69 100644 --- a/pkg/backup/backuper.go +++ b/pkg/backup/backuper.go @@ -2,18 +2,32 @@ package backup import ( "context" + "errors" "fmt" + "path" + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/pkg/config" "github.com/Altinity/clickhouse-backup/pkg/resumable" "github.com/Altinity/clickhouse-backup/pkg/storage" + apexLog "github.com/apex/log" - "path" ) +var errShardOperationUnsupported = errors.New("sharded operations are not supported") + +// versioner is an interface for determining the version of Clickhouse +type versioner interface { + CanShardOperation(ctx context.Context) error +} + +type BackuperOpt func(*Backuper) + type Backuper struct { cfg *config.Config ch *clickhouse.ClickHouse + vers versioner + bs backupSharder dst *storage.BackupDestination log *apexLog.Entry DiskToPathMap map[string]string @@ -24,15 +38,33 @@ type Backuper struct { resumableState *resumable.State } -func NewBackuper(cfg *config.Config) *Backuper { +func NewBackuper(cfg *config.Config, opts ...BackuperOpt) *Backuper { ch := &clickhouse.ClickHouse{ Config: &cfg.ClickHouse, Log: apexLog.WithField("logger", "clickhouse"), } - return &Backuper{ - cfg: cfg, - ch: ch, - log: apexLog.WithField("logger", "backuper"), + b := &Backuper{ + cfg: cfg, + ch: ch, + vers: ch, + bs: nil, + log: apexLog.WithField("logger", "backuper"), + } + for _, opt := range opts { + opt(b) + } + return b +} + +func WithVersioner(v versioner) BackuperOpt { + return func(b *Backuper) { + b.vers = v + } +} + +func WithBackupSharder(s backupSharder) BackuperOpt { + return func(b *Backuper) { + b.bs = s } } @@ -75,3 +107,46 @@ func (b *Backuper) getLocalBackupDataPathForTable(backupName string, disk string } return backupPath } + +// populateBackupShardField populates the BackupShard field for a slice of Table structs +func (b *Backuper) populateBackupShardField(ctx context.Context, tables []clickhouse.Table) error { + // By default, have all fields populated to full backup unless the table is to be skipped + for i := range tables { + tables[i].BackupType = clickhouse.ShardBackupFull + if tables[i].Skip { + tables[i].BackupType = clickhouse.ShardBackupNone + } + } + if !doesShard(b.cfg.General.ShardedOperationMode) { + return nil + } + if err := b.vers.CanShardOperation(ctx); err != nil { + return err + } + + if b.bs == nil { + // Parse shard config here to avoid error return in NewBackuper + shardFunc, err := shardFuncByName(b.cfg.General.ShardedOperationMode) + if err != nil { + return fmt.Errorf("could not determine shards for tables: %w", err) + } + b.bs = newReplicaDeterminer(b.ch, shardFunc) + } + assignment, err := b.bs.determineShards(ctx) + if err != nil { + return err + } + for i, t := range tables { + if t.Skip { + continue + } + fullBackup, err := assignment.inShard(t.Database, t.Name) + if err != nil { + return err + } + if !fullBackup { + tables[i].BackupType = clickhouse.ShardBackupSchema + } + } + return nil +} diff --git a/pkg/backup/backuper_test.go b/pkg/backup/backuper_test.go new file mode 100644 index 00000000..c067ef79 --- /dev/null +++ b/pkg/backup/backuper_test.go @@ -0,0 +1,185 @@ +package backup + +import ( + "context" + "errors" + "reflect" + "testing" + + "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/config" +) + +type testVersioner struct { + err error +} + +func newTestVersioner(err error) *testVersioner { + return &testVersioner{err: err} +} + +func (v *testVersioner) CanShardOperation(ctx context.Context) error { + return v.err +} + +type testBackupSharder struct { + data shardDetermination + err error +} + +func (bs *testBackupSharder) determineShards(_ context.Context) (shardDetermination, error) { + if bs.err != nil { + return nil, bs.err + } + return bs.data, nil +} + +func TestPopulateBackupShardField(t *testing.T) { + errVersion := errors.New("versioner error") + errVersioner := newTestVersioner(errVersion) + goodVersioner := newTestVersioner(nil) + oldVersioner := newTestVersioner(clickhouse.ErrShardOperationVers) + + // Create tables to reset field state + tableData := func() []clickhouse.Table { + return []clickhouse.Table{ + { + Database: "a", + Name: "present", + }, + { + Database: "a", + Name: "absent", + }, + { + Database: "b", + Name: "present", + Skip: true, + }, + } + } + + errShard := errors.New("backup sharder error") + errSharder := &testBackupSharder{err: errShard} + staticSharder := &testBackupSharder{ + data: shardDetermination{ + "`a`.`present`": true, + "`a`.`absent`": false, + }, + } + emptySharder := &testBackupSharder{ + data: shardDetermination{}, + } + + testcases := []struct { + name string + shardOpMode string + v versioner + bs backupSharder + expect []clickhouse.Table + expectErr error + }{ + { + name: "Test versioner error", + shardOpMode: "table", + v: errVersioner, + bs: staticSharder, + expectErr: errVersion, + }, + { + name: "Test incompatible version", + shardOpMode: "table", + v: oldVersioner, + bs: staticSharder, + expectErr: clickhouse.ErrShardOperationVers, + }, + { + name: "Test incompatible version without sharding config", + shardOpMode: "none", + v: oldVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + { + name: "Test sharder error", + shardOpMode: "table", + v: goodVersioner, + bs: errSharder, + expectErr: errShard, + }, + { + name: "Test incomplete replica data", + shardOpMode: "table", + v: goodVersioner, + bs: emptySharder, + expectErr: errUnknownBackupShard, + }, + { + name: "Test normal sharding", + shardOpMode: "table", + v: goodVersioner, + bs: staticSharder, + expect: []clickhouse.Table{ + { + Database: "a", + Name: "present", + BackupType: clickhouse.ShardBackupFull, + }, + { + Database: "a", + Name: "absent", + BackupType: clickhouse.ShardBackupSchema, + }, + { + Database: "b", + Name: "present", + Skip: true, + BackupType: clickhouse.ShardBackupNone, + }, + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, + func(t *testing.T) { + cfg := &config.Config{ + General: config.GeneralConfig{ + ShardedOperationMode: tc.shardOpMode, + }, + } + b := NewBackuper(cfg, + WithVersioner(tc.v), + WithBackupSharder(tc.bs), + ) + tables := tableData() + err := b.populateBackupShardField(context.Background(), tables) + if !errors.Is(err, tc.expectErr) { + t.Fatalf("expected error %v, got %v", tc.expectErr, err) + } + if err != nil { + return + } + if !reflect.DeepEqual(tables, tc.expect) { + t.Fatalf("expected %+v, got %+v", tc.expect, tables) + } + }, + ) + } +} diff --git a/pkg/backup/create.go b/pkg/backup/create.go index 46f96633..9aa6fc7b 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -5,12 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/config" - "github.com/Altinity/clickhouse-backup/pkg/keeper" - "github.com/Altinity/clickhouse-backup/pkg/partition" - "github.com/Altinity/clickhouse-backup/pkg/status" - "github.com/Altinity/clickhouse-backup/pkg/storage" - "github.com/Altinity/clickhouse-backup/pkg/storage/object_disk" "os" "path" "path/filepath" @@ -19,9 +13,16 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/clickhouse" "github.com/Altinity/clickhouse-backup/pkg/common" + "github.com/Altinity/clickhouse-backup/pkg/config" "github.com/Altinity/clickhouse-backup/pkg/filesystemhelper" + "github.com/Altinity/clickhouse-backup/pkg/keeper" "github.com/Altinity/clickhouse-backup/pkg/metadata" + "github.com/Altinity/clickhouse-backup/pkg/partition" + "github.com/Altinity/clickhouse-backup/pkg/status" + "github.com/Altinity/clickhouse-backup/pkg/storage" + "github.com/Altinity/clickhouse-backup/pkg/storage/object_disk" "github.com/Altinity/clickhouse-backup/pkg/utils" + apexLog "github.com/apex/log" "github.com/google/uuid" recursiveCopy "github.com/otiai10/copy" @@ -82,7 +83,7 @@ func (b *Backuper) CreateBackup(backupName, tablePattern string, partitions []st if err != nil { return fmt.Errorf("can't get database engines from clickhouse: %v", err) } - tables, err := b.ch.GetTables(ctx, tablePattern) + tables, err := b.GetTables(ctx, tablePattern) if err != nil { return fmt.Errorf("can't get tables from clickhouse: %v", err) } @@ -166,7 +167,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } var realSize map[string]int64 var disksToPartsMap map[string][]metadata.Part - if doBackupData { + if doBackupData && table.BackupType == clickhouse.ShardBackupFull { log.Debug("create data") shadowBackupUUID := strings.ReplaceAll(uuid.New().String(), "-", "") disksToPartsMap, realSize, err = b.AddTableToBackup(ctx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}]) @@ -209,7 +210,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par Size: realSize, Parts: disksToPartsMap, Mutations: inProgressMutations, - MetadataOnly: schemaOnly, + MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema, }, disks) if err != nil { if removeBackupErr := b.RemoveBackupLocal(ctx, backupName, disks); removeBackupErr != nil { @@ -252,6 +253,10 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par } func (b *Backuper) createBackupEmbedded(ctx context.Context, backupName, tablePattern string, partitionsNameList map[metadata.TableTitle][]string, partitionsIdMap map[metadata.TableTitle]common.EmptyMap, schemaOnly, createRBAC, createConfigs bool, tables []clickhouse.Table, allDatabases []clickhouse.Database, allFunctions []clickhouse.Function, disks []clickhouse.Disk, diskMap, diskTypes map[string]string, log *apexLog.Entry, startBackup time.Time, backupVersion string) error { + // TODO: Implement sharded backup operations for embedded backups + if doesShard(b.cfg.General.ShardedOperationMode) { + return fmt.Errorf("cannot perform embedded backup: %w", errShardOperationUnsupported) + } if _, isBackupDiskExists := diskMap[b.cfg.ClickHouse.EmbeddedBackupDisk]; !isBackupDiskExists { return fmt.Errorf("backup disk `%s` not exists in system.disks", b.cfg.ClickHouse.EmbeddedBackupDisk) } diff --git a/pkg/backup/list.go b/pkg/backup/list.go index 31c575b2..25952416 100644 --- a/pkg/backup/list.go +++ b/pkg/backup/list.go @@ -4,9 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/Altinity/clickhouse-backup/pkg/custom" - "github.com/Altinity/clickhouse-backup/pkg/status" - apexLog "github.com/apex/log" "io" "os" "path" @@ -15,9 +12,13 @@ import ( "text/tabwriter" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" + "github.com/Altinity/clickhouse-backup/pkg/custom" "github.com/Altinity/clickhouse-backup/pkg/metadata" + "github.com/Altinity/clickhouse-backup/pkg/status" "github.com/Altinity/clickhouse-backup/pkg/storage" "github.com/Altinity/clickhouse-backup/pkg/utils" + + apexLog "github.com/apex/log" ) // List - list backups to stdout from command line @@ -368,7 +369,7 @@ func (b *Backuper) GetRemoteBackups(ctx context.Context, parseMetadata bool) ([] return backupList, err } -// GetTables - get all tables for use by PrintTables and API +// GetTables - get all tables for use by CreateBackup, PrintTables, and API func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickhouse.Table, error) { if !b.ch.IsOpen { if err := b.ch.Connect(); err != nil { @@ -381,6 +382,9 @@ func (b *Backuper) GetTables(ctx context.Context, tablePattern string) ([]clickh if err != nil { return []clickhouse.Table{}, fmt.Errorf("can't get tables: %v", err) } + if err := b.populateBackupShardField(ctx, allTables); err != nil { + return nil, err + } return allTables, nil } @@ -416,7 +420,7 @@ func (b *Backuper) PrintTables(printAll bool, tablePattern string) error { } continue } - if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ",")); err != nil { + if bytes, err := fmt.Fprintf(w, "%s.%s\t%s\t%v\t%v\n", table.Database, table.Name, utils.FormatBytes(table.TotalBytes), strings.Join(tableDisks, ","), table.BackupType); err != nil { log.Errorf("fmt.Fprintf write %d bytes return error: %v", bytes, err) } } diff --git a/pkg/clickhouse/structs.go b/pkg/clickhouse/structs.go index 31f134aa..dcbc7904 100644 --- a/pkg/clickhouse/structs.go +++ b/pkg/clickhouse/structs.go @@ -4,6 +4,14 @@ import ( "time" ) +type ShardBackupType string + +const ( + ShardBackupFull = "full" + ShardBackupNone = "none" + ShardBackupSchema = "schema-only" +) + // Table - ClickHouse table struct type Table struct { // common fields for all `clickhouse-server` versions @@ -17,6 +25,7 @@ type Table struct { CreateTableQuery string `ch:"create_table_query"` TotalBytes uint64 `ch:"total_bytes"` Skip bool + BackupType ShardBackupType } // IsSystemTablesFieldPresent - ClickHouse `system.tables` varius field flags diff --git a/pkg/clickhouse/version.go b/pkg/clickhouse/version.go new file mode 100644 index 00000000..9f98f8d8 --- /dev/null +++ b/pkg/clickhouse/version.go @@ -0,0 +1,34 @@ +package clickhouse + +import ( + "context" + "errors" +) + +const ( + minVersShardOp = 21000000 +) + +var ( + ErrShardOperationVers = errors.New("sharded operations are only supported for " + + "clickhouse-server >= v21.x") +) + +type versionGetter interface { + GetVersion(ctx context.Context) (int, error) +} + +func canShardOperation(ctx context.Context, v versionGetter) error { + version, err := v.GetVersion(ctx) + if err != nil { + return err + } + if version < minVersShardOp { + return ErrShardOperationVers + } + return nil +} + +func (ch *ClickHouse) CanShardOperation(ctx context.Context) error { + return canShardOperation(ctx, ch) +} diff --git a/pkg/clickhouse/version_test.go b/pkg/clickhouse/version_test.go new file mode 100644 index 00000000..a1d68f32 --- /dev/null +++ b/pkg/clickhouse/version_test.go @@ -0,0 +1,76 @@ +package clickhouse + +import ( + "context" + "errors" + "testing" +) + +type testVersionGetterOpt func(sd *testVersionGetter) + +type testVersionGetter struct { + version int + versionErr error +} + +func newTestVersionGetter(opts ...testVersionGetterOpt) *testVersionGetter { + v := &testVersionGetter{} + for _, opt := range opts { + opt(v) + } + return v +} + +func withVersion(version int) testVersionGetterOpt { + return func(v *testVersionGetter) { + v.version = version + } +} + +func withVersionErr(err error) testVersionGetterOpt { + return func(v *testVersionGetter) { + v.versionErr = err + } +} + +func (v *testVersionGetter) GetVersion(_ context.Context) (int, error) { + if v.versionErr != nil { + return -1, v.versionErr + } + return v.version, nil +} + +func TestCanShardOperation(t *testing.T) { + ctx := context.Background() + + t.Run("test error on version retrieval", + func(t *testing.T) { + v := newTestVersionGetter(withVersionErr(errors.New("error"))) + if err := canShardOperation(ctx, v); err == nil { + t.Fatal("expected error when getting shard determiner error on version retrieval") + } + }, + ) + + t.Run("test version too low", + func(t *testing.T) { + v := newTestVersionGetter(withVersion(-1)) + err := canShardOperation(ctx, v) + if err == nil { + t.Fatal("expected error when version number is too low") + } + if !errors.Is(err, ErrShardOperationVers) { + t.Fatalf("expected ErrShardOperationUnsupported, got %v", err) + } + }, + ) + + t.Run("test version should be OK", + func(t *testing.T) { + v := newTestVersionGetter(withVersion(minVersShardOp)) + if err := canShardOperation(ctx, v); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }, + ) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 13a04763..ae600df5 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -55,6 +55,7 @@ type GeneralConfig struct { WatchInterval string `yaml:"watch_interval" envconfig:"WATCH_INTERVAL"` FullInterval string `yaml:"full_interval" envconfig:"FULL_INTERVAL"` WatchBackupNameTemplate string `yaml:"watch_backup_name_template" envconfig:"WATCH_BACKUP_NAME_TEMPLATE"` + ShardedOperationMode string `yaml:"sharded_operation_mode" envconfig:"SHARDED_OPERATION_MODE"` RetriesDuration time.Duration WatchDuration time.Duration FullDuration time.Duration diff --git a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot index 2086efa2..d932e1c7 100644 --- a/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot +++ b/test/testflows/clickhouse_backup/tests/snapshots/cli.py.cli.snapshot @@ -1,4 +1,4 @@ -default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' skip_table_engines: []\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: exec:systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' object_disk_path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' ca_cert_file: ""\', \' ca_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" +default_config = r"""'[\'general:\', \' remote_storage: none\', \' disable_progress_bar: true\', \' backups_to_keep_local: 0\', \' backups_to_keep_remote: 0\', \' log_level: info\', \' allow_empty_backups: false\', \' use_resumable_state: true\', \' restore_schema_on_cluster: ""\', \' upload_by_part: true\', \' download_by_part: true\', \' restore_database_mapping: {}\', \' retries_on_failure: 3\', \' retries_pause: 30s\', \' watch_interval: 1h\', \' full_interval: 24h\', \' watch_backup_name_template: shard{shard}-{type}-{time:20060102150405}\', \' sharded_operation_mode: ""\', \' retriesduration: 100ms\', \' watchduration: 1h0m0s\', \' fullduration: 24h0m0s\', \'clickhouse:\', \' username: default\', \' password: ""\', \' host: localhost\', \' port: 9000\', \' disk_mapping: {}\', \' skip_tables:\', \' - system.*\', \' - INFORMATION_SCHEMA.*\', \' - information_schema.*\', \' - _temporary_and_external_tables.*\', \' skip_table_engines: []\', \' timeout: 5m\', \' freeze_by_part: false\', \' freeze_by_part_where: ""\', \' use_embedded_backup_restore: false\', \' embedded_backup_disk: ""\', \' backup_mutations: true\', \' restore_as_attach: false\', \' check_parts_columns: true\', \' secure: false\', \' skip_verify: false\', \' sync_replicated_tables: false\', \' log_sql_queries: true\', \' config_dir: /etc/clickhouse-server/\', \' restart_command: exec:systemctl restart clickhouse-server\', \' ignore_not_exists_error_during_freeze: true\', \' check_replicas_before_attach: true\', \' tls_key: ""\', \' tls_cert: ""\', \' tls_ca: ""\', \' debug: false\', \'s3:\', \' access_key: ""\', \' secret_key: ""\', \' bucket: ""\', \' endpoint: ""\', \' region: us-east-1\', \' acl: private\', \' assume_role_arn: ""\', \' force_path_style: false\', \' path: ""\', \' object_disk_path: ""\', \' disable_ssl: false\', \' compression_level: 1\', \' compression_format: tar\', \' sse: ""\', \' sse_kms_key_id: ""\', \' sse_customer_algorithm: ""\', \' sse_customer_key: ""\', \' sse_customer_key_md5: ""\', \' sse_kms_encryption_context: ""\', \' disable_cert_verification: false\', \' use_custom_storage_class: false\', \' storage_class: STANDARD\', \' custom_storage_class_map: {}\', \' part_size: 0\', \' allow_multipart_download: false\', \' object_labels: {}\', \' debug: false\', \'gcs:\', \' credentials_file: ""\', \' credentials_json: ""\', \' credentials_json_encoded: ""\', \' bucket: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' debug: false\', \' endpoint: ""\', \' storage_class: STANDARD\', \' object_labels: {}\', \' custom_storage_class_map: {}\', \'cos:\', \' url: ""\', \' timeout: 2m\', \' secret_id: ""\', \' secret_key: ""\', \' path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'api:\', \' listen: localhost:7171\', \' enable_metrics: true\', \' enable_pprof: false\', \' username: ""\', \' password: ""\', \' secure: false\', \' certificate_file: ""\', \' private_key_file: ""\', \' ca_cert_file: ""\', \' ca_key_file: ""\', \' create_integration_tables: false\', \' integration_tables_host: ""\', \' allow_parallel: false\', \' complete_resumable_after_restart: true\', \'ftp:\', \' address: ""\', \' timeout: 2m\', \' username: ""\', \' password: ""\', \' tls: false\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'sftp:\', \' address: ""\', \' port: 22\', \' username: ""\', \' password: ""\', \' key: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_format: tar\', \' compression_level: 1\', \' debug: false\', \'azblob:\', \' endpoint_schema: https\', \' endpoint_suffix: core.windows.net\', \' account_name: ""\', \' account_key: ""\', \' sas: ""\', \' use_managed_identity: false\', \' container: ""\', \' path: ""\', \' object_disk_path: ""\', \' compression_level: 1\', \' compression_format: tar\', \' sse_key: ""\', \' buffer_size: 0\', \' buffer_count: 3\', \' timeout: 15m\', \'custom:\', \' upload_command: ""\', \' download_command: ""\', \' list_command: ""\', \' delete_command: ""\', \' command_timeout: 4h\', \' commandtimeoutduration: 4h0m0s\']'""" help_flag = r"""'NAME:\n clickhouse-backup - Tool for easy backup of ClickHouse with cloud supportUSAGE:\n clickhouse-backup [-t, --tables=.] DESCRIPTION:\n Run as \'root\' or \'clickhouse\' userCOMMANDS:\n tables List of tables, exclude skip_tables\n create Create new backup\n create_remote Create and upload new backup\n upload Upload backup to remote storage\n list List of backups\n download Download backup from remote storage\n restore Create schema and restore data from backup\n restore_remote Download and restore\n delete Delete specific backup\n default-config Print default config\n print-config Print current config merged with environment variables\n clean Remove data in \'shadow\' folder from all \'path\' folders available from \'system.disks\'\n clean_remote_broken Remove all broken remote backups\n watch Run infinite loop which create full + incremental backup sequence to allow efficient backup sequences\n server Run API server\n help, h Shows a list of commands or help for one commandGLOBAL OPTIONS:\n --config value, -c value Config \'FILE\' name. (default: "/etc/clickhouse-backup/config.yml") [$CLICKHOUSE_BACKUP_CONFIG]\n --help, -h show help\n --version, -v print the version'"""