Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dynamic url to HTTP processor + fix RawData bug #1498

Merged
merged 8 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/conduitio/conduit

go 1.22.2
go 1.21.1

toolchain go1.22.1

require (
buf.build/gen/go/grpc-ecosystem/grpc-gateway/protocolbuffers/go v1.33.0-20240416201300-ca2899286658.1
Expand Down
53 changes: 44 additions & 9 deletions pkg/plugin/processor/builtin/impl/webhook/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"io"
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -33,10 +36,13 @@ import (
)

type httpConfig struct {
// URL used in the HTTP request.
// URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).
// The value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),
// so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)
// to make it easier to write templates.
URL string `json:"request.url" validate:"required"`
// Method is the HTTP request method to be used.
Method string `json:"request.method" default:"POST"`
Method string `json:"request.method" default:"GET"`
// The value of the `Content-Type` header.
ContentType string `json:"request.contentType" default:"application/json"`

Expand All @@ -53,7 +59,7 @@ type httpConfig struct {
// the HTTP request.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
RequestBodyRef string `json:"request.body" default:"."`
RequestBodyRef string `json:"request.body"`
// Specifies in which field should the response body be saved.
//
// For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).
Expand All @@ -76,6 +82,7 @@ type httpProcessor struct {
requestBodyRef *sdk.ReferenceResolver
responseBodyRef *sdk.ReferenceResolver
responseStatusRef *sdk.ReferenceResolver
urlTmpl *template.Template
}

func NewHTTPProcessor(l log.CtxLogger) sdk.Processor {
Expand Down Expand Up @@ -104,11 +111,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return cerrors.New("invalid configuration: response.body and response.status set to same field")
}

requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef)
if err != nil {
return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err)
if p.config.RequestBodyRef != "" {
requestBodyRef, err := sdk.NewReferenceResolver(p.config.RequestBodyRef)
if err != nil {
return cerrors.Errorf("failed parsing request.body %v: %w", p.config.RequestBodyRef, err)
}
p.requestBodyRef = &requestBodyRef
}
p.requestBodyRef = &requestBodyRef

responseBodyRef, err := sdk.NewReferenceResolver(p.config.ResponseBodyRef)
if err != nil {
Expand All @@ -124,6 +133,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
}
p.responseStatusRef = &responseStatusRef
}
if strings.Contains(p.config.URL, "{{") {
// create URL template
p.urlTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.URL)
if err != nil {
return cerrors.Errorf("error while parsing the URL template: %w", err)
}
}

// preflight check
_, err = http.NewRequest(p.config.Method, p.config.URL, bytes.NewReader([]byte{}))
Expand All @@ -139,6 +155,18 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return nil
}

func (p *httpProcessor) EvaluateURL(rec opencdc.Record) (string, error) {
if p.urlTmpl == nil {
return p.config.URL, nil
}
var b bytes.Buffer
err := p.urlTmpl.Execute(&b, rec)
if err != nil {
return "", cerrors.Errorf("error while evaluating URL template: %w", err)
}
return b.String(), nil
}

func (p *httpProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
Expand Down Expand Up @@ -223,7 +251,7 @@ func (p *httpProcessor) processRecord(ctx context.Context, r opencdc.Record) (sd
}

// Set response body
err = p.setField(&r, p.responseBodyRef, opencdc.RawData(body))
err = p.setField(&r, p.responseBodyRef, body)
if err != nil {
return nil, cerrors.Errorf("failed setting response body: %w", err)
}
Expand All @@ -241,10 +269,14 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
return nil, cerrors.Errorf("failed getting request body: %w", err)
}

url, err := p.EvaluateURL(r)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(
ctx,
p.config.Method,
p.config.URL,
url,
bytes.NewReader(reqBody),
)
if err != nil {
Expand All @@ -260,6 +292,9 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
// requestBody returns the request body for the given record,
// using the configured field reference (see: request.body configuration parameter).
func (p *httpProcessor) requestBody(r opencdc.Record) ([]byte, error) {
if p.requestBodyRef == nil {
return nil, nil
}
ref, err := p.requestBodyRef.Resolve(&r)
if err != nil {
return nil, cerrors.Errorf("failed resolving request.body: %w", err)
Expand Down
81 changes: 75 additions & 6 deletions pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"net"
"net/http"
"net/http/httptest"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
"strings"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
conduit_log "github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
)

//nolint:govet // we're using a more descriptive name of example
Expand All @@ -50,11 +50,15 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
"response.status": `.Metadata["http_status"]`,
},
Have: opencdc.Record{
Operation: opencdc.OperationUpdate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.RawData("world"),
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationUpdate,
Position: opencdc.Position("pos-1"),
Metadata: map[string]string{
"http_status": "200",
},
Expand All @@ -70,8 +74,8 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
// +++ after
// @@ -1,10 +1,12 @@
// {
// "position": null,
// "operation": "Operation(0)",
// "position": "cG9zLTE=",
// "operation": "update",
// - "metadata": null,
// + "metadata": {
// + "http_status": "200"
Expand All @@ -85,15 +89,80 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
// }
}

//nolint:govet // we're using a more descriptive name of example
func ExampleHTTPProcessor_DynamicURL() {
p := NewHTTPProcessor(conduit_log.Nop())

srv := newTestServer()
// Stop the server on return from the function.
defer srv.Close()

exampleutil.RunExample(p, exampleutil.Example{
Summary: `Send a request to an HTTP server with a dynamic URL`,
Description: `
This example shows how to use the HTTP processor to use a record's ` + "`.Payload.After.name`" + ` field in the URL path,
send it to a dummy HTTP server, and get a greeting with the name back.

The response will be written under the record's ` + "`.Payload.After.response`.",
Config: map[string]string{
"request.url": srv.URL + "/{{.Payload.After.name}}",
"response.body": ".Payload.After.response",
},
Have: opencdc.Record{
Operation: opencdc.OperationCreate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
},
},
},
Want: sdk.SingleRecord{
Operation: opencdc.OperationCreate,
Position: opencdc.Position("pos-1"),
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
"response": []byte("hello, foo!"),
},
},
},
})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,12 +1,13 @@
// {
// "position": "cG9zLTE=",
// "operation": "create",
// "metadata": null,
// "key": null,
// "payload": {
// "before": null,
// "after": {
// - "name": "foo"
// + "name": "foo",
// + "response": "aGVsbG8sIGZvbyE="
// }
// }
// }
}

func newTestServer() *httptest.Server {
l, err := net.Listen("tcp", "127.0.0.1:54321")
if err != nil {
log.Fatalf("failed starting test server on port 54321: %v", err)
}

srv := httptest.NewUnstartedServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
if req.URL.Path != "/" {
_, _ = resp.Write([]byte("hello, " + strings.TrimPrefix(req.URL.Path, "/") + "!"))
} else {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
}
}))

// NewUnstartedServer creates a listener. Close that listener and replace
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/processor/builtin/impl/webhook/http_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading