Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/adam/assert-packages' into adam/…
Browse files Browse the repository at this point in the history
…assert-packages
  • Loading branch information
AdamHaffar committed Sep 15, 2023
2 parents 636b641 + b3a5114 commit 67d6196
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 37 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
echo "GORELEASER_PREVIOUS_TAG=$prev_tag" >> $GITHUB_ENV
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v4
uses: goreleaser/goreleaser-action@v5
with:
distribution: goreleaser
version: latest
Expand All @@ -54,15 +54,15 @@ jobs:
uses: actions/checkout@v4

- name: Log in to the Container registry
uses: docker/login-action@v2.2.0
uses: docker/login-action@v3.0.0
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v4.6.0
uses: docker/metadata-action@v5.0.0
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# Makes no sense to have an image with tag v0, hence the check in the last line.
Expand All @@ -78,7 +78,7 @@ jobs:
org.opencontainers.image.vendor=ConduitIO
- name: Build and push Docker image
uses: docker/build-push-action@v4.2.1
uses: docker/build-push-action@v5.0.0
with:
context: .
push: true
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/google/uuid v1.3.1
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0
github.com/hamba/avro/v2 v2.15.1
github.com/hamba/avro/v2 v2.16.0
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.1
github.com/jackc/pgx/v5 v5.4.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8hLWZ5SPcfI+3IDNk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk=
github.com/hamba/avro/v2 v2.15.1 h1:vIeEMKARCNXLa52NurXOF3X4DUpH8AdjAlDI/Z9j4dg=
github.com/hamba/avro/v2 v2.15.1/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hamba/avro/v2 v2.16.0 h1:0XhyP65Hs8iMLtdSR0v7ZrwRjsbIZdvr7KzYgmx1Mbo=
github.com/hamba/avro/v2 v2.16.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.5.1 h1:oGm7cWBaYIp3lJpx1RUEfLWophprE2EV/KUeqBYo+6k=
Expand Down
27 changes: 23 additions & 4 deletions pkg/processor/procbuiltin/httprequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package procbuiltin
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/url"
Expand All @@ -33,6 +34,8 @@ const (

httpRequestConfigURL = "url"
httpRequestConfigMethod = "method"
httpRequestConfigContentType = "contentType"
httpRequestContentTypeDefault = "application/json"
httpRequestBackoffRetryCount = "backoffRetry.count"
httpRequestBackoffRetryMin = "backoffRetry.min"
httpRequestBackoffRetryMax = "backoffRetry.max"
Expand All @@ -43,9 +46,10 @@ func init() {
processor.GlobalBuilderRegistry.MustRegister(httpRequestProcType, HTTPRequest)
}

// HTTPRequest builds a processor that sends an HTTP request to the specified
// URL with the specified HTTP method (default is POST). Record.Payload.After is
// used as the request body and the raw response body overwrites the field.
// HTTPRequest builds a processor that sends an HTTP request to the specified URL with the specified HTTP method
// (default is POST) with a content-type header as the specified value (default is application/json). the whole
// record as json will be used as the request body and the raw response body will be set under Record.Payload.After.
// if the response code is (204 No Content) then the record will be filtered out.
func HTTPRequest(config processor.Config) (processor.Interface, error) {
return httpRequest(httpRequestProcType, config)
}
Expand Down Expand Up @@ -73,6 +77,10 @@ func httpRequest(
if method == "" {
method = http.MethodPost
}
contentType := config.Settings[httpRequestConfigContentType]
if contentType == "" {
contentType = httpRequestContentTypeDefault
}

// preflight check
_, err = http.NewRequest(
Expand All @@ -85,16 +93,23 @@ func httpRequest(
}

procFn := func(ctx context.Context, r record.Record) (record.Record, error) {
jsonRec, err := json.Marshal(r)
if err != nil {
return record.Record{}, cerrors.Errorf("%s: error creating json record: %w", processorType, err)
}

req, err := http.NewRequestWithContext(
ctx,
method,
rawURL,
bytes.NewReader(r.Payload.After.Bytes()),
bytes.NewReader(jsonRec),
)
if err != nil {
return record.Record{}, cerrors.Errorf("%s: error trying to create HTTP request: %w", processorType, err)
}

req.Header.Set("Content-Type", contentType)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return record.Record{}, cerrors.Errorf("%s: error trying to execute HTTP request: %w", processorType, err)
Expand All @@ -110,6 +125,10 @@ func httpRequest(
// regard status codes over 299 as errors
return record.Record{}, cerrors.Errorf("%s: invalid status code %v (body: %q)", processorType, resp.StatusCode, string(body))
}
// skip if body has no content
if resp.StatusCode == http.StatusNoContent {
return record.Record{}, processor.ErrSkipRecord
}

r.Payload.After = record.RawData{Raw: body}
return r, nil
Expand Down
45 changes: 42 additions & 3 deletions pkg/processor/procbuiltin/httprequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package procbuiltin

import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -139,6 +140,7 @@ func TestHTTPRequest_Build(t *testing.T) {
httpRequestBackoffRetryMin: "10ms",
httpRequestBackoffRetryMax: "1s",
httpRequestBackoffRetryFactor: "1.3",
httpRequestConfigContentType: "application/json",
},
}},
wantErr: false,
Expand Down Expand Up @@ -211,7 +213,9 @@ func TestHTTPRequest_Success(t *testing.T) {
if wantMethod == "" {
wantMethod = "POST" // default
}
wantBody := tt.args.r.Payload.After.Bytes()

wantBody, err := json.Marshal(tt.args.r)
is.NoErr(err)

srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
is.Equal(wantMethod, req.Method)
Expand Down Expand Up @@ -242,7 +246,9 @@ func TestHTTPRequest_RetrySuccess(t *testing.T) {
respBody := []byte("foo-bar/response")

wantMethod := "POST"
wantBody := []byte("random data")
rec := record.Record{Payload: record.Change{After: record.RawData{Raw: []byte("random data")}}}
wantBody, err := json.Marshal(rec)
is.NoErr(err)

srvHandlerCount := 0

Expand Down Expand Up @@ -278,7 +284,7 @@ func TestHTTPRequest_RetrySuccess(t *testing.T) {
underTest, err := HTTPRequest(config)
is.NoErr(err)

got, err := underTest.Process(context.Background(), record.Record{Payload: record.Change{After: record.RawData{Raw: wantBody}}})
got, err := underTest.Process(context.Background(), rec)
is.NoErr(err)
is.Equal(got.Payload.After, record.RawData{Raw: respBody})
is.Equal(srvHandlerCount, 5)
Expand Down Expand Up @@ -314,3 +320,36 @@ func TestHTTPRequest_RetryFail(t *testing.T) {
is.Equal(got, record.Record{})
is.Equal(srvHandlerCount, 6) // expected 6 requests (1 regular and 5 retries)
}

func TestHTTPRequest_FilterRecord(t *testing.T) {
is := is.New(t)

wantMethod := "POST"
rec := record.Record{Payload: record.Change{After: record.RawData{Raw: []byte("random data")}}}
wantBody, err := json.Marshal(rec)
is.NoErr(err)

srv := httptest.NewServer(http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
is.Equal(wantMethod, req.Method)

gotBody, err := io.ReadAll(req.Body)
is.NoErr(err)
is.Equal(wantBody, gotBody)

resp.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()

config := processor.Config{
Settings: map[string]string{
httpRequestConfigURL: srv.URL,
},
}

underTest, err := HTTPRequest(config)
is.NoErr(err)

got, err := underTest.Process(context.Background(), rec)
is.Equal(err, processor.ErrSkipRecord)
is.Equal(got, record.Record{})
}
46 changes: 23 additions & 23 deletions pkg/provisioning/config/yaml/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,41 @@ var Changelog = internal.Changelog{
}

type Configuration struct {
Version string `yaml:"version"`
Pipelines []Pipeline `yaml:"pipelines"`
Version string `yaml:"version" json:"version"`
Pipelines []Pipeline `yaml:"pipelines" json:"pipelines"`
}

type Pipeline struct {
ID string `yaml:"id"`
Status string `yaml:"status"`
Name string `yaml:"name"`
Description string `yaml:"description"`
Connectors []Connector `yaml:"connectors"`
Processors []Processor `yaml:"processors"`
DLQ DLQ `yaml:"dead-letter-queue"`
ID string `yaml:"id" json:"id"`
Status string `yaml:"status" json:"status"`
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Connectors []Connector `yaml:"connectors" json:"connectors"`
Processors []Processor `yaml:"processors" json:"processors"`
DLQ DLQ `yaml:"dead-letter-queue" json:"dead-letter-queue"`
}

type Connector struct {
ID string `yaml:"id"`
Type string `yaml:"type"`
Plugin string `yaml:"plugin"`
Name string `yaml:"name"`
Settings map[string]string `yaml:"settings"`
Processors []Processor `yaml:"processors"`
ID string `yaml:"id" json:"id"`
Type string `yaml:"type" json:"type"`
Plugin string `yaml:"plugin" json:"plugin"`
Name string `yaml:"name" json:"name"`
Settings map[string]string `yaml:"settings" json:"settings"`
Processors []Processor `yaml:"processors" json:"processors"`
}

type Processor struct {
ID string `yaml:"id"`
Type string `yaml:"type"`
Settings map[string]string `yaml:"settings"`
Workers int `yaml:"workers"`
ID string `yaml:"id" json:"id"`
Type string `yaml:"type" json:"type"`
Settings map[string]string `yaml:"settings" json:"settings"`
Workers int `yaml:"workers" json:"workers"`
}

type DLQ struct {
Plugin string `yaml:"plugin"`
Settings map[string]string `yaml:"settings"`
WindowSize *int `yaml:"window-size"`
WindowNackThreshold *int `yaml:"window-nack-threshold"`
Plugin string `yaml:"plugin" json:"plugin"`
Settings map[string]string `yaml:"settings" json:"settings"`
WindowSize *int `yaml:"window-size" json:"window-size"`
WindowNackThreshold *int `yaml:"window-nack-threshold" json:"window-nack-threshold"`
}

func (c Configuration) ToConfig() []config.Pipeline {
Expand Down
Loading

0 comments on commit 67d6196

Please sign in to comment.