From 5a7aad1fdac708459b8970b2be55497ad3c6929c Mon Sep 17 00:00:00 2001 From: Prateek Malhotra Date: Fri, 8 Sep 2017 11:10:32 -0400 Subject: [PATCH] Fixes from testing Azure/B2 backends, added some logging. --- backends/azure_backend.go | 7 +++++++ backends/backblaze_b2_backend.go | 17 +++++++++++++++-- backends/gcs_backend.go | 7 +++---- backup/backup.go | 12 ++++++++---- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/backends/azure_backend.go b/backends/azure_backend.go index 76c105f..6570ed8 100644 --- a/backends/azure_backend.go +++ b/backends/azure_backend.go @@ -26,6 +26,7 @@ import ( "crypto/md5" "encoding/base64" "encoding/binary" + "encoding/hex" "io" "net/http" "os" @@ -196,6 +197,12 @@ func (a *AzureBackend) Upload(ctx context.Context, vol *helpers.VolumeInfo) erro return err } + md5Raw, merr := hex.DecodeString(vol.MD5Sum) + if merr != nil { + return merr + } + blob.Properties.ContentMD5 = base64.StdEncoding.EncodeToString(md5Raw) + // Finally, finalize the storage blob by giving Azure the block list order err = blob.PutBlockList(blocks, nil) if err != nil { diff --git a/backends/backblaze_b2_backend.go b/backends/backblaze_b2_backend.go index 6299c24..d2c4632 100644 --- a/backends/backblaze_b2_backend.go +++ b/backends/backblaze_b2_backend.go @@ -26,6 +26,7 @@ import ( "net/http" "os" "strings" + "sync" "github.com/kurin/blazer/b2" @@ -39,6 +40,7 @@ const B2BackendPrefix = "b2" type B2Backend struct { conf *BackendConfig bucketCli *b2.Bucket + mutex sync.Mutex prefix string bucketName string } @@ -76,7 +78,12 @@ func (b *B2Backend) Init(ctx context.Context, conf *BackendConfig, opts ...Optio opt.Apply(b) } - client, err := b2.NewClient(ctx, accountID, accountKey, b2.Transport(bufferedRT{b.conf.MaxParallelUploadBuffer})) + var cliopts []b2.ClientOption + if conf.MaxParallelUploadBuffer != nil { + cliopts = append(cliopts, b2.Transport(bufferedRT{b.conf.MaxParallelUploadBuffer})) + } + + client, err := b2.NewClient(ctx, accountID, accountKey, cliopts...) if err != nil { return err } @@ -87,16 +94,22 @@ func (b *B2Backend) Init(ctx context.Context, conf *BackendConfig, opts ...Optio } _, _, err = b.bucketCli.ListCurrentObjects(ctx, 0, nil) + if err == io.EOF { + err = nil + } return err } // Upload will upload the provided volume to this B2Backend's configured bucket+prefix func (b *B2Backend) Upload(ctx context.Context, vol *helpers.VolumeInfo) error { + // We will be doing multipart uploads, no need to allow multiple calls of Upload to initiate new uploads. + b.mutex.Lock() + defer b.mutex.Unlock() + name := b.prefix + vol.ObjectName w := b.bucketCli.Object(name).NewWriter(ctx) w.ConcurrentUploads = b.conf.MaxParallelUploads - w.Resume = true w.ChunkSize = b.conf.UploadChunkSize if _, err := io.Copy(w, vol); err != nil { diff --git a/backends/gcs_backend.go b/backends/gcs_backend.go index 4cdcaba..ca7b1c2 100644 --- a/backends/gcs_backend.go +++ b/backends/gcs_backend.go @@ -162,12 +162,11 @@ func (g *GoogleCloudStorageBackend) Upload(ctx context.Context, vol *helpers.Vol objName := g.prefix + vol.ObjectName w := g.client.NewWriter(ctx, g.bucketName, objName, vol.CRC32CSum32, g.conf.UploadChunkSize) - defer w.Close() - _, err := io.Copy(w, vol) - if err != nil { + if _, err := io.Copy(w, vol); err != nil { + w.Close() helpers.AppLogger.Debugf("gs backend: Error while uploading volume %s - %v", vol.ObjectName, err) } - return err + return w.Close() } diff --git a/backup/backup.go b/backup/backup.go index 1d2e6ab..c8fc770 100644 --- a/backup/backup.go +++ b/backup/backup.go @@ -619,7 +619,7 @@ func retryUploadChainer(ctx context.Context, in <-chan *helpers.VolumeInfo, b ba be.MaxElapsedTime = j.MaxRetryTime retryconf := backoff.WithContext(be, ctx) - operation := volUploadWrapper(ctx, b, vol) + operation := volUploadWrapper(ctx, b, vol, prefix) if err := backoff.Retry(operation, retryconf); err != nil { helpers.AppLogger.Errorf("%s backend: Failed to upload volume %s due to error: %v", prefix, vol.ObjectName, err) return err @@ -642,14 +642,18 @@ func retryUploadChainer(ctx context.Context, in <-chan *helpers.VolumeInfo, b ba return out, gwg } -func volUploadWrapper(ctx context.Context, b backends.Backend, vol *helpers.VolumeInfo) func() error { +func volUploadWrapper(ctx context.Context, b backends.Backend, vol *helpers.VolumeInfo, prefix string) func() error { return func() error { if err := vol.OpenVolume(); err != nil { - helpers.AppLogger.Warningf("Error while opening volume %s - %v", vol.ObjectName, err) + helpers.AppLogger.Warningf("%s: Error while opening volume %s - %v", prefix, vol.ObjectName, err) return err } defer vol.Close() - return b.Upload(ctx, vol) + err := b.Upload(ctx, vol) + if err != nil { + helpers.AppLogger.Warningf("%s: Error while uploading volume %s - %v", prefix, vol.ObjectName, err) + } + return err } }