Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCN-458 handle specific decoding error when schema registry unavailable #133

20 changes: 20 additions & 0 deletions avroregistry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package avroregistry

import (
"fmt"
)

// UnavailableError reports an error when the schema registry is unavailable.
type UnavailableError struct {
Cause error
}

// Error implements the error interface.
func (m *UnavailableError) Error() string {
return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause)
Sharykhin marked this conversation as resolved.
Show resolved Hide resolved
}

// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors.
func (e *UnavailableError) Unwrap() error {
return e.Cause
}
34 changes: 34 additions & 0 deletions avroregistry/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package avroregistry_test

import (
"errors"
"testing"

qt "github.com/frankban/quicktest"

"github.com/heetch/avro/avroregistry"
)

func TestUnavailableError_Unwrap(t *testing.T) {
c := qt.New(t)
var ErrExpect = errors.New("error")

err := &avroregistry.UnavailableError{
Cause: ErrExpect,
}

c.Assert(errors.Is(err, ErrExpect), qt.IsTrue)

var newErr *avroregistry.UnavailableError
c.Assert(errors.As(err, &newErr), qt.IsTrue)
}

func TestUnavailableError_Error(t *testing.T) {
c := qt.New(t)

err := &avroregistry.UnavailableError{
Cause: errors.New("ECONNREFUSED"),
}

c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED")
}
28 changes: 20 additions & 8 deletions avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,39 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if !attempt.More() || !isTemporaryError(err) {
return err
return &UnavailableError{err}
}
continue
}
err = unmarshalResponse(req, resp, result)
if err == nil {
return nil
}
if !attempt.More() {
return err
}
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
// It's not a 5xx error. We want to retry on 5xx
if apiErr, ok := err.(*apiError); ok {
// We want to retry on 5xx
// errors, because the Confluent Avro registry
// can occasionally return them as a matter of
// course (and there could also be an
// unavailable service that we're reaching
// through a proxy).
if apiErr.StatusCode/100 == 5 {
err = &UnavailableError{apiErr}
} else {
return apiErr
}
} else {
// some 5XX response body cannot be decoded
// hence an *apiError is not returned
if resp.StatusCode/100 == 5 {
err = &UnavailableError{err}
}
}

if !attempt.More() {
return err
}
}

if attempt.Stopped() {
return ctx.Err()
}
Expand All @@ -228,13 +240,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil {
return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err)
}
return nil
}
var apiErr apiError
if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil {
return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err)
}
apiErr.StatusCode = resp.StatusCode
return &apiErr
Expand Down
31 changes: 28 additions & 3 deletions avroregistry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) {
c.Assert(id1, qt.Equals, id)
}

func TestSchemaRegistryUnavailableError(t *testing.T) {
c := qt.New(t)
ctx := context.Background()

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

}))

// close the server
testServer.Close()

registry, err := avroregistry.New(avroregistry.Params{
ServerURL: testServer.URL,
RetryStrategy: noRetry,
})
c.Assert(err, qt.IsNil)

type R struct {
X int
}

_, err = registry.Register(ctx, randomString(), schemaOf(nil, R{}))
c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*")
}

func TestRegisterWithEmptyStruct(t *testing.T) {
c := qt.New(t)

Expand Down Expand Up @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) {
c.Assert(err, qt.Equals, nil)
t0 := time.Now()
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`)
if d := time.Since(t0); d < 30*time.Millisecond {
c.Errorf("retry duration too small, want >=30ms got %v", d)
}
Expand Down Expand Up @@ -315,7 +340,7 @@ func TestRetryOn500(t *testing.T) {
// an error.
failCount = 5
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
}

func TestNoRetryOnNon5XXStatus(t *testing.T) {
Expand Down Expand Up @@ -367,7 +392,7 @@ func TestUnavailableError(t *testing.T) {
})
c.Assert(err, qt.Equals, nil)
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
}

var schemaEquivalenceTests = []struct {
Expand Down
2 changes: 1 addition & 1 deletion singledecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{
}
prog, err := c.getProgram(ctx, vt, wID)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal: %v", err)
return nil, fmt.Errorf("cannot unmarshal: %w", err)
}
return unmarshal(nil, body, prog, v)
}
Expand Down
Loading