Skip to content

Commit

Permalink
Support debezium records with raw data (MongoDB records) (#870)
Browse files Browse the repository at this point in the history
* allow raw data in debezium records

* support debezium mongo updates & deletes

* allow null values in parsejson processor

* refactor metadata unwrap, use structured data for patch field

* simplify parsejson

* replace after with patch

* put patch and filter in metadata if present

* use generic approach to put unknown debezium fields into metadata

---------

Co-authored-by: Samir Ketema <[email protected]>
  • Loading branch information
lovromazgon and samirketema authored Feb 13, 2023
1 parent 4343858 commit 0e04d77
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 90 deletions.
3 changes: 2 additions & 1 deletion pkg/processor/procbuiltin/parsejson.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func parseJSON(

case record.StructuredData:
// data is already structured

case nil:
// if the field is nil leave it as it is
default:
return record.Record{}, cerrors.Errorf("%s: unexpected data type %T", processorType, data)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/processor/procbuiltin/parsejson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ func TestParseJSONPayload_Process(t *testing.T) {
},
},
wantErr: false,
}, {
name: "nil after",
record: record.Record{
Payload: record.Change{
Before: nil,
After: nil,
},
},
want: record.Record{
Payload: record.Change{
Before: nil,
After: nil,
},
},
wantErr: false,
}, {
name: "invalid JSON payload",
record: record.Record{
Expand Down
180 changes: 104 additions & 76 deletions pkg/processor/procbuiltin/unwrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ func (p *unwrapProcessor) Process(ctx context.Context, in record.Record) (record
}

/*
example of kafka-connect record:
`{
"payload": {
"description": "desc",
"id": 20
},
"schema": {} // will be ignored
}`
Example of a kafka-connect record:
{
"payload": {
"description": "desc",
"id": 20
},
"schema": {} // will be ignored
}
*/

// kafkaConnectUnwrapper unwraps a kafka connect record from the payload, expects rec.Payload.After to be of type record.StructuredData
type kafkaConnectUnwrapper struct{}

Expand All @@ -114,7 +115,7 @@ func (k *kafkaConnectUnwrapper) Unwrap(rec record.Record) (record.Record, error)
}

// get payload
structPayload, ok = structPayload["payload"].(map[string]interface{})
structPayload, ok = structPayload["payload"].(map[string]any)
if !ok {
return record.Record{}, cerrors.Errorf("payload doesn't contain a record")
}
Expand Down Expand Up @@ -166,26 +167,27 @@ func (k *kafkaConnectUnwrapper) UnwrapKey(key record.Data) record.Data {
}

/*
example of debezium record:
`{
"payload": {
"after": {
"description": "desc",
"id": 20
},
"before": null,
"op": "c",
"source": {
"opencdc.readAt": "1674061777225877000",
"opencdc.version": "v1",
},
"transaction": null,
"ts_ms": 1674061777225
},
"schema": {} // will be ignored
}`
Example of a debezium record:
{
"payload": {
"after": {
"description": "desc",
"id": 20
},
"before": null,
"op": "c",
"source": {
"opencdc.readAt": "1674061777225877000",
"opencdc.version": "v1",
},
"transaction": null,
"ts_ms": 1674061777225
},
"schema": {} // will be ignored
}
*/
// debeziumUnwrapper unwraps a debezium record from the payload, expects rec.Payload.After to be of type record.StructuredData

// debeziumUnwrapper unwraps a debezium record from the payload.
type debeziumUnwrapper struct {
kafkaConnectUnwrapper kafkaConnectUnwrapper
}
Expand All @@ -194,7 +196,8 @@ const (
debeziumOpCreate = "c"
debeziumOpUpdate = "u"
debeziumOpDelete = "d"
debeziumOpRead = "r" // snapshot
debeziumOpRead = "r" // snapshot
debeziumOpUnset = "$unset" // mongoDB unset operation

debeziumFieldBefore = "before"
debeziumFieldAfter = "after"
Expand All @@ -205,37 +208,33 @@ const (

func (d *debeziumUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
// record must be structured
structPayload, ok := rec.Payload.After.(record.StructuredData)
debeziumRec, ok := rec.Payload.After.(record.StructuredData)
if !ok {
return record.Record{}, cerrors.Errorf("record payload data must be structured data")
}
// get payload
structPayload, ok = structPayload["payload"].(map[string]interface{}) // the payload has the debezium record
debeziumRec, ok = debeziumRec["payload"].(map[string]any) // the payload has the debezium record
if !ok {
return record.Record{}, cerrors.Errorf("payload doesn't contain a record")
}

// check fields under payload
err := d.validateRecord(structPayload)
err := d.validateRecord(debeziumRec)
if err != nil {
return record.Record{}, err
}

var before record.StructuredData
beforeData := structPayload[debeziumFieldBefore]
before, ok = beforeData.(map[string]any)
if beforeData != nil && !ok {
return record.Record{}, cerrors.Errorf("%s field is not a map", debeziumFieldBefore)
before, err := d.valueToData(debeziumRec[debeziumFieldBefore])
if err != nil {
return record.Record{}, cerrors.Errorf("failed to parse field %s: %w", debeziumFieldBefore, err)
}

var after record.StructuredData
afterData := structPayload[debeziumFieldAfter]
after, ok = afterData.(map[string]any)
if afterData != nil && !ok {
return record.Record{}, cerrors.Errorf("%s field is not a map", debeziumFieldAfter)
after, err := d.valueToData(debeziumRec[debeziumFieldAfter])
if err != nil {
return record.Record{}, cerrors.Errorf("failed to parse field %s: %w", debeziumFieldAfter, err)
}

op, ok := structPayload[debeziumFieldOp].(string)
op, ok := debeziumRec[debeziumFieldOp].(string)
if !ok {
return record.Record{}, cerrors.Errorf("%s operation is not a string", op)
}
Expand All @@ -262,11 +261,22 @@ func (d *debeziumUnwrapper) Unwrap(rec record.Record) (record.Record, error) {
}, nil
}

func (d *debeziumUnwrapper) valueToData(val any) (record.Data, error) {
switch v := val.(type) {
case map[string]any:
return record.StructuredData(v), nil
case string:
return record.RawData{Raw: []byte(v)}, nil
case nil:
// nil is allowed
return nil, nil
default:
return nil, cerrors.Errorf("expected a map or a string, got %T", val)
}
}

func (d *debeziumUnwrapper) validateRecord(data record.StructuredData) error {
var multiErr error
if _, ok := data[debeziumFieldBefore]; !ok {
multiErr = multierror.Append(multiErr, cerrors.Errorf("the %q field is missing from debezium payload", debeziumFieldBefore))
}
if _, ok := data[debeziumFieldAfter]; !ok {
multiErr = multierror.Append(multiErr, cerrors.Errorf("the %q field is missing from debezium payload", debeziumFieldAfter))
}
Expand All @@ -281,44 +291,60 @@ func (d *debeziumUnwrapper) validateRecord(data record.StructuredData) error {
}

func (d *debeziumUnwrapper) unwrapMetadata(rec record.Record) (record.Metadata, error) {
meta := record.Metadata{}
// add original record's metadata to meta
for k, v := range rec.Metadata {
meta[k] = v
}

structPayload := rec.Payload.After.(record.StructuredData)["payload"].(map[string]interface{})

// set metadata readAt time
var t time.Time
tsMs := structPayload[debeziumFieldTimestamp]
if tsMs != nil {
floatTime, ok := tsMs.(float64)
if !ok {
return nil, cerrors.Errorf("%s is not a float", debeziumFieldTimestamp)
debeziumRec := rec.Payload.After.(record.StructuredData)["payload"].(map[string]any)

var source map[string]string
for field, val := range debeziumRec {
switch field {
case debeziumFieldAfter, debeziumFieldBefore, debeziumFieldOp:
continue // ignore
case debeziumFieldTimestamp:
tsMs, ok := val.(float64)
if !ok {
return nil, cerrors.Errorf("%s is not a float", debeziumFieldTimestamp)
}
readAt := time.UnixMilli(int64(tsMs))
rec.Metadata.SetReadAt(readAt)
case debeziumFieldSource:
// don't add prefix for source fields
source = d.flatten("", val)
default:
flattened := d.flatten("debezium."+field, val)
for k, v := range flattened {
rec.Metadata[k] = v
}
}
t = time.Unix(0, int64(floatTime)*int64(time.Millisecond))
}
meta.SetReadAt(t)

// return current metadata if "source" field doesn't exist
if structPayload[debeziumFieldSource] == nil {
return meta, nil
// source is added at the end to overwrite any other fields
for k, v := range source {
rec.Metadata[k] = v
}

mp, ok := structPayload[debeziumFieldSource].(map[string]interface{})
if !ok {
return nil, cerrors.Errorf("%q is not formatted as a map", debeziumFieldSource)
}
return rec.Metadata, nil
}

for k, v := range mp {
if str, ok := v.(string); ok {
meta[k] = str
} else {
return nil, cerrors.Errorf("the value %q from the field %q is not a string", v, debeziumFieldSource)
func (d *debeziumUnwrapper) flatten(key string, val any) map[string]string {
var prefix string
if len(key) > 0 {
prefix = key + "."
}
switch val := val.(type) {
case map[string]any:
out := make(map[string]string)
for k1, v1 := range val {
for k2, v2 := range d.flatten(prefix+k1, v1) {
out[k2] = v2
}
}
return out
case nil:
return nil
case string:
return map[string]string{key: val}
default:
return map[string]string{key: fmt.Sprint(val)}
}
return meta, nil
}

// convertOperation converts debezium operation to openCDC operation
Expand All @@ -332,6 +358,8 @@ func (d *debeziumUnwrapper) convertOperation(op string) (record.Operation, error
return record.OperationDelete, nil
case debeziumOpRead:
return record.OperationSnapshot, nil
case debeziumOpUnset:
return record.OperationUpdate, nil
}
return 0, cerrors.Errorf("%q is an invalid operation", op)
}
Loading

0 comments on commit 0e04d77

Please sign in to comment.