Skip to content

Commit

Permalink
Webhook processor fixes (#1597)
Browse files Browse the repository at this point in the history
* remove default for deprecated field, add default content type manually, update example

* allow http status <500

* update link to opencdc record

* allow go template in request.body

---------

Co-authored-by: Haris Osmanagić <[email protected]>
  • Loading branch information
lovromazgon and hariso authored May 16, 2024
1 parent a79386e commit 4641fb4
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 203 deletions.
133 changes: 70 additions & 63 deletions pkg/plugin/processor/builtin/impl/webhook/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,19 @@ import (
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/goccy/go-json"
"github.com/jpillora/backoff"
)

type httpConfig struct {
// URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).
// The value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),
// so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)
// The value provided to the template is [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record),
// so the template has access to all its fields (e.g. `.Position`, `.Key`, `.Metadata`, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)
// to make it easier to write templates.
URL string `json:"request.url" validate:"required"`
// Method is the HTTP request method to be used.
Method string `json:"request.method" default:"GET"`
// Deprecated: use `headers.Content-Type` instead.
ContentType string `json:"request.contentType" default:"application/json"`
ContentType string `json:"request.contentType"`
// Headers to add to the request, use `headers.*` to specify the header and its value (e.g. `headers.Authorization: "Bearer key"`).
Headers map[string]string `json:"headers"`

Expand All @@ -58,11 +57,13 @@ type httpConfig struct {
// The maximum waiting time before retrying.
BackoffRetryMax time.Duration `json:"backoffRetry.max" default:"5s"`

// Specifies which field from the input record should be used as the body in
// the HTTP request.
// Specifies the body that will be sent in the HTTP request. The field accepts
// a Go [templates](https://pkg.go.dev/text/template) that's evaluated using the
// [opencdc.Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)
// as input. By default, the body is empty.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
RequestBodyRef string `json:"request.body"`
// To send the whole record as JSON you can use `{{ toJson . }}`.
RequestBodyTmpl string `json:"request.body"`
// Specifies in which field should the response body be saved.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
Expand All @@ -79,20 +80,25 @@ func (c *httpConfig) parseHeaders() error {
c.Headers = make(map[string]string)
}

if c.ContentType == "" {
return nil // Nothing to replace in headers
}

var isContentTypeSet bool
for name, _ := range c.Headers {
if strings.ToLower(name) == "content-type" {
return cerrors.Errorf("Configuration error, cannot provide both \"request.contentType\" and \"headers.Content-Type\", use \"headers.Content-Type\" only.")
isContentTypeSet = true
break
}
}

c.Headers["Content-Type"] = c.ContentType
// the ContentType field is deprecated,
// so we're preparing for completely removing it in a later release
c.ContentType = ""
switch {
case isContentTypeSet && c.ContentType != "":
return cerrors.Errorf(`configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`)
case !isContentTypeSet && c.ContentType != "":
// Use contents of deprecated field.
c.Headers["Content-Type"] = c.ContentType
c.ContentType = ""
case !isContentTypeSet:
// By default, we set the Content-Type to application/json.
c.Headers["Content-Type"] = "application/json"
}

return nil
}
Expand All @@ -105,10 +111,11 @@ type httpProcessor struct {
config httpConfig
backoffCfg *backoff.Backoff

requestBodyRef *sdk.ReferenceResolver
urlTmpl *template.Template
requestBodyTmpl *template.Template

responseBodyRef *sdk.ReferenceResolver
responseStatusRef *sdk.ReferenceResolver
urlTmpl *template.Template
}

func NewHTTPProcessor(l log.CtxLogger) sdk.Processor {
Expand All @@ -120,7 +127,10 @@ func (p *httpProcessor) Specification() (sdk.Specification, error) {
Name: "webhook.http",
Summary: "Trigger an HTTP request for every record.",
Description: `A processor that sends an HTTP request to the specified URL, retries on error and
saves the response body and, optionally, the response status.`,
saves the response body and, optionally, the response status.
A status code over 500 is regarded as an error and will cause the processor to retry the request.
The processor will retry the request according to the backoff configuration.`,
Version: "v0.1.0",
Author: "Meroxa, Inc.",
Parameters: httpConfig{}.Parameters(),
Expand All @@ -142,12 +152,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return cerrors.New("invalid configuration: response.body and response.status set to same field")
}

if p.config.RequestBodyRef != "" {
requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef)
// parse request body template
if strings.Contains(p.config.RequestBodyTmpl, "{{") {
// create URL template
p.requestBodyTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.RequestBodyTmpl)
if err != nil {
return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err)
return cerrors.Errorf("failed parsing request.body %v: %w", err)
}
p.requestBodyRef = &requestBodyRef
}

responseBodyRef, err := sdk.NewReferenceResolver(p.config.ResponseBodyRef)
Expand All @@ -164,6 +175,8 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
}
p.responseStatusRef = &responseStatusRef
}

// parse URL template
if strings.Contains(p.config.URL, "{{") {
// create URL template
p.urlTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.URL)
Expand All @@ -186,27 +199,6 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return nil
}

func (p *httpProcessor) EvaluateURL(rec opencdc.Record) (string, error) {
if p.urlTmpl == nil {
return p.config.URL, nil
}
var b bytes.Buffer
err := p.urlTmpl.Execute(&b, rec)
if err != nil {
return "", cerrors.Errorf("error while evaluating URL template: %w", err)
}
u, err := url.Parse(b.String())
if err != nil {
return "", cerrors.Errorf("error parsing URL: %w", err)
}
q, err := url.ParseQuery(u.RawQuery)
if err != nil {
return "", cerrors.Errorf("error parsing URL query: %w", err)
}
u.RawQuery = q.Encode()
return u.String(), nil
}

func (p *httpProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
Expand Down Expand Up @@ -281,14 +273,10 @@ func (p *httpProcessor) processRecord(ctx context.Context, r opencdc.Record) (sd
return nil, cerrors.Errorf("error reading response body: %w", err)
}

if resp.StatusCode >= 300 {
// regard status codes over 299 as errors
if resp.StatusCode >= 500 {
// regard status codes over 500 as errors
return nil, cerrors.Errorf("error status code %v (body: %q)", resp.StatusCode, string(body))
}
// skip if body has no content
if resp.StatusCode == http.StatusNoContent {
return sdk.FilterRecord{}, nil
}

// Set response body
err = p.setField(&r, p.responseBodyRef, body)
Expand All @@ -309,7 +297,7 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
return nil, cerrors.Errorf("failed getting request body: %w", err)
}

url, err := p.EvaluateURL(r)
url, err := p.evaluateURL(r)
if err != nil {
return nil, err
}
Expand All @@ -331,25 +319,44 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
return req, nil
}

func (p *httpProcessor) evaluateURL(rec opencdc.Record) (string, error) {
if p.urlTmpl == nil {
return p.config.URL, nil
}
var b bytes.Buffer
err := p.urlTmpl.Execute(&b, rec)
if err != nil {
return "", cerrors.Errorf("error while evaluating URL template: %w", err)
}
u, err := url.Parse(b.String())
if err != nil {
return "", cerrors.Errorf("error parsing URL: %w", err)
}
q, err := url.ParseQuery(u.RawQuery)
if err != nil {
return "", cerrors.Errorf("error parsing URL query: %w", err)
}
u.RawQuery = q.Encode()
return u.String(), nil
}

// requestBody returns the request body for the given record,
// using the configured field reference (see: request.body configuration parameter).
func (p *httpProcessor) requestBody(r opencdc.Record) ([]byte, error) {
if p.requestBodyRef == nil {
if p.requestBodyTmpl == nil {
if p.config.RequestBodyTmpl != "" {
return []byte(p.config.RequestBodyTmpl), nil
}
return nil, nil
}
ref, err := p.requestBodyRef.Resolve(&r)
if err != nil {
return nil, cerrors.Errorf("failed resolving request.body: %w", err)
}

val := ref.Get()
// Raw byte data should be sent as it is, as that's most often what we want
// If we json.Marshal it first, it will be Base64-encoded.
if raw, ok := val.(opencdc.RawData); ok {
return raw.Bytes(), nil
var b bytes.Buffer
err := p.requestBodyTmpl.Execute(&b, r)
if err != nil {
return nil, cerrors.Errorf("error while evaluating request body template: %w", err)
}

return json.Marshal(val)
return b.Bytes(), nil
}

func (p *httpProcessor) setField(r *opencdc.Record, refRes *sdk.ReferenceResolver, data any) error {
Expand Down
9 changes: 5 additions & 4 deletions pkg/plugin/processor/builtin/impl/webhook/http_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package webhook

import (
"github.com/matryer/is"
"testing"

"github.com/matryer/is"
)

func TestHTTPConfig_ValidateHeaders(t *testing.T) {
func TestHTTPConfig_ParseHeaders(t *testing.T) {
testCases := []struct {
name string
input httpConfig
Expand All @@ -34,7 +35,7 @@ func TestHTTPConfig_ValidateHeaders(t *testing.T) {
"Content-Type": "application/json",
},
},
wantErr: `Configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only.`,
wantErr: `configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`,
},
{
name: "ContentType field present, header present, different case",
Expand All @@ -44,7 +45,7 @@ func TestHTTPConfig_ValidateHeaders(t *testing.T) {
"content-type": "application/json",
},
},
wantErr: `Configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only.`,
wantErr: `configuration error, cannot provide both "request.contentType" and "headers.Content-Type", use "headers.Content-Type" only`,
},
{
name: "ContentType field presents, header not present",
Expand Down
15 changes: 8 additions & 7 deletions pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ func ExampleHTTPProcessor() {
exampleutil.RunExample(p, exampleutil.Example{
Summary: `Send a request to an HTTP server`,
Description: `
This example shows how to use the HTTP processor to send a record's ` + "`.Payload.After`" + ` field to a dummy HTTP server
that replies back with a greeting.
This example shows how to use the HTTP processor to send a record's ` + "`.Payload.After`" + ` field as a string to a dummy
HTTP server that replies back with a greeting.
The record's ` + "`.Payload.After`" + ` is overwritten with the response. Additionally, the example shows how to store the
value of the HTTP response's code in the metadata field ` + "`http_status`" + `.`,
The record's ` + "`.Payload.After`" + ` is overwritten with the response. Additionally, the example shows how to set a request
header and how to store the value of the HTTP response's code in the metadata field ` + "`http_status`" + `.`,
Config: map[string]string{
"request.url": srv.URL,
"request.body": ".Payload.After",
"response.status": `.Metadata["http_status"]`,
"request.url": srv.URL,
"request.body": `{{ printf "%s" .Payload.After }}`,
"response.status": `.Metadata["http_status"]`,
"headers.content-type": "application/json",
},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4641fb4

Please sign in to comment.