From 469a7c0bbdbd2f239d9452e9ea0fea663f91a7b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 30 Nov 2023 19:36:04 +0100 Subject: [PATCH] refactor opencdc record, include serializer, rename ctxutil to ccontext, include csync functions from connector-sdk --- ctxutil/context.go => ccontext/detach.go | 8 +- .../detach_test.go | 4 +- csync/run.go | 47 +++++ csync/run_test.go | 55 ++++++ csync/valuewatcher.go | 100 ++++++++--- csync/valuewatcher_test.go | 79 ++++++++- csync/waitgroup.go | 14 +- opencdc/data.go | 81 +++++++++ opencdc/metadata.go | 2 + opencdc/operation.go | 64 +++++++ opencdc/position.go | 28 +++ opencdc/record.go | 164 ++---------------- opencdc/record_test.go | 19 +- opencdc/serializer.go | 21 +++ 14 files changed, 479 insertions(+), 207 deletions(-) rename ctxutil/context.go => ccontext/detach.go (91%) rename ctxutil/context_test.go => ccontext/detach_test.go (96%) create mode 100644 csync/run.go create mode 100644 csync/run_test.go create mode 100644 opencdc/data.go create mode 100644 opencdc/operation.go create mode 100644 opencdc/position.go create mode 100644 opencdc/serializer.go diff --git a/ctxutil/context.go b/ccontext/detach.go similarity index 91% rename from ctxutil/context.go rename to ccontext/detach.go index 166def2..96de37a 100644 --- a/ctxutil/context.go +++ b/ccontext/detach.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ctxutil +package ccontext import ( "context" @@ -49,9 +49,9 @@ import ( // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// DetachContext returns a context that keeps all the values of its parent -// context but detaches from the cancellation and error handling. -func DetachContext(ctx context.Context) context.Context { return detachedContext{ctx} } +// Detach returns a context that keeps all the values of its parent context but +// detaches from the cancellation and error handling. +func Detach(ctx context.Context) context.Context { return detachedContext{ctx} } type detachedContext struct{ context.Context } //nolint:containedctx // we are wrapping a context diff --git a/ctxutil/context_test.go b/ccontext/detach_test.go similarity index 96% rename from ctxutil/context_test.go rename to ccontext/detach_test.go index df075d6..4556d0e 100644 --- a/ctxutil/context_test.go +++ b/ccontext/detach_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ctxutil +package ccontext import ( "context" @@ -45,7 +45,7 @@ func TestDetachContext(t *testing.T) { defer cancel() // detach context and assert it is detached - detachedCtx := DetachContext(ctx) + detachedCtx := Detach(ctx) assertDetachedContext(detachedCtx) // cancel parent context and assert again diff --git a/csync/run.go b/csync/run.go new file mode 100644 index 0000000..4979aa2 --- /dev/null +++ b/csync/run.go @@ -0,0 +1,47 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package csync + +import ( + "context" + "time" + + "github.com/conduitio/conduit-commons/cchan" +) + +// Run executes fn in a goroutine and waits for it to return. If the context +// gets canceled before that happens the method returns the context error. +// +// This is useful for executing long-running functions like sync.WaitGroup.Wait +// that don't take a context and can potentially block the execution forever. +func Run(ctx context.Context, fn func()) error { + done := make(chan struct{}) + go func() { + defer close(done) + fn() + }() + + _, _, err := cchan.ChanOut[struct{}](done).Recv(ctx) + return err //nolint:wrapcheck // errors from cchan are context errors and we don't wrap them +} + +// RunTimeout executes fn in a goroutine and waits for it to return. If the +// context gets canceled before that happens or the timeout is reached the +// method returns the context error. +func RunTimeout(ctx context.Context, fn func(), timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + return Run(ctx, fn) +} diff --git a/csync/run_test.go b/csync/run_test.go new file mode 100644 index 0000000..deebfb6 --- /dev/null +++ b/csync/run_test.go @@ -0,0 +1,55 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package csync + +import ( + "context" + "testing" + "time" + + "github.com/matryer/is" +) + +func TestRun_Success(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + var executed bool + err := Run(ctx, func() { executed = true }) + is.NoErr(err) + is.True(executed) +} + +func TestRun_Canceled(t *testing.T) { + is := is.New(t) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // run function that blocks for 1 second + err := Run(ctx, func() { <-time.After(time.Second) }) + is.Equal(err, context.Canceled) +} + +func TestRun_DeadlineReached(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + start := time.Now() + err := RunTimeout(ctx, func() { <-time.After(time.Second) }, time.Millisecond*100) + since := time.Since(start) + + is.Equal(err, context.DeadlineExceeded) + is.True(since >= time.Millisecond*100) +} diff --git a/csync/valuewatcher.go b/csync/valuewatcher.go index e6dbb4d..fb66b22 100644 --- a/csync/valuewatcher.go +++ b/csync/valuewatcher.go @@ -18,6 +18,7 @@ import ( "context" "sync" + "github.com/conduitio/conduit-commons/cchan" "github.com/google/uuid" ) @@ -32,11 +33,11 @@ type ValueWatcher[T any] struct { listeners map[string]chan T } -type ValueWatcherFunc[T any] func(val T) bool +type ValueWatcherCompareFunc[T any] func(val T) bool -// WatchValues is a utility function for creating a simple ValueWatcherFunc that +// WatchValues is a utility function for creating a simple ValueWatcherCompareFunc that // waits for one of the supplied values. -func WatchValues[T comparable](want ...T) ValueWatcherFunc[T] { +func WatchValues[T comparable](want ...T) ValueWatcherCompareFunc[T] { if len(want) == 0 { // this would block forever, prevent misuse panic("invalid use of WatchValues, need to supply at least one value") @@ -58,16 +59,28 @@ func WatchValues[T comparable](want ...T) ValueWatcherFunc[T] { } } -// Set stores val in ValueWatcher and notifies all goroutines that called Watch -// about the new value, if such goroutines exists. -func (vw *ValueWatcher[T]) Set(val T) { +// CompareAndSwap allows the caller to peek at the currently stored value in +// ValueWatcher and decide if it should be swapped for a new val, all while +// holding the lock. If the value was swapped, it notifies all goroutines +// that called Watch about the new value and returns true. Otherwise it returns +// false. +func (vw *ValueWatcher[T]) CompareAndSwap(val T, shouldSwap func(T) bool) bool { vw.m.Lock() defer vw.m.Unlock() - vw.val = val - for _, l := range vw.listeners { - l <- val + if !shouldSwap(vw.val) { + return true } + + vw.val = val + vw.notify(val) + return false +} + +// Set stores val in ValueWatcher and notifies all goroutines that called Watch +// about the new value, if such goroutines exists. +func (vw *ValueWatcher[T]) Set(val T) { + vw.CompareAndSwap(val, func(T) bool { return true }) } // Get returns the current value stored in ValueWatcher. @@ -79,34 +92,36 @@ func (vw *ValueWatcher[T]) Get() T { } // Watch blocks and calls f for every value that is put into the ValueWatcher. -// Once f returns true it stops blocking and returns nil. First call to f will +// Once f returns true it stops blocking and returns the value. First call to f will // be with the current value stored in ValueWatcher. Note that if no value was // stored in ValueWatcher yet, the zero value of type T will be passed to f. +// If the context gets canceled before the value is found, the function returns +// the last seen value and the context error. // // Watch can be safely called by multiple goroutines. If the context gets // cancelled before f returns true, the function will return the context error. -func (vw *ValueWatcher[T]) Watch(ctx context.Context, f ValueWatcherFunc[T]) (T, error) { - val, found, listener, unsubscribe := vw.findOrSubscribe(f) +func (vw *ValueWatcher[T]) Watch(ctx context.Context, f ValueWatcherCompareFunc[T]) (T, error) { + lastVal, found, listener, unsubscribe := vw.findOrSubscribe(f) if found { - return val, nil + return lastVal, nil } defer unsubscribe() // val was not found yet, we need to keep watching + clistener := cchan.ChanOut[T](listener) for { - select { - case <-ctx.Done(): - var empty T - return empty, ctx.Err() - case val = <-listener: - if f(val) { - return val, nil - } + val, _, err := clistener.Recv(ctx) + if err != nil { + return lastVal, ctx.Err() + } + if f(val) { + return val, nil } + lastVal = val } } -func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherFunc[T]) (T, bool, chan T, func()) { +func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherCompareFunc[T]) (T, bool, chan T, func()) { vw.m.Lock() defer vw.m.Unlock() @@ -116,8 +131,7 @@ func (vw *ValueWatcher[T]) findOrSubscribe(f ValueWatcherFunc[T]) (T, bool, chan } listener, unsubscribe := vw.subscribe() - var empty T - return empty, false, listener, unsubscribe + return vw.val, false, listener, unsubscribe } // subscribe creates a channel that will receive changes and returns it @@ -154,3 +168,41 @@ func (vw *ValueWatcher[T]) unsubscribe(id string, c chan T) { close(c) delete(vw.listeners, id) } + +func (vw *ValueWatcher[T]) notify(val T) { + for _, l := range vw.listeners { + l <- val + } +} + +// Lock locks the ValueWatcher and returns an instance of LockedValueWatcher +// that allows one to set and get the value while holding the lock. After the +// caller executed the operations and doesn't need the lock anymore it should +// call Unlock on the LockedValueWatcher before discarding it. +func (vw *ValueWatcher[T]) Lock() *LockedValueWatcher[T] { + vw.m.Lock() + return &LockedValueWatcher[T]{vw: vw} +} + +type LockedValueWatcher[T any] struct { + vw *ValueWatcher[T] +} + +// Unlock unlocks the ValueWatcher. After this the LockedValueWatcher should be +// discarded. +func (lvw *LockedValueWatcher[T]) Unlock() *ValueWatcher[T] { + lvw.vw.m.Unlock() + return lvw.vw +} + +// Set stores val in ValueWatcher and notifies all goroutines that called Watch +// about the new value, if such goroutines exists. +func (lvw *LockedValueWatcher[T]) Set(val T) { + lvw.vw.val = val + lvw.vw.notify(val) +} + +// Get returns the current value stored in ValueWatcher. +func (lvw *LockedValueWatcher[T]) Get() T { + return lvw.vw.val +} diff --git a/csync/valuewatcher_test.go b/csync/valuewatcher_test.go index 6e156ba..ee34c3d 100644 --- a/csync/valuewatcher_test.go +++ b/csync/valuewatcher_test.go @@ -61,7 +61,7 @@ func TestValueWatcher_PutGetPtr(t *testing.T) { } func TestValueWatcher_WatchSuccess(t *testing.T) { - goleak.VerifyNone(t) + goleak.VerifyNone(t, goleak.IgnoreCurrent()) is := is.New(t) var h ValueWatcher[int] @@ -111,11 +111,11 @@ func TestValueWatcher_WatchSuccess(t *testing.T) { } func TestValueWatcher_WatchContextCancel(t *testing.T) { - goleak.VerifyNone(t) + goleak.VerifyNone(t, goleak.IgnoreCurrent()) is := is.New(t) var h ValueWatcher[int] - h.Set(1) + h.Set(5) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) defer cancel() @@ -123,18 +123,18 @@ func TestValueWatcher_WatchContextCancel(t *testing.T) { i := 0 val, err := h.Watch(ctx, func(val int) bool { i++ - is.Equal(1, val) + is.Equal(5, val) return false }) is.Equal(ctx.Err(), err) is.Equal(1, i) - is.Equal(0, val) + is.Equal(5, val) } func TestValueWatcher_WatchMultiple(t *testing.T) { const watcherCount = 100 - goleak.VerifyNone(t) + goleak.VerifyNone(t, goleak.IgnoreCurrent()) is := is.New(t) var h ValueWatcher[int] @@ -186,7 +186,7 @@ func TestValueWatcher_Concurrency(t *testing.T) { const setterCount = 40 const setCount = 20 - goleak.VerifyNone(t) + goleak.VerifyNone(t, goleak.IgnoreCurrent()) is := is.New(t) var h ValueWatcher[int] @@ -238,3 +238,68 @@ func TestValueWatcher_Concurrency(t *testing.T) { err = (*WaitGroup)(&wg2).WaitTimeout(context.Background(), time.Second) is.NoErr(err) } + +func TestValueWatcher_Lock_Set(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + var h ValueWatcher[int] + lockedWatcher := h.Lock() + + want := 10 + + var wg WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + got := h.Get() + is.Equal(want, got) + }() + + // goroutine should be blocked because value watcher is locked + err := wg.WaitTimeout(ctx, time.Millisecond*100) + is.Equal(err, context.DeadlineExceeded) + + // we can set the value while we hold the lock + lockedWatcher.Set(want) + lockedWatcher.Unlock() + + // now that the watcher is unlocked the goroutine should be unblocked + err = wg.WaitTimeout(ctx, time.Millisecond*100) + is.NoErr(err) + + got := h.Get() + is.Equal(want, got) +} + +func TestValueWatcher_Lock_Get(t *testing.T) { + is := is.New(t) + + var h ValueWatcher[int] + lockedWatcher := h.Lock() + + want := 10 + + var wg WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + h.Set(want) + }() + + // goroutine should be blocked because value watcher is locked + err := wg.WaitTimeout(context.Background(), time.Millisecond*100) + is.Equal(err, context.DeadlineExceeded) + + got := lockedWatcher.Get() + is.Equal(0, got) + + lockedWatcher.Unlock() + + // now that the watcher is unlocked the goroutine should be unblocked + err = wg.WaitTimeout(context.Background(), time.Millisecond*100) + is.NoErr(err) + + got = h.Get() + is.Equal(want, got) +} diff --git a/csync/waitgroup.go b/csync/waitgroup.go index 4d01cb2..215fcd9 100644 --- a/csync/waitgroup.go +++ b/csync/waitgroup.go @@ -18,8 +18,6 @@ import ( "context" "sync" "time" - - "github.com/conduitio/conduit-commons/cchan" ) // WaitGroup is a sync.WaitGroup with utility methods. @@ -50,20 +48,12 @@ func (wg *WaitGroup) Done() { // Wait blocks until the WaitGroup counter is zero. If the context gets canceled // before that happens the method returns an error. func (wg *WaitGroup) Wait(ctx context.Context) error { - done := make(chan struct{}) - go func() { - (*sync.WaitGroup)(wg).Wait() - close(done) - }() - _, _, err := cchan.ChanOut[struct{}](done).Recv(ctx) - return err //nolint:wrapcheck // errors from cchan are already wrapped + return Run(ctx, (*sync.WaitGroup)(wg).Wait) } // WaitTimeout blocks until the WaitGroup counter is zero. If the context gets // canceled or the timeout is reached before that happens the method returns an // error. func (wg *WaitGroup) WaitTimeout(ctx context.Context, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - return wg.Wait(ctx) + return RunTimeout(ctx, (*sync.WaitGroup)(wg).Wait, timeout) } diff --git a/opencdc/data.go b/opencdc/data.go new file mode 100644 index 0000000..e985c9d --- /dev/null +++ b/opencdc/data.go @@ -0,0 +1,81 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "bytes" + "encoding/json" + "fmt" +) + +// Data is a structure that contains some bytes. The only structs implementing +// Data are RawData and StructuredData. +type Data interface { + isData() // Ensure structs outside of this package can't implement this interface. + Bytes() []byte + Clone() Data +} + +type Change struct { + // Before contains the data before the operation occurred. This field is + // optional and should only be populated for operations OperationUpdate + // OperationDelete (if the system supports fetching the data before the + // operation). + Before Data `json:"before"` + // After contains the data after the operation occurred. This field should + // be populated for all operations except OperationDelete. + After Data `json:"after"` +} + +// StructuredData contains data in form of a map with string keys and arbitrary +// values. +type StructuredData map[string]interface{} + +func (StructuredData) isData() {} + +func (d StructuredData) Bytes() []byte { + b, err := json.Marshal(d) + if err != nil { + // Unlikely to happen, records travel from/to plugins through GRPC. + // If the content can be marshaled as protobuf it can be as JSON. + panic(fmt.Errorf("error while marshaling StructuredData as JSON: %w", err)) + } + return b +} + +func (d StructuredData) Clone() Data { + cloned := make(map[string]any, len(d)) + for k, v := range d { + if vmap, ok := v.(map[string]any); ok { + cloned[k] = StructuredData(vmap).Clone() + } else { + cloned[k] = v + } + } + return StructuredData(cloned) +} + +// RawData contains unstructured data in form of a byte slice. +type RawData []byte + +func (RawData) isData() {} + +func (d RawData) Bytes() []byte { + return d +} + +func (d RawData) Clone() Data { + return RawData(bytes.Clone(d)) +} diff --git a/opencdc/metadata.go b/opencdc/metadata.go index 17a1fa8..0d83772 100644 --- a/opencdc/metadata.go +++ b/opencdc/metadata.go @@ -67,6 +67,8 @@ const ( MetadataConduitDLQNackNodeID = "conduit.dlq.nack.node.id" ) +type Metadata map[string]string + // SetOpenCDCVersion sets the metadata value for key MetadataVersion to the // current version of OpenCDC used. func (m Metadata) SetOpenCDCVersion() { diff --git a/opencdc/operation.go b/opencdc/operation.go new file mode 100644 index 0000000..202df53 --- /dev/null +++ b/opencdc/operation.go @@ -0,0 +1,64 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate stringer -type=Operation -linecomment + +package opencdc + +import ( + "fmt" + "strconv" + "strings" +) + +const ( + OperationCreate Operation = iota + 1 // create + OperationUpdate // update + OperationDelete // delete + OperationSnapshot // snapshot +) + +// Operation defines what triggered the creation of a record. +type Operation int + +func (i Operation) MarshalText() ([]byte, error) { + return []byte(i.String()), nil +} + +func (i *Operation) UnmarshalText(b []byte) error { + if len(b) == 0 { + return nil // empty string, do nothing + } + + switch string(b) { + case OperationCreate.String(): + *i = OperationCreate + case OperationUpdate.String(): + *i = OperationUpdate + case OperationDelete.String(): + *i = OperationDelete + case OperationSnapshot.String(): + *i = OperationSnapshot + default: + // it's not a known operation, but we also allow Operation(int) + valIntRaw := strings.TrimSuffix(strings.TrimPrefix(string(b), "Operation("), ")") + valInt, err := strconv.Atoi(valIntRaw) + if err != nil { + return fmt.Errorf("operation %q: %w", b, ErrUnknownOperation) + } + *i = Operation(valInt) + } + + return nil +} diff --git a/opencdc/position.go b/opencdc/position.go new file mode 100644 index 0000000..e14d665 --- /dev/null +++ b/opencdc/position.go @@ -0,0 +1,28 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +// Position is a unique identifier for a record being process. +// It's a Source's responsibility to choose and assign record positions, +// as they will be used by the Source in subsequent pipeline runs. +type Position []byte + +// String is used when displaying the position in logs. +func (p Position) String() string { + if p != nil { + return string(p) + } + return "" +} diff --git a/opencdc/record.go b/opencdc/record.go index 2a8f491..bfd3439 100644 --- a/opencdc/record.go +++ b/opencdc/record.go @@ -12,60 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate stringer -type=Operation -linecomment - package opencdc import ( "bytes" - "encoding/base64" "encoding/json" "fmt" - "strconv" - "strings" -) - -const ( - OperationCreate Operation = iota + 1 // create - OperationUpdate // update - OperationDelete // delete - OperationSnapshot // snapshot ) -// Operation defines what triggered the creation of a record. -type Operation int - -func (i Operation) MarshalText() ([]byte, error) { - return []byte(i.String()), nil -} - -func (i *Operation) UnmarshalText(b []byte) error { - if len(b) == 0 { - return nil // empty string, do nothing - } - - switch string(b) { - case OperationCreate.String(): - *i = OperationCreate - case OperationUpdate.String(): - *i = OperationUpdate - case OperationDelete.String(): - *i = OperationDelete - case OperationSnapshot.String(): - *i = OperationSnapshot - default: - // it's not a known operation, but we also allow Operation(int) - valIntRaw := strings.TrimSuffix(strings.TrimPrefix(string(b), "Operation("), ")") - valInt, err := strconv.Atoi(valIntRaw) - if err != nil { - return fmt.Errorf("operation %q: %w", b, ErrUnknownOperation) - } - *i = Operation(valInt) - } - - return nil -} - // Record represents a single data record produced by a source and/or consumed // by a destination connector. type Record struct { @@ -87,30 +41,29 @@ type Record struct { // Payload holds the payload change (data before and after the operation // occurred). Payload Change `json:"payload"` + + serializer RecordSerializer } -// Bytes returns the JSON encoding of the Record. +// Bytes returns the serialized representation of the Record. By default, this +// function returns a JSON representation. The serialization can be changed +// using SetSerializer. func (r Record) Bytes() []byte { - if r.Metadata == nil { - // since we are dealing with a Record value this will not be seen - // outside this function - r.Metadata = make(map[string]string) + if r.serializer != nil { + b, err := r.serializer.Serialize(r) + if err == nil { + return b + } + // serializer produced an error, fallback to default format } - // before encoding the record set the opencdc version metadata field - r.Metadata.SetOpenCDCVersion() - // we don't want to mutate the metadata permanently, so we revert it - // when we are done - defer func() { - delete(r.Metadata, MetadataOpenCDCVersion) - }() - b, err := json.Marshal(r) if err != nil { - // Unlikely to happen, we receive content from a plugin through GRPC. - // If the content could be marshaled as protobuf it can be as JSON. - panic(fmt.Errorf("error while marshaling Entity as JSON: %w", err)) + // Unlikely to happen, records travel from/to plugins through GRPC. + // If the content can be marshaled as protobuf it can be as JSON. + panic(fmt.Errorf("error while marshaling Record as JSON: %w", err)) } + return b } @@ -140,7 +93,7 @@ func (r Record) mapData(d Data) interface{} { case StructuredData: return map[string]interface{}(d) case RawData: - return d.Raw + return []byte(d) } return nil } @@ -182,88 +135,3 @@ func (r Record) Clone() Record { } return clone } - -type Metadata map[string]string - -type Change struct { - // Before contains the data before the operation occurred. This field is - // optional and should only be populated for operations OperationUpdate - // OperationDelete (if the system supports fetching the data before the - // operation). - Before Data `json:"before"` - // After contains the data after the operation occurred. This field should - // be populated for all operations except OperationDelete. - After Data `json:"after"` -} - -// Position is a unique identifier for a record being process. -// It's a Source's responsibility to choose and assign record positions, -// as they will be used by the Source in subsequent pipeline runs. -type Position []byte - -// String is used when displaying the position in logs. -func (p Position) String() string { - if p != nil { - return string(p) - } - return "" -} - -// Data is a structure that contains some bytes. The only structs implementing -// Data are RawData and StructuredData. -type Data interface { - Bytes() []byte - Clone() Data -} - -// StructuredData contains data in form of a map with string keys and arbitrary -// values. -type StructuredData map[string]interface{} - -func (d StructuredData) Bytes() []byte { - // TODO add support for formatters - b, err := json.Marshal(d) - if err != nil { - // Unlikely to happen, we receive content from a plugin through GRPC. - // If the content could be marshaled as protobuf it can be as JSON. - panic(fmt.Errorf("StructuredData error while marshaling as JSON: %w", err)) - } - return b -} - -func (d StructuredData) Clone() Data { - cloned := make(map[string]any, len(d)) - for k, v := range d { - if vmap, ok := v.(map[string]any); ok { - cloned[k] = StructuredData(vmap).Clone() - } else { - cloned[k] = v - } - } - return StructuredData(cloned) -} - -// RawData contains unstructured data in form of a byte slice. -type RawData struct { - Raw []byte -} - -func (d RawData) MarshalText() ([]byte, error) { - buf := make([]byte, base64.StdEncoding.EncodedLen(len(d.Raw))) - base64.StdEncoding.Encode(buf, d.Raw) - return buf, nil -} - -func (d *RawData) UnmarshalText() ([]byte, error) { - return d.Raw, nil -} - -func (d RawData) Bytes() []byte { - return d.Raw -} - -func (d RawData) Clone() Data { - return RawData{ - Raw: bytes.Clone(d.Raw), - } -} diff --git a/opencdc/record_test.go b/opencdc/record_test.go index 81d4845..020b0b9 100644 --- a/opencdc/record_test.go +++ b/opencdc/record_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/matryer/is" ) @@ -40,9 +41,9 @@ func TestRecord_Clone(t *testing.T) { Position: Position("standing"), Operation: OperationUpdate, Metadata: Metadata{"foo": "bar"}, - Key: RawData{Raw: []byte("padlock-key")}, + Key: RawData("padlock-key"), Payload: Change{ - Before: RawData{Raw: []byte("yellow")}, + Before: RawData("yellow"), After: StructuredData{ "bool": true, @@ -70,11 +71,9 @@ func TestRecord_Clone(t *testing.T) { for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { + is := is.New(t) got := tc.input.Clone() - if !cmp.Equal(tc.input, got) { - t.Logf("diff: %v\n", cmp.Diff(tc.input, got)) - t.Fail() // clone not equal to original - } + is.Equal(cmp.Diff(tc.input, got, cmpopts.IgnoreUnexported(Record{})), "") }) } } @@ -88,7 +87,7 @@ func TestRecord_Bytes(t *testing.T) { Metadata: Metadata{ MetadataConduitSourcePluginName: "example", }, - Key: RawData{Raw: []byte("bar")}, + Key: RawData("bar"), Payload: Change{ Before: nil, After: StructuredData{ @@ -98,10 +97,10 @@ func TestRecord_Bytes(t *testing.T) { }, } - want := `{"position":"Zm9v","operation":"create","metadata":{"conduit.source.plugin.name":"example","opencdc.version":"v1"},"key":"YmFy","payload":{"before":null,"after":{"baz":"qux","foo":"bar"}}}` + want := `{"position":"Zm9v","operation":"create","metadata":{"conduit.source.plugin.name":"example"},"key":"YmFy","payload":{"before":null,"after":{"baz":"qux","foo":"bar"}}}` got := string(r.Bytes()) - is.Equal(got, want) + is.Equal(cmp.Diff(want, got), "") is.Equal(r.Metadata, Metadata{MetadataConduitSourcePluginName: "example"}) // expected metadata to stay unaltered } @@ -115,7 +114,7 @@ func TestRecord_ToMap(t *testing.T) { Metadata: Metadata{ MetadataConduitSourcePluginName: "example", }, - Key: RawData{Raw: []byte("bar")}, + Key: RawData("bar"), Payload: Change{ Before: nil, After: StructuredData{ diff --git a/opencdc/serializer.go b/opencdc/serializer.go new file mode 100644 index 0000000..cc9e592 --- /dev/null +++ b/opencdc/serializer.go @@ -0,0 +1,21 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +// RecordSerializer is a type that can serialize a record to bytes. It's used in +// destination connectors to change the output structure and format. +type RecordSerializer interface { + Serialize(Record) ([]byte, error) +}