Skip to content

Commit

Permalink
clone record without reflection
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Oct 20, 2023
1 parent 0b3dd54 commit c7c2989
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 26 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ require (
github.com/hashicorp/go-hclog v1.5.0
github.com/hashicorp/go-plugin v1.5.2
github.com/jackc/pgx/v5 v5.4.3
github.com/jinzhu/copier v0.4.0
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230630140346-bb9ce3f90f4a
github.com/matryer/is v1.4.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,6 @@ github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0
github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ=
github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8=
github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls=
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
3 changes: 1 addition & 2 deletions pkg/processor/procjs/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/processor"
Expand Down Expand Up @@ -414,7 +413,7 @@ func TestJSProcessor_Inspect(t *testing.T) {
Position: record.Position("test-pos"),
Operation: record.OperationUpdate,
Metadata: record.Metadata{"test": "true"},
Key: sdk.RawData("test-key"),
Key: record.RawData{Raw: []byte("test-key")},
Payload: record.Change{},
}
recOut, err := underTest.Process(ctx, recIn)
Expand Down
74 changes: 53 additions & 21 deletions pkg/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package record

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
Expand All @@ -25,7 +26,6 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/record/schema"
"github.com/jinzhu/copier"
)

const (
Expand Down Expand Up @@ -149,27 +149,39 @@ func (r Record) mapData(d Data) interface{} {
}

func (r Record) Clone() Record {
clone := Record{}
// todo copier uses reflection under the hood
// we should optimize it, because Clone() is on a hot path.
// https://github.com/ConduitIO/conduit/issues/885
err := copier.CopyWithOption(
&clone,
&r,
copier.Option{DeepCopy: true, IgnoreEmpty: true},
var (
metadata map[string]string
key Data
payloadBefore Data
payloadAfter Data
)
if err != nil {
// At the moment, a clone error cannot happen because of the input data
// (i.e. the record itself).
// It can only happen if the clone destination is not addressable
// or if reflect.ValueOf(&r) is invalid.
// The first should never happen, because &Record{} is always addressable.
// The second, reflect.ValueOf(&r) is invalid if &r is nil, which in our case
// is also not possible.
// Hence, if the copier returns an error, it's a bug related to how we use copier
// which would cause all pipelines to fail anyway, so we panic here.
// This also makes the method signature simpler.
panic(cerrors.Errorf("record clone error: %w", err))

if r.Metadata != nil {
metadata = make(map[string]string, len(r.Metadata))
for k, v := range r.Metadata {
metadata[k] = v
}
}

if r.Key != nil {
key = r.Key.Clone()
}
if r.Payload.Before != nil {
payloadBefore = r.Payload.Before.Clone()
}
if r.Payload.After != nil {
payloadAfter = r.Payload.After.Clone()
}

clone := Record{
Position: bytes.Clone(r.Position),
Operation: r.Operation,
Metadata: metadata,
Key: key,
Payload: Change{
Before: payloadBefore,
After: payloadAfter,
},
}
return clone
}
Expand Down Expand Up @@ -204,6 +216,7 @@ func (p Position) String() string {
// Data are RawData and StructuredData.
type Data interface {
Bytes() []byte
Clone() Data
}

// StructuredData contains data in form of a map with string keys and arbitrary
Expand All @@ -220,6 +233,18 @@ func (d StructuredData) Bytes() []byte {
return b
}

func (d StructuredData) Clone() Data {
cloned := make(map[string]any, len(d))
for k, v := range d {
if vmap, ok := v.(map[string]any); ok {
cloned[k] = StructuredData(vmap).Clone()
} else {
cloned[k] = v
}
}
return StructuredData(cloned)
}

// RawData contains unstructured data in form of a byte slice.
type RawData struct {
Raw []byte
Expand All @@ -239,3 +264,10 @@ func (d *RawData) UnmarshalText() ([]byte, error) {
func (d RawData) Bytes() []byte {
return d.Raw
}

func (d RawData) Clone() Data {
return RawData{
Raw: bytes.Clone(d.Raw),
Schema: d.Schema, // this field is currently unused, we don't care about cloning it atm
}
}

0 comments on commit c7c2989

Please sign in to comment.