Skip to content

Commit

Permalink
add dynamic url to HTTP processor + fix RawData bug (#1498)
Browse files Browse the repository at this point in the history
* add dynamic url to HTTP processor + fix RawData bug

* fix conflicts

* make generate

* add example test

* address reviews + change default value for request.body and request.method

* make generate

* go mod tidy
  • Loading branch information
maha-hajja authored Apr 18, 2024
1 parent 73217be commit 4de440b
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 45 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/conduitio/conduit

go 1.22.2
go 1.21.1

toolchain go1.22.1

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.33.0-20240416201300-ca2899286658.1
Expand Down
53 changes: 44 additions & 9 deletions pkg/plugin/processor/builtin/impl/webhook/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"io"
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -33,10 +36,13 @@ import (
)

type httpConfig struct {
// URL used in the HTTP request.
// URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).
// The value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),
// so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)
// to make it easier to write templates.
URL string `json:"request.url" validate:"required"`
// Method is the HTTP request method to be used.
Method string `json:"request.method" default:"POST"`
Method string `json:"request.method" default:"GET"`
// The value of the `Content-Type` header.
ContentType string `json:"request.contentType" default:"application/json"`

Expand All @@ -53,7 +59,7 @@ type httpConfig struct {
// the HTTP request.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
RequestBodyRef string `json:"request.body" default:"."`
RequestBodyRef string `json:"request.body"`
// Specifies in which field should the response body be saved.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
Expand All @@ -76,6 +82,7 @@ type httpProcessor struct {
requestBodyRef *sdk.ReferenceResolver
responseBodyRef *sdk.ReferenceResolver
responseStatusRef *sdk.ReferenceResolver
urlTmpl *template.Template
}

func NewHTTPProcessor(l log.CtxLogger) sdk.Processor {
Expand Down Expand Up @@ -104,11 +111,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return cerrors.New("invalid configuration: response.body and response.status set to same field")
}

requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef)
if err != nil {
return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err)
if p.config.RequestBodyRef != "" {
requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef)
if err != nil {
return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err)
}
p.requestBodyRef = &requestBodyRef
}
p.requestBodyRef = &requestBodyRef

responseBodyRef, err := sdk.NewReferenceResolver(p.config.ResponseBodyRef)
if err != nil {
Expand All @@ -124,6 +133,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
}
p.responseStatusRef = &responseStatusRef
}
if strings.Contains(p.config.URL, "{{") {
// create URL template
p.urlTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.URL)
if err != nil {
return cerrors.Errorf("error while parsing the URL template: %w", err)
}
}

// preflight check
_, err = http.NewRequest(p.config.Method, p.config.URL, bytes.NewReader([]byte{}))
Expand All @@ -139,6 +155,18 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return nil
}

func (p *httpProcessor) EvaluateURL(rec opencdc.Record) (string, error) {
if p.urlTmpl == nil {
return p.config.URL, nil
}
var b bytes.Buffer
err := p.urlTmpl.Execute(&b, rec)
if err != nil {
return "", cerrors.Errorf("error while evaluating URL template: %w", err)
}
return b.String(), nil
}

func (p *httpProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
Expand Down Expand Up @@ -223,7 +251,7 @@ func (p *httpProcessor) processRecord(ctx context.Context, r opencdc.Record) (sd
}

// Set response body
err = p.setField(&r, p.responseBodyRef, opencdc.RawData(body))
err = p.setField(&r, p.responseBodyRef, body)
if err != nil {
return nil, cerrors.Errorf("failed setting response body: %w", err)
}
Expand All @@ -241,10 +269,14 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
return nil, cerrors.Errorf("failed getting request body: %w", err)
}

url, err := p.EvaluateURL(r)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(
ctx,
p.config.Method,
p.config.URL,
url,
bytes.NewReader(reqBody),
)
if err != nil {
Expand All @@ -260,6 +292,9 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
// requestBody returns the request body for the given record,
// using the configured field reference (see: request.body configuration parameter).
func (p *httpProcessor) requestBody(r opencdc.Record) ([]byte, error) {
if p.requestBodyRef == nil {
return nil, nil
}
ref, err := p.requestBodyRef.Resolve(&r)
if err != nil {
return nil, cerrors.Errorf("failed resolving request.body: %w", err)
Expand Down
81 changes: 75 additions & 6 deletions pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"net"
"net/http"
"net/http/httptest"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
"strings"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
conduit_log "github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
)

//nolint:govet // we're using a more descriptive name of example
Expand All @@ -50,11 +50,15 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
"response.status": `.Metadata["http_status"]`,
},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.RawData("world"),
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Position: opencdc.Position("pos-1"),
Metadata: map[string]string{
"http_status": "200",
},
Expand All @@ -70,8 +74,8 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
// +++ after
// @@ -1,10 +1,12 @@
// {
// "position": null,
// "operation": "Operation(0)",
// "position": "cG9zLTE=",
// "operation": "update",
// - "metadata": null,
// + "metadata": {
// + "http_status": "200"
Expand All @@ -85,15 +89,80 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
// }
}

//nolint:govet // we're using a more descriptive name of example
func ExampleHTTPProcessor_DynamicURL() {
p := NewHTTPProcessor(conduit_log.Nop())

srv := newTestServer()
// Stop the server on return from the function.
defer srv.Close()

exampleutil.RunExample(p, exampleutil.Example{
Summary: `Send a request to an HTTP server with a dynamic URL`,
Description: `
This example shows how to use the HTTP processor to use a record's ` + "`.Payload.After.name`" + ` field in the URL path,
send it to a dummy HTTP server, and get a greeting with the name back.
The response will be written under the record's ` + "`.Payload.After.response`.",
Config: map[string]string{
"request.url": srv.URL + "/{{.Payload.After.name}}",
"response.body": ".Payload.After.response",
},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
},
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
"response": []byte("hello, foo!"),
},
},
},
})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,12 +1,13 @@
// {
// "position": "cG9zLTE=",
// "operation": "create",
// "metadata": null,
// "key": null,
// "payload": {
// "before": null,
// "after": {
// - "name": "foo"
// + "name": "foo",
// + "response": "aGVsbG8sIGZvbyE="
// }
// }
// }
}

func newTestServer() *httptest.Server {
l, err := net.Listen("tcp", "127.0.0.1:54321")
if err != nil {
log.Fatalf("failed starting test server on port 54321: %v", err)
}

srv := httptest.NewUnstartedServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
if req.URL.Path != "/" {
_, _ = resp.Write([]byte("hello, " + strings.TrimPrefix(req.URL.Path, "/") + "!"))
} else {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
}
}))

// NewUnstartedServer creates a listener. Close that listener and replace
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4de440b

Please sign in to comment.