Skip to content

Commit

Permalink
refactor: opencdc test examples (#1363)
Browse files Browse the repository at this point in the history
* refactor: opencdc test examples

* refactor: address feedback
  • Loading branch information
raulb authored Feb 7, 2024
1 parent 33fd889 commit c149030
Showing 1 changed file with 171 additions and 31 deletions.
202 changes: 171 additions & 31 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,44 @@ const DebeziumRecordPayload = `{
"schema": {}
}`

const OpenCDCRecordCreatePayload = `{
const OpenCDCRecordUpdateWithBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
},
"after": {
"event_id": 1747353658,
"msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31",
"pg_generator": false,
"sensor_id": 1250383580,
"triggered": false
}
}
}`

const OpenCDCRecordUpdateWithoutBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": null,
"after": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
Expand All @@ -62,27 +90,24 @@ const OpenCDCRecordCreatePayload = `{
}
}`

const OpenCDCRecordDeletePayload = `{
"position": "Qy9ENDAwMjNCMA==",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity"
},
"key": {
"key": 3
},
"payload": {
"before": null,
"after": null
}
}`
const OpenCDCRecordDeleteWithoutBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": null,
"after": null
}
}`

const OpenCDCRecordUpdatePayload = `{
const OpenCDCRecordDeleteWithBeforePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "update",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
Expand All @@ -97,7 +122,27 @@ const OpenCDCRecordUpdatePayload = `{
"sensor_id": 1250383582,
"triggered": false
},
"after": null
"after": null
}
}`
const OpenCDCRecordCreatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": null,
"after": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
}
}
}`

Expand Down Expand Up @@ -755,7 +800,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc record delete with raw data",
name: "opencdc record delete with before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -772,30 +817,74 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordDeletePayload),
Raw: []byte(OpenCDCRecordDeleteWithBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
},
Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record delete without before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordDeleteWithoutBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
"postgres.table": "user_activity",
},
Payload: record.Change{
Before: nil,
After: nil,
},
Key: record.StructuredData{"key": float64(3)},
Key: record.RawData{Raw: []uint8("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update with raw data",
name: "opencdc record update with before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -812,7 +901,7 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordUpdatePayload),
Raw: []byte(OpenCDCRecordUpdateWithBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand All @@ -832,7 +921,58 @@ func TestUnwrap_Process(t *testing.T) {
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
After: record.StructuredData{
"event_id": float64(1.747353658e+09),
"msg": "string 0f5397c9-31f1-422a-9c9a-26e3574a5c31",
"pg_generator": false,
"sensor_id": float64(1.25038358e+09),
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update without before and with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordUpdateWithoutBeforePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationUpdate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand Down

0 comments on commit c149030

Please sign in to comment.