diff --git a/pkg/processor/procbuiltin/unwrap.go b/pkg/processor/procbuiltin/unwrap.go index 6130c3fb5..8cd396c8b 100644 --- a/pkg/processor/procbuiltin/unwrap.go +++ b/pkg/processor/procbuiltin/unwrap.go @@ -194,6 +194,31 @@ func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.D return key, nil } +func (o *openCDCUnwrapper) convertPayloadData(payload map[string]interface{}, key string) (record.Data, error) { + payloadData, ok := payload[key] + if !ok { + return nil, nil + } + + switch data := payloadData.(type) { + case map[string]interface{}: + convertedData := make(record.StructuredData, len(data)) + for k, v := range data { + convertedData[k] = v + } + return convertedData, nil + case string: + decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data))) + n, err := base64.StdEncoding.Decode(decoded, []byte(data)) + if err != nil { + return nil, cerrors.Errorf("couldn't decode payload %s: %w", err, key) + } + return record.RawData{Raw: decoded[:n]}, nil + default: + return nil, nil + } +} + // UnwrapPayload extracts payload from a structuredData record. func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) { var payload record.Change @@ -206,34 +231,19 @@ func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (reco case record.Change: payload = p case map[string]interface{}: - afterData, ok := p["after"] - if !ok { - return record.Change{}, cerrors.Errorf("record payload after doesn't contain payload after") + before, err := o.convertPayloadData(p, "before") + if err != nil { + return record.Change{}, err } - switch data := afterData.(type) { - case map[string]interface{}: - convertedData := make(record.StructuredData, len(data)) - for k, v := range data { - convertedData[k] = v - } - payload = record.Change{ - Before: nil, - After: convertedData, - } - case string: - decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data))) - n, err := base64.StdEncoding.Decode(decoded, []byte(data)) - if err != nil { - return payload, cerrors.Errorf("couldn't decode payload after: %w", err) - } - convertedData := record.RawData{Raw: decoded[:n]} - payload = record.Change{ - Before: nil, - After: convertedData, - } - default: - return record.Change{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data) + after, err := o.convertPayloadData(p, "after") + if err != nil { + return record.Change{}, err + } + + payload = record.Change{ + Before: before, + After: after, } default: return record.Change{}, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p) diff --git a/pkg/processor/procbuiltin/unwrap_test.go b/pkg/processor/procbuiltin/unwrap_test.go index f22cf2048..7e6ae615d 100644 --- a/pkg/processor/procbuiltin/unwrap_test.go +++ b/pkg/processor/procbuiltin/unwrap_test.go @@ -42,7 +42,7 @@ const DebeziumRecordPayload = `{ "schema": {} }` -const OpenCDCRecordPayload = `{ +const OpenCDCRecordCreatePayload = `{ "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", "operation": "create", "metadata": { @@ -62,6 +62,45 @@ const OpenCDCRecordPayload = `{ } }` +const OpenCDCRecordDeletePayload = `{ + "position": "Qy9ENDAwMjNCMA==", + "operation": "delete", + "metadata": { + "conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source", + "opencdc.readAt": "1707134319088931000", + "opencdc.version": "v1", + "postgres.table": "user_activity" + }, + "key": { + "key": 3 + }, + "payload": { + "before": null, + "after": null + } + }` + +const OpenCDCRecordUpdatePayload = `{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "update", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "before": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + }, + "after": null + } + }` + func TestUnwrap_Config(t *testing.T) { tests := []struct { name string @@ -422,7 +461,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: false, }, { - name: "opencdc with structured data and no payload after", + name: "opencdc record create with structured data and no payload after", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -440,7 +479,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: true, }, { - name: "opencdc with an invalid operation", + name: "opencdc record create with an invalid operation", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -485,7 +524,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: true, }, { - name: "opencdc with an invalid metadata", + name: "opencdc record create with an invalid metadata", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -526,7 +565,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: true, }, { - name: "opencdc with an invalid key", + name: "opencdc record create with an invalid key", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -571,7 +610,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: true, }, { - name: "opencdc with an invalid payload", + name: "opencdc record create with an invalid payload", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -607,7 +646,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: true, }, { - name: "opencdc with structured data", + name: "opencdc record create with structured data", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -671,7 +710,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: false, }, { - name: "opencdc with raw data", + name: "opencdc record create with raw data", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -688,7 +727,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordPayload), + Raw: []byte(OpenCDCRecordCreatePayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), @@ -715,6 +754,91 @@ func TestUnwrap_Process(t *testing.T) { }, wantErr: false, }, + { + name: "opencdc record delete with raw data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(OpenCDCRecordDeletePayload), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationDelete, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source", + "opencdc.readAt": "1707134319088931000", + "opencdc.version": "v1", + "postgres.table": "user_activity", + }, + Payload: record.Change{ + Before: nil, + After: nil, + }, + Key: record.StructuredData{"key": float64(3)}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, + { + name: "opencdc record update with raw data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(OpenCDCRecordUpdatePayload), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationUpdate, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: record.StructuredData{ + "event_id": float64(1747353650), + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": float64(1250383582), + "triggered": false, + }, + After: nil, + }, + Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, } for _, tt := range tests {