Skip to content

Commit

Permalink
refactor: address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Feb 7, 2024
1 parent 41ddc2f commit 181920a
Showing 1 changed file with 171 additions and 32 deletions.
203 changes: 171 additions & 32 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,36 @@ const DebeziumRecordPayload = `{
"schema": {}
}`

const OpenCDCRecordWithAfterPayload = `{
const OpenCDCRecordUpdateWithBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
},
"after": {
"event_id": 1747353658,
"msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31",
"pg_generator": false,
"sensor_id": 1250383580,
"triggered": false
}
}
}`

const OpenCDCRecordUpdateWithoutBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
Expand All @@ -63,25 +90,22 @@ const OpenCDCRecordWithAfterPayload = `{
}
}`

const OpenCDCRecordWithoutPayload = `{
"position": "Qy9ENDAwMjNCMA==",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity"
},
"key": {
"key": 3
},
"payload": {
"before": null,
"after": null
}
}`
const OpenCDCRecordDeleteWithoutBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": null,
"after": null
}
}`

const OpenCDCRecordWithBeforePayload = `{
const OpenCDCRecordDeleteWithBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "delete",
"metadata": {
Expand All @@ -98,7 +122,27 @@ const OpenCDCRecordWithBeforePayload = `{
"sensor_id": 1250383582,
"triggered": false
},
"after": null
"after": null
}
}`
const OpenCDCRecordCreatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": null,
"after": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
}
}
}`

Expand Down Expand Up @@ -728,7 +772,7 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordWithAfterPayload),
Raw: []byte(OpenCDCRecordCreatePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand Down Expand Up @@ -756,7 +800,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc record delete with raw data",
name: "opencdc record delete with before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -773,30 +817,74 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordWithoutPayload),
Raw: []byte(OpenCDCRecordDeleteWithBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
},
Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record delete without before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordDeleteWithoutBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
"postgres.table": "user_activity",
},
Payload: record.Change{
Before: nil,
After: nil,
},
Key: record.StructuredData{"key": float64(3)},
Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update with raw data",
name: "opencdc record update with before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -813,13 +901,13 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordWithBeforePayload),
Raw: []byte(OpenCDCRecordUpdateWithBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Operation: record.OperationUpdate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
Expand All @@ -833,7 +921,58 @@ func TestUnwrap_Process(t *testing.T) {
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
After: record.StructuredData{
"event_id": float64(1.747353658e+09),
"msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31",
"pg_generator": false,
"sensor_id": float64(1.25038358e+09),
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update without before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordUpdateWithoutBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationUpdate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand Down

0 comments on commit 181920a

Please sign in to comment.