Skip to content

Commit

Permalink
backup: Addressing changes for adding sharding support
Browse files Browse the repository at this point in the history
  • Loading branch information
mskwon committed Jun 14, 2023
1 parent 11c101c commit e4e2260
Show file tree
Hide file tree
Showing 14 changed files with 494 additions and 252 deletions.
2 changes: 1 addition & 1 deletion ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ general:
full_interval: 24h # FULL_INTERVAL, use only for `watch` command, full backup will create every 24h
watch_backup_name_template: "shard{shard}-{type}-{time:20060102150405}" # WATCH_BACKUP_NAME_TEMPLATE, used only for `watch` command, macros values will apply from `system.macros` for time:XXX, look format in https://go.dev/src/time/format.go

sharded_operation: false # SHARDED_OPERATION, backups to replicas will save the schema but will only save one replica copy of the data with tables sharded among replicas.
sharded_operation_mode: none # SHARDED_OPERATION_MODE, how different replicas will shard backing up data for tables. Options are: none (no sharding), table (table granularity), database (database granularity), first-replica (on the lexicographically sorted first active replica). If left empty, then the "none" option will be set as default.
clickhouse:
username: default # CLICKHOUSE_USERNAME
password: "" # CLICKHOUSE_PASSWORD
Expand Down
19 changes: 10 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ require (
github.com/Azure/go-autorest/autorest v0.11.29
github.com/Azure/go-autorest/autorest/adal v0.9.23
github.com/ClickHouse/clickhouse-go/v2 v2.10.1
github.com/aws/aws-sdk-go-v2 v1.17.6
github.com/aws/aws-sdk-go-v2/config v1.18.16
github.com/aws/aws-sdk-go-v2/credentials v1.13.16
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.56
github.com/aws/aws-sdk-go-v2/service/s3 v1.30.6
github.com/aws/aws-sdk-go-v2/service/sts v1.18.6
github.com/apex/log v1.9.0
github.com/aws/aws-sdk-go-v2 v1.18.1
github.com/aws/aws-sdk-go-v2/config v1.18.26
github.com/aws/aws-sdk-go-v2/credentials v1.13.25
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.68
github.com/aws/aws-sdk-go-v2/service/s3 v1.34.0
github.com/aws/aws-sdk-go-v2/service/sts v1.19.1
github.com/aws/smithy-go v1.13.5
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/eapache/go-resiliency v1.3.0
github.com/go-logfmt/logfmt v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
Expand All @@ -29,7 +31,7 @@ require (
github.com/otiai10/copy v1.11.0
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.5
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_golang v1.15.1
github.com/rs/zerolog v1.29.1
github.com/stretchr/testify v1.8.4
github.com/tencentyun/cos-go-sdk-v5 v0.7.41
Expand Down Expand Up @@ -75,7 +77,6 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
github.com/fatih/color v1.7.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
Expand All @@ -93,7 +94,7 @@ require (
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-ieproxy v0.0.9 // indirect
github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand Down
88 changes: 53 additions & 35 deletions go.sum

Large diffs are not rendered by default.

100 changes: 84 additions & 16 deletions pkg/backup/backup_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/fnv"
"sort"
)

var (
Expand All @@ -15,6 +14,14 @@ var (

// errNoActiveReplicas is returned when a table is has no current active replicas
errNoActiveReplicas = errors.New("no active replicas")

shardFuncRegistry = map[string]shardFunc{
"table": fnvHashModTableShardFunc,
"database": fnvHashModDatabaseShardFunc,
"first-replica": firstReplicaShardFunc,
"none": noneShardFunc,
"": noneShardFunc,
}
)

// shardDetermination is an object holding information on whether or not a table is within the
Expand All @@ -23,7 +30,7 @@ type shardDetermination map[string]bool

// inShard returns whether or not a given table is within a backup shard
func (d shardDetermination) inShard(database, table string) (bool, error) {
fullName := fmt.Sprintf("%s.%s", database, table)
fullName := fmt.Sprintf("`%s`.`%s`", database, table)
presentInShard, ok := d[fullName]
if !ok {
return false, fmt.Errorf("error determining backup shard state for %q: %w", fullName,
Expand All @@ -48,33 +55,94 @@ type tableReplicaMetadata struct {

// fullName returns the table name in the form of `database.table`
func (md *tableReplicaMetadata) fullName() string {
return fmt.Sprintf("%s.%s", md.Database, md.Table)
return fmt.Sprintf("`%s`.`%s`", md.Database, md.Table)
}

// querier is an interface that can query Clickhouse
type querier interface {
SelectContext(context.Context, interface{}, string, ...interface{}) error
}

// shardFunc is a function that is used to determine whether or not a given database/table should be
// handled by a given replica
// shardFunc is a function that is determines whether or not a given database/table should have its
// data backed up by the replica calling this function
type shardFunc func(md *tableReplicaMetadata) (bool, error)

// fnvHashModShardFunc performs a FNV hash of a table name in the form of `database.table` and then
// performs a mod N operation (where N is the number of active replicas) in order to find an index
// in an alphabetically sorted list of active replicas which corresponds to the replica that will
// handle the backup of the table
func fnvHashModShardFunc(md *tableReplicaMetadata) (bool, error) {
func shardFuncByName(name string) (shardFunc, error) {
chosen, ok := shardFuncRegistry[name]
if !ok {
validOptions := make([]string, len(shardFuncRegistry))
for k := range shardFuncRegistry {
if k == "" {
continue
}
validOptions = append(validOptions, k)
}
return nil, fmt.Errorf("unknown backup sharding option %q, valid options: %v", name,
validOptions)
}
return chosen, nil
}

// fnvShardReplicaFromString returns a replica assignment from a slice of active replicas by taking
// an arbitrary string, performing a FNV hash on it (mod NumActiveReplicas), and using the resulting
// number as an index of the sorted slice of active replicas. It is assumed that the active replicas
// slice is provided pre-sorted.
func fnvShardReplicaFromString(str string, activeReplicas []string) (string, error) {
if len(activeReplicas) == 0 {
return "", fmt.Errorf("could not determine in-shard state for %s: %w", str,
errNoActiveReplicas)
}

h := fnv.New32a()
h.Write([]byte(str))
i := h.Sum32() % uint32(len(activeReplicas))
return activeReplicas[i], nil
}

// fnvHashModTableShardFunc determines whether a replica should handle backing up data based on the
// table name in the form of `database.table`. It is assumed that the active replicas slice is
// provided pre-sorted.
func fnvHashModTableShardFunc(md *tableReplicaMetadata) (bool, error) {
assignedReplica, err := fnvShardReplicaFromString(md.fullName(), md.ActiveReplicas)
if err != nil {
return false, err
}
return assignedReplica == md.ReplicaName, nil
}

// fnvHashModDatabaseShardFunc determines whether a replica should handle backing up data based on
// database name. It is assumed that the active replicas slice is provided pre-sorted.
func fnvHashModDatabaseShardFunc(md *tableReplicaMetadata) (bool, error) {
assignedReplica, err := fnvShardReplicaFromString(md.Database, md.ActiveReplicas)
if err != nil {
return false, err
}
return assignedReplica == md.ReplicaName, nil
}

// firstReplicaShardFunc determines whether a replica should handle backing up data based on whether
// or not it is the lexicographically first active replica. It is assumed that the active replicas
// slice is provided pre-sorted.
func firstReplicaShardFunc(md *tableReplicaMetadata) (bool, error) {
if len(md.ActiveReplicas) == 0 {
return false, fmt.Errorf("could not determine in-shard state for %s: %w", md.fullName(),
errNoActiveReplicas)
}
sort.Strings(md.ActiveReplicas)
return md.ReplicaName == md.ActiveReplicas[0], nil
}

h := fnv.New32a()
h.Write([]byte(md.fullName()))
i := h.Sum32() % uint32(len(md.ActiveReplicas))
return md.ActiveReplicas[i] == md.ReplicaName, nil
// noneShardFunc always returns true
func noneShardFunc(md *tableReplicaMetadata) (bool, error) {
return true, nil
}

// doesShard returns whether a ShardedOperationMode configuration performs sharding or not
func doesShard(mode string) bool {
_, ok := shardFuncRegistry[mode]
if !ok {
return false
}
return mode != "" && mode != "none"
}

// replicaDeterminer is a concrete struct that will query clickhouse to obtain a shard determination
Expand All @@ -97,7 +165,7 @@ func newReplicaDeterminer(q querier, sf shardFunc) *replicaDeterminer {
func (rd *replicaDeterminer) getReplicaState(ctx context.Context) ([]tableReplicaMetadata, error) {
md := []tableReplicaMetadata{}
// TODO: Change query to pull replica_is_active after upgrading to clickhouse-go v2
query := "select t.database, t.name as table, r.replica_name, mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active)) as active_replicas from system.tables t left join system.replicas r on t.database = r.database and t.name = r.table"
query := "SELECT t.database, t.name AS table, r.replica_name, arraySort(mapKeys(mapFilter((replica, active) -> (active == 1), r.replica_is_active))) AS active_replicas FROM system.tables t LEFT JOIN system.replicas r ON t.database = r.database AND t.name = r.table"
if err := rd.q.SelectContext(ctx, &md, query); err != nil {
return nil, fmt.Errorf("could not determine replication state: %w", err)
}
Expand Down
Loading

0 comments on commit e4e2260

Please sign in to comment.