From 09be70197e270c769778a476b1bdd150d5bcd11b Mon Sep 17 00:00:00 2001 From: Haris Osmanagic Date: Thu, 7 Mar 2024 14:20:45 +0100 Subject: [PATCH] fix linter --- .../v1/internal/fromplugin/specifier.go | 97 --- .../processor/builtin/impl/json/decode.go | 2 +- .../processor/builtin/impl/json/encode.go | 2 +- .../builtin/impl/unwrap/kafka_connect.go | 2 +- pkg/processor/procbuiltin/httprequest.go | 200 ------ pkg/processor/procbuiltin/httprequest_test.go | 355 ----------- pkg/processor/procbuiltin/parsejson.go | 77 --- pkg/processor/procbuiltin/unwrap.go | 571 ------------------ 8 files changed, 3 insertions(+), 1303 deletions(-) delete mode 100644 pkg/plugin/builtin/v1/internal/fromplugin/specifier.go delete mode 100644 pkg/processor/procbuiltin/httprequest.go delete mode 100644 pkg/processor/procbuiltin/httprequest_test.go delete mode 100644 pkg/processor/procbuiltin/parsejson.go delete mode 100644 pkg/processor/procbuiltin/unwrap.go diff --git a/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go b/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go deleted file mode 100644 index b5257a3d3..000000000 --- a/pkg/plugin/builtin/v1/internal/fromplugin/specifier.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright © 2022 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package fromplugin - -import ( - "github.com/conduitio/conduit-connector-protocol/cpluginv1" - "github.com/conduitio/conduit/pkg/plugin" -) - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - var vTypes [1]struct{} - _ = vTypes[int(cpluginv1.ValidationTypeRequired)-int(plugin.ValidationTypeRequired)] - _ = vTypes[int(cpluginv1.ValidationTypeLessThan)-int(plugin.ValidationTypeLessThan)] - _ = vTypes[int(cpluginv1.ValidationTypeGreaterThan)-int(plugin.ValidationTypeGreaterThan)] - _ = vTypes[int(cpluginv1.ValidationTypeInclusion)-int(plugin.ValidationTypeInclusion)] - _ = vTypes[int(cpluginv1.ValidationTypeExclusion)-int(plugin.ValidationTypeExclusion)] - _ = vTypes[int(cpluginv1.ValidationTypeRegex)-int(plugin.ValidationTypeRegex)] - // parameter types - _ = vTypes[int(cpluginv1.ParameterTypeString)-int(plugin.ParameterTypeString)] - _ = vTypes[int(cpluginv1.ParameterTypeInt)-int(plugin.ParameterTypeInt)] - _ = vTypes[int(cpluginv1.ParameterTypeFloat)-int(plugin.ParameterTypeFloat)] - _ = vTypes[int(cpluginv1.ParameterTypeBool)-int(plugin.ParameterTypeBool)] - _ = vTypes[int(cpluginv1.ParameterTypeFile)-int(plugin.ParameterTypeFile)] - _ = vTypes[int(cpluginv1.ParameterTypeDuration)-int(plugin.ParameterTypeDuration)] -} - -func SpecifierSpecifyResponse(in cpluginv1.SpecifierSpecifyResponse) (plugin.Specification, error) { - specMap := func(params map[string]cpluginv1.SpecifierParameter) map[string]plugin.Parameter { - out := make(map[string]plugin.Parameter) - for k, v := range params { - out[k] = SpecifierParameter(v) - } - return out - } - - return plugin.Specification{ - Name: in.Name, - Summary: in.Summary, - Description: in.Description, - Version: in.Version, - Author: in.Author, - DestinationParams: specMap(in.DestinationParams), - SourceParams: specMap(in.SourceParams), - }, nil -} - -func SpecifierParameter(in cpluginv1.SpecifierParameter) plugin.Parameter { - validations := make([]plugin.Validation, len(in.Validations)) - - requiredExists := false - for i, v := range in.Validations { - validations[i] = plugin.Validation{ - Type: plugin.ValidationType(v.Type), - Value: v.Value, - } - if v.Type == cpluginv1.ValidationTypeRequired { - requiredExists = true - } - } - //nolint:staticcheck // needed for backward compatibility, in.Required is - // converted to a validation of type ValidationTypeRequired making sure not - // to duplicate the required validation - if in.Required && !requiredExists { - //nolint:makezero // false positive, we actually want to append here - validations = append(validations, plugin.Validation{ - Type: plugin.ValidationTypeRequired, - }) - } - - return plugin.Parameter{ - Default: in.Default, - Type: cpluginv1ParamTypeToPluginParamType(in.Type), - Description: in.Description, - Validations: validations, - } -} - -func cpluginv1ParamTypeToPluginParamType(t cpluginv1.ParameterType) plugin.ParameterType { - // default type should be string - if t == 0 { - return plugin.ParameterTypeString - } - return plugin.ParameterType(t) -} diff --git a/pkg/plugin/processor/builtin/impl/json/decode.go b/pkg/plugin/processor/builtin/impl/json/decode.go index ceadb6e39..cfa453e7c 100644 --- a/pkg/plugin/processor/builtin/impl/json/decode.go +++ b/pkg/plugin/processor/builtin/impl/json/decode.go @@ -18,12 +18,12 @@ package json import ( "context" - "encoding/json" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/goccy/go-json" ) type decodeProcessor struct { diff --git a/pkg/plugin/processor/builtin/impl/json/encode.go b/pkg/plugin/processor/builtin/impl/json/encode.go index 522e22df0..9604b8b72 100644 --- a/pkg/plugin/processor/builtin/impl/json/encode.go +++ b/pkg/plugin/processor/builtin/impl/json/encode.go @@ -18,12 +18,12 @@ package json import ( "context" - "encoding/json" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/goccy/go-json" ) type encodeProcessor struct { diff --git a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go index 3c21b9b3d..59f1c51a6 100644 --- a/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go +++ b/pkg/plugin/processor/builtin/impl/unwrap/kafka_connect.go @@ -18,13 +18,13 @@ package unwrap import ( "context" - "encoding/json" "fmt" "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/goccy/go-json" ) type kafkaConnectConfig struct { diff --git a/pkg/processor/procbuiltin/httprequest.go b/pkg/processor/procbuiltin/httprequest.go deleted file mode 100644 index e3be2e91c..000000000 --- a/pkg/processor/procbuiltin/httprequest.go +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright © 2022 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package procbuiltin - -import ( - "bytes" - "context" - "io" - "net/http" - "net/url" - "time" - - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/processor" - "github.com/conduitio/conduit/pkg/record" - "github.com/goccy/go-json" - "github.com/jpillora/backoff" -) - -const ( - httpRequestProcType = "httprequest" - - httpRequestConfigURL = "url" - httpRequestConfigMethod = "method" - httpRequestConfigContentType = "contentType" - httpRequestContentTypeDefault = "application/json" - httpRequestBackoffRetryCount = "backoffRetry.count" - httpRequestBackoffRetryMin = "backoffRetry.min" - httpRequestBackoffRetryMax = "backoffRetry.max" - httpRequestBackoffRetryFactor = "backoffRetry.factor" -) - -func init() { - processor.GlobalBuilderRegistry.MustRegister(httpRequestProcType, HTTPRequest) -} - -// HTTPRequest builds a processor that sends an HTTP request to the specified URL with the specified HTTP method -// (default is POST) with a content-type header as the specified value (default is application/json). the whole -// record as json will be used as the request body and the raw response body will be set under Record.Payload.After. -// if the response code is (204 No Content) then the record will be filtered out. -func HTTPRequest(config processor.Config) (processor.Interface, error) { - return httpRequest(httpRequestProcType, config) -} - -func httpRequest( - processorType string, - config processor.Config, -) (processor.Interface, error) { - var ( - err error - rawURL string - method string - ) - - if rawURL, err = getConfigFieldString(config, httpRequestConfigURL); err != nil { - return nil, cerrors.Errorf("%s: %w", processorType, err) - } - - _, err = url.Parse(rawURL) - if err != nil { - return nil, cerrors.Errorf("%s: error trying to parse url: %w", processorType, err) - } - - method = config.Settings[httpRequestConfigMethod] - if method == "" { - method = http.MethodPost - } - contentType := config.Settings[httpRequestConfigContentType] - if contentType == "" { - contentType = httpRequestContentTypeDefault - } - - // preflight check - _, err = http.NewRequest( - method, - rawURL, - bytes.NewReader([]byte{}), - ) - if err != nil { - return nil, cerrors.Errorf("%s: error trying to create HTTP request: %w", processorType, err) - } - - procFn := func(ctx context.Context, r record.Record) (record.Record, error) { - jsonRec, err := json.Marshal(r) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: error creating json record: %w", processorType, err) - } - - req, err := http.NewRequestWithContext( - ctx, - method, - rawURL, - bytes.NewReader(jsonRec), - ) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: error trying to create HTTP request: %w", processorType, err) - } - - req.Header.Set("Content-Type", contentType) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: error trying to execute HTTP request: %w", processorType, err) - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: error trying to read response body: %w", processorType, err) - } - - if resp.StatusCode > 299 { - // regard status codes over 299 as errors - return record.Record{}, cerrors.Errorf("%s: invalid status code %v (body: %q)", processorType, resp.StatusCode, string(body)) - } - // skip if body has no content - if resp.StatusCode == http.StatusNoContent { - return record.Record{}, processor.ErrSkipRecord - } - - r.Payload.After = record.RawData{Raw: body} - return r, nil - } - - return configureHTTPRequestBackoffRetry(processorType, config, procFn) -} - -func configureHTTPRequestBackoffRetry( - processorType string, - config processor.Config, - procFn func(context.Context, record.Record) (record.Record, error), -) (processor.Interface, error) { - // retryCount is a float64 to match the backoff library attempt type - var retryCount float64 - - tmp, err := getConfigFieldInt64(config, httpRequestBackoffRetryCount) - if err != nil && !cerrors.Is(err, errEmptyConfigField) { - return nil, cerrors.Errorf("%s: %w", processorType, err) - } - retryCount = float64(tmp) - - if retryCount == 0 { - // no retries configured, just use the plain processor - return NewFuncWrapper(procFn), nil - } - - // default retry values - b := &backoff.Backoff{ - Factor: 2, - Min: time.Millisecond * 100, - Max: time.Second * 5, - } - - min, err := getConfigFieldDuration(config, httpRequestBackoffRetryMin) - if err != nil && !cerrors.Is(err, errEmptyConfigField) { - return nil, cerrors.Errorf("%s: %w", processorType, err) - } else if err == nil { - b.Min = min - } - - max, err := getConfigFieldDuration(config, httpRequestBackoffRetryMax) - if err != nil && !cerrors.Is(err, errEmptyConfigField) { - return nil, cerrors.Errorf("%s: %w", processorType, err) - } else if err == nil { - b.Max = max - } - - factor, err := getConfigFieldFloat64(config, httpRequestBackoffRetryFactor) - if err != nil && !cerrors.Is(err, errEmptyConfigField) { - return nil, cerrors.Errorf("%s: %w", processorType, err) - } else if err == nil { - b.Factor = factor - } - - // wrap processor in a retry loop - return NewFuncWrapper(func(ctx context.Context, r record.Record) (record.Record, error) { - for { - r, err := procFn(ctx, r) - if err != nil && b.Attempt() < retryCount { - // TODO log message that we are retrying, include error cause (we don't have access to a proper logger) - time.Sleep(b.Duration()) - continue - } - b.Reset() // reset for next processor execution - return r, err - } - }), nil -} diff --git a/pkg/processor/procbuiltin/httprequest_test.go b/pkg/processor/procbuiltin/httprequest_test.go deleted file mode 100644 index fecc8e777..000000000 --- a/pkg/processor/procbuiltin/httprequest_test.go +++ /dev/null @@ -1,355 +0,0 @@ -// Copyright © 2022 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package procbuiltin - -import ( - "context" - "io" - "net/http" - "net/http/httptest" - "testing" - - "github.com/conduitio/conduit/pkg/processor" - "github.com/conduitio/conduit/pkg/record" - "github.com/goccy/go-json" - "github.com/matryer/is" -) - -func TestHTTPRequest_Build(t *testing.T) { - type args struct { - config processor.Config - } - tests := []struct { - name string - args args - wantErr bool - }{{ - name: "nil config returns error", - args: args{config: processor.Config{}}, - wantErr: true, - }, { - name: "empty config returns error", - args: args{config: processor.Config{ - Settings: map[string]string{}, - }}, - wantErr: true, - }, { - name: "empty url returns error", - args: args{config: processor.Config{ - Settings: map[string]string{httpRequestConfigURL: ""}, - }}, - wantErr: true, - }, { - name: "invalid url returns error", - args: args{config: processor.Config{ - Settings: map[string]string{httpRequestConfigURL: ":not/a/valid/url"}, - }}, - wantErr: true, - }, { - name: "invalid method returns error", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestConfigMethod: ":foo", - }, - }}, - wantErr: true, - }, { - name: "invalid backoffRetry.count returns error", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryCount: "not-a-number", - }, - }}, - wantErr: true, - }, { - name: "invalid backoffRetry.min returns error", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryCount: "1", - httpRequestBackoffRetryMin: "not-a-duration", - }, - }}, - wantErr: true, - }, { - name: "invalid backoffRetry.max returns error", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryCount: "1", - httpRequestBackoffRetryMax: "not-a-duration", - }, - }}, - wantErr: true, - }, { - name: "invalid backoffRetry.factor returns error", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryCount: "1", - httpRequestBackoffRetryFactor: "not-a-number", - }, - }}, - wantErr: true, - }, { - name: "valid url returns processor", - args: args{config: processor.Config{ - Settings: map[string]string{httpRequestConfigURL: "http://example.com"}, - }}, - wantErr: false, - }, { - name: "valid url and method returns processor", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestConfigMethod: "GET", - }, - }}, - wantErr: false, - }, { - name: "invalid backoff retry config is ignored", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryMin: "not-a-duration", - httpRequestBackoffRetryMax: "not-a-duration", - httpRequestBackoffRetryFactor: "not-a-number", - }, - }}, - wantErr: false, - }, { - name: "valid url, method and backoff retry config returns processor", - args: args{config: processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: "http://example.com", - httpRequestBackoffRetryCount: "1", - httpRequestBackoffRetryMin: "10ms", - httpRequestBackoffRetryMax: "1s", - httpRequestBackoffRetryFactor: "1.3", - httpRequestConfigContentType: "application/json", - }, - }}, - wantErr: false, - }} - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, err := HTTPRequest(tt.args.config) - if (err != nil) != tt.wantErr { - t.Errorf("HTTPRequest() error = %v, wantErr = %v", err, tt.wantErr) - } - }) - } -} - -func TestHTTPRequest_Success(t *testing.T) { - respBody := []byte("foo-bar/response") - - type args struct { - r record.Record - } - tests := []struct { - name string - config processor.Config - args args - want record.Record - }{{ - name: "structured data", - config: processor.Config{ - Settings: map[string]string{httpRequestConfigMethod: "GET"}, - }, - args: args{r: record.Record{ - Payload: record.Change{ - Before: nil, - After: record.StructuredData{ - "bar": 123, - "baz": nil, - }, - }, - }}, - want: record.Record{ - Payload: record.Change{ - Before: nil, - After: record.RawData{Raw: respBody}, - }, - }, - }, { - name: "raw data", - config: processor.Config{ - Settings: map[string]string{}, - }, - args: args{r: record.Record{ - Payload: record.Change{ - Before: nil, - After: record.RawData{Raw: []byte("random data")}, - }, - }}, - want: record.Record{ - Payload: record.Change{ - Before: nil, - After: record.RawData{Raw: respBody}, - }, - }, - }} - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - is := is.New(t) - - wantMethod := tt.config.Settings[httpRequestConfigMethod] - if wantMethod == "" { - wantMethod = "POST" // default - } - - wantBody, err := json.Marshal(tt.args.r) - is.NoErr(err) - - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - is.Equal(wantMethod, req.Method) - - gotBody, err := io.ReadAll(req.Body) - is.NoErr(err) - is.Equal(wantBody, gotBody) - - _, err = resp.Write(respBody) - is.NoErr(err) - })) - defer srv.Close() - - tt.config.Settings[httpRequestConfigURL] = srv.URL - underTest, err := HTTPRequest(tt.config) - is.NoErr(err) - - got, err := underTest.Process(context.Background(), tt.args.r) - is.NoErr(err) - is.Equal(got.Payload.After, record.RawData{Raw: respBody}) - }) - } -} - -func TestHTTPRequest_RetrySuccess(t *testing.T) { - is := is.New(t) - - respBody := []byte("foo-bar/response") - - wantMethod := "POST" - rec := record.Record{Payload: record.Change{After: record.RawData{Raw: []byte("random data")}}} - wantBody, err := json.Marshal(rec) - is.NoErr(err) - - srvHandlerCount := 0 - - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - srvHandlerCount++ - - is.Equal(wantMethod, req.Method) - - gotBody, err := io.ReadAll(req.Body) - is.NoErr(err) - is.Equal(wantBody, gotBody) - - if srvHandlerCount < 5 { - // first 4 requests will fail with an internal server error - resp.WriteHeader(http.StatusInternalServerError) - } else { - _, err := resp.Write(respBody) - is.NoErr(err) - } - })) - defer srv.Close() - - config := processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: srv.URL, - httpRequestBackoffRetryCount: "4", - httpRequestBackoffRetryMin: "5ms", - httpRequestBackoffRetryMax: "10ms", - httpRequestBackoffRetryFactor: "1.2", - }, - } - - underTest, err := HTTPRequest(config) - is.NoErr(err) - - got, err := underTest.Process(context.Background(), rec) - is.NoErr(err) - is.Equal(got.Payload.After, record.RawData{Raw: respBody}) - is.Equal(srvHandlerCount, 5) -} - -func TestHTTPRequest_RetryFail(t *testing.T) { - is := is.New(t) - - srvHandlerCount := 0 - - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - srvHandlerCount++ - // all requests fail - resp.WriteHeader(http.StatusInternalServerError) - })) - defer srv.Close() - - config := processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: srv.URL, - httpRequestBackoffRetryCount: "5", - httpRequestBackoffRetryMin: "5ms", - httpRequestBackoffRetryMax: "10ms", - httpRequestBackoffRetryFactor: "1.2", - }, - } - - underTest, err := HTTPRequest(config) - is.NoErr(err) - - got, err := underTest.Process(context.Background(), record.Record{Payload: record.Change{After: record.RawData{}}}) - is.True(err != nil) // expected an error - is.Equal(got, record.Record{}) - is.Equal(srvHandlerCount, 6) // expected 6 requests (1 regular and 5 retries) -} - -func TestHTTPRequest_FilterRecord(t *testing.T) { - is := is.New(t) - - wantMethod := "POST" - rec := record.Record{Payload: record.Change{After: record.RawData{Raw: []byte("random data")}}} - wantBody, err := json.Marshal(rec) - is.NoErr(err) - - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - is.Equal(wantMethod, req.Method) - - gotBody, err := io.ReadAll(req.Body) - is.NoErr(err) - is.Equal(wantBody, gotBody) - - resp.WriteHeader(http.StatusNoContent) - })) - defer srv.Close() - - config := processor.Config{ - Settings: map[string]string{ - httpRequestConfigURL: srv.URL, - }, - } - - underTest, err := HTTPRequest(config) - is.NoErr(err) - - got, err := underTest.Process(context.Background(), rec) - is.Equal(err, processor.ErrSkipRecord) - is.Equal(got, record.Record{}) -} diff --git a/pkg/processor/procbuiltin/parsejson.go b/pkg/processor/procbuiltin/parsejson.go deleted file mode 100644 index 5c92e4350..000000000 --- a/pkg/processor/procbuiltin/parsejson.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright © 2023 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package procbuiltin - -import ( - "context" - - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/processor" - "github.com/conduitio/conduit/pkg/record" - "github.com/goccy/go-json" -) - -const ( - parseJSONKeyProcType = "parsejsonkey" - parseJSONPayloadProcType = "parsejsonpayload" -) - -func init() { - processor.GlobalBuilderRegistry.MustRegister(parseJSONKeyProcType, ParseJSONKey) - processor.GlobalBuilderRegistry.MustRegister(parseJSONPayloadProcType, ParseJSONPayload) -} - -// ParseJSONKey parses the record key from raw to structured data -func ParseJSONKey(_ processor.Config) (processor.Interface, error) { - return parseJSON(parseJSONKeyProcType, recordKeyGetSetter{}) -} - -// ParseJSONPayload parses the record payload from raw to structured data -func ParseJSONPayload(_ processor.Config) (processor.Interface, error) { - return parseJSON(parseJSONPayloadProcType, recordPayloadGetSetter{}) -} - -func parseJSON( - processorType string, - getSetter recordDataGetSetter, -) (processor.Interface, error) { - return NewFuncWrapper(func(_ context.Context, r record.Record) (record.Record, error) { - data := getSetter.Get(r) - - switch data.(type) { - case record.RawData: - var jsonData record.StructuredData - if len(data.Bytes()) == 0 { - // change empty raw data to empty structured data - r = getSetter.Set(r, jsonData) - return r, nil - } - err := json.Unmarshal(data.Bytes(), &jsonData) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: failed to unmarshal raw data as JSON: %w", processorType, err) - } - r = getSetter.Set(r, jsonData) - - case record.StructuredData: - // data is already structured - case nil: - // if the field is nil leave it as it is - default: - return record.Record{}, cerrors.Errorf("%s: unexpected data type %T", processorType, data) - } - - return r, nil - }), nil -} diff --git a/pkg/processor/procbuiltin/unwrap.go b/pkg/processor/procbuiltin/unwrap.go deleted file mode 100644 index 69c76646a..000000000 --- a/pkg/processor/procbuiltin/unwrap.go +++ /dev/null @@ -1,571 +0,0 @@ -// Copyright © 2023 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package procbuiltin - -import ( - "context" - "encoding/base64" - "fmt" - "time" - - "github.com/conduitio/conduit/pkg/foundation/cerrors" - "github.com/conduitio/conduit/pkg/foundation/multierror" - "github.com/conduitio/conduit/pkg/processor" - "github.com/conduitio/conduit/pkg/record" - "github.com/goccy/go-json" -) - -type unwrapProcessor struct { - unwrapper unwrapper -} - -// unwrapper unwraps the formatted record from the openCDC record -type unwrapper interface { - // Unwrap gets the unwrapped record - Unwrap(record.Record) (record.Record, error) -} - -const ( - unwrapProcType = "unwrap" - - unwrapConfigFormat = "format" - - FormatDebezium = "debezium" - FormatKafkaConnect = "kafka-connect" - FormatOpenCDC = "opencdc" -) - -func init() { - processor.GlobalBuilderRegistry.MustRegister(unwrapProcType, Unwrap) -} - -func Unwrap(config processor.Config) (processor.Interface, error) { - if _, ok := config.Settings[unwrapConfigFormat]; !ok { - return nil, cerrors.Errorf("%s: %q config not specified", unwrapProcType, unwrapConfigFormat) - } - format := config.Settings[unwrapConfigFormat] - proc := &unwrapProcessor{} - switch format { - case FormatDebezium: - proc.unwrapper = &debeziumUnwrapper{} - case FormatKafkaConnect: - proc.unwrapper = &kafkaConnectUnwrapper{} - case FormatOpenCDC: - proc.unwrapper = &openCDCUnwrapper{} - default: - return nil, cerrors.Errorf("%s: %q is not a valid format", unwrapProcType, format) - } - - return NewFuncWrapper(proc.Process), nil -} - -func (p *unwrapProcessor) Process(_ context.Context, in record.Record) (record.Record, error) { - data := in.Payload.After - var structData record.StructuredData - switch d := data.(type) { - case record.RawData: - // todo: take this section out, after platform team support ordering processors - // unmarshal raw data to structured - err := json.Unmarshal(data.Bytes(), &structData) - if err != nil { - return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err) - } - case record.StructuredData: - structData = d - default: - return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data) - } - // assign the structured data to payload.After - in.Payload.After = structData - - out, err := p.unwrapper.Unwrap(in) - if err != nil { - return record.Record{}, cerrors.Errorf("%s: error unwrapping record: %w", unwrapProcType, err) - } - return out, nil -} - -/* -Example of an OpenCDC record: -{ - "key": "NWQ0N2UwZGQtNTkxYi00MGEyLTk3YzMtYzc1MDY0MWU3NTc1", - "metadata": { - "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", - "opencdc.readAt": "1706028881541916000", - "opencdc.version": "v1" - }, - "operation": "create", - "payload": { - "after": { - "event_id": 2041181862, - "msg": "string 4c88f20f-aa77-4f4b-9354-e4fdb1989a52", - "pg_generator": false, - "sensor_id": 54434691, - "triggered": false - }, - "before": null - }, - "position": "ZWIwNmJiMmMtNWNhMS00YjUyLWE2ZmMtYzc0OTFlZDQ3OTYz" -} -*/ - -// openCDCUnwrapper unwraps an OpenCDC record from the payload, by unmarhsalling rec.Payload.After into type Record. -type openCDCUnwrapper struct{} - -// UnwrapOperation extracts operation from a structuredData record. -func (o *openCDCUnwrapper) UnwrapOperation(structData record.StructuredData) (record.Operation, error) { - var operation record.Operation - op, ok := structData["operation"] - if !ok { - return operation, cerrors.Errorf("record payload after doesn't contain operation") - } - - switch opType := op.(type) { - case record.Operation: - operation = opType - case string: - if err := operation.UnmarshalText([]byte(opType)); err != nil { - return operation, cerrors.Errorf("couldn't unmarshal record operation") - } - default: - return operation, cerrors.Errorf("expected a record.Operation or a string, got %T", opType) - } - return operation, nil -} - -// UnwrapMetadata extracts metadata from a structuredData record. -func (o *openCDCUnwrapper) UnwrapMetadata(structData record.StructuredData) (record.Metadata, error) { - var metadata record.Metadata - meta, ok := structData["metadata"] - if !ok { - return metadata, cerrors.Errorf("record payload after doesn't contain metadata") - } - - switch m := meta.(type) { - case record.Metadata: - metadata = m - case map[string]interface{}: - metadata = make(record.Metadata, len(m)) - for k, v := range m { - metadata[k] = fmt.Sprint(v) - } - default: - return metadata, cerrors.Errorf("expected a record.Metadata or a map[string]interface{}, got %T", m) - } - return metadata, nil -} - -// UnwrapKey extracts key from a structuredData record. -func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.Data, error) { - var key record.Data - ky, ok := structData["key"] - if !ok { - return key, cerrors.Errorf("record payload after doesn't contain key") - } - switch k := ky.(type) { - case map[string]interface{}: - convertedData := make(record.StructuredData, len(k)) - for kk, v := range k { - convertedData[kk] = v - } - key = convertedData - case string: - decoded := make([]byte, base64.StdEncoding.DecodedLen(len(k))) - n, err := base64.StdEncoding.Decode(decoded, []byte(k)) - if err != nil { - return key, cerrors.Errorf("couldn't decode key: %w", err) - } - key = record.RawData{Raw: decoded[:n]} - default: - return key, cerrors.Errorf("expected a record.Data or a string, got %T", k) - } - return key, nil -} - -func (o *openCDCUnwrapper) convertPayloadData(payload map[string]interface{}, key string) (record.Data, error) { - payloadData, ok := payload[key] - if !ok { - return nil, nil - } - - switch data := payloadData.(type) { - case map[string]interface{}: - convertedData := make(record.StructuredData, len(data)) - for k, v := range data { - convertedData[k] = v - } - return convertedData, nil - case string: - decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - n, err := base64.StdEncoding.Decode(decoded, []byte(data)) - if err != nil { - return nil, cerrors.Errorf("couldn't decode payload %s: %w", err, key) - } - return record.RawData{Raw: decoded[:n]}, nil - default: - return nil, nil - } -} - -// UnwrapPayload extracts payload from a structuredData record. -func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) { - var payload record.Change - pl, ok := structData["payload"] - if !ok { - return payload, cerrors.Errorf("record payload doesn't contain payload") - } - - switch p := pl.(type) { - case record.Change: - payload = p - case map[string]interface{}: - before, err := o.convertPayloadData(p, "before") - if err != nil { - return record.Change{}, err - } - - after, err := o.convertPayloadData(p, "after") - if err != nil { - return record.Change{}, err - } - - payload = record.Change{ - Before: before, - After: after, - } - default: - return record.Change{}, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p) - } - return payload, nil -} - -// Unwrap replaces the whole record.payload with record.payload.after.payload except position. -func (o *openCDCUnwrapper) Unwrap(rec record.Record) (record.Record, error) { - var structData record.StructuredData - data := rec.Payload.After - switch d := data.(type) { - case record.RawData: - // unmarshal raw data to structured - if err := json.Unmarshal(data.Bytes(), &structData); err != nil { - return record.Record{}, cerrors.Errorf("failed to unmarshal raw data as JSON: %w", unwrapProcType, err) - } - case record.StructuredData: - structData = d - default: - return record.Record{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data) - } - - operation, err := o.UnwrapOperation(structData) - if err != nil { - return record.Record{}, err - } - - metadata, err := o.UnwrapMetadata(structData) - if err != nil { - return record.Record{}, err - } - - key, err := o.UnwrapKey(structData) - if err != nil { - return record.Record{}, err - } - - payload, err := o.UnwrapPayload(structData) - if err != nil { - return record.Record{}, err - } - - // Position is the only key we preserve from the original record to maintain the reference respect other messages - // that will be coming from in the event of chaining pipelines (e.g.: source -> kafka, kafka -> destination) - return record.Record{ - Key: key, - Position: rec.Position, - Metadata: metadata, - Payload: payload, - Operation: operation, - }, nil -} - -/* -Example of a kafka-connect record: - { - "payload": { - "description": "desc", - "id": 20 - }, - "schema": {} // will be ignored - } -*/ - -// kafkaConnectUnwrapper unwraps a kafka connect record from the payload, expects rec.Payload.After to be of type record.StructuredData -type kafkaConnectUnwrapper struct{} - -func (k *kafkaConnectUnwrapper) Unwrap(rec record.Record) (record.Record, error) { - // record must be structured - structPayload, ok := rec.Payload.After.(record.StructuredData) - if !ok { - return record.Record{}, cerrors.Errorf("record payload data must be structured data") - } - - // get payload - structPayload, ok = structPayload["payload"].(map[string]any) - if !ok { - return record.Record{}, cerrors.Errorf("payload doesn't contain a record") - } - - return record.Record{ - Key: k.UnwrapKey(rec.Key), - Position: rec.Position, - Metadata: nil, - Payload: record.Change{ - Before: nil, - After: structPayload, - }, - Operation: record.OperationSnapshot, - }, nil -} - -// UnwrapKey unwraps key as a kafka connect formatted record, returns the key's payload content, or returns the -// original key if payload doesn't exist. -func (k *kafkaConnectUnwrapper) UnwrapKey(key record.Data) record.Data { - // convert the key to structured data - var structKey record.StructuredData - switch d := key.(type) { - case record.RawData: - // try unmarshalling raw key - err := json.Unmarshal(key.Bytes(), &structKey) - // if key is not json formatted, return the original key - if err != nil { - return key - } - case record.StructuredData: - structKey = d - } - - payload, ok := structKey["payload"] - // return the original key if it doesn't contain a payload - if !ok { - return key - } - - // if payload is a map, return the payload as structured data - if p, ok := payload.(map[string]any); ok { - return record.StructuredData(p) - } - - // otherwise, convert the payload to string, then return it as raw data - raw := fmt.Sprint(payload) - - return record.RawData{Raw: []byte(raw)} -} - -/* -Example of a debezium record: - { - "payload": { - "after": { - "description": "desc", - "id": 20 - }, - "before": null, - "op": "c", - "source": { - "opencdc.readAt": "1674061777225877000", - "opencdc.version": "v1", - }, - "transaction": null, - "ts_ms": 1674061777225 - }, - "schema": {} // will be ignored - } -*/ -// debeziumUnwrapper unwraps a debezium record from the payload. -type debeziumUnwrapper struct { - kafkaConnectUnwrapper kafkaConnectUnwrapper -} - -const ( - debeziumOpCreate = "c" - debeziumOpUpdate = "u" - debeziumOpDelete = "d" - debeziumOpRead = "r" // snapshot - debeziumOpUnset = "$unset" // mongoDB unset operation - - debeziumFieldBefore = "before" - debeziumFieldAfter = "after" - debeziumFieldSource = "source" - debeziumFieldOp = "op" - debeziumFieldTimestamp = "ts_ms" -) - -func (d *debeziumUnwrapper) Unwrap(rec record.Record) (record.Record, error) { - // record must be structured - debeziumRec, ok := rec.Payload.After.(record.StructuredData) - if !ok { - return record.Record{}, cerrors.Errorf("record payload data must be structured data") - } - // get payload - debeziumRec, ok = debeziumRec["payload"].(map[string]any) // the payload has the debezium record - if !ok { - return record.Record{}, cerrors.Errorf("payload doesn't contain a record") - } - - // check fields under payload - err := d.validateRecord(debeziumRec) - if err != nil { - return record.Record{}, err - } - - before, err := d.valueToData(debeziumRec[debeziumFieldBefore]) - if err != nil { - return record.Record{}, cerrors.Errorf("failed to parse field %s: %w", debeziumFieldBefore, err) - } - - after, err := d.valueToData(debeziumRec[debeziumFieldAfter]) - if err != nil { - return record.Record{}, cerrors.Errorf("failed to parse field %s: %w", debeziumFieldAfter, err) - } - - op, ok := debeziumRec[debeziumFieldOp].(string) - if !ok { - return record.Record{}, cerrors.Errorf("%s operation is not a string", op) - } - - operation, err := d.convertOperation(op) - if err != nil { - return record.Record{}, cerrors.Errorf("error unwrapping operation: %w", err) - } - - metadata, err := d.unwrapMetadata(rec) - if err != nil { - return record.Record{}, cerrors.Errorf("error unwrapping metadata: %w", err) - } - - return record.Record{ - Key: d.kafkaConnectUnwrapper.UnwrapKey(rec.Key), - Position: rec.Position, - Operation: operation, - Payload: record.Change{ - Before: before, - After: after, - }, - Metadata: metadata, - }, nil -} - -func (d *debeziumUnwrapper) valueToData(val any) (record.Data, error) { - switch v := val.(type) { - case map[string]any: - return record.StructuredData(v), nil - case string: - return record.RawData{Raw: []byte(v)}, nil - case nil: - // nil is allowed - return nil, nil - default: - return nil, cerrors.Errorf("expected a map or a string, got %T", val) - } -} - -func (d *debeziumUnwrapper) validateRecord(data record.StructuredData) error { - var multiErr error - if _, ok := data[debeziumFieldAfter]; !ok { - multiErr = multierror.Append(multiErr, cerrors.Errorf("the %q field is missing from debezium payload", debeziumFieldAfter)) - } - if _, ok := data[debeziumFieldSource]; !ok { - multiErr = multierror.Append(multiErr, cerrors.Errorf("the %q field is missing from debezium payload", debeziumFieldSource)) - } - if _, ok := data[debeziumFieldOp]; !ok { - multiErr = multierror.Append(multiErr, cerrors.Errorf("the %q field is missing from debezium payload", debeziumFieldOp)) - } - // ts_ms and transaction can be empty - return multiErr -} - -func (d *debeziumUnwrapper) unwrapMetadata(rec record.Record) (record.Metadata, error) { - debeziumRec := rec.Payload.After.(record.StructuredData)["payload"].(map[string]any) - - var source map[string]string - for field, val := range debeziumRec { - switch field { - case debeziumFieldAfter, debeziumFieldBefore, debeziumFieldOp: - continue // ignore - case debeziumFieldTimestamp: - tsMs, ok := val.(float64) - if !ok { - return nil, cerrors.Errorf("%s is not a float", debeziumFieldTimestamp) - } - readAt := time.UnixMilli(int64(tsMs)) - rec.Metadata.SetReadAt(readAt) - case debeziumFieldSource: - // don't add prefix for source fields to be consistent with the - // behavior of the debezium converter in the SDK - it puts all - // metadata fields into the `source` field - source = d.flatten("", val) - default: - flattened := d.flatten("debezium."+field, val) - for k, v := range flattened { - rec.Metadata[k] = v - } - } - } - - // source is added at the end to overwrite any other fields - for k, v := range source { - rec.Metadata[k] = v - } - - return rec.Metadata, nil -} - -func (d *debeziumUnwrapper) flatten(key string, val any) map[string]string { - var prefix string - if len(key) > 0 { - prefix = key + "." - } - switch val := val.(type) { - case map[string]any: - out := make(map[string]string) - for k1, v1 := range val { - for k2, v2 := range d.flatten(prefix+k1, v1) { - out[k2] = v2 - } - } - return out - case nil: - return nil - case string: - return map[string]string{key: val} - default: - return map[string]string{key: fmt.Sprint(val)} - } -} - -// convertOperation converts debezium operation to openCDC operation -func (d *debeziumUnwrapper) convertOperation(op string) (record.Operation, error) { - switch op { - case debeziumOpCreate: - return record.OperationCreate, nil - case debeziumOpUpdate: - return record.OperationUpdate, nil - case debeziumOpDelete: - return record.OperationDelete, nil - case debeziumOpRead: - return record.OperationSnapshot, nil - case debeziumOpUnset: - return record.OperationUpdate, nil - } - return 0, cerrors.Errorf("%q is an invalid operation", op) -}