diff --git a/pkg/plugin/processor/builtin/impl/webhook/http.go b/pkg/plugin/processor/builtin/impl/webhook/http.go index a31635c22..cf02391e7 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http.go @@ -32,20 +32,19 @@ import ( sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/log" - "github.com/goccy/go-json" "github.com/jpillora/backoff" ) type httpConfig struct { // URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template). - // The value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30), - // so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/) + // The value provided to the template is [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record), + // so the template has access to all its fields (e.g. `.Position`, `.Key`, `.Metadata`, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/) // to make it easier to write templates. URL string `json:"request.url" validate:"required"` // Method is the HTTP request method to be used. Method string `json:"request.method" default:"GET"` // Deprecated: use `headers.Content-Type` instead. - ContentType string `json:"request.contentType" default:"application/json"` + ContentType string `json:"request.contentType"` // Headers to add to the request, use `headers.*` to specify the header and its value (e.g. `headers.Authorization: "Bearer key"`). Headers map[string]string `json:"headers"` @@ -58,11 +57,13 @@ type httpConfig struct { // The maximum waiting time before retrying. BackoffRetryMax time.Duration `json:"backoffRetry.max" default:"5s"` - // Specifies which field from the input record should be used as the body in - // the HTTP request. + // Specifies the body that will be sent in the HTTP request. The field accepts + // a Go [templates](https://pkg.go.dev/text/template) that's evaluated using the + // [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record) + // as input. By default, the body is empty. // - // For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). - RequestBodyRef string `json:"request.body"` + // To send the whole record as JSON you can use `{{ toJson . }}`. + RequestBodyTmpl string `json:"request.body"` // Specifies in which field should the response body be saved. // // For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). @@ -79,20 +80,25 @@ func (c *httpConfig) parseHeaders() error { c.Headers = make(map[string]string) } - if c.ContentType == "" { - return nil // Nothing to replace in headers - } - + var isContentTypeSet bool for name, _ := range c.Headers { if strings.ToLower(name) == "content-type" { - return cerrors.Errorf("Configuration error, cannot provide both \"request.contentType\" and \"headers.Content-Type\", use \"headers.Content-Type\" only.") + isContentTypeSet = true + break } } - c.Headers["Content-Type"] = c.ContentType - // the ContentType field is deprecated, - // so we're preparing for completely removing it in a later release - c.ContentType = "" + switch { + case isContentTypeSet && c.ContentType != "": + return cerrors.Errorf(`configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`) + case !isContentTypeSet && c.ContentType != "": + // Use contents of deprecated field. + c.Headers["Content-Type"] = c.ContentType + c.ContentType = "" + case !isContentTypeSet: + // By default, we set the Content-Type to application/json. + c.Headers["Content-Type"] = "application/json" + } return nil } @@ -105,10 +111,11 @@ type httpProcessor struct { config httpConfig backoffCfg *backoff.Backoff - requestBodyRef *sdk.ReferenceResolver + urlTmpl *template.Template + requestBodyTmpl *template.Template + responseBodyRef *sdk.ReferenceResolver responseStatusRef *sdk.ReferenceResolver - urlTmpl *template.Template } func NewHTTPProcessor(l log.CtxLogger) sdk.Processor { @@ -120,7 +127,10 @@ func (p *httpProcessor) Specification() (sdk.Specification, error) { Name: "webhook.http", Summary: "Trigger an HTTP request for every record.", Description: `A processor that sends an HTTP request to the specified URL, retries on error and -saves the response body and, optionally, the response status.`, +saves the response body and, optionally, the response status. + +A status code over 500 is regarded as an error and will cause the processor to retry the request. +The processor will retry the request according to the backoff configuration.`, Version: "v0.1.0", Author: "Meroxa, Inc.", Parameters: httpConfig{}.Parameters(), @@ -142,12 +152,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro return cerrors.New("invalid configuration: response.body and response.status set to same field") } - if p.config.RequestBodyRef != "" { - requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef) + // parse request body template + if strings.Contains(p.config.RequestBodyTmpl, "{{") { + // create URL template + p.requestBodyTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.RequestBodyTmpl) if err != nil { - return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err) + return cerrors.Errorf("failed parsing request.body %v: %w", err) } - p.requestBodyRef = &requestBodyRef } responseBodyRef, err := sdk.NewReferenceResolver(p.config.ResponseBodyRef) @@ -164,6 +175,8 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro } p.responseStatusRef = &responseStatusRef } + + // parse URL template if strings.Contains(p.config.URL, "{{") { // create URL template p.urlTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.URL) @@ -186,27 +199,6 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro return nil } -func (p *httpProcessor) EvaluateURL(rec opencdc.Record) (string, error) { - if p.urlTmpl == nil { - return p.config.URL, nil - } - var b bytes.Buffer - err := p.urlTmpl.Execute(&b, rec) - if err != nil { - return "", cerrors.Errorf("error while evaluating URL template: %w", err) - } - u, err := url.Parse(b.String()) - if err != nil { - return "", cerrors.Errorf("error parsing URL: %w", err) - } - q, err := url.ParseQuery(u.RawQuery) - if err != nil { - return "", cerrors.Errorf("error parsing URL query: %w", err) - } - u.RawQuery = q.Encode() - return u.String(), nil -} - func (p *httpProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { out := make([]sdk.ProcessedRecord, 0, len(records)) for _, rec := range records { @@ -281,14 +273,10 @@ func (p *httpProcessor) processRecord(ctx context.Context, r opencdc.Record) (sd return nil, cerrors.Errorf("error reading response body: %w", err) } - if resp.StatusCode >= 300 { - // regard status codes over 299 as errors + if resp.StatusCode >= 500 { + // regard status codes over 500 as errors return nil, cerrors.Errorf("error status code %v (body: %q)", resp.StatusCode, string(body)) } - // skip if body has no content - if resp.StatusCode == http.StatusNoContent { - return sdk.FilterRecord{}, nil - } // Set response body err = p.setField(&r, p.responseBodyRef, body) @@ -309,7 +297,7 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht return nil, cerrors.Errorf("failed getting request body: %w", err) } - url, err := p.EvaluateURL(r) + url, err := p.evaluateURL(r) if err != nil { return nil, err } @@ -331,25 +319,44 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht return req, nil } +func (p *httpProcessor) evaluateURL(rec opencdc.Record) (string, error) { + if p.urlTmpl == nil { + return p.config.URL, nil + } + var b bytes.Buffer + err := p.urlTmpl.Execute(&b, rec) + if err != nil { + return "", cerrors.Errorf("error while evaluating URL template: %w", err) + } + u, err := url.Parse(b.String()) + if err != nil { + return "", cerrors.Errorf("error parsing URL: %w", err) + } + q, err := url.ParseQuery(u.RawQuery) + if err != nil { + return "", cerrors.Errorf("error parsing URL query: %w", err) + } + u.RawQuery = q.Encode() + return u.String(), nil +} + // requestBody returns the request body for the given record, // using the configured field reference (see: request.body configuration parameter). func (p *httpProcessor) requestBody(r opencdc.Record) ([]byte, error) { - if p.requestBodyRef == nil { + if p.requestBodyTmpl == nil { + if p.config.RequestBodyTmpl != "" { + return []byte(p.config.RequestBodyTmpl), nil + } return nil, nil } - ref, err := p.requestBodyRef.Resolve(&r) - if err != nil { - return nil, cerrors.Errorf("failed resolving request.body: %w", err) - } - val := ref.Get() - // Raw byte data should be sent as it is, as that's most often what we want - // If we json.Marshal it first, it will be Base64-encoded. - if raw, ok := val.(opencdc.RawData); ok { - return raw.Bytes(), nil + var b bytes.Buffer + err := p.requestBodyTmpl.Execute(&b, r) + if err != nil { + return nil, cerrors.Errorf("error while evaluating request body template: %w", err) } - return json.Marshal(val) + return b.Bytes(), nil } func (p *httpProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResolver, data any) error { diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_config_test.go b/pkg/plugin/processor/builtin/impl/webhook/http_config_test.go index c13f5dbf5..89cd3baba 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_config_test.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_config_test.go @@ -15,11 +15,12 @@ package webhook import ( - "github.com/matryer/is" "testing" + + "github.com/matryer/is" ) -func TestHTTPConfig_ValidateHeaders(t *testing.T) { +func TestHTTPConfig_ParseHeaders(t *testing.T) { testCases := []struct { name string input httpConfig @@ -34,7 +35,7 @@ func TestHTTPConfig_ValidateHeaders(t *testing.T) { "Content-Type": "application/json", }, }, - wantErr: `Configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only.`, + wantErr: `configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`, }, { name: "ContentType field present, header present, different case", @@ -44,7 +45,7 @@ func TestHTTPConfig_ValidateHeaders(t *testing.T) { "content-type": "application/json", }, }, - wantErr: `Configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only.`, + wantErr: `configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`, }, { name: "ContentType field presents, header not present", diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go b/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go index 2df0c469a..6c6c36548 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go @@ -39,15 +39,16 @@ func ExampleHTTPProcessor() { exampleutil.RunExample(p, exampleutil.Example{ Summary: `Send a request to an HTTP server`, Description: ` -This example shows how to use the HTTP processor to send a record's ` + "`.Payload.After`" + ` field to a dummy HTTP server -that replies back with a greeting. +This example shows how to use the HTTP processor to send a record's ` + "`.Payload.After`" + ` field as a string to a dummy +HTTP server that replies back with a greeting. -The record's ` + "`.Payload.After`" + ` is overwritten with the response. Additionally, the example shows how to store the -value of the HTTP response's code in the metadata field ` + "`http_status`" + `.`, +The record's ` + "`.Payload.After`" + ` is overwritten with the response. Additionally, the example shows how to set a request +header and how to store the value of the HTTP response's code in the metadata field ` + "`http_status`" + `.`, Config: map[string]string{ - "request.url": srv.URL, - "request.body": ".Payload.After", - "response.status": `.Metadata["http_status"]`, + "request.url": srv.URL, + "request.body": `{{ printf "%s" .Payload.After }}`, + "response.status": `.Metadata["http_status"]`, + "headers.content-type": "application/json", }, Have: opencdc.Record{ Operation: opencdc.OperationUpdate, diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go b/pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go index fc86fe27e..b695cc430 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go @@ -45,12 +45,12 @@ func (httpConfig) Parameters() map[string]config.Parameter { }, "request.body": { Default: "", - Description: "Specifies which field from the input record should be used as the body in\nthe HTTP request.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", + Description: "Specifies the body that will be sent in the HTTP request. The field accepts\na Go [templates](https://pkg.go.dev/text/template) that's evaluated using the\n[opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nas input. By default, the body is empty.\n\nTo send the whole record as JSON you can use `{{ toJson . }}`.", Type: config.ParameterTypeString, Validations: []config.Validation{}, }, "request.contentType": { - Default: "application/json", + Default: "", Description: "Deprecated: use `headers.Content-Type` instead.", Type: config.ParameterTypeString, Validations: []config.Validation{}, @@ -63,7 +63,7 @@ func (httpConfig) Parameters() map[string]config.Parameter { }, "request.url": { Default: "", - Description: "URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).\nThe value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),\nso the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)\nto make it easier to write templates.", + Description: "URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).\nThe value provided to the template is [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record),\nso the template has access to all its fields (e.g. `.Position`, `.Key`, `.Metadata`, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)\nto make it easier to write templates.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, diff --git a/pkg/plugin/processor/builtin/impl/webhook/http_test.go b/pkg/plugin/processor/builtin/impl/webhook/http_test.go index 7ca3d7e48..ac551453d 100644 --- a/pkg/plugin/processor/builtin/impl/webhook/http_test.go +++ b/pkg/plugin/processor/builtin/impl/webhook/http_test.go @@ -24,10 +24,8 @@ import ( "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-processor-sdk" "github.com/conduitio/conduit/pkg/foundation/log" - "github.com/goccy/go-json" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/matryer/is" ) @@ -140,6 +138,23 @@ func TestHTTPProcessor_Configure(t *testing.T) { }, wantErr: "", }, + { + name: "content-type header", + config: map[string]string{ + "request.url": "http://example.com", + "headers.content-type": "application/json", + }, + wantErr: "", + }, + { + name: "invalid: content-type header and request.contentType", + config: map[string]string{ + "request.url": "http://example.com", + "request.contentType": "application/json", + "headers.content-type": "application/json", + }, + wantErr: `configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`, + }, { name: "invalid: same value of response.body and response.status", config: map[string]string{ @@ -179,18 +194,22 @@ func TestHTTPProcessor_Success(t *testing.T) { respBody := []byte("foo-bar/response") tests := []struct { - name string - config map[string]string - args []opencdc.Record - want []sdk.ProcessedRecord + name string + config map[string]string + status int + record opencdc.Record + wantBody string + want sdk.ProcessedRecord }{ { name: "structured data", config: map[string]string{ "request.method": "POST", - "request.body": ".", + "request.body": "{{ toJson . }}", }, - args: []opencdc.Record{{ + status: 200, + record: opencdc.Record{ + Operation: opencdc.OperationCreate, Payload: opencdc.Change{ Before: nil, After: opencdc.StructuredData{ @@ -198,30 +217,35 @@ func TestHTTPProcessor_Success(t *testing.T) { "baz": nil, }, }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `{"position":null,"operation":"create","metadata":null,"key":null,"payload":{"before":null,"after":{"bar":123,"baz":null}}}`, + want: sdk.SingleRecord{ + Operation: opencdc.OperationCreate, Payload: opencdc.Change{ After: opencdc.RawData(respBody), }, }, - }, }, { name: "raw data", config: map[string]string{ "request.method": "GET", - "request.body": ".", + "request.body": "{{ toJson . }}", }, - args: []opencdc.Record{{ + status: 200, + record: opencdc.Record{ + Operation: opencdc.OperationUpdate, Payload: opencdc.Change{ After: opencdc.RawData("random data"), }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `{"position":null,"operation":"update","metadata":null,"key":null,"payload":{"before":null,"after":"cmFuZG9tIGRhdGE="}}`, + want: sdk.SingleRecord{ + Operation: opencdc.OperationUpdate, Payload: opencdc.Change{ After: opencdc.RawData(respBody), }, - }}, + }, }, { name: "custom field for response body and status", @@ -229,33 +253,39 @@ func TestHTTPProcessor_Success(t *testing.T) { "response.body": ".Payload.After.body", "response.status": ".Payload.After.status", "request.method": "POST", - "request.body": ".", + "request.body": "{{ toJson . }}", }, - args: []opencdc.Record{{ + status: 404, + record: opencdc.Record{ + Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ After: opencdc.StructuredData{ "a key": "random data", }, }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `{"position":null,"operation":"snapshot","metadata":null,"key":null,"payload":{"before":null,"after":{"a key":"random data"}}}`, + want: sdk.SingleRecord{ + Operation: opencdc.OperationSnapshot, Payload: opencdc.Change{ After: opencdc.StructuredData{ "a key": "random data", "body": respBody, - "status": "200", + "status": "404", }, }, - }}, + }, }, { name: "request body: custom field, structured", config: map[string]string{ - "request.body": ".", + "request.body": "{{ toJson . }}", "response.body": ".Payload.After.httpResponse", "request.method": "POST", }, - args: []opencdc.Record{{ + status: 200, + record: opencdc.Record{ + Operation: opencdc.OperationDelete, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "before-key": "before-data", @@ -264,8 +294,10 @@ func TestHTTPProcessor_Success(t *testing.T) { "after-key": "after-data", }, }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `{"position":null,"operation":"delete","metadata":null,"key":null,"payload":{"before":{"before-key":"before-data"},"after":{"after-key":"after-data"}}}`, + want: sdk.SingleRecord{ + Operation: opencdc.OperationDelete, Payload: opencdc.Change{ Before: opencdc.StructuredData{ "before-key": "before-data", @@ -275,24 +307,26 @@ func TestHTTPProcessor_Success(t *testing.T) { "httpResponse": []byte("foo-bar/response"), }, }, - }}, + }, }, { name: "request body: custom field, raw data", config: map[string]string{ - "request.body": ".Payload.Before", + "request.body": `{{ printf "%s" .Payload.Before }}`, "response.body": ".Payload.After.httpResponse", "request.method": "POST", }, - args: []opencdc.Record{{ + status: 200, + record: opencdc.Record{ Payload: opencdc.Change{ Before: opencdc.RawData("uncooked data"), After: opencdc.StructuredData{ "after-key": "after-data", }, }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `uncooked data`, + want: sdk.SingleRecord{ Payload: opencdc.Change{ Before: opencdc.RawData("uncooked data"), After: opencdc.StructuredData{ @@ -300,53 +334,53 @@ func TestHTTPProcessor_Success(t *testing.T) { "httpResponse": []byte("foo-bar/response"), }, }, - }}, + }, }, { - name: "request body: custom field, []byte data", + name: "request body: static", config: map[string]string{ - "request.body": ".Payload.After.contents", + "request.body": `foo`, "response.body": ".Payload.After.httpResponse", "request.method": "POST", }, - args: []opencdc.Record{{ + status: 200, + record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ - "contents": []byte{15, 2, 20, 24}, - "after-key": "after-data", + "unused": "data", }, }, - }}, - want: []sdk.ProcessedRecord{sdk.SingleRecord{ + }, + wantBody: `foo`, + want: sdk.SingleRecord{ Payload: opencdc.Change{ After: opencdc.StructuredData{ - "after-key": "after-data", - "contents": []byte{15, 2, 20, 24}, + "unused": "data", "httpResponse": []byte("foo-bar/response"), }, }, - }}, + }, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { is := is.New(t) + ctx := context.Background() wantMethod := tc.config["request.method"] if wantMethod == "" { wantMethod = "GET" // default } - wantBody := getRequestBody(is, tc.config["request.body"], tc.args) - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { is.Equal(wantMethod, req.Method) gotBody, err := io.ReadAll(req.Body) is.NoErr(err) - is.Equal(wantBody, gotBody) + is.Equal(tc.wantBody, string(gotBody)) + resp.WriteHeader(tc.status) _, err = resp.Write(respBody) is.NoErr(err) })) @@ -354,15 +388,18 @@ func TestHTTPProcessor_Success(t *testing.T) { tc.config["request.url"] = srv.URL underTest := NewHTTPProcessor(log.Test(t)) - err := underTest.Configure(context.Background(), tc.config) + err := underTest.Configure(ctx, tc.config) is.NoErr(err) - got := underTest.Process(context.Background(), tc.args) - diff := cmp.Diff(tc.want, got, cmpopts.IgnoreUnexported(sdk.SingleRecord{})) - if diff != "" { - t.Logf("mismatch (-want +got): %s", diff) - t.Fail() - } + got := underTest.Process(ctx, []opencdc.Record{tc.record}) + is.Equal( + "", + cmp.Diff( + []sdk.ProcessedRecord{tc.want}, + got, + cmpopts.IgnoreUnexported(sdk.SingleRecord{}), + ), + ) }) } } @@ -372,44 +409,44 @@ func TestHTTPProcessor_URLTemplate(t *testing.T) { name string pathTmpl string // will be attached to the URL path string // expected result of the pathTmpl - args []opencdc.Record + record opencdc.Record }{ { name: "URL template, success", pathTmpl: "/{{.Payload.After.foo}}", path: "/123", - args: []opencdc.Record{{ + record: opencdc.Record{ Payload: opencdc.Change{ Before: nil, After: opencdc.StructuredData{ "foo": 123, }, }, - }}, + }, }, { name: "URL template, key with a hyphen", pathTmpl: `/{{index .Payload.After "foo-bar"}}`, path: "/baz", - args: []opencdc.Record{{ + record: opencdc.Record{ Payload: opencdc.Change{ After: opencdc.StructuredData{ "foo-bar": "baz", }, }, - }}, + }, }, { name: "URL template, path and query have spaces", pathTmpl: `/{{.Payload.Before.url}}`, path: "/what%20is%20conduit?id=my+id", - args: []opencdc.Record{{ + record: opencdc.Record{ Payload: opencdc.Change{ Before: opencdc.StructuredData{ "url": "what is conduit?id=my id", }, }, - }}, + }, }, } @@ -431,7 +468,7 @@ func TestHTTPProcessor_URLTemplate(t *testing.T) { err := underTest.Configure(context.Background(), config) is.NoErr(err) - got := underTest.Process(context.Background(), tc.args) + got := underTest.Process(context.Background(), []opencdc.Record{tc.record}) is.Equal(1, len(got)) _, ok := got[0].(sdk.SingleRecord) is.True(ok) @@ -477,7 +514,7 @@ func TestHTTPProcessor_RetrySuccess(t *testing.T) { "backoffRetry.min": "5ms", "backoffRetry.max": "10ms", "backoffRetry.factor": "1.2", - "request.body": ".", + "request.body": "{{ toJson . }}", } underTest := NewHTTPProcessor(log.Test(t)) @@ -529,60 +566,3 @@ func TestHTTPProcessor_RetryFail(t *testing.T) { is.True(isErr) // expected an error is.Equal(srvHandlerCount, 6) // expected 6 requests (1 regular and 5 retries) } - -func TestHTTPProcessor_FilterRecord(t *testing.T) { - is := is.New(t) - - wantMethod := "GET" - rec := []opencdc.Record{ - {Payload: opencdc.Change{After: opencdc.RawData("random data")}}, - } - - wantBody := getRequestBody(is, ".", rec) - - srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) { - is.Equal(wantMethod, req.Method) - - gotBody, err := io.ReadAll(req.Body) - is.NoErr(err) - is.Equal(wantBody, gotBody) - - resp.WriteHeader(http.StatusNoContent) - })) - defer srv.Close() - - config := map[string]string{ - "request.url": srv.URL, - "request.body": ".", - } - - underTest := NewHTTPProcessor(log.Test(t)) - err := underTest.Configure(context.Background(), config) - is.NoErr(err) - - got := underTest.Process(context.Background(), rec) - is.Equal(got, []sdk.ProcessedRecord{sdk.FilterRecord{}}) -} - -func getRequestBody(is *is.I, field string, records []opencdc.Record) []byte { - f := field - if f == "" { - f = "." - } - - refRes, err := sdk.NewReferenceResolver(f) - is.NoErr(err) - - ref, err := refRes.Resolve(&records[0]) - is.NoErr(err) - - val := ref.Get() - if raw, ok := val.(opencdc.RawData); ok { - return raw.Bytes() - } - - bytes, err := json.Marshal(ref.Get()) - is.NoErr(err) - - return bytes -} diff --git a/pkg/plugin/processor/builtin/internal/exampleutil/specs/webhook.http.json b/pkg/plugin/processor/builtin/internal/exampleutil/specs/webhook.http.json index 54a5e2ac2..0bcf6702e 100644 --- a/pkg/plugin/processor/builtin/internal/exampleutil/specs/webhook.http.json +++ b/pkg/plugin/processor/builtin/internal/exampleutil/specs/webhook.http.json @@ -2,7 +2,7 @@ "specification": { "name": "webhook.http", "summary": "Trigger an HTTP request for every record.", - "description": "A processor that sends an HTTP request to the specified URL, retries on error and \nsaves the response body and, optionally, the response status.", + "description": "A processor that sends an HTTP request to the specified URL, retries on error and \nsaves the response body and, optionally, the response status.\n\nA status code over 500 is regarded as an error and will cause the processor to retry the request.\nThe processor will retry the request according to the backoff configuration.", "version": "v0.1.0", "author": "Meroxa, Inc.", "parameters": { @@ -48,12 +48,12 @@ }, "request.body": { "default": "", - "description": "Specifies which field from the input record should be used as the body in\nthe HTTP request.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", + "description": "Specifies the body that will be sent in the HTTP request. The field accepts\na Go [templates](https://pkg.go.dev/text/template) that's evaluated using the\n[opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nas input. By default, the body is empty.\n\nTo send the whole record as JSON you can use `{{ toJson . }}`.", "type": "string", "validations": [] }, "request.contentType": { - "default": "application/json", + "default": "", "description": "Deprecated: use `headers.Content-Type` instead.", "type": "string", "validations": [] @@ -66,7 +66,7 @@ }, "request.url": { "default": "", - "description": "URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).\nThe value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),\nso the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)\nto make it easier to write templates.", + "description": "URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).\nThe value provided to the template is [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record),\nso the template has access to all its fields (e.g. `.Position`, `.Key`, `.Metadata`, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)\nto make it easier to write templates.", "type": "string", "validations": [ { @@ -92,14 +92,14 @@ "examples": [ { "summary": "Send a request to an HTTP server", - "description": "\nThis example shows how to use the HTTP processor to send a record's `.Payload.After` field to a dummy HTTP server\nthat replies back with a greeting.\n\nThe record's `.Payload.After` is overwritten with the response. Additionally, the example shows how to store the\nvalue of the HTTP response's code in the metadata field `http_status`.", + "description": "\nThis example shows how to use the HTTP processor to send a record's `.Payload.After` field as a string to a dummy\nHTTP server that replies back with a greeting.\n\nThe record's `.Payload.After` is overwritten with the response. Additionally, the example shows how to set a request\nheader and how to store the value of the HTTP response's code in the metadata field `http_status`.", "config": { "backoffRetry.count": "0", "backoffRetry.factor": "2", "backoffRetry.max": "5s", "backoffRetry.min": "100ms", - "request.body": ".Payload.After", - "request.contentType": "application/json", + "headers.content-type": "application/json", + "request.body": "{{ printf \"%s\" .Payload.After }}", "request.method": "GET", "request.url": "http://127.0.0.1:54321", "response.body": ".Payload.After", @@ -136,7 +136,6 @@ "backoffRetry.factor": "2", "backoffRetry.max": "5s", "backoffRetry.min": "100ms", - "request.contentType": "application/json", "request.method": "GET", "request.url": "http://127.0.0.1:54321/{{.Payload.After.name}}", "response.body": ".Payload.After.response"