diff --git a/avroregistry/errors.go b/avroregistry/errors.go new file mode 100644 index 0000000..f1923f1 --- /dev/null +++ b/avroregistry/errors.go @@ -0,0 +1,20 @@ +package avroregistry + +import ( + "fmt" +) + +// UnavailableError reports an error when the schema registry is unavailable. +type UnavailableError struct { + Cause error +} + +// Error implements the error interface. +func (m *UnavailableError) Error() string { + return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) +} + +// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors. +func (e *UnavailableError) Unwrap() error { + return e.Cause +} diff --git a/avroregistry/errors_test.go b/avroregistry/errors_test.go new file mode 100644 index 0000000..d83bd81 --- /dev/null +++ b/avroregistry/errors_test.go @@ -0,0 +1,34 @@ +package avroregistry_test + +import ( + "errors" + "testing" + + qt "github.com/frankban/quicktest" + + "github.com/heetch/avro/avroregistry" +) + +func TestUnavailableError_Unwrap(t *testing.T) { + c := qt.New(t) + var ErrExpect = errors.New("error") + + err := &avroregistry.UnavailableError{ + Cause: ErrExpect, + } + + c.Assert(errors.Is(err, ErrExpect), qt.IsTrue) + + var newErr *avroregistry.UnavailableError + c.Assert(errors.As(err, &newErr), qt.IsTrue) +} + +func TestUnavailableError_Error(t *testing.T) { + c := qt.New(t) + + err := &avroregistry.UnavailableError{ + Cause: errors.New("ECONNREFUSED"), + } + + c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED") +} diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 6ef714c..97ff39e 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -190,7 +190,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { resp, err := http.DefaultClient.Do(req) if err != nil { if !attempt.More() || !isTemporaryError(err) { - return err + return &UnavailableError{err} } continue } @@ -198,19 +198,31 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if !attempt.More() { - return err - } - if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 { - // It's not a 5xx error. We want to retry on 5xx + if apiErr, ok := err.(*apiError); ok { + // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). + if apiErr.StatusCode/100 == 5 { + err = &UnavailableError{apiErr} + } else { + return apiErr + } + } else { + // some 5XX response body cannot be decoded + // hence an *apiError is not returned + if resp.StatusCode/100 == 5 { + err = &UnavailableError{err} + } + } + + if !attempt.More() { return err } } + if attempt.Stopped() { return ctx.Err() } @@ -228,13 +240,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{ defer resp.Body.Close() if resp.StatusCode == http.StatusOK { if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil { - return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err) } return nil } var apiErr apiError if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil { - return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err) } apiErr.StatusCode = resp.StatusCode return &apiErr diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index fc47aef..43858e2 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) { c.Assert(id1, qt.Equals, id) } +func TestSchemaRegistryUnavailableError(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + + // close the server + testServer.Close() + + registry, err := avroregistry.New(avroregistry.Params{ + ServerURL: testServer.URL, + RetryStrategy: noRetry, + }) + c.Assert(err, qt.IsNil) + + type R struct { + X int + } + + _, err = registry.Register(ctx, randomString(), schemaOf(nil, R{})) + c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*") +} + func TestRegisterWithEmptyStruct(t *testing.T) { c := qt.New(t) @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) { c.Assert(err, qt.Equals, nil) t0 := time.Now() err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`) if d := time.Since(t0); d < 30*time.Millisecond { c.Errorf("retry duration too small, want >=30ms got %v", d) } @@ -315,7 +340,7 @@ func TestRetryOn500(t *testing.T) { // an error. failCount = 5 err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) } func TestNoRetryOnNon5XXStatus(t *testing.T) { @@ -367,7 +392,7 @@ func TestUnavailableError(t *testing.T) { }) c.Assert(err, qt.Equals, nil) err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) } var schemaEquivalenceTests = []struct { diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..5538982 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -92,7 +92,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{ } prog, err := c.getProgram(ctx, vt, wID) if err != nil { - return nil, fmt.Errorf("cannot unmarshal: %v", err) + return nil, fmt.Errorf("cannot unmarshal: %w", err) } return unmarshal(nil, body, prog, v) }