Skip to content

Commit

Permalink
Merge pull request #68 from Azure/opt_zc_align
Browse files Browse the repository at this point in the history
Optimize retry and fix SAS recourse type.
  • Loading branch information
jiacfan authored Sep 5, 2018
2 parents 4df8286 + 34f42e3 commit b3f0c7d
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 17 deletions.
8 changes: 3 additions & 5 deletions 2017-07-29/azblob/highlevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func UploadFileToBlockBlob(ctx context.Context, file *os.File,

///////////////////////////////////////////////////////////////////////////////


const BlobDefaultDownloadBlockSize = int64(4 * 1024 * 1024) // 4MB

// DownloadFromAzureFileOptions identifies options used by the DownloadAzureFileToBuffer and DownloadAzureFileToFile functions.
Expand Down Expand Up @@ -211,11 +210,11 @@ func downloadBlobToBuffer(ctx context.Context, blobURL BlobURL, offset int64, co

err := doBatchTransfer(ctx, batchTransferOptions{
operationName: "downloadBlobToBuffer",
transferSize: count,
transferSize: count,
chunkSize: o.BlockSize,
parallelism: o.Parallelism,
operation: func(chunkStart int64, count int64) error {
dr, err := blobURL.Download(ctx, chunkStart+ offset, count, ac, false)
dr, err := blobURL.Download(ctx, chunkStart+offset, count, ac, false)
body := dr.Body(o.RetryReaderOptionsPerBlock)
if o.Progress != nil {
rangeProgress := int64(0)
Expand Down Expand Up @@ -296,7 +295,6 @@ func DownloadBlobToFile(ctx context.Context, blobURL BlobURL, offset int64, coun
}
}


///////////////////////////////////////////////////////////////////////////////

// BatchTransferOptions identifies options used by doBatchTransfer.
Expand Down Expand Up @@ -416,7 +414,7 @@ func (t *uploadStreamToBlockBlobOptions) end(ctx context.Context) (interface{},
}
// Multiple blocks staged, commit them all now
blockID := newUuidBlockID(t.blockIDPrefix)
blockIDs := make([]string, t.maxBlockNum + 1)
blockIDs := make([]string, t.maxBlockNum+1)
for bn := uint32(0); bn <= t.maxBlockNum; bn++ {
blockIDs[bn] = blockID.WithBlockNumber(bn).ToBase64()
}
Expand Down
118 changes: 108 additions & 10 deletions 2018-03-28/azblob/zc_policy_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package azblob

import (
"context"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"io/ioutil"
"io"
)

// RetryPolicy tells the pipeline what kind of retry policy to use. See the RetryPolicy* constants.
Expand Down Expand Up @@ -57,11 +58,11 @@ type RetryOptions struct {
// If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host.
// NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent
// data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs
RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
}

func (o RetryOptions) retryReadsFromSecondaryHost() string {
return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only
return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only
//return "" // This is for non-blob SDKs
}

Expand Down Expand Up @@ -221,9 +222,27 @@ func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
considerSecondary = false
action = "Retry: Secondary URL returned 404"
case err != nil:
// NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation
if netErr, ok := err.(net.Error); ok && (netErr.Temporary() || netErr.Timeout()) {
action = "Retry: net.Error and Temporary() or Timeout()"
// NOTE: Protocol Responder returns non-nil if REST API returns invalid status code for the invoked operation.
// Use ServiceCode to verify if the error is related to storage service-side,
// ServiceCode is set only when error related to storage service happened.
if stErr, ok := err.(StorageError); ok {
if stErr.Temporary() {
action = "Retry: StorageError with error service code and Temporary()"
} else if stErr.Response() != nil && isSuccessStatusCode(stErr.Response()) { // TODO: This is a temporarily work around, remove this after protocol layer fix the issue that net.Error is wrapped as storageError
action = "Retry: StorageError with success status code"
} else {
action = "NoRetry: StorageError not Temporary() and without retriable status code"
}
} else if netErr, ok := err.(net.Error); ok {
// Use non-retriable net.Error list, but not retriable list.
// As there are errors without Temporary() implementation,
// while need be retried, like 'connection reset by peer', 'transport connection broken' and etc.
// So the SDK do retry for most of the case, unless the error should not be retried for sure.
if !isNotRetriable(netErr) {
action = "Retry: net.Error and not in the non-retriable list"
} else {
action = "NoRetry: net.Error and in the non-retriable list"
}
} else {
action = "NoRetry: unrecognized error"
}
Expand All @@ -237,11 +256,18 @@ func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
if err != nil {
tryCancel() // If we're returning an error, cancel this current/last per-retry timeout context
} else {
// TODO: Right now, we've decided to leak the per-try Context until the user's Context is canceled.
// Another option is that we wrap the last per-try context in a body and overwrite the Response's Body field with our wrapper.
// We wrap the last per-try context in a body and overwrite the Response's Body field with our wrapper.
// So, when the user closes the Body, the our per-try context gets closed too.
// Another option, is that the Last Policy do this wrapping for a per-retry context (not for the user's context)
_ = tryCancel // So, for now, we don't call cancel: cancel()
if response == nil || response.Response() == nil {
// We do panic in the case response or response.Response() is nil,
// as for client, the response should not be nil if request is sent and the operations is executed successfully.
// Another option, is that execute the cancel function when response or response.Response() is nil,
// as in this case, current per-try has nothing to do in future.
panic("invalid state, response should not be nil when the operation is executed successfully")
}

response.Response().Body = &contextCancelReadCloser{cf: tryCancel, body: response.Response().Body}
}
break // Don't retry
}
Expand All @@ -259,6 +285,78 @@ func NewRetryPolicyFactory(o RetryOptions) pipeline.Factory {
})
}

// contextCancelReadCloser helps to invoke context's cancelFunc properly when the ReadCloser is closed.
type contextCancelReadCloser struct {
cf context.CancelFunc
body io.ReadCloser
}

func (rc *contextCancelReadCloser) Read(p []byte) (n int, err error) {
return rc.body.Read(p)
}

func (rc *contextCancelReadCloser) Close() error {
err := rc.body.Close()
if rc.cf != nil {
rc.cf()
}
return err
}

// isNotRetriable checks if the provided net.Error isn't retriable.
func isNotRetriable(errToParse net.Error) bool {
// No error, so this is NOT retriable.
if errToParse == nil {
return true
}

// The error is either temporary or a timeout so it IS retriable (not not retriable).
if errToParse.Temporary() || errToParse.Timeout() {
return false
}

genericErr := error(errToParse)

// From here all the error are neither Temporary() nor Timeout().
switch err := errToParse.(type) {
case *net.OpError:
// The net.Error is also a net.OpError but the inner error is nil, so this is not retriable.
if err.Err == nil {
return true
}
genericErr = err.Err
}

switch genericErr.(type) {
case *net.AddrError, net.UnknownNetworkError, *net.DNSError, net.InvalidAddrError, *net.ParseError, *net.DNSConfigError:
// If the error is one of the ones listed, then it is NOT retriable.
return true
}

// If it's invalid header field name/value error thrown by http module, then it is NOT retriable.
// This could happen when metadata's key or value is invalid. (RoundTrip in transport.go)
if strings.Contains(genericErr.Error(), "invalid header field") {
return true
}

// Assume the error is retriable.
return false
}

var successStatusCodes = []int{http.StatusOK, http.StatusCreated, http.StatusAccepted, http.StatusNoContent, http.StatusPartialContent}

func isSuccessStatusCode(resp *http.Response) bool {
if resp == nil {
return false
}
for _, i := range successStatusCodes {
if i == resp.StatusCode {
return true
}
}
return false
}

// According to https://github.com/golang/go/wiki/CompilerOptimizations, the compiler will inline this method and hopefully optimize all calls to it away
var logf = func(format string, a ...interface{}) {}

Expand Down
2 changes: 1 addition & 1 deletion 2018-03-28/azblob/zc_retry_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *retryReader) Read(p []byte) (n int, err error) {
return n, err // All retries exhausted
}

if netErr, ok := err.(net.Error); ok && (netErr.Timeout() || netErr.Temporary()) {
if _, ok := err.(net.Error); ok {
continue
// Loop around and try to get and read from new stream.
}
Expand Down
2 changes: 1 addition & 1 deletion 2018-03-28/azblob/zc_sas_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (rt *AccountSASResourceTypes) Parse(s string) error {
switch r {
case 's':
rt.Service = true
case 'q':
case 'c':
rt.Container = true
case 'o':
rt.Object = true
Expand Down

0 comments on commit b3f0c7d

Please sign in to comment.