diff --git a/pkg/processor/procbuiltin/unwrap_test.go b/pkg/processor/procbuiltin/unwrap_test.go index 7e6ae615d..05ea6098a 100644 --- a/pkg/processor/procbuiltin/unwrap_test.go +++ b/pkg/processor/procbuiltin/unwrap_test.go @@ -42,7 +42,7 @@ const DebeziumRecordPayload = `{ "schema": {} }` -const OpenCDCRecordCreatePayload = `{ +const OpenCDCRecordWithAfterPayload = `{ "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", "operation": "create", "metadata": { @@ -52,6 +52,7 @@ const OpenCDCRecordCreatePayload = `{ }, "key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh", "payload": { + "before": null, "after": { "event_id": 1747353650, "msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d", @@ -62,7 +63,7 @@ const OpenCDCRecordCreatePayload = `{ } }` -const OpenCDCRecordDeletePayload = `{ +const OpenCDCRecordWithoutPayload = `{ "position": "Qy9ENDAwMjNCMA==", "operation": "delete", "metadata": { @@ -80,9 +81,9 @@ const OpenCDCRecordDeletePayload = `{ } }` -const OpenCDCRecordUpdatePayload = `{ +const OpenCDCRecordWithBeforePayload = `{ "position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0", - "operation": "update", + "operation": "delete", "metadata": { "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", "opencdc.readAt": "1706028953595546000", @@ -727,7 +728,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordCreatePayload), + Raw: []byte(OpenCDCRecordWithAfterPayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), @@ -772,7 +773,7 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordDeletePayload), + Raw: []byte(OpenCDCRecordWithoutPayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), @@ -812,13 +813,13 @@ func TestUnwrap_Process(t *testing.T) { Payload: record.Change{ Before: nil, After: record.RawData{ - Raw: []byte(OpenCDCRecordUpdatePayload), + Raw: []byte(OpenCDCRecordWithBeforePayload), }, }, Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"), }, want: record.Record{ - Operation: record.OperationUpdate, + Operation: record.OperationDelete, Metadata: record.Metadata{ "conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source", "opencdc.readAt": "1706028953595546000",