Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add dynamic url to HTTP processor + fix RawData bug #1498

Merged
merged 8 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions pkg/plugin/processor/builtin/impl/webhook/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ import (
"io"
"net/http"
"strconv"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig/v3"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
Expand All @@ -33,7 +36,10 @@ import (
)

type httpConfig struct {
// URL used in the HTTP request.
// URL is a Go template expression for the URL used in the HTTP request, using Go [templates](https://pkg.go.dev/text/template).
// The value provided to the template is [opencdc.Record](https://github.com/ConduitIO/conduit-commons/blob/59ecfbe5d5be2ac4cd9a674d274862d164123f36/opencdc/record.go#L30),
// so the template has access to all its fields (e.g. .Position, .Key, .Metadata, and so on). We also inject all template functions provided by [sprig](https://masterminds.github.io/sprig/)
// to make it easier to write templates.
URL string `json:"request.url" validate:"required"`
// Method is the HTTP request method to be used.
Method string `json:"request.method" default:"POST"`
Expand Down Expand Up @@ -76,6 +82,7 @@ type httpProcessor struct {
requestBodyRef *sdk.ReferenceResolver
responseBodyRef *sdk.ReferenceResolver
responseStatusRef *sdk.ReferenceResolver
urlTmpl *template.Template
}

func NewHTTPProcessor(l log.CtxLogger) sdk.Processor {
Expand Down Expand Up @@ -124,6 +131,13 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
}
p.responseStatusRef = &responseStatusRef
}
if strings.Contains(p.config.URL, "{{") {
// create URL template
p.urlTmpl, err = template.New("").Funcs(sprig.FuncMap()).Parse(p.config.URL)
if err != nil {
return cerrors.Errorf("error while parsing the URL template: %w", err)
}
}

// preflight check
_, err = http.NewRequest(p.config.Method, p.config.URL, bytes.NewReader([]byte{}))
Expand All @@ -139,6 +153,18 @@ func (p *httpProcessor) Configure(ctx context.Context, m map[string]string) erro
return nil
}

func (p *httpProcessor) EvaluateURL(rec opencdc.Record) (string, error) {
if p.urlTmpl == nil {
return p.config.URL, nil
}
var b bytes.Buffer
err := p.urlTmpl.Execute(&b, rec)
if err != nil {
return p.config.URL, cerrors.Errorf("error while evaluating URL template: %w", err)
maha-hajja marked this conversation as resolved.
Show resolved Hide resolved
}
return b.String(), nil
}

func (p *httpProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
out := make([]sdk.ProcessedRecord, 0, len(records))
for _, rec := range records {
Expand Down Expand Up @@ -223,7 +249,7 @@ func (p *httpProcessor) processRecord(ctx context.Context, r opencdc.Record) (sd
}

// Set response body
err = p.setField(&r, p.responseBodyRef, opencdc.RawData(body))
err = p.setField(&r, p.responseBodyRef, body)
if err != nil {
return nil, cerrors.Errorf("failed setting response body: %w", err)
}
Expand All @@ -241,10 +267,14 @@ func (p *httpProcessor) buildRequest(ctx context.Context, r opencdc.Record) (*ht
return nil, cerrors.Errorf("failed getting request body: %w", err)
}

url, err := p.EvaluateURL(r)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(
ctx,
p.config.Method,
p.config.URL,
url,
bytes.NewReader(reqBody),
)
if err != nil {
Expand Down
69 changes: 65 additions & 4 deletions pkg/plugin/processor/builtin/impl/webhook/http_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"net"
"net/http"
"net/http/httptest"

"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
"strings"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-processor-sdk"
conduit_log "github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/exampleutil"
)

//nolint:govet // we're using a more descriptive name of example
Expand Down Expand Up @@ -85,15 +85,76 @@ value of the HTTP response's code in the metadata field ` + "`http_status`" + `.
// }
}

//nolint:govet // we're using a more descriptive name of example
func ExampleHTTPProcessor_DynamicURL() {
p := NewHTTPProcessor(conduit_log.Nop())

srv := newTestServer()
// Stop the server on return from the function.
defer srv.Close()

exampleutil.RunExample(p, exampleutil.Example{
Summary: `Send a request to an HTTP server with a dynamic URL`,
Description: `
This example shows how to use the HTTP processor to use a record's ` + "`.Payload.After.name`" + ` field in the URL path,
send it to a dummy HTTP server, and get a greeting with the name back.

The response will be written under the record's ` + "`.Payload.After.response`.",
Config: map[string]string{
"request.url": srv.URL + "/{{.Payload.After.name}}",
"response.body": ".Payload.After.response",
},
Have: opencdc.Record{
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
},
},
},
Want: sdk.SingleRecord{
Payload: opencdc.Change{
After: opencdc.StructuredData{
"name": "foo",
"response": []byte("hello, foo!"),
},
},
},
})

// Output:
// processor transformed record:
// --- before
// +++ after
// @@ -1,12 +1,13 @@
// {
// "position": null,
// "operation": "Operation(0)",
maha-hajja marked this conversation as resolved.
Show resolved Hide resolved
// "metadata": null,
// "key": null,
// "payload": {
// "before": null,
// "after": {
// - "name": "foo"
// + "name": "foo",
// + "response": "aGVsbG8sIGZvbyE="
// }
// }
// }
}

