diff --git a/avroregistry/message.go b/avroregistry/message.go index 30eca8f..d58e40e 100644 --- a/avroregistry/message.go +++ b/avroregistry/message.go @@ -6,6 +6,8 @@ import ( "encoding/binary" "encoding/json" "fmt" + "log" + "net/http" "github.com/heetch/avro" ) @@ -74,7 +76,8 @@ func (r decodingRegistry) DecodeSchemaID(msg []byte) (int64, []byte) { // // See https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id func (r decodingRegistry) SchemaForID(ctx context.Context, id int64) (*avro.Type, error) { - req := r.r.newRequest(ctx, "GET", fmt.Sprintf("/schemas/ids/%d", id), nil) + req := r.r.newRequest(ctx, http.MethodGet, fmt.Sprintf("/schemas/ids/%d", id), nil) + log.Printf("=======> %d getting schema with ID: %s", id, req.URL.String()) var resp struct { Schema string `json:"schema"` } diff --git a/avroregistry/registry.go b/avroregistry/registry.go index 6ef714c..a2b113d 100644 --- a/avroregistry/registry.go +++ b/avroregistry/registry.go @@ -6,11 +6,15 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" + "log" "net/http" "net/url" + "os" "strconv" + "syscall" "time" "github.com/heetch/avro" @@ -102,7 +106,7 @@ func (r *Registry) Register(ctx context.Context, subject string, schema *avro.Ty if err != nil { return 0, err } - req := r.newRequest(ctx, "POST", fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data)) + req := r.newRequest(ctx, http.MethodPost, fmt.Sprintf("/subjects/%s/versions", subject), bytes.NewReader(data)) var resp struct { ID int64 `json:"id"` } @@ -122,14 +126,14 @@ func (r *Registry) SetCompatibility(ctx context.Context, subject string, mode av if err != nil { return err } - return r.doRequest(r.newRequest(ctx, "PUT", "/config/"+subject, bytes.NewReader(data)), nil) + return r.doRequest(r.newRequest(ctx, http.MethodPut, "/config/"+subject, bytes.NewReader(data)), nil) } // DeleteSubject deletes the given subject from the registry. // // See https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject) func (r *Registry) DeleteSubject(ctx context.Context, subject string) error { - return r.doRequest(r.newRequest(ctx, "DELETE", "/subjects/"+subject, nil), nil) + return r.doRequest(r.newRequest(ctx, http.MethodDelete, "/subjects/"+subject, nil), nil) } // Schema gets a specific version of the schema registered under this subject @@ -166,7 +170,17 @@ func validateVersion(version string) error { } func (r *Registry) newRequest(ctx context.Context, method string, urlStr string, body io.Reader) *http.Request { - req, err := http.NewRequestWithContext(ctx, method, r.params.ServerURL+urlStr, body) + var url string + if method == http.MethodGet { + if u, ok := os.LookupEnv("OVERRIDE_REGISTRY"); ok { + url = u + } else { + url = r.params.ServerURL + } + } else { + url = r.params.ServerURL + } + req, err := http.NewRequestWithContext(ctx, method, url+urlStr, body) if err != nil { // Should never happen, as we've checked the URL for validity when // creating the registry instance. @@ -184,11 +198,20 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if r.params.Username != "" { req.SetBasicAuth(r.params.Username, r.params.Password) } + log.Printf("===> foo request: %s: %s", req.Method, req.URL) ctx := req.Context() attempt := retry.StartWithCancel(r.params.RetryStrategy, nil, ctx.Done()) for attempt.Next() { resp, err := http.DefaultClient.Do(req) if err != nil { + var urlError *url.Error + if errors.As(err, &urlError) { + log.Printf("=====> net error: %v: temporary: %t, timeout: %t: detected: %t", urlError, urlError.Temporary(), urlError.Timeout(), isTemporaryError(err)) + } + if errors.Is(err, syscall.ECONNREFUSED) { + log.Printf("===> connection refused detected") + err = fmt.Errorf("should be a retryable error this one: %w", err) + } if !attempt.More() || !isTemporaryError(err) { return err } @@ -201,7 +224,7 @@ func (r *Registry) doRequest(req *http.Request, result interface{}) error { if !attempt.More() { return err } - if err, ok := err.(*apiError); ok && err.StatusCode/100 != 5 { + if err, ok := err.(*apiError); ok && err.StatusCode != http.StatusInternalServerError { // It's not a 5xx error. We want to retry on 5xx // errors, because the Confluent Avro registry // can occasionally return them as a matter of