From 6ecad9f6b68b9e113f6b8270bbe30d36c5389d9f Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 20 Feb 2024 15:05:38 +0300 Subject: [PATCH 1/9] return specific schema registry unavailable error. --- avroregistry/errors.go | 15 +++++++++++++++ avroregistry/registry.go | 4 ++-- avroregistry/registry_test.go | 27 ++++++++++++++++++++++++++- 3 files changed, 43 insertions(+), 3 deletions(-) create mode 100644 avroregistry/errors.go diff --git a/avroregistry/errors.go b/avroregistry/errors.go new file mode 100644 index 0000000..5f4e2ea --- /dev/null +++ b/avroregistry/errors.go @@ -0,0 +1,15 @@ +package avroregistry + +import ( + "fmt" +) + +// UnavailableError reports an error when the schema registry is unavailable. +type UnavailableError struct { + Cause error +} + +// Error implements the error interface. +func (m *UnavailableError) Error() string { + return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) +} diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 6ef714c..e8a4d57 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -190,7 +190,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { resp, err := http.DefaultClient.Do(req) if err != nil { if !attempt.More() || !isTemporaryError(err) { - return err + return &UnavailableError{err} } continue } @@ -201,7 +201,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if !attempt.More() { return err } - if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 { + if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { // It's not a 5xx error. We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index fc47aef..d929d54 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -36,6 +36,31 @@ func TestRegister(t *testing.T) { c.Assert(id1, qt.Equals, id) } +func TestSchemaRegistryUnavailableError(t *testing.T) { + c := qt.New(t) + ctx := context.Background() + + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + + // close the server + testServer.Close() + + registry, err := avroregistry.New(avroregistry.Params{ + ServerURL: testServer.URL, + RetryStrategy: noRetry, + }) + c.Assert(err, qt.IsNil) + + type R struct { + X int + } + + _, err = registry.Register(ctx, randomString(), schemaOf(nil, R{})) + c.Assert(err, qt.ErrorMatches, "schema registry unavailability caused by: .*") +} + func TestRegisterWithEmptyStruct(t *testing.T) { c := qt.New(t) @@ -255,7 +280,7 @@ func TestRetryOnError(t *testing.T) { c.Assert(err, qt.Equals, nil) t0 := time.Now() err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Put "?http://0.1.2.3/config/x"?: temporary test error true`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Put "?http://0.1.2.3/config/x"?: temporary test error true`) if d := time.Since(t0); d < 30*time.Millisecond { c.Errorf("retry duration too small, want >=30ms got %v", d) } From ce50231659c9ade85655aa3bdfe4c36b8a203c63 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Thu, 22 Feb 2024 09:35:32 +0300 Subject: [PATCH 2/9] wrap errors. --- singledecoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singledecoder.go b/singledecoder.go index 4e0e674..5538982 100644 --- a/singledecoder.go +++ b/singledecoder.go @@ -92,7 +92,7 @@ func (c *SingleDecoder) Unmarshal(ctx context.Context, data []byte, x interface{ } prog, err := c.getProgram(ctx, vt, wID) if err != nil { - return nil, fmt.Errorf("cannot unmarshal: %v", err) + return nil, fmt.Errorf("cannot unmarshal: %w", err) } return unmarshal(nil, body, prog, v) } From 5753d28df13000ea068ce46cc1f17128a0c9a383 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Mon, 4 Mar 2024 13:19:06 +0300 Subject: [PATCH 3/9] return unavailable errors for proxy errors too. --- avroregistry/registry.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index e8a4d57..f53022d 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -199,7 +199,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { return nil } if !attempt.More() { - return err + return &UnavailableError{err} } if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { // It's not a 5xx error. We want to retry on 5xx @@ -208,6 +208,9 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { // course (and there could also be an // unavailable service that we're reaching // through a proxy). + if !attempt.More() { + return &UnavailableError{err} + } return err } } From c82aacb192124cdf2399e73788d96f6eb30f36da Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Mon, 4 Mar 2024 13:43:13 +0300 Subject: [PATCH 4/9] registry unavailable error for 5XX status code. Of course after exhausing all retries. --- avroregistry/registry.go | 28 +++++++++++++++++++--------- avroregistry/registry_test.go | 4 ++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index f53022d..ca1d6f8 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -198,20 +198,30 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if !attempt.More() { - return &UnavailableError{err} - } - if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { - // It's not a 5xx error. We want to retry on 5xx + if err, ok := err.(*apiError); ok { + // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). - if !attempt.More() { + switch err.StatusCode { + case http.StatusInternalServerError: + if !attempt.More() { + return &UnavailableError{err} + } + default: + return err + } + } + + if !attempt.More() { + switch resp.StatusCode { + case http.StatusInternalServerError: return &UnavailableError{err} + default: + return err } - return err } } if attempt.Stopped() { @@ -231,13 +241,13 @@ func unmarshalResponse(req *http.Request, resp *http.Response, result interface{ defer resp.Body.Close() if resp.StatusCode == http.StatusOK { if err := httprequest.UnmarshalJSONResponse(resp, result); err != nil { - return fmt.Errorf("cannot unmarshal JSON response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON response from %v: %w", req.URL, err) } return nil } var apiErr apiError if err := httprequest.UnmarshalJSONResponse(resp, &apiErr); err != nil { - return fmt.Errorf("cannot unmarshal JSON error response from %v: %v", req.URL, err) + return fmt.Errorf("cannot unmarshal JSON error response from %v: %w", req.URL, err) } apiErr.StatusCode = resp.StatusCode return &apiErr diff --git a/avroregistry/registry_test.go b/avroregistry/registry_test.go index d929d54..43858e2 100644 --- a/avroregistry/registry_test.go +++ b/avroregistry/registry_test.go @@ -340,7 +340,7 @@ func TestRetryOn500(t *testing.T) { // an error. failCount = 5 err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: Avro registry error \(code 50001; HTTP status 500\): Failed to update compatibility level`) } func TestNoRetryOnNon5XXStatus(t *testing.T) { @@ -392,7 +392,7 @@ func TestUnavailableError(t *testing.T) { }) c.Assert(err, qt.Equals, nil) err = registry.SetCompatibility(context.Background(), "x", avro.BackwardTransitive) - c.Assert(err, qt.ErrorMatches, `cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) + c.Assert(err, qt.ErrorMatches, `schema registry unavailability caused by: cannot unmarshal JSON error response from .*/config/x: unexpected content type text/html; want application/json; content: 502 Proxy Error; Proxy Error; The whole world is bogus`) } var schemaEquivalenceTests = []struct { From d32fab68a814c54a36d17ee4a017dbcecbd82a28 Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 07:43:27 +0300 Subject: [PATCH 5/9] refactor logic. --- avroregistry/registry.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index ca1d6f8..28183ed 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -198,32 +198,25 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if err == nil { return nil } - if err, ok := err.(*apiError); ok { + if apiErr, ok := err.(*apiError); ok { // We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of // course (and there could also be an // unavailable service that we're reaching // through a proxy). - switch err.StatusCode { - case http.StatusInternalServerError: - if !attempt.More() { - return &UnavailableError{err} - } - default: - return err + if apiErr.StatusCode/100 == 5 { + return apiErr + } else { + err = &UnavailableError{apiErr} } } if !attempt.More() { - switch resp.StatusCode { - case http.StatusInternalServerError: - return &UnavailableError{err} - default: - return err - } + return err } } + if attempt.Stopped() { return ctx.Err() } From a9112539a2b1a3731ef7978dcc6fbc7d2489aa4a Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 07:49:21 +0300 Subject: [PATCH 6/9] fix error handling logic. --- avroregistry/registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 28183ed..bb04e40 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -206,9 +206,9 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { // unavailable service that we're reaching // through a proxy). if apiErr.StatusCode/100 == 5 { - return apiErr - } else { err = &UnavailableError{apiErr} + } else { + return apiErr } } From 4f1eb1e71177631c19f785dcf62bcb5c0fdfe40c Mon Sep 17 00:00:00 2001 From: Harrison Sunda Date: Tue, 5 Mar 2024 08:07:19 +0300 Subject: [PATCH 7/9] Handle proxy errors with non-json response. --- avroregistry/registry.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/avroregistry/registry.go b/avroregistry/registry.go index bb04e40..97ff39e 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -210,6 +210,12 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { } else { return apiErr } + } else { + // some 5XX response body cannot be decoded + // hence an *apiError is not returned + if resp.StatusCode/100 == 5 { + err = &UnavailableError{err} + } } if !attempt.More() { From bed04dd2adca9a27422b4d43e175bfdc4d14d152 Mon Sep 17 00:00:00 2001 From: Siarhei Sharykhin Date: Wed, 10 Apr 2024 08:46:28 +0200 Subject: [PATCH 8/9] [BCN-458] Added Unwrap method --- avroregistry/errors.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/avroregistry/errors.go b/avroregistry/errors.go index 5f4e2ea..6feddf9 100644 --- a/avroregistry/errors.go +++ b/avroregistry/errors.go @@ -13,3 +13,8 @@ type UnavailableError struct { func (m *UnavailableError) Error() string { return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) } + +// Unwrap unwraps and return Cause error. It is needed to properly handle %w usage in fmt.Errorf cases. +func (e *UnavailableError) Unwrap() error { + return e.Cause +} From d460ccdd01401a427b005e63cae4650890b251ca Mon Sep 17 00:00:00 2001 From: Siarhei Sharykhin Date: Wed, 10 Apr 2024 08:54:40 +0200 Subject: [PATCH 9/9] [BCN-458] Added unit tests for unwrap and error methods --- avroregistry/errors.go | 2 +- avroregistry/errors_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 avroregistry/errors_test.go diff --git a/avroregistry/errors.go b/avroregistry/errors.go index 6feddf9..f1923f1 100644 --- a/avroregistry/errors.go +++ b/avroregistry/errors.go @@ -14,7 +14,7 @@ func (m *UnavailableError) Error() string { return fmt.Sprintf("schema registry unavailability caused by: %v", m.Cause) } -// Unwrap unwraps and return Cause error. It is needed to properly handle %w usage in fmt.Errorf cases. +// Unwrap unwraps and return Cause error. It is needed to properly handle and compare errors. func (e *UnavailableError) Unwrap() error { return e.Cause } diff --git a/avroregistry/errors_test.go b/avroregistry/errors_test.go new file mode 100644 index 0000000..d83bd81 --- /dev/null +++ b/avroregistry/errors_test.go @@ -0,0 +1,34 @@ +package avroregistry_test + +import ( + "errors" + "testing" + + qt "github.com/frankban/quicktest" + + "github.com/heetch/avro/avroregistry" +) + +func TestUnavailableError_Unwrap(t *testing.T) { + c := qt.New(t) + var ErrExpect = errors.New("error") + + err := &avroregistry.UnavailableError{ + Cause: ErrExpect, + } + + c.Assert(errors.Is(err, ErrExpect), qt.IsTrue) + + var newErr *avroregistry.UnavailableError + c.Assert(errors.As(err, &newErr), qt.IsTrue) +} + +func TestUnavailableError_Error(t *testing.T) { + c := qt.New(t) + + err := &avroregistry.UnavailableError{ + Cause: errors.New("ECONNREFUSED"), + } + + c.Assert(err.Error(), qt.Equals, "schema registry unavailability caused by: ECONNREFUSED") +}