func newTestServer() *httptest.Server {
l, err := net.Listen("tcp", "127.0.0.1:54321")
if err != nil {
log.Fatalf("failed starting test server on port 54321: %v", err)
}

srv := httptest.NewUnstartedServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
if req.URL.Path != "/" {
_, _ = resp.Write([]byte("hello, " + strings.TrimPrefix(req.URL.Path, "/") + "!"))
} else {
body, _ := io.ReadAll(req.Body)
_, _ = resp.Write([]byte("hello, " + string(body)))
}
}))

// NewUnstartedServer creates a listener. Close that listener and replace
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 111 additions & 4 deletions pkg/plugin/processor/builtin/impl/webhook/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ func TestHTTPProcessor_Configure(t *testing.T) {
},
wantErr: "",
},
{
name: "valid url template returns processor",
config: map[string]string{
"request.url": "http://example.com/{{.Payload.After}}",
},
wantErr: "",
},
{
name: "invalid url template with a hyphen",
config: map[string]string{
"request.url": "http://example.com/{{.Payload.After.my-key}}",
},
wantErr: "error while parsing the URL template: template: :1: bad character U+002D '-'",
},
{
name: "valid url and method returns processor",
config: map[string]string{
Expand Down Expand Up @@ -220,7 +234,7 @@ func TestHTTPProcessor_Success(t *testing.T) {
Payload: opencdc.Change{
After: opencdc.StructuredData{
"a key": "random data",
"body": opencdc.RawData(respBody),
"body": respBody,
"status": "200",
},
},
Expand Down Expand Up @@ -249,7 +263,7 @@ func TestHTTPProcessor_Success(t *testing.T) {
},
After: opencdc.StructuredData{
"after-key": "after-data",
"httpResponse": opencdc.RawData("foo-bar/response"),
"httpResponse": []byte("foo-bar/response"),
},
},
}},
Expand All @@ -273,7 +287,7 @@ func TestHTTPProcessor_Success(t *testing.T) {
Before: opencdc.RawData("uncooked data"),
After: opencdc.StructuredData{
"after-key": "after-data",
"httpResponse": opencdc.RawData("foo-bar/response"),
"httpResponse": []byte("foo-bar/response"),
},
},
}},
Expand All @@ -297,7 +311,7 @@ func TestHTTPProcessor_Success(t *testing.T) {
After: opencdc.StructuredData{
"after-key": "after-data",
"contents": []byte{15, 2, 20, 24},
"httpResponse": opencdc.RawData("foo-bar/response"),
"httpResponse": []byte("foo-bar/response"),
},
},
}},
Expand Down Expand Up @@ -342,6 +356,99 @@ func TestHTTPProcessor_Success(t *testing.T) {
}
}

func TestHTTPProcessor_URLTemplate(t *testing.T) {
respBody := []byte("foo-bar/response")

tests := []struct {
name string
config map[string]string
pathTmpl string // will be attached to the URL
path string // expected result of the pathTmpl
args []opencdc.Record
want []sdk.ProcessedRecord
}{
{
name: "URL template, success",
config: map[string]string{"request.method": "GET"},
pathTmpl: "/{{.Payload.After.foo}}",
path: "/123",
args: []opencdc.Record{{
Payload: opencdc.Change{
Before: nil,
After: opencdc.StructuredData{
"foo": 123,
},
},
}},
want: []sdk.ProcessedRecord{sdk.SingleRecord{
Payload: opencdc.Change{
After: opencdc.RawData(respBody),
},
},
},
},
{
name: "URL template, key with a hyphen",
config: map[string]string{},
pathTmpl: `/{{index .Payload.After "foo-bar"}}`,
path: "/baz",
args: []opencdc.Record{{
Payload: opencdc.Change{
After: opencdc.StructuredData{
"foo-bar": "baz",
},
},
}},
want: []sdk.ProcessedRecord{sdk.SingleRecord{
Payload: opencdc.Change{
After: opencdc.RawData(respBody),
},
}},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)

wantMethod := tc.config["request.method"]
if wantMethod == "" {
wantMethod = "POST" // default
}
maha-hajja marked this conversation as resolved.
Show resolved Hide resolved

wantBody := getRequestBody(is, tc.config["request.body"], tc.args)

srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
// check the expected path with the evaluated URL
is.Equal(req.URL.Path, tc.path)

is.Equal(wantMethod, req.Method)

gotBody, err := io.ReadAll(req.Body)
is.NoErr(err)
is.Equal(wantBody, gotBody)

_, err = resp.Write(respBody)
is.NoErr(err)
}))
defer srv.Close()

// attach the path template to the URL
tc.config["request.url"] = srv.URL + tc.pathTmpl
underTest := NewHTTPProcessor(log.Test(t))
err := underTest.Configure(context.Background(), tc.config)
is.NoErr(err)

got := underTest.Process(context.Background(), tc.args)
diff := cmp.Diff(tc.want, got, cmpopts.IgnoreUnexported(sdk.SingleRecord{}))
if diff != "" {
t.Logf("mismatch (-want +got): %s", diff)
t.Fail()
}
})
}
}

func TestHTTPProcessor_RetrySuccess(t *testing.T) {
is := is.New(t)

Expand Down
Loading