Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: unwrap openCDC records on create and update #1362

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 36 additions & 26 deletions pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,31 @@ func (o *openCDCUnwrapper) UnwrapKey(structData record.StructuredData) (record.D
return key, nil
}

func (o *openCDCUnwrapper) convertPayloadData(payload map[string]interface{}, key string) (record.Data, error) {
payloadData, ok := payload[key]
if !ok {
return nil, nil
}

switch data := payloadData.(type) {
case map[string]interface{}:
convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}
return convertedData, nil
case string:
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(decoded, []byte(data))
if err != nil {
return nil, cerrors.Errorf("couldn't decode payload %s: %w", err, key)
}
return record.RawData{Raw: decoded[:n]}, nil
default:
return nil, nil
}
}

// UnwrapPayload extracts payload from a structuredData record.
func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (record.Change, error) {
var payload record.Change
Expand All @@ -206,34 +231,19 @@ func (o *openCDCUnwrapper) UnwrapPayload(structData record.StructuredData) (reco
case record.Change:
payload = p
case map[string]interface{}:
afterData, ok := p["after"]
if !ok {
return record.Change{}, cerrors.Errorf("record payload after doesn't contain payload after")
before, err := o.convertPayloadData(p, "before")
if err != nil {
return record.Change{}, err
}
switch data := afterData.(type) {
case map[string]interface{}:
convertedData := make(record.StructuredData, len(data))
for k, v := range data {
convertedData[k] = v
}
payload = record.Change{
Before: nil,
After: convertedData,
}
case string:
decoded := make([]byte, base64.StdEncoding.DecodedLen(len(data)))
n, err := base64.StdEncoding.Decode(decoded, []byte(data))
if err != nil {
return payload, cerrors.Errorf("couldn't decode payload after: %w", err)
}
convertedData := record.RawData{Raw: decoded[:n]}

payload = record.Change{
Before: nil,
After: convertedData,
}
default:
return record.Change{}, cerrors.Errorf("unexpected data type %T", unwrapProcType, data)
after, err := o.convertPayloadData(p, "after")
if err != nil {
return record.Change{}, err
}

payload = record.Change{
Before: before,
After: after,
}
default:
return record.Change{}, cerrors.Errorf("expected a record.Change or a map[string]interface{}, got %T", p)
Expand Down
142 changes: 133 additions & 9 deletions pkg/processor/procbuiltin/unwrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const DebeziumRecordPayload = `{
"schema": {}
}`

const OpenCDCRecordPayload = `{
const OpenCDCRecordCreatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "create",
"metadata": {
Expand All @@ -62,6 +62,45 @@ const OpenCDCRecordPayload = `{
}
}`

const OpenCDCRecordDeletePayload = `{
"position": "Qy9ENDAwMjNCMA==",
"operation": "delete",
"metadata": {
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity"
},
"key": {
"key": 3
},
"payload": {
"before": null,
"after": null
}
}`

const OpenCDCRecordUpdatePayload = `{
"position": "NzgyNjJmODUtODNmMS00ZGQwLWEyZDAtNTRmNjA1ZjkyYTg0",
"operation": "update",
"metadata": {
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1"
},
"key": "MTc3NzQ5NDEtNTdhMi00MmZhLWI0MzAtODkxMmE5NDI0YjNh",
"payload": {
"before": {
"event_id": 1747353650,
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": 1250383582,
"triggered": false
},
"after": null
}
}`

func TestUnwrap_Config(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -422,7 +461,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc with structured data and no payload after",
name: "opencdc record create with structured data and no payload after",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -440,7 +479,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid operation",
name: "opencdc record create with an invalid operation",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -485,7 +524,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid metadata",
name: "opencdc record create with an invalid metadata",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -526,7 +565,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid key",
name: "opencdc record create with an invalid key",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -571,7 +610,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with an invalid payload",
name: "opencdc record create with an invalid payload",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -607,7 +646,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: true,
},
{
name: "opencdc with structured data",
name: "opencdc record create with structured data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand Down Expand Up @@ -671,7 +710,7 @@ func TestUnwrap_Process(t *testing.T) {
wantErr: false,
},
{
name: "opencdc with raw data",
name: "opencdc record create with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
Expand All @@ -688,7 +727,7 @@ func TestUnwrap_Process(t *testing.T) {
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordPayload),
Raw: []byte(OpenCDCRecordCreatePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
Expand All @@ -715,6 +754,91 @@ func TestUnwrap_Process(t *testing.T) {
},
wantErr: false,
},
{
name: "opencdc record delete with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordDeletePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationDelete,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-pg-source-to7iktk7mnnhhml:source",
"opencdc.readAt": "1707134319088931000",
"opencdc.version": "v1",
"postgres.table": "user_activity",
},
Payload: record.Change{
Before: nil,
After: nil,
},
Key: record.StructuredData{"key": float64(3)},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
{
name: "opencdc record update with raw data",
config: processor.Config{
Settings: map[string]string{"format": "opencdc"},
},
record: record.Record{
Key: record.RawData{Raw: []byte("one-key-raw-data")},
Operation: record.OperationCreate,
Metadata: map[string]string{
"conduit.source.connector.id": "dest-log-78lpnchx7tzpyqz:source-kafka",
"kafka.topic": "stream-78lpnchx7tzpyqz-generator",
"opencdc.createdAt": "1706028953595000000",
"opencdc.readAt": "1706028953606997000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: nil,
After: record.RawData{
Raw: []byte(OpenCDCRecordUpdatePayload),
},
},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
want: record.Record{
Operation: record.OperationUpdate,
Metadata: record.Metadata{
"conduit.source.connector.id": "source-generator-78lpnchx7tzpyqz:source",
"opencdc.readAt": "1706028953595546000",
"opencdc.version": "v1",
},
Payload: record.Change{
Before: record.StructuredData{
"event_id": float64(1747353650),
"msg": "string 0e8955b3-7fb5-4dda-8064-e10dc007f00d",
"pg_generator": false,
"sensor_id": float64(1250383582),
"triggered": false,
},
After: nil,
},
Key: record.RawData{Raw: []byte("17774941-57a2-42fa-b430-8912a9424b3a")},
Position: []byte("eyJHcm91cElEIjoiNGQ2ZTBhMjktNzAwZi00Yjk4LWEzY2MtZWUyNzZhZTc4MjVjIiwiVG9waWMiOiJzdHJlYW0tNzhscG5jaHg3dHpweXF6LWdlbmVyYXRvciIsIlBhcnRpdGlvbiI6MCwiT2Zmc2V0IjoyMjF9"),
},
wantErr: false,
},
}

for _, tt := range tests {
Expand Down
Loading