Skip to content

Commit

Permalink
Merge pull request #133 from heetch/BCN-458-handle-specific-decoding-…
Browse files Browse the repository at this point in the history
…error-when-schema-registry-unavailable

BCN-458 handle specific decoding error when schema registry unavailable
  • Loading branch information
Sharykhin authored Apr 12, 2024
2 parents 6c34fe4 + d460ccd commit eaf42ab
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 12 deletions.
20 changes: 20 additions & 0 deletions avroregistry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package avroregistry

import (
"fmt"
)

// UnavailableError reports an error when the schema registry is unavailable.
type UnavailableError struct {
Cause error
}

// Error implements the error interface.
func (m *UnavailableError) Error() string {
return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause)
}

// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors.
func (e *UnavailableError) Unwrap() error {
return e.Cause
}
34 changes: 34 additions & 0 deletions avroregistry/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package avroregistry_test

import (
"errors"
"testing"

qt "github.com/frankban/quicktest"

"github.com/heetch/avro/avroregistry"
)

func TestUnavailableError_Unwrap(t *testing.T) {
c := qt.New(t)
var ErrExpect = errors.New("error")

err := &avroregistry.UnavailableError{
Cause: ErrExpect,
}

c.Assert(errors.Is(err, ErrExpect), qt.IsTrue)

var newErr *avroregistry.UnavailableError
c.Assert(errors.As(err, &newErr), qt.IsTrue)
}

func TestUnavailableError_Error(t *testing.T) {
c := qt.New(t)

err := &avroregistry.UnavailableError{
Cause: errors.New("ECONNREFUSED"),
}

c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED")
}
28 changes: 20 additions & 8 deletions avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,39 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if !attempt.More() || !isTemporaryError(err) {
return err
return &UnavailableError{err}
}
continue
}
err = unmarshalResponse(req, resp, result)
if err == nil {
return nil
}
if !attempt.More() {
return err
}
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
// It's not a 5xx error. We want to retry on 5xx
if apiErr, ok := err.(*apiError); ok {
// We want to retry on 5xx
// errors, because the Confluent Avro registry
// can occasionally return them as a matter of
// course (and there could also be an
// unavailable service that we're reaching
// through a proxy).
if apiErr.StatusCode/100 == 5 {
err = &UnavailableError{apiErr}
} else {
return apiErr
}
} else {
// some 5XX response body cannot be decoded
// hence an *apiError is not returned
if resp.StatusCode/100 == 5 {
err = &UnavailableError{err}
}
}

if !attempt.More() {
return err
}
}

if attempt.Stopped() {
return ctx.Err()
}
Expand All @@ -228,13 +240,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil {
return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err)
}
return nil
}
var apiErr apiError
if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil {
return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err)
return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err)
}
apiErr.StatusCode = resp.StatusCode
return &apiErr
Expand Down
31 changes: 28 additions & 3 deletions avroregistry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) {
c.Assert(id1, qt.Equals, id)
}

func TestSchemaRegistryUnavailableError(t *testing.T) {
c := qt.New(t)
ctx := context.Background()

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

}))

// close the server
testServer.Close()

registry, err := avroregistry.New(avroregistry.Params{
ServerURL: testServer.URL,
RetryStrategy: noRetry,
})
c.Assert(err, qt.IsNil)

type R struct {
X int
}

_, err = registry.Register(ctx, randomString(), schemaOf(nil, R{}))
c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*")
}

func TestRegisterWithEmptyStruct(t *testing.T) {
c := qt.New(t)

Expand Down Expand Up @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) {
c.Assert(err, qt.Equals, nil)
t0 := time.Now()
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`)
if d := time.Since(t0); d < 30*time.Millisecond {
c.Errorf("retry duration too small, want >=30ms got %v", d)
}
Expand Down Expand Up @@ -315,7 +340,7 @@ func TestRetryOn500(t *testing.T) {
// an error.
failCount = 5
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`)
}

func TestNoRetryOnNon5XXStatus(t *testing.T) {
Expand Down Expand Up @@ -367,7 +392,7 @@ func TestUnavailableError(t *testing.T) {
})
c.Assert(err, qt.Equals, nil)
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`)
}

var schemaEquivalenceTests = []struct {
Expand Down
2 changes: 1 addition & 1 deletion singledecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{
}
prog, err := c.getProgram(ctx, vt, wID)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal: %v", err)
return nil, fmt.Errorf("cannot unmarshal: %w", err)
}
return unmarshal(nil, body, prog, v)
}
Expand Down

0 comments on commit eaf42ab

Please sign in to comment.