Skip to content

Commit

Permalink
move proto related code into single file, documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jan 3, 2024
1 parent 6d75cb0 commit 69e7a9d
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 241 deletions.
94 changes: 0 additions & 94 deletions opencdc/fromproto.go

This file was deleted.

85 changes: 84 additions & 1 deletion opencdc/toproto.go → opencdc/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package opencdc
import (
"fmt"

opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1"
"google.golang.org/protobuf/types/known/structpb"

Check failure on line 20 in opencdc/proto.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)

opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1"

Check failure on line 22 in opencdc/proto.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
)

func _() {
Expand All @@ -30,6 +31,83 @@ func _() {
_ = cTypes[int(OperationSnapshot)-int(opencdcv1.Operation_OPERATION_SNAPSHOT)]
}

// -- From Proto To OpenCDC ----------------------------------------------------

// FromProto takes data from the supplied proto object and populates the
// receiver. If the proto object is nil, the receiver is set to its zero value.
// If the function returns an error, the receiver could be partially populated.
func (r *Record) FromProto(proto *opencdcv1.Record) error {
if proto == nil {
*r = Record{}
return nil
}

var err error
r.Key, err = dataFromProto(proto.Key)
if err != nil {
return fmt.Errorf("error converting key: %w", err)
}

if proto.Payload != nil {
err := r.Payload.FromProto(proto.Payload)
if err != nil {
return fmt.Errorf("error converting payload: %w", err)
}
} else {
r.Payload = Change{}
}

r.Position = proto.Position
r.Metadata = proto.Metadata
r.Operation = Operation(proto.Operation)
return nil
}

// FromProto takes data from the supplied proto object and populates the
// receiver. If the proto object is nil, the receiver is set to its zero value.
// If the function returns an error, the receiver could be partially populated.
func (c *Change) FromProto(proto *opencdcv1.Change) error {
if proto == nil {
*c = Change{}
return nil
}

var err error
c.Before, err = dataFromProto(proto.Before)
if err != nil {
return fmt.Errorf("error converting before: %w", err)
}

c.After, err = dataFromProto(proto.After)
if err != nil {
return fmt.Errorf("error converting after: %w", err)
}

return nil
}

func dataFromProto(proto *opencdcv1.Data) (Data, error) {
if proto == nil {
return nil, nil //nolint:nilnil // This is the expected behavior.
}

switch v := proto.Data.(type) {
case *opencdcv1.Data_RawData:
return RawData(v.RawData), nil
case *opencdcv1.Data_StructuredData:
return StructuredData(v.StructuredData.AsMap()), nil
case nil:
return nil, nil //nolint:nilnil // This is the expected behavior.
default:
return nil, ErrInvalidProtoDataType
}
}

// -- From OpenCDC To Proto ----------------------------------------------------

// ToProto takes data from the receiver and populates the supplied proto object.
// If the function returns an error, the proto object could be partially
// populated.
func (r Record) ToProto(proto *opencdcv1.Record) error {
if r.Key != nil {
if proto.Key == nil {
Expand Down Expand Up @@ -57,6 +135,9 @@ func (r Record) ToProto(proto *opencdcv1.Record) error {
return nil
}

// ToProto takes data from the receiver and populates the supplied proto object.
// If the function returns an error, the proto object could be partially
// populated.
func (c Change) ToProto(proto *opencdcv1.Change) error {
if c.Before != nil {
if proto.Before == nil {
Expand Down Expand Up @@ -85,6 +166,7 @@ func (c Change) ToProto(proto *opencdcv1.Change) error {
return nil
}

// ToProto takes data from the receiver and populates the supplied proto object.
func (d RawData) ToProto(proto *opencdcv1.Data) error {
protoRawData, ok := proto.Data.(*opencdcv1.Data_RawData)
if !ok {
Expand All @@ -95,6 +177,7 @@ func (d RawData) ToProto(proto *opencdcv1.Data) error {
return nil
}

// ToProto takes data from the receiver and populates the supplied proto object.
func (d StructuredData) ToProto(proto *opencdcv1.Data) error {
protoStructuredData, ok := proto.Data.(*opencdcv1.Data_StructuredData)
if !ok {
Expand Down
117 changes: 117 additions & 0 deletions opencdc/fromproto_test.go → opencdc/proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package opencdc

import (
"fmt"
"testing"

opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1"
Expand Down Expand Up @@ -113,3 +114,119 @@ func BenchmarkRecord_FromProto_Structured(b *testing.B) {
}
_ = r2
}

func TestRecord_ToProto(t *testing.T) {
is := is.New(t)

r1 := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Metadata: Metadata{"foo": "bar"},
Key: RawData("padlock-key"),
Payload: Change{
Before: RawData("yellow"),
After: StructuredData{
"bool": true,

"int": 1,
"int32": int32(1),
"int64": int64(1),

"float32": float32(1.2),
"float64": 1.2,

"string": "orange",

"nested": map[string]any{
"bool": true,
"float": 2.3,
"string": "blue",
},
},
},
}

after, err := structpb.NewStruct(r1.Payload.After.(StructuredData))
is.NoErr(err)
want := &opencdcv1.Record{
Position: r1.Position,
Operation: opencdcv1.Operation(r1.Operation),
Metadata: r1.Metadata,
Key: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: r1.Key.(RawData)}},
Payload: &opencdcv1.Change{
Before: &opencdcv1.Data{Data: &opencdcv1.Data_RawData{RawData: r1.Payload.Before.(RawData)}},
After: &opencdcv1.Data{Data: &opencdcv1.Data_StructuredData{StructuredData: after}},
},
}

var got opencdcv1.Record
err = r1.ToProto(&got)
is.NoErr(err)
is.Equal(&got, want)

// writing another record to the same target should overwrite the previous

want2 := &opencdcv1.Record{
Payload: &opencdcv1.Change{}, // there's always a change
}
err = Record{}.ToProto(&got)
is.NoErr(err)
is.Equal(&got, want2)
}

func BenchmarkRecord_ToProto_Structured(b *testing.B) {
r1 := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Metadata: Metadata{"foo": "bar"},
Key: RawData("padlock-key"),
Payload: Change{
Before: RawData("yellow"),
After: StructuredData{
"bool": true,

"int": 1,
"int32": int32(1),
"int64": int64(1),

"float32": float32(1.2),
"float64": 1.2,

"string": "orange",
},
},
}

// reuse the same target record
var r2 opencdcv1.Record

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = r1.ToProto(&r2)
}
}

func BenchmarkRecord_ToProto_Raw(b *testing.B) {
for _, size := range []int{1, 100, 10000, 1000000} {
payload := make([]byte, size)
r1 := Record{
Position: Position("standing"),
Operation: OperationUpdate,
Metadata: Metadata{"foo": "bar"},
Key: RawData("padlock-key"),
Payload: Change{
Before: RawData("yellow"),
After: RawData(payload),
},
}

b.Run(fmt.Sprintf("%d", size), func(b *testing.B) {
// reuse the same target record
var r2 opencdcv1.Record
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = r1.ToProto(&r2)
}
})
}
}
15 changes: 9 additions & 6 deletions opencdc/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (

// Record represents a single data record produced by a source and/or consumed
// by a destination connector.
// Record should be used as a value, not a pointer, except when (de)serializing
// the record. Note that methods related to (de)serializing the record mutate
// the record and are thus not thread-safe (see SetSerializer, FromProto and
// UnmarshalJSON).
type Record struct {
// Position uniquely represents the record.
Position Position `json:"position"`
Expand All @@ -46,17 +50,16 @@ type Record struct {
serializer RecordSerializer
}

// WithSerializer returns a new record which is serialized using the provided
// serializer when Bytes gets called. If serializer is nil, the serializing
// behavior is reset to the default (JSON).
func (r Record) WithSerializer(serializer RecordSerializer) Record {
// SetSerializer sets the serializer used to encode the record into bytes. If
// serializer is nil, the serializing behavior is reset to the default (JSON).
// This method mutates the receiver and is not thread-safe.
func (r *Record) SetSerializer(serializer RecordSerializer) {
r.serializer = serializer
return r
}

// Bytes returns the serialized representation of the Record. By default, this
// function returns a JSON representation. The serialization logic can be changed
// using WithSerializer.
// using SetSerializer.
func (r Record) Bytes() []byte {
if r.serializer != nil {
b, err := r.serializer.Serialize(r)
Expand Down
Loading

0 comments on commit 69e7a9d

Please sign in to comment.