Skip to content

Commit

Permalink
return specific schema registry unavailable error.
Browse files Browse the repository at this point in the history
  • Loading branch information
hchienjo committed Feb 20, 2024
1 parent 6c34fe4 commit 6ecad9f
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
15 changes: 15 additions & 0 deletions avroregistry/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package avroregistry

import (
"fmt"
)

// UnavailableError reports an error when the schema registry is unavailable.
type UnavailableError struct {
Cause error
}

// Error implements the error interface.
func (m *UnavailableError) Error() string {
return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause)
}
4 changes: 2 additions & 2 deletions avroregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if !attempt.More() || !isTemporaryError(err) {
return err
return &UnavailableError{err}
}
continue
}
Expand All @@ -201,7 +201,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error {
if !attempt.More() {
return err
}
if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 {
if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError {
// It's not a 5xx error. We want to retry on 5xx
// errors, because the Confluent Avro registry
// can occasionally return them as a matter of
Expand Down
27 changes: 26 additions & 1 deletion avroregistry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) {
c.Assert(id1, qt.Equals, id)
}

func TestSchemaRegistryUnavailableError(t *testing.T) {
c := qt.New(t)
ctx := context.Background()

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

}))

// close the server
testServer.Close()

registry, err := avroregistry.New(avroregistry.Params{
ServerURL: testServer.URL,
RetryStrategy: noRetry,
})
c.Assert(err, qt.IsNil)

type R struct {
X int
}

_, err = registry.Register(ctx, randomString(), schemaOf(nil, R{}))
c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*")
}

func TestRegisterWithEmptyStruct(t *testing.T) {
c := qt.New(t)

Expand Down Expand Up @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) {
c.Assert(err, qt.Equals, nil)
t0 := time.Now()
err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive)
c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`)
c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`)
if d := time.Since(t0); d < 30*time.Millisecond {
c.Errorf("retry duration too small, want >=30ms got %v", d)
}
Expand Down

0 comments on commit 6ecad9f

Please sign in to comment.