diff --git a/pkg/plugin/processor/builtin/impl/avro/encode.go b/pkg/plugin/processor/builtin/impl/avro/encode.go index 867f40c59..c1e45552e 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode.go @@ -211,7 +211,12 @@ func (p *encodeProcessor) structuredData(data any) (opencdc.StructuredData, erro var sd opencdc.StructuredData switch v := data.(type) { case opencdc.RawData: - err := json.Unmarshal(v.Bytes(), &sd) + b := v.Bytes() + // if data is empty, then return empty structured data + if len(b) == 0 { + return sd, nil + } + err := json.Unmarshal(b, &sd) if err != nil { return nil, cerrors.Errorf("failed unmarshalling JSON from raw data: %w", err) } diff --git a/pkg/plugin/processor/builtin/impl/avro/encode_test.go b/pkg/plugin/processor/builtin/impl/avro/encode_test.go index 2a6b06701..674c47125 100644 --- a/pkg/plugin/processor/builtin/impl/avro/encode_test.go +++ b/pkg/plugin/processor/builtin/impl/avro/encode_test.go @@ -163,3 +163,90 @@ func TestEncodeProcessor_Process_RawData_CustomField(t *testing.T) { }) } } + +func TestEncodeProcessor_Process_EmptyPayloadField(t *testing.T) { + testCases := []struct { + name string + field string + input opencdc.Record + mockEncoder func(ctx context.Context) *MockEncoder + wantPayloadBefore opencdc.Data + wantPayloadAfter opencdc.Data + }{ + { + name: "empty payload.Before encoding", + field: ".Payload.Before", + input: opencdc.Record{ + Payload: opencdc.Change{ + Before: opencdc.StructuredData{}, + After: opencdc.StructuredData{ + "something": opencdc.RawData(`{"field_int": 123}`), + }, + }, + }, + wantPayloadBefore: opencdc.RawData(`{}`), + wantPayloadAfter: opencdc.StructuredData{ + "something": opencdc.RawData(`{"field_int": 123}`), + }, + mockEncoder: func(ctx context.Context) *MockEncoder { + m := NewMockEncoder(gomock.NewController(t)) + m.EXPECT(). + Encode(ctx, opencdc.StructuredData{}). + Return(opencdc.RawData(`{}`), nil) + return m + }, + }, + { + name: "empty payload.After encoding", + field: ".Payload.After", + input: opencdc.Record{ + Payload: opencdc.Change{ + Before: opencdc.StructuredData{ + "something": opencdc.RawData(`{"field_int": 123}`), + }, + After: opencdc.StructuredData{}, + }, + }, + wantPayloadBefore: opencdc.StructuredData{ + "something": opencdc.RawData(`{"field_int": 123}`), + }, + wantPayloadAfter: opencdc.RawData(`{}`), + mockEncoder: func(ctx context.Context) *MockEncoder { + m := NewMockEncoder(gomock.NewController(t)) + m.EXPECT(). + Encode(ctx, opencdc.StructuredData{}). + Return(opencdc.RawData(`{}`), nil) + return m + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + config := map[string]string{ + "url": "http://localhost", + "field": tc.field, + "schema.strategy": "autoRegister", + "schema.autoRegister.subject": "testsubject", + } + + want := sdk.SingleRecord(tc.input.Clone()) + want.Payload.After = tc.wantPayloadAfter + want.Payload.Before = tc.wantPayloadBefore + + underTest := NewEncodeProcessor(log.Nop()) + err := underTest.Configure(ctx, config) + is.NoErr(err) + + // skipping Open(), so we can inject a mock encoder + mockEncoder := tc.mockEncoder(ctx) + underTest.(*encodeProcessor).encoder = mockEncoder + + got := underTest.Process(ctx, []opencdc.Record{tc.input}) + is.Equal(1, len(got)) + is.Equal("", cmp.Diff(want, got[0], internal.CmpProcessedRecordOpts...)) + }) + } +}