Skip to content

Commit

Permalink
fix: unwrap openCDC records on create and update (#1362)
Browse files Browse the repository at this point in the history
* fix: unwrap delete opencdc records

* fix: unwrap payload before
  • Loading branch information
raulb authored Feb 5, 2024
1 parent 9c45825 commit f71d078
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 35 deletions.
62 changes: 36 additions & 26 deletions pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,31 @@ func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.D
return key, nil
}

func (o *openCDCUnwrapper) convertPayloadData(payload map[string]interface{}, key string) (record.Data, error) {
payloadData, ok := payload[key]
if !ok {
return nil, nil
}

switch data := payloadData.(type) {
case map[string]interface{}:
convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}
return convertedData, nil
case string:
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(decoded, []byte(data))
if err != nil {
return nil, cerrors.Errorf("couldn't decode payload %s: %w", err, key)
}
return record.RawData{Raw: decoded[:n]}, nil
default:
return nil, nil
}
}

// UnwrapPayload extracts payload from a structuredData record.
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
var payload record.Change
Expand All @@ -206,34 +231,19 @@ func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (reco
case record.Change:
payload = p
case map[string]interface{}:
afterData, ok := p["after"]
if !ok {
return record.Change{}, cerrors.Errorf("record payload after doesn't contain payload after")
before, err := o.convertPayloadData(p, "before")
if err != nil {
return record.Change{}, err
}
switch data := afterData.(type) {
case map[string]interface{}:
convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}
payload = record.Change{
Before: nil,
After: convertedData,
}
case string:
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(decoded, []byte(data))
if err != nil {
return payload, cerrors.Errorf("couldn't decode payload after: %w", err)
}
convertedData := record.RawData{Raw: decoded[:n]}

payload = record.Change{
Before: nil,
After: convertedData,
}
default:
return record.Change{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
after, err := o.convertPayloadData(p, "after")
if err != nil {
return record.Change{}, err
}

payload = record.Change{
Before: before,
After: after,
}
default:
return record.Change{}, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
Expand Down
142 changes: 133 additions & 9 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const DebeziumRecordPayload = `{
"schema": {}
}`

const OpenCDCRecordPayload = `{
const OpenCDCRecordCreatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"metadata": {
Expand All @@ -62,6 +62,45 @@ const OpenCDCRecordPayload = `{
}
}`

const OpenCDCRecordDeletePayload = `{
"position": "Qy9ENDAwMjNCMA==",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity"
},
"key": {
"key": 3
},
"payload": {
"before": null,
"after": null
}
}`

const OpenCDCRecordUpdatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
},
"after": null
}
}`

func TestUnwrap_Config(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -422,7 +461,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc with structured data and no payload after",
name: "opencdc record create with structured data and no payload after",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -440,7 +479,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid operation",
name: "opencdc record create with an invalid operation",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -485,7 +524,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid metadata",
name: "opencdc record create with an invalid metadata",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -526,7 +565,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid key",
name: "opencdc record create with an invalid key",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -571,7 +610,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid payload",
name: "opencdc record create with an invalid payload",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -607,7 +646,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with structured data",
name: "opencdc record create with structured data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -671,7 +710,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc with raw data",
name: "opencdc record create with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -688,7 +727,7 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordPayload),
Raw: []byte(OpenCDCRecordCreatePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand All @@ -715,6 +754,91 @@ func TestUnwrap_Process(t *testing.T) {
},
wantErr: false,
},
{
name: "opencdc record delete with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordDeletePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity",
},
Payload: record.Change{
Before: nil,
After: nil,
},
Key: record.StructuredData{"key": float64(3)},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordUpdatePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationUpdate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
}

for _, tt := range tests {
Expand Down

0 comments on commit f71d078

Please sign in to comment.