diff --git a/pkg/processor/procbuiltin/unwrap_test.go b/pkg/processor/procbuiltin/unwrap_test.go index 05ea6098a..40590afe4 100644 --- a/pkg/processor/procbuiltin/unwrap_test.go +++ b/pkg/processor/procbuiltin/unwrap_test.go @@ -42,9 +42,36 @@ const DebeziumRecordPayload = `{ "schema": {} }` -const OpenCDCRecordWithAfterPayload = `{ +const OpenCDCRecordUpdateWithBeforePayload = `{ "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", - "operation": "create", + "operation": "update", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "before": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + }, + "after": { + "event_id": 1747353658, + "msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31", + "pg_generator": false, + "sensor_id": 1250383580, + "triggered": false + } + } + }` + +const OpenCDCRecordUpdateWithoutBeforePayload = `{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "update", "metadata": { "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", "opencdc.readAt": "1706028953595546000", @@ -63,25 +90,22 @@ const OpenCDCRecordWithAfterPayload = `{ } }` -const OpenCDCRecordWithoutPayload = `{ - "position": "Qy9ENDAwMjNCMA==", - "operation": "delete", - "metadata": { - "conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source", - "opencdc.readAt": "1707134319088931000", - "opencdc.version": "v1", - "postgres.table": "user_activity" - }, - "key": { - "key": 3 - }, - "payload": { - "before": null, - "after": null - } - }` +const OpenCDCRecordDeleteWithoutBeforePayload = `{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "delete", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "before": null, + "after": null + } + }` -const OpenCDCRecordWithBeforePayload = `{ +const OpenCDCRecordDeleteWithBeforePayload = `{ "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", "operation": "delete", "metadata": { @@ -98,7 +122,27 @@ const OpenCDCRecordWithBeforePayload = `{ "sensor_id": 1250383582, "triggered": false }, - "after": null + "after": null + } + }` +const OpenCDCRecordCreatePayload = `{ + "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", + "operation": "create", + "metadata": { + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1" + }, + "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", + "payload": { + "before": null, + "after": { + "event_id": 1747353650, + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": 1250383582, + "triggered": false + } } }` @@ -728,7 +772,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordWithAfterPayload), + Raw: []byte(OpenCDCRecordCreatePayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), @@ -756,7 +800,7 @@ func TestUnwrap_Process(t *testing.T) { wantErr: false, }, { - name: "opencdc record delete with raw data", + name: "opencdc record delete with before and with raw data", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -773,7 +817,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordWithoutPayload), + Raw: []byte(OpenCDCRecordDeleteWithBeforePayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), @@ -781,22 +825,66 @@ func TestUnwrap_Process(t *testing.T) { want: record.Record{ Operation: record.OperationDelete, Metadata: record.Metadata{ - "conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source", - "opencdc.readAt": "1707134319088931000", + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: record.StructuredData{ + "event_id": float64(1747353650), + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": float64(1250383582), + "triggered": false, + }, + After: nil, + }, + Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, + { + name: "opencdc record delete without before and with raw data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(OpenCDCRecordDeleteWithoutBeforePayload), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationDelete, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", "opencdc.version": "v1", - "postgres.table": "user_activity", }, Payload: record.Change{ Before: nil, After: nil, }, - Key: record.StructuredData{"key": float64(3)}, + Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")}, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), }, wantErr: false, }, { - name: "opencdc record update with raw data", + name: "opencdc record update with before and with raw data", config: processor.Config{ Settings: map[string]string{"format": "opencdc"}, }, @@ -813,13 +901,13 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordWithBeforePayload), + Raw: []byte(OpenCDCRecordUpdateWithBeforePayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), }, want: record.Record{ - Operation: record.OperationDelete, + Operation: record.OperationUpdate, Metadata: record.Metadata{ "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", "opencdc.readAt": "1706028953595546000", @@ -833,7 +921,58 @@ func TestUnwrap_Process(t *testing.T) { "sensor_id": float64(1250383582), "triggered": false, }, - After: nil, + After: record.StructuredData{ + "event_id": float64(1.747353658e+09), + "msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31", + "pg_generator": false, + "sensor_id": float64(1.25038358e+09), + "triggered": false, + }, + }, + Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")}, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + wantErr: false, + }, + { + name: "opencdc record update without before and with raw data", + config: processor.Config{ + Settings: map[string]string{"format": "opencdc"}, + }, + record: record.Record{ + Key: record.RawData{Raw: []byte("one-key-raw-data")}, + Operation: record.OperationCreate, + Metadata: map[string]string{ + "conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka", + "kafka.topic": "stream-78lpnchx7tzpyqz-generator", + "opencdc.createdAt": "1706028953595000000", + "opencdc.readAt": "1706028953606997000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.RawData{ + Raw: []byte(OpenCDCRecordUpdateWithoutBeforePayload), + }, + }, + Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), + }, + want: record.Record{ + Operation: record.OperationUpdate, + Metadata: record.Metadata{ + "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", + "opencdc.readAt": "1706028953595546000", + "opencdc.version": "v1", + }, + Payload: record.Change{ + Before: nil, + After: record.StructuredData{ + "event_id": float64(1747353650), + "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", + "pg_generator": false, + "sensor_id": float64(1250383582), + "triggered": false, + }, }, Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")}, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),