Skip to content

Commit

Permalink
refactoring create command to avoid race conditions, resolve direct e…
Browse files Browse the repository at this point in the history
…rror compare and add error handlers, improve logging
  • Loading branch information
Slach committed Jan 24, 2024
1 parent a037207 commit 42718ca
Show file tree
Hide file tree
Showing 16 changed files with 85 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ ARG TARGETPLATFORM
MAINTAINER Eugene Klimov <[email protected]>

RUN apt-get update && apt-get install -y gpg xxd bsdmainutils parallel && wget -qO- https://kopia.io/signing-key | gpg --dearmor -o /usr/share/keyrings/kopia-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/kopia-keyring.gpg] http://packages.kopia.io/apt/ stable main" > /etc/apt/sources.list.d/kopia.list && \
echo "deb [signed-by=/usr/share/keyrings/kopia-keyring.gpg] https://packages.kopia.io/apt/ stable main" > /etc/apt/sources.list.d/kopia.list && \
wget -c "https://github.com/mikefarah/yq/releases/latest/download/yq_linux_$(dpkg --print-architecture)" -O /usr/bin/yq && chmod +x /usr/bin/yq && \
apt-get update -y && \
apt-get install -y ca-certificates tzdata bash curl restic rsync rclone jq gpg kopia && \
Expand Down
1 change: 0 additions & 1 deletion ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ fi
- [How to convert MergeTree to ReplicatedMergeTree](Examples.md#how-to-convert-mergetree-to-replicatedmergetree)
- [How to store backups on NFS or another server](Examples.md#how-to-store-backups-on-nfs-backup-drive-or-another-server-via-sftp)
- [How to move data to another clickhouse server](Examples.md#how-to-move-data-to-another-clickhouse-server)
- [How to reduce the number of partitions](Examples.md#How-to-reduce-the-number-of-partitions)
- [How to monitor that backups created and uploaded correctly](Examples.md#how-to-monitor-that-backups-were-created-and-uploaded-correctly)
- [How to back up / restore a sharded cluster](Examples.md#how-to-back-up--restore-a-sharded-cluster)
- [How to back up a sharded cluster with Ansible](Examples.md#how-to-back-up-a-sharded-cluster-with-ansible)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/sftp v1.13.6
github.com/prometheus/client_golang v1.17.0
github.com/puzpuzpuz/xsync v1.5.2
github.com/stretchr/testify v1.8.4
github.com/tencentyun/cos-go-sdk-v5 v0.7.45
github.com/urfave/cli v1.22.14
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne
github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY=
github.com/puzpuzpuz/xsync v1.5.2/go.mod h1:K98BYhX3k1dQ2M63t1YNVDanbwUPmBCAhNmVrrxfiGg=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
4 changes: 3 additions & 1 deletion pkg/backup/backup_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func fnvShardReplicaFromString(str string, activeReplicas []string) (string, err
}

h := fnv.New32a()
h.Write([]byte(str))
if _, err := h.Write([]byte(str)); err != nil {
return "", fmt.Errorf("can't write %s to fnv.New32a")

Check failure on line 98 in pkg/backup/backup_shard.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

fmt.Errorf format %s reads arg #1, but call has 0 args

Check failure on line 98 in pkg/backup/backup_shard.go

View workflow job for this annotation

GitHub Actions / Build (1.21)

fmt.Errorf format %s reads arg #1, but call has 0 args
}
i := h.Sum32() % uint32(len(activeReplicas))
return activeReplicas[i], nil
}
Expand Down
28 changes: 12 additions & 16 deletions pkg/backup/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
var addTableToBackupErr error
disksToPartsMap, realSize, addTableToBackupErr = b.AddTableToBackup(createCtx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}])
if addTableToBackupErr != nil {
log.Error(addTableToBackupErr.Error())
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
// fix corner cases after https://github.com/Altinity/clickhouse-backup/issues/379
if cleanShadowErr := b.Clean(ctx); cleanShadowErr != nil {
log.Error(cleanShadowErr.Error())
}
log.Errorf("b.AddTableToBackup error: %v", addTableToBackupErr)
return addTableToBackupErr
}
// more precise data size calculation
Expand All @@ -224,10 +217,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
var inProgressMutationsErr error
inProgressMutations, inProgressMutationsErr = b.ch.GetInProgressMutations(createCtx, table.Database, table.Name)
if inProgressMutationsErr != nil {
log.Error(inProgressMutationsErr.Error())
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
log.Errorf("b.ch.GetInProgressMutations error: %v", inProgressMutationsErr)
return inProgressMutationsErr
}
}
Expand All @@ -244,9 +234,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema,
}, disks)
if createTableMetadataErr != nil {
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Error(removeBackupErr.Error())
}
log.Errorf("b.createTableMetadata error: %v", createTableMetadataErr)
return createTableMetadataErr
}
atomic.AddUint64(&backupMetadataSize, metadataSize)
Expand All @@ -262,6 +250,14 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par
})
}
if wgWaitErr := createBackupWorkingGroup.Wait(); wgWaitErr != nil {
if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil {
log.Errorf("b.RemoveBackupLocal error: %v", removeBackupErr)
}
// fix corner cases after https://github.com/Altinity/clickhouse-backup/issues/379
if cleanShadowErr := b.Clean(ctx); cleanShadowErr != nil {
log.Errorf("b.Clean error: %v", cleanShadowErr)
log.Error(cleanShadowErr.Error())
}
return fmt.Errorf("one of createBackupLocal go-routine return error: %v", wgWaitErr)
}
backupRBACSize, backupConfigSize := uint64(0), uint64(0)
Expand Down Expand Up @@ -658,7 +654,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup
defer cancel()
uploadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx)
uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency))
srcDiskConnection, exists := object_disk.DisksConnections[disk.Name]
srcDiskConnection, exists := object_disk.DisksConnections.Load(disk.Name)
if !exists {
return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (b *Backuper) cleanRemoteBackupObjectDisks(ctx context.Context, backup stor
if err := b.dst.DownloadCompressedStream(ctx, fName, localPath); err != nil {
return err
}
filepath.Walk(localPath, func(fPath string, fInfo fs.FileInfo, err error) error {
walkErr := filepath.Walk(localPath, func(fPath string, fInfo fs.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -388,6 +388,9 @@ func (b *Backuper) cleanRemoteBackupObjectDisks(ctx context.Context, backup stor
}
return nil
})
if walkErr != nil {
b.log.Warnf("filepath.Walk(%s) return error: %v", localPath, walkErr)
}
if err := os.RemoveAll(localPath); err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/backup/restore_remote.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package backup

import "errors"

func (b *Backuper) RestoreFromRemote(backupName, tablePattern string, databaseMapping, partitions []string, schemaOnly, dataOnly, dropTable, ignoreDependencies, restoreRBAC, rbacOnly, restoreConfigs, configsOnly, resume bool, commandId int) error {
if err := b.Download(backupName, tablePattern, partitions, schemaOnly, resume, commandId); err != nil {
// https://github.com/Altinity/clickhouse-backup/issues/625
if err != ErrBackupIsAlreadyExists {
if !errors.Is(err, ErrBackupIsAlreadyExists) {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (cfg *Config) SetPriority() error {
}
}
if err = gionice.SetNicePri(0, gionice.PRIO_PROCESS, cfg.General.CPUNicePriority); err != nil {
log.Warnf("can't set CPU priority %s, error: %v", cfg.General.CPUNicePriority, err)
log.Warnf("can't set CPU priority %v, error: %v", cfg.General.CPUNicePriority, err)
}
return nil
}
6 changes: 5 additions & 1 deletion pkg/server/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ func TestParseCallback(t *testing.T) {

passToChanHandler := func(ch chan *payload) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
defer func() {
if err := r.Body.Close(); err != nil {
t.Fatalf("can't close r.Body: %v", err)
}
}()

var data payload
if err := json.NewDecoder(r.Body).Decode(&data); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (a *AzureBlob) Connect(ctx context.Context) error {
}
testBlob := a.Container.NewBlockBlobURL(base64.URLEncoding.EncodeToString(testName))
if _, err = testBlob.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}); err != nil {
if se, ok := err.(azblob.StorageError); !ok || se.ServiceCode() != azblob.ServiceCodeBlobNotFound {
var se azblob.StorageError
if !errors.As(err, &se) || se.ServiceCode() != azblob.ServiceCodeBlobNotFound {
return errors.Wrapf(err, "azblob: failed to access container %s", a.Config.Container)
}
}
Expand Down Expand Up @@ -189,7 +190,8 @@ func (a *AzureBlob) StatFile(ctx context.Context, key string) (RemoteFile, error
blob := a.Container.NewBlockBlobURL(path.Join(a.Config.Path, key))
r, err := blob.GetProperties(ctx, azblob.BlobAccessConditions{}, a.CPK)
if err != nil {
if se, ok := err.(azblob.StorageError); !ok || se.ServiceCode() != azblob.ServiceCodeBlobNotFound {
var se azblob.StorageError
if !errors.As(err, &se) || se.ServiceCode() != azblob.ServiceCodeBlobNotFound {
return nil, err
}
return nil, ErrNotFound
Expand Down Expand Up @@ -332,7 +334,8 @@ func (f *azureBlobFile) LastModified() time.Time {

func isContainerAlreadyExists(err error) bool {
if err != nil {
if storageErr, ok := err.(azblob.StorageError); ok { // This error is a Service-specific
var storageErr azblob.StorageError
if errors.As(err, &storageErr) { // This error is a Service-specific
switch storageErr.ServiceCode() { // Compare serviceCode to ServiceCodeXxx constants
case azblob.ServiceCodeContainerAlreadyExists:
return true
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/azblob/chunkwriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ func (c *copier) sendChunk() error {
id: c.id.next(),
}
return nil
case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0:
case err != nil && (err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF)) && n == 0:
return io.EOF
}

if err == io.EOF || err == io.ErrUnexpectedEOF {
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
c.ch <- copierChunk{
buffer: buffer[0:n],
id: c.id.next(),
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"context"
"errors"
"fmt"
"github.com/Altinity/clickhouse-backup/pkg/config"
"io"
Expand Down Expand Up @@ -62,7 +63,8 @@ func (c *COS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
// file max size is 5Gb
resp, err := c.client.Object.Get(ctx, path.Join(c.Config.Path, key), nil)
if err != nil {
cosErr, ok := err.(*cos.ErrorResponse)
var cosErr *cos.ErrorResponse
ok := errors.As(err, &cosErr)
if ok && cosErr.Code == "NoSuchKey" {
return nil, ErrNotFound
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"encoding/base64"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -196,8 +197,8 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
})
for {
object, err := it.Next()
switch err {
case nil:
switch {
case err == nil:
if object.Prefix != "" {
if err := process(ctx, &gcsFile{
name: strings.TrimPrefix(object.Prefix, rootPath),
Expand All @@ -213,7 +214,7 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces
}); err != nil {
return err
}
case iterator.Done:
case errors.Is(err, iterator.Done):
return nil
default:
return err
Expand Down Expand Up @@ -282,7 +283,7 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx)
if err != nil {
if err == storage.ErrObjectNotExist {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, ErrNotFound
}
return nil, err
Expand Down
Loading

0 comments on commit 42718ca

Please sign in to comment.