Skip to content

Commit

Permalink
Merge pull request #784 from minguyen9988/awiik
Browse files Browse the repository at this point in the history
Improve GCP uploading
  • Loading branch information
Slach authored Nov 19, 2023
2 parents ab7f339 + 97aa700 commit b212bda
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 13 deletions.
1 change: 1 addition & 0 deletions ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ gcs:
# GCS_CUSTOM_STORAGE_CLASS_MAP, allow setup storage class depends on backup name regexp pattern, format nameRegexp > className
custom_storage_class_map: {}
debug: false # GCS_DEBUG
force_http: false # GCS_FORCE_HTTP
cos:
url: "" # COS_URL
timeout: 2m # COS_TIMEOUT
Expand Down
17 changes: 15 additions & 2 deletions pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/Altinity/clickhouse-backup/pkg/custom"
"github.com/Altinity/clickhouse-backup/pkg/resumable"
"github.com/Altinity/clickhouse-backup/pkg/status"
"github.com/Altinity/clickhouse-backup/pkg/storage"
"github.com/eapache/go-resiliency/retrier"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -479,7 +480,13 @@ func (b *Backuper) uploadBackupRelatedDir(ctx context.Context, localBackupRelate
if err != nil {
return 0, fmt.Errorf("can't RBAC or config upload compressed %s: %v", destinationRemote, err)
}
remoteUploaded, err := b.dst.StatFile(ctx, destinationRemote)

var remoteUploaded storage.RemoteFile
retry = retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
err = retry.RunCtx(ctx, func(ctx context.Context) error {
remoteUploaded, err = b.dst.StatFile(ctx, destinationRemote)
return err
})
if err != nil {
return 0, fmt.Errorf("can't check uploaded destinationRemote: %s, error: %v", destinationRemote, err)
}
Expand Down Expand Up @@ -577,7 +584,13 @@ breakByError:
log.Errorf("UploadCompressedStream return error: %v", err)
return fmt.Errorf("can't upload: %v", err)
}
remoteFile, err := b.dst.StatFile(ctx, remoteDataFile)

