From 42718ca8a54982360814625072e16abaab4ddd0c Mon Sep 17 00:00:00 2001 From: Slach Date: Wed, 24 Jan 2024 15:10:07 +0400 Subject: [PATCH] refactoring create command to avoid race conditions, resolve direct error compare and add error handlers, improve logging --- Dockerfile | 2 +- ReadMe.md | 1 - go.mod | 1 + go.sum | 2 + pkg/backup/backup_shard.go | 4 +- pkg/backup/create.go | 28 +++---- pkg/backup/delete.go | 5 +- pkg/backup/restore_remote.go | 4 +- pkg/config/config_linux.go | 2 +- pkg/server/callback_test.go | 6 +- pkg/storage/azblob.go | 9 ++- pkg/storage/azblob/chunkwriting.go | 4 +- pkg/storage/cos.go | 4 +- pkg/storage/gcs.go | 9 ++- pkg/storage/object_disk/object_disk.go | 80 +++++++++---------- .../docker-compose/kafka-service.yml | 1 + 16 files changed, 85 insertions(+), 77 deletions(-) diff --git a/Dockerfile b/Dockerfile index 1c3cd888..6b4b471c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -102,7 +102,7 @@ ARG TARGETPLATFORM MAINTAINER Eugene Klimov RUN apt-get update && apt-get install -y gpg xxd bsdmainutils parallel && wget -qO- https://kopia.io/signing-key | gpg --dearmor -o /usr/share/keyrings/kopia-keyring.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/kopia-keyring.gpg] http://packages.kopia.io/apt/ stable main" > /etc/apt/sources.list.d/kopia.list && \ + echo "deb [signed-by=/usr/share/keyrings/kopia-keyring.gpg] https://packages.kopia.io/apt/ stable main" > /etc/apt/sources.list.d/kopia.list && \ wget -c "https://github.com/mikefarah/yq/releases/latest/download/yq_linux_$(dpkg --print-architecture)" -O /usr/bin/yq && chmod +x /usr/bin/yq && \ apt-get update -y && \ apt-get install -y ca-certificates tzdata bash curl restic rsync rclone jq gpg kopia && \ diff --git a/ReadMe.md b/ReadMe.md index 404a6dce..d0e8a70a 100644 --- a/ReadMe.md +++ b/ReadMe.md @@ -827,7 +827,6 @@ fi - [How to convert MergeTree to ReplicatedMergeTree](Examples.md#how-to-convert-mergetree-to-replicatedmergetree) - [How to store backups on NFS or another server](Examples.md#how-to-store-backups-on-nfs-backup-drive-or-another-server-via-sftp) - [How to move data to another clickhouse server](Examples.md#how-to-move-data-to-another-clickhouse-server) -- [How to reduce the number of partitions](Examples.md#How-to-reduce-the-number-of-partitions) - [How to monitor that backups created and uploaded correctly](Examples.md#how-to-monitor-that-backups-were-created-and-uploaded-correctly) - [How to back up / restore a sharded cluster](Examples.md#how-to-back-up--restore-a-sharded-cluster) - [How to back up a sharded cluster with Ansible](Examples.md#how-to-back-up-a-sharded-cluster-with-ansible) diff --git a/go.mod b/go.mod index 7682e05a..a21e6b9a 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.6 github.com/prometheus/client_golang v1.17.0 + github.com/puzpuzpuz/xsync v1.5.2 github.com/stretchr/testify v1.8.4 github.com/tencentyun/cos-go-sdk-v5 v0.7.45 github.com/urfave/cli v1.22.14 diff --git a/go.sum b/go.sum index d29119c3..8b6b4ff9 100644 --- a/go.sum +++ b/go.sum @@ -337,6 +337,8 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= +github.com/puzpuzpuz/xsync v1.5.2 h1:yRAP4wqSOZG+/4pxJ08fPTwrfL0IzE/LKQ/cw509qGY= +github.com/puzpuzpuz/xsync v1.5.2/go.mod h1:K98BYhX3k1dQ2M63t1YNVDanbwUPmBCAhNmVrrxfiGg= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/pkg/backup/backup_shard.go b/pkg/backup/backup_shard.go index 6c6f1ca8..dc90c628 100644 --- a/pkg/backup/backup_shard.go +++ b/pkg/backup/backup_shard.go @@ -94,7 +94,9 @@ func fnvShardReplicaFromString(str string, activeReplicas []string) (string, err } h := fnv.New32a() - h.Write([]byte(str)) + if _, err := h.Write([]byte(str)); err != nil { + return "", fmt.Errorf("can't write %s to fnv.New32a") + } i := h.Sum32() % uint32(len(activeReplicas)) return activeReplicas[i], nil } diff --git a/pkg/backup/create.go b/pkg/backup/create.go index fe71fa89..0d7ef554 100644 --- a/pkg/backup/create.go +++ b/pkg/backup/create.go @@ -202,14 +202,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par var addTableToBackupErr error disksToPartsMap, realSize, addTableToBackupErr = b.AddTableToBackup(createCtx, backupName, shadowBackupUUID, disks, &table, partitionsIdMap[metadata.TableTitle{Database: table.Database, Table: table.Name}]) if addTableToBackupErr != nil { - log.Error(addTableToBackupErr.Error()) - if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil { - log.Error(removeBackupErr.Error()) - } - // fix corner cases after https://github.com/Altinity/clickhouse-backup/issues/379 - if cleanShadowErr := b.Clean(ctx); cleanShadowErr != nil { - log.Error(cleanShadowErr.Error()) - } + log.Errorf("b.AddTableToBackup error: %v", addTableToBackupErr) return addTableToBackupErr } // more precise data size calculation @@ -224,10 +217,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par var inProgressMutationsErr error inProgressMutations, inProgressMutationsErr = b.ch.GetInProgressMutations(createCtx, table.Database, table.Name) if inProgressMutationsErr != nil { - log.Error(inProgressMutationsErr.Error()) - if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil { - log.Error(removeBackupErr.Error()) - } + log.Errorf("b.ch.GetInProgressMutations error: %v", inProgressMutationsErr) return inProgressMutationsErr } } @@ -244,9 +234,7 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par MetadataOnly: schemaOnly || table.BackupType == clickhouse.ShardBackupSchema, }, disks) if createTableMetadataErr != nil { - if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil { - log.Error(removeBackupErr.Error()) - } + log.Errorf("b.createTableMetadata error: %v", createTableMetadataErr) return createTableMetadataErr } atomic.AddUint64(&backupMetadataSize, metadataSize) @@ -262,6 +250,14 @@ func (b *Backuper) createBackupLocal(ctx context.Context, backupName string, par }) } if wgWaitErr := createBackupWorkingGroup.Wait(); wgWaitErr != nil { + if removeBackupErr := b.RemoveBackupLocal(createCtx, backupName, disks); removeBackupErr != nil { + log.Errorf("b.RemoveBackupLocal error: %v", removeBackupErr) + } + // fix corner cases after https://github.com/Altinity/clickhouse-backup/issues/379 + if cleanShadowErr := b.Clean(ctx); cleanShadowErr != nil { + log.Errorf("b.Clean error: %v", cleanShadowErr) + log.Error(cleanShadowErr.Error()) + } return fmt.Errorf("one of createBackupLocal go-routine return error: %v", wgWaitErr) } backupRBACSize, backupConfigSize := uint64(0), uint64(0) @@ -658,7 +654,7 @@ func (b *Backuper) uploadObjectDiskParts(ctx context.Context, backupName, backup defer cancel() uploadObjectDiskPartsWorkingGroup, ctx := errgroup.WithContext(ctx) uploadObjectDiskPartsWorkingGroup.SetLimit(int(b.cfg.General.UploadConcurrency)) - srcDiskConnection, exists := object_disk.DisksConnections[disk.Name] + srcDiskConnection, exists := object_disk.DisksConnections.Load(disk.Name) if !exists { return 0, fmt.Errorf("uploadObjectDiskParts: %s not present in object_disk.DisksConnections", disk.Name) } diff --git a/pkg/backup/delete.go b/pkg/backup/delete.go index d0e5c73a..f5f7d638 100644 --- a/pkg/backup/delete.go +++ b/pkg/backup/delete.go @@ -369,7 +369,7 @@ func (b *Backuper) cleanRemoteBackupObjectDisks(ctx context.Context, backup stor if err := b.dst.DownloadCompressedStream(ctx, fName, localPath); err != nil { return err } - filepath.Walk(localPath, func(fPath string, fInfo fs.FileInfo, err error) error { + walkErr := filepath.Walk(localPath, func(fPath string, fInfo fs.FileInfo, err error) error { if err != nil { return err } @@ -388,6 +388,9 @@ func (b *Backuper) cleanRemoteBackupObjectDisks(ctx context.Context, backup stor } return nil }) + if walkErr != nil { + b.log.Warnf("filepath.Walk(%s) return error: %v", localPath, walkErr) + } if err := os.RemoveAll(localPath); err != nil { return err } diff --git a/pkg/backup/restore_remote.go b/pkg/backup/restore_remote.go index b7781094..b2eabf8d 100644 --- a/pkg/backup/restore_remote.go +++ b/pkg/backup/restore_remote.go @@ -1,9 +1,11 @@ package backup +import "errors" + func (b *Backuper) RestoreFromRemote(backupName, tablePattern string, databaseMapping, partitions []string, schemaOnly, dataOnly, dropTable, ignoreDependencies, restoreRBAC, rbacOnly, restoreConfigs, configsOnly, resume bool, commandId int) error { if err := b.Download(backupName, tablePattern, partitions, schemaOnly, resume, commandId); err != nil { // https://github.com/Altinity/clickhouse-backup/issues/625 - if err != ErrBackupIsAlreadyExists { + if !errors.Is(err, ErrBackupIsAlreadyExists) { return err } } diff --git a/pkg/config/config_linux.go b/pkg/config/config_linux.go index d485b0a7..ef770dc8 100644 --- a/pkg/config/config_linux.go +++ b/pkg/config/config_linux.go @@ -17,7 +17,7 @@ func (cfg *Config) SetPriority() error { } } if err = gionice.SetNicePri(0, gionice.PRIO_PROCESS, cfg.General.CPUNicePriority); err != nil { - log.Warnf("can't set CPU priority %s, error: %v", cfg.General.CPUNicePriority, err) + log.Warnf("can't set CPU priority %v, error: %v", cfg.General.CPUNicePriority, err) } return nil } diff --git a/pkg/server/callback_test.go b/pkg/server/callback_test.go index 4ca06886..b630dd4e 100644 --- a/pkg/server/callback_test.go +++ b/pkg/server/callback_test.go @@ -29,7 +29,11 @@ func TestParseCallback(t *testing.T) { passToChanHandler := func(ch chan *payload) http.HandlerFunc { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() + defer func() { + if err := r.Body.Close(); err != nil { + t.Fatalf("can't close r.Body: %v", err) + } + }() var data payload if err := json.NewDecoder(r.Body).Decode(&data); err != nil { diff --git a/pkg/storage/azblob.go b/pkg/storage/azblob.go index b0ed946b..34ce805e 100644 --- a/pkg/storage/azblob.go +++ b/pkg/storage/azblob.go @@ -126,7 +126,8 @@ func (a *AzureBlob) Connect(ctx context.Context) error { } testBlob := a.Container.NewBlockBlobURL(base64.URLEncoding.EncodeToString(testName)) if _, err = testBlob.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}); err != nil { - if se, ok := err.(azblob.StorageError); !ok || se.ServiceCode() != azblob.ServiceCodeBlobNotFound { + var se azblob.StorageError + if !errors.As(err, &se) || se.ServiceCode() != azblob.ServiceCodeBlobNotFound { return errors.Wrapf(err, "azblob: failed to access container %s", a.Config.Container) } } @@ -189,7 +190,8 @@ func (a *AzureBlob) StatFile(ctx context.Context, key string) (RemoteFile, error blob := a.Container.NewBlockBlobURL(path.Join(a.Config.Path, key)) r, err := blob.GetProperties(ctx, azblob.BlobAccessConditions{}, a.CPK) if err != nil { - if se, ok := err.(azblob.StorageError); !ok || se.ServiceCode() != azblob.ServiceCodeBlobNotFound { + var se azblob.StorageError + if !errors.As(err, &se) || se.ServiceCode() != azblob.ServiceCodeBlobNotFound { return nil, err } return nil, ErrNotFound @@ -332,7 +334,8 @@ func (f *azureBlobFile) LastModified() time.Time { func isContainerAlreadyExists(err error) bool { if err != nil { - if storageErr, ok := err.(azblob.StorageError); ok { // This error is a Service-specific + var storageErr azblob.StorageError + if errors.As(err, &storageErr) { // This error is a Service-specific switch storageErr.ServiceCode() { // Compare serviceCode to ServiceCodeXxx constants case azblob.ServiceCodeContainerAlreadyExists: return true diff --git a/pkg/storage/azblob/chunkwriting.go b/pkg/storage/azblob/chunkwriting.go index efbb8ce6..460b093a 100644 --- a/pkg/storage/azblob/chunkwriting.go +++ b/pkg/storage/azblob/chunkwriting.go @@ -145,11 +145,11 @@ func (c *copier) sendChunk() error { id: c.id.next(), } return nil - case err != nil && (err == io.EOF || err == io.ErrUnexpectedEOF) && n == 0: + case err != nil && (err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF)) && n == 0: return io.EOF } - if err == io.EOF || err == io.ErrUnexpectedEOF { + if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { c.ch <- copierChunk{ buffer: buffer[0:n], id: c.id.next(), diff --git a/pkg/storage/cos.go b/pkg/storage/cos.go index 801e62f8..7f266638 100644 --- a/pkg/storage/cos.go +++ b/pkg/storage/cos.go @@ -2,6 +2,7 @@ package storage import ( "context" + "errors" "fmt" "github.com/Altinity/clickhouse-backup/pkg/config" "io" @@ -62,7 +63,8 @@ func (c *COS) StatFile(ctx context.Context, key string) (RemoteFile, error) { // file max size is 5Gb resp, err := c.client.Object.Get(ctx, path.Join(c.Config.Path, key), nil) if err != nil { - cosErr, ok := err.(*cos.ErrorResponse) + var cosErr *cos.ErrorResponse + ok := errors.As(err, &cosErr) if ok && cosErr.Code == "NoSuchKey" { return nil, ErrNotFound } diff --git a/pkg/storage/gcs.go b/pkg/storage/gcs.go index 50e1f507..e3fbdd63 100644 --- a/pkg/storage/gcs.go +++ b/pkg/storage/gcs.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/base64" + "errors" "fmt" "io" "net" @@ -196,8 +197,8 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces }) for { object, err := it.Next() - switch err { - case nil: + switch { + case err == nil: if object.Prefix != "" { if err := process(ctx, &gcsFile{ name: strings.TrimPrefix(object.Prefix, rootPath), @@ -213,7 +214,7 @@ func (gcs *GCS) Walk(ctx context.Context, gcsPath string, recursive bool, proces }); err != nil { return err } - case iterator.Done: + case errors.Is(err, iterator.Done): return nil default: return err @@ -282,7 +283,7 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) { objAttr, err := gcs.client.Bucket(gcs.Config.Bucket).Object(path.Join(gcs.Config.Path, key)).Attrs(ctx) if err != nil { - if err == storage.ErrObjectNotExist { + if errors.Is(err, storage.ErrObjectNotExist) { return nil, ErrNotFound } return nil, err diff --git a/pkg/storage/object_disk/object_disk.go b/pkg/storage/object_disk/object_disk.go index 12302f6e..502705aa 100644 --- a/pkg/storage/object_disk/object_disk.go +++ b/pkg/storage/object_disk/object_disk.go @@ -11,7 +11,6 @@ import ( "path" "strconv" "strings" - "sync" "time" "github.com/Altinity/clickhouse-backup/pkg/clickhouse" @@ -19,6 +18,7 @@ import ( "github.com/Altinity/clickhouse-backup/pkg/storage" "github.com/antchfx/xmlquery" apexLog "github.com/apex/log" + "github.com/puzpuzpuz/xsync" ) type MetadataVersion uint32 @@ -190,7 +190,7 @@ type ObjectStorageCredentials struct { AzureContainerName string } -var DisksCredentials map[string]ObjectStorageCredentials +var DisksCredentials = xsync.NewMapOf[ObjectStorageCredentials]() type ObjectStorageConnection struct { Type string @@ -232,32 +232,24 @@ func (c *ObjectStorageConnection) GetRemotePath() string { return "" } -var DisksConnections map[string]ObjectStorageConnection -var DisksConnectionsMutex sync.Mutex +var DisksConnections = xsync.NewMapOf[*ObjectStorageConnection]() -var SystemDisks map[string]clickhouse.Disk -var SystemDisksMutex sync.Mutex +var SystemDisks = xsync.NewMapOf[clickhouse.Disk]() func InitCredentialsAndConnections(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName string) error { var err error - DisksConnectionsMutex.Lock() - if _, exists := DisksCredentials[diskName]; !exists { - DisksCredentials, err = getObjectDisksCredentials(ctx, ch) - if err != nil { + if _, exists := DisksCredentials.Load(diskName); !exists { + if err = getObjectDisksCredentials(ctx, ch); err != nil { return err } } - if _, exists := DisksConnections[diskName]; !exists { - if DisksConnections == nil { - DisksConnections = make(map[string]ObjectStorageConnection) - } + if _, exists := DisksConnections.Load(diskName); !exists { connection, err := makeObjectDiskConnection(ctx, ch, cfg, diskName) if err != nil { return err } - DisksConnections[diskName] = *connection + DisksConnections.Store(diskName, connection) } - DisksConnectionsMutex.Unlock() return nil } @@ -297,18 +289,17 @@ func WriteMetadataToFile(metadata *Metadata, path string) error { return metadata.writeToFile(metadataFile) } -func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) (map[string]ObjectStorageCredentials, error) { - credentials := make(map[string]ObjectStorageCredentials) +func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) error { var version int var err error if version, err = ch.GetVersion(ctx); err != nil { - return nil, err + return err } else if version <= 20006000 { - return credentials, nil + return nil } configFile, doc, err := ch.ParseXML(ctx, "config.xml") if err != nil { - return nil, err + return err } root := xmlquery.FindOne(doc, "/") disks := xmlquery.Find(doc, fmt.Sprintf("/%s/storage_configuration/disks/*", root.Data)) @@ -331,11 +322,11 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) ( // macros works only after 23.3+ https://github.com/Altinity/clickhouse-backup/issues/750 if version > 23003000 { if creds.EndPoint, err = ch.ApplyMacros(ctx, creds.EndPoint); err != nil { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s apply macros to error: %v", configFile, root.Data, diskName, err) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s apply macros to error: %v", configFile, root.Data, diskName, err) } } } else { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) } if regionNode := d.SelectElement("region"); regionNode != nil { creds.S3Region = strings.Trim(regionNode.InnerText(), "\r\n \t") @@ -358,7 +349,7 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) ( creds.S3SecretKey = os.Getenv("AWS_SECRET_ACCESS_KEY") } } - credentials[diskName] = creds + DisksCredentials.Store(diskName, creds) break case "azure_blob_storage": creds := ObjectStorageCredentials{ @@ -366,25 +357,25 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) ( } accountUrlNode := d.SelectElement("storage_account_url") if accountUrlNode == nil { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) } creds.EndPoint = strings.Trim(accountUrlNode.InnerText(), "\r\n \t") containerNameNode := d.SelectElement("container_name") if containerNameNode == nil { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) } creds.AzureContainerName = strings.Trim(containerNameNode.InnerText(), "\r\n \t") accountNameNode := d.SelectElement("account_name") if containerNameNode == nil { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) } creds.AzureAccountName = strings.Trim(accountNameNode.InnerText(), "\r\n \t") accountKeyNode := d.SelectElement("account_key") if containerNameNode == nil { - return nil, fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) + return fmt.Errorf("%s -> /%s/storage_configuration/disks/%s doesn't contains ", configFile, root.Data, diskName) } creds.AzureAccountKey = strings.Trim(accountKeyNode.InnerText(), "\r\n \t") - credentials[diskName] = creds + DisksCredentials.Store(diskName, creds) break } } @@ -395,38 +386,39 @@ func getObjectDisksCredentials(ctx context.Context, ch *clickhouse.ClickHouse) ( diskType := diskTypeNode.InnerText() switch diskType { case "encrypted", "cache": - _, exists := credentials[diskName] + _, exists := DisksCredentials.Load(diskName) if !exists { if diskNode := d.SelectElement("disk"); diskNode != nil { childDiskName := diskNode.InnerText() - credentials[diskName] = credentials[childDiskName] + if childCreds, childExists := DisksCredentials.Load(childDiskName); childExists { + DisksCredentials.Store(diskName, childCreds) + } else { + apexLog.Warnf("disk %s with type %s, reference to childDisk %s which not contains DiskCredentials", diskName, diskType, childDiskName) + } } } } } } - return credentials, nil + return nil } func makeObjectDiskConnection(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Config, diskName string) (*ObjectStorageConnection, error) { - creds, exists := DisksCredentials[diskName] + creds, exists := DisksCredentials.Load(diskName) if !exists { return nil, fmt.Errorf("%s is not present in object_disk.DisksCredentials", diskName) } connection := ObjectStorageConnection{} - SystemDisksMutex.Lock() - if SystemDisks == nil || len(SystemDisks) == 0 { + if SystemDisks.Size() == 0 { disks, err := ch.GetDisks(ctx, false) if err != nil { return nil, err } - SystemDisks = make(map[string]clickhouse.Disk, len(disks)) for _, d := range disks { - SystemDisks[d.Name] = d + SystemDisks.Store(d.Name, d) } } - SystemDisksMutex.Unlock() - disk, exists := SystemDisks[diskName] + disk, exists := SystemDisks.Load(diskName) if !exists { return nil, fmt.Errorf("%s is not presnet in object_disk.SystemDisks", diskName) } @@ -533,7 +525,7 @@ func makeObjectDiskConnection(ctx context.Context, ch *clickhouse.ClickHouse, cf } func ConvertLocalPathToRemote(diskName, localPath string) (string, error) { - connection, exists := DisksConnections[diskName] + connection, exists := DisksConnections.Load(diskName) if !exists { return "", fmt.Errorf("%s is not present in object_disk.DisksConnections", diskName) } @@ -548,7 +540,7 @@ func ConvertLocalPathToRemote(diskName, localPath string) (string, error) { } func GetFileReader(ctx context.Context, diskName, remotePath string) (io.ReadCloser, error) { - connection, exists := DisksConnections[diskName] + connection, exists := DisksConnections.Load(diskName) if !exists { return nil, fmt.Errorf("%s not exits in object_disk.DisksConnections", diskName) } @@ -579,7 +571,7 @@ func ReadFileContent(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config } func PutFile(ctx context.Context, diskName, remotePath string, content []byte) error { - connection, exists := DisksConnections[diskName] + connection, exists := DisksConnections.Load(diskName) if !exists { return fmt.Errorf("%s not exits in object_disk.DisksConnections", diskName) } @@ -602,7 +594,7 @@ func WriteFileContent(ctx context.Context, ch *clickhouse.ClickHouse, cfg *confi } func DeleteFile(ctx context.Context, diskName, remotePath string) error { - connection, exists := DisksConnections[diskName] + connection, exists := DisksConnections.Load(diskName) if !exists { return fmt.Errorf("%s not exits in object_disk.DisksConnections", diskName) } @@ -644,7 +636,7 @@ func CopyObject(ctx context.Context, ch *clickhouse.ClickHouse, cfg *config.Conf if err := InitCredentialsAndConnections(ctx, ch, cfg, diskName); err != nil { return 0, err } - connection := DisksConnections[diskName] + connection, _ := DisksConnections.Load(diskName) remoteStorage := connection.GetRemoteStorage() return remoteStorage.CopyObject(ctx, srcSize, srcBucket, srcKey, dstPath) } diff --git a/test/testflows/clickhouse_backup/docker-compose/kafka-service.yml b/test/testflows/clickhouse_backup/docker-compose/kafka-service.yml index 1c23881a..506e1297 100644 --- a/test/testflows/clickhouse_backup/docker-compose/kafka-service.yml +++ b/test/testflows/clickhouse_backup/docker-compose/kafka-service.yml @@ -11,6 +11,7 @@ services: - ZOOKEEPER=zookeeper:2181 - KAFKA_LOG4J_LOGGERS=kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO healthcheck: + test: nc -z localhost 9092 || exit 1 interval: 3s timeout: 2s retries: 5