Skip to content

Commit

Permalink
support empty .payload.After / .payload.Before in avro.decode (#1678)
Browse files Browse the repository at this point in the history
* support empty rawData in avro.decode

* add test for empty payload.after

* simplify len check

* add payload.before test

---------

Co-authored-by: Raúl Barroso <[email protected]>
  • Loading branch information
samirketema and raulb authored Jun 27, 2024
1 parent 782c890 commit b6fa5be
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 1 deletion.
7 changes: 6 additions & 1 deletion pkg/plugin/processor/builtin/impl/avro/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@ func (p *encodeProcessor) structuredData(data any) (opencdc.StructuredData, erro
var sd opencdc.StructuredData
switch v := data.(type) {
case opencdc.RawData:
err := json.Unmarshal(v.Bytes(), &sd)
b := v.Bytes()
// if data is empty, then return empty structured data
if len(b) == 0 {
return sd, nil
}
err := json.Unmarshal(b, &sd)
if err != nil {
return nil, cerrors.Errorf("failed unmarshalling JSON from raw data: %w", err)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/plugin/processor/builtin/impl/avro/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,90 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) {
})
}
}

func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) {
testCases := []struct {
name string
field string
input opencdc.Record
mockEncoder func(ctx context.Context) *MockEncoder
wantPayloadBefore opencdc.Data
wantPayloadAfter opencdc.Data
}{
{
name: "empty payload.Before encoding",
field: ".Payload.Before",
input: opencdc.Record{
Payload: opencdc.Change{
Before: opencdc.StructuredData{},
After: opencdc.StructuredData{
"something": opencdc.RawData(`{"field_int": 123}`),
},
},
},
wantPayloadBefore: opencdc.RawData(`{}`),
wantPayloadAfter: opencdc.StructuredData{
"something": opencdc.RawData(`{"field_int": 123}`),
},
mockEncoder: func(ctx context.Context) *MockEncoder {
m := NewMockEncoder(gomock.NewController(t))
m.EXPECT().
Encode(ctx, opencdc.StructuredData{}).
Return(opencdc.RawData(`{}`), nil)
return m
},
},
{
name: "empty payload.After encoding",
field: ".Payload.After",
input: opencdc.Record{
Payload: opencdc.Change{
Before: opencdc.StructuredData{
"something": opencdc.RawData(`{"field_int": 123}`),
},
After: opencdc.StructuredData{},
},
},
wantPayloadBefore: opencdc.StructuredData{
"something": opencdc.RawData(`{"field_int": 123}`),
},
wantPayloadAfter: opencdc.RawData(`{}`),
mockEncoder: func(ctx context.Context) *MockEncoder {
m := NewMockEncoder(gomock.NewController(t))
m.EXPECT().
Encode(ctx, opencdc.StructuredData{}).
Return(opencdc.RawData(`{}`), nil)
return m
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
ctx := context.Background()

config := map[string]string{
"url": "http://localhost",
"field": tc.field,
"schema.strategy": "autoRegister",
"schema.autoRegister.subject": "testsubject",
}

want := sdk.SingleRecord(tc.input.Clone())
want.Payload.After = tc.wantPayloadAfter
want.Payload.Before = tc.wantPayloadBefore

underTest := NewEncodeProcessor(log.Nop())
err := underTest.Configure(ctx, config)
is.NoErr(err)

// skipping Open(), so we can inject a mock encoder
mockEncoder := tc.mockEncoder(ctx)
underTest.(*encodeProcessor).encoder = mockEncoder

got := underTest.Process(ctx, []opencdc.Record{tc.input})
is.Equal(1, len(got))
is.Equal("", cmp.Diff(want, got[0], internal.CmpProcessedRecordOpts...))
})
}
}

0 comments on commit b6fa5be

Please sign in to comment.