var remoteFile storage.RemoteFile
retry = retrier.New(retrier.ConstantBackoff(b.cfg.General.RetriesOnFailure, b.cfg.General.RetriesDuration), nil)
err = retry.RunCtx(ctx, func(ctx context.Context) error {
remoteFile, err = b.dst.StatFile(ctx, remoteDataFile)
return err
})
if err != nil {
return fmt.Errorf("can't check uploaded remoteDataFile: %s, error: %v", remoteDataFile, err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type GCSConfig struct {
CompressionLevel int `yaml:"compression_level" envconfig:"GCS_COMPRESSION_LEVEL"`
CompressionFormat string `yaml:"compression_format" envconfig:"GCS_COMPRESSION_FORMAT"`
Debug bool `yaml:"debug" envconfig:"GCS_DEBUG"`
ForceHttp bool `yaml:"force_http" envconfig:"GCS_FORCE_HTTP"`
Endpoint string `yaml:"endpoint" envconfig:"GCS_ENDPOINT"`
StorageClass string `yaml:"storage_class" envconfig:"GCS_STORAGE_CLASS"`
ObjectLabels map[string]string `yaml:"object_labels" envconfig:"GCS_OBJECT_LABELS"`
Expand Down
79 changes: 68 additions & 11 deletions pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package storage

import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"path"
"strings"
Expand Down Expand Up @@ -65,6 +67,19 @@ func (gcs *GCS) Kind() string {
return "GCS"
}

type rewriteTransport struct {
base http.RoundTripper
}

// forces requests to target varnish and use HTTP, required to get uploading
// via varnish working
func (r rewriteTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if req.URL.Scheme == "https" {
req.URL.Scheme = "http"
}
return r.base.RoundTrip(req)
}

// Connect - connect to GCS
func (gcs *GCS) Connect(ctx context.Context) error {
var err error
Expand All @@ -85,6 +100,47 @@ func (gcs *GCS) Connect(ctx context.Context) error {
clientOptions = append(clientOptions, option.WithCredentialsFile(gcs.Config.CredentialsFile))
}

if gcs.Config.ForceHttp {
customTransport := &http.Transport{
WriteBufferSize: 8388608,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
// must set ForceAttemptHTTP2 to false so that when a custom TLSClientConfig
// is provided Golang does not setup HTTP/2 transport
customTransport.ForceAttemptHTTP2 = false
customTransport.TLSClientConfig = &tls.Config{
NextProtos: []string{"http/1.1"},
}
// These clientOptions are passed in by storage.NewClient. However, to set a custom HTTP client
// we must pass all these in manually.

if gcs.Config.Endpoint == "" {
clientOptions = append([]option.ClientOption{option.WithScopes(storage.ScopeFullControl)}, clientOptions...)
}
clientOptions = append(clientOptions, internaloption.WithDefaultEndpoint(endpoint))

customRountripper := &rewriteTransport{base: customTransport}
gcpTransport, _, err := googleHTTPTransport.NewClient(ctx, clientOptions...)
transport, err := googleHTTPTransport.NewTransport(ctx, customRountripper, clientOptions...)
gcpTransport.Transport = transport
if err != nil {
return fmt.Errorf("failed to create GCP transport: %v", err)
}

clientOptions = append(clientOptions, option.WithHTTPClient(gcpTransport))

}

if gcs.Config.Debug {
if gcs.Config.Endpoint == "" {
clientOptions = append([]option.ClientOption{option.WithScopes(storage.ScopeFullControl)}, clientOptions...)
Expand Down Expand Up @@ -114,7 +170,7 @@ func (gcs *GCS) Connect(ctx context.Context) error {
nil
})
gcs.clientPool = pool.NewObjectPoolWithDefaultConfig(ctx, factory)
gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize
gcs.clientPool.Config.MaxTotal = gcs.Config.ClientPoolSize * 3
gcs.client, err = storage.NewClient(ctx, clientOptions...)
return err
}
Expand Down Expand Up @@ -199,27 +255,28 @@ func (gcs *GCS) PutFile(ctx context.Context, key string, r io.ReadCloser) error
pClient := pClientObj.(*clientObject).Client
key = path.Join(gcs.Config.Path, key)
obj := pClient.Bucket(gcs.Config.Bucket).Object(key)

writer := obj.NewWriter(ctx)
writer.StorageClass = gcs.Config.StorageClass
writer.ChunkRetryDeadline = 60 * time.Minute
if len(gcs.Config.ObjectLabels) > 0 {
writer.Metadata = gcs.Config.ObjectLabels
}
defer func() {
if err := writer.Close(); err != nil {
log.Warnf("gcs.PutFile: can't close writer: %+v", err)
if err = gcs.clientPool.InvalidateObject(ctx, pClientObj); err != nil {
log.Warnf("gcs.PutFile: gcs.clientPool.InvalidateObject error: %+v", err)
}
return
}
if err = gcs.clientPool.ReturnObject(ctx, pClientObj); err != nil {
if err := gcs.clientPool.ReturnObject(ctx, pClientObj); err != nil {
log.Warnf("gcs.PutFile: gcs.clientPool.ReturnObject error: %+v", err)
}
}()
buffer := make([]byte, 128*1024)
_, err = io.CopyBuffer(writer, r, buffer)
return err
if err != nil {
log.Warnf("gcs.PutFile: can't copy buffer: %+v", err)
return err
}
if err = writer.Close(); err != nil {
log.Warnf("gcs.PutFile: can't close writer: %+v", err)
return err
}
return nil
}

func (gcs *GCS) StatFile(ctx context.Context, key string) (RemoteFile, error) {
Expand Down

0 comments on commit b212bda

Please sign in to comment.