-
Notifications
You must be signed in to change notification settings - Fork 226
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
clickhouse-backup: Add support for sharded backup
This change adds a new configuration 'general.sharded_operation' which shards tables for backup across replicas, allowing for a uniform backup and restore call to the server without consideration for table replication state. Fixes #639 clickhouse-backup/backup_shard: Use Array for active replicas clickhouse-go v1 does not support clickhouse Map types. Force the Map(String, UInt8) column replica_is_active to a string array for now. clickhouse-backup/backuper: Skip shard assignment for skipped tables Skip shard assignment for skipped tables. Also add the new ShardBackupType "ShardBackupNone", which is assigned to skipped tables clickhouse-backup/backuper: Use b.GetTables for CreateBackup Use b.GetTables for CreateBackup instead of b.ch.GetTables and move b.populateBackupShardField to b.GetTables so as to populate the field for the server API.
- Loading branch information
Showing
12 changed files
with
795 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package backup | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"hash/fnv" | ||
"sort" | ||
) | ||
|
||
var ( | ||
// errUnknownBackupShard is returned when sharding assignment is requested for a table for which | ||
// active replication state is not known. | ||
errUnknownBackupShard = errors.New("unknown backup shard") | ||
|
||
// errNoActiveReplicas is returned when a table is has no current active replicas | ||
errNoActiveReplicas = errors.New("no active replicas") | ||
) | ||
|
||
// shardDetermination is an object holding information on whether or not a table is within the | ||
// backup shard | ||
type shardDetermination map[string]bool | ||
|
||
// inShard returns whether or not a given table is within a backup shard | ||
func (d shardDetermination) inShard(database, table string) (bool, error) { | ||
fullName := fmt.Sprintf("%s.%s", database, table) | ||
presentInShard, ok := d[fullName] | ||
if !ok { | ||
return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName, | ||
errUnknownBackupShard) | ||
} | ||
return presentInShard, nil | ||
} | ||
|
||
// backupSharder is an interface which can obtain a shard determination at a given point in time | ||
type backupSharder interface { | ||
determineShards(ctx context.Context) (shardDetermination, error) | ||
} | ||
|
||
// tableReplicaMetadata is data derived from `system.replicas` | ||
type tableReplicaMetadata struct { | ||
Database string `db:"database" json:"database"` | ||
Table string `db:"table" json:"table"` | ||
ReplicaName string `db:"replica_name" json:"replica_name"` | ||
// TODO: Change type to use replica_is_active directly after upgrade to clickhouse-go v2 | ||
ActiveReplicas []string `db:"active_replicas" json:"replica_is_active"` | ||
} | ||
|
||
// fullName returns the table name in the form of `database.table` | ||
func (md *tableReplicaMetadata) fullName() string { | ||
return fmt.Sprintf("%s.%s", md.Database, md.Table) | ||
} | ||
|
||
// querier is an interface that can query Clickhouse | ||
type querier interface { | ||
StructSelectContext(ctx context.Context, dest any, query string) error | ||
} | ||
|
||
// shardFunc is a function that is used to determine whether or not a given database/table should be | ||
// handled by a given replica | ||
type shardFunc func(md *tableReplicaMetadata) (bool, error) | ||
|
||
// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then | ||
// performs a mod N operation (where N is the number of active replicas) in order to find an index | ||
// in an alphabetically sorted list of active replicas which corresponds to the replica that will | ||
// handle the backup of the table | ||
func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) { | ||
if len(md.ActiveReplicas) == 0 { | ||
return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(), | ||
errNoActiveReplicas) | ||
} | ||
sort.Strings(md.ActiveReplicas) | ||
|
||
h := fnv.New32a() | ||
h.Write([]byte(md.fullName())) | ||
i := h.Sum32() % uint32(len(md.ActiveReplicas)) | ||
return md.ActiveReplicas[i] == md.ReplicaName, nil | ||
} | ||
|
||
// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination | ||
// by examining replica information | ||
type replicaDeterminer struct { | ||
q querier | ||
sf shardFunc | ||
} | ||
|
||
// newReplicaDeterminer returns a new shardDeterminer | ||
func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer { | ||
sd := &replicaDeterminer{ | ||
q: q, | ||
sf: sf, | ||
} | ||
return sd | ||
} | ||
|
||
// getReplicaState obtains the local replication state through a query to `system.replicas` | ||
func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) { | ||
md := []tableReplicaMetadata{} | ||
// TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2 | ||
query := "select database, table, replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), replica_is_active)) as active_replicas from system.replicas" | ||
if err := rd.q.StructSelectContext(ctx, &md, query); err != nil { | ||
return nil, fmt.Errorf("could not determine replication state: %w", err) | ||
} | ||
return md, nil | ||
} | ||
|
||
func (rd *replicaDeterminer) determineShards(ctx context.Context) (shardDetermination, error) { | ||
md, err := rd.getReplicaState(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
sd := shardDetermination{} | ||
for _, entry := range md { | ||
assigned, err := rd.sf(&entry) | ||
if err != nil { | ||
return nil, err | ||
} | ||
sd[entry.fullName()] = assigned | ||
} | ||
return sd, nil | ||
} |
Oops, something went wrong.