From e61b3844199e413b477c75e3df76782c7a7daa21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Wed, 28 Feb 2024 19:18:06 +0100 Subject: [PATCH 1/4] add string serializer for raw data --- opencdc/data.go | 21 ++++++++++++ opencdc/json_test.go | 5 ++- opencdc/serializer.go | 28 +++++++++++++++ opencdc/serializer_test.go | 70 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 opencdc/serializer_test.go diff --git a/opencdc/data.go b/opencdc/data.go index 3b7c8b3..813cbce 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -16,6 +16,8 @@ package opencdc import ( "bytes" + "context" + "encoding/base64" "fmt" opencdcv1 "github.com/conduitio/conduit-commons/proto/opencdc/v1" @@ -82,3 +84,22 @@ func (d RawData) Bytes() []byte { func (d RawData) Clone() Data { return RawData(bytes.Clone(d)) } + +func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) { + if ctx != nil { + s := ctx.Value(jsonSerializerCtxKey{}) + if s != nil && s.(*JSONSerializer).RawDataAsString { + return json.Marshal(string(d)) + } + } + + if d == nil { + return []byte(`null`), nil + } + encodedLen := base64.StdEncoding.EncodedLen(len(d)) + out := make([]byte, encodedLen+2) + out[0] = '"' // add leading quote + base64.StdEncoding.Encode(out[1:], d) + out[encodedLen+1] = '"' // add trailing quote + return out, nil +} diff --git a/opencdc/json_test.go b/opencdc/json_test.go index fdbaa93..ae6dd66 100644 --- a/opencdc/json_test.go +++ b/opencdc/json_test.go @@ -23,7 +23,7 @@ import ( "github.com/matryer/is" ) -func TestRecord_UnmarshalJSON(t *testing.T) { +func TestRecord_JSON(t *testing.T) { is := is.New(t) have := Record{ Position: Position("standing"), @@ -46,6 +46,7 @@ func TestRecord_UnmarshalJSON(t *testing.T) { }, }, } + wantJSON := `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}` want := Record{ Position: Position("standing"), Operation: OperationUpdate, @@ -71,6 +72,8 @@ func TestRecord_UnmarshalJSON(t *testing.T) { b, err := json.Marshal(have) is.NoErr(err) + is.Equal(cmp.Diff(string(b), wantJSON), "") + var got Record err = json.Unmarshal(b, &got) is.NoErr(err) diff --git a/opencdc/serializer.go b/opencdc/serializer.go index cc9e592..6ea52d2 100644 --- a/opencdc/serializer.go +++ b/opencdc/serializer.go @@ -14,8 +14,36 @@ package opencdc +import ( + "context" + + "github.com/goccy/go-json" +) + // RecordSerializer is a type that can serialize a record to bytes. It's used in // destination connectors to change the output structure and format. type RecordSerializer interface { Serialize(Record) ([]byte, error) } + +// JSONSerializer is a RecordSerializer that serializes records to JSON. +type JSONSerializer struct { + // RawDataAsString is a flag that indicates if the RawData type should be + // serialized as a string. If set to false, RawData will be serialized as a + // base64 encoded string. If set to true, RawData will be serialized as a + // string without conversion. + RawDataAsString bool +} + +type jsonSerializerCtxKey struct{} + +func (s JSONSerializer) Serialize(r Record) ([]byte, error) { + ctx := context.WithValue(context.Background(), jsonSerializerCtxKey{}, &s) + defer func() { + // Workaround because of https://github.com/goccy/go-json/issues/499. + // TODO: Remove this when the issue is fixed and store value in context + // instead of pointer. + s = JSONSerializer{} + }() + return json.MarshalContext(ctx, r) +} diff --git a/opencdc/serializer_test.go b/opencdc/serializer_test.go new file mode 100644 index 0000000..fac3c26 --- /dev/null +++ b/opencdc/serializer_test.go @@ -0,0 +1,70 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencdc + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/matryer/is" +) + +func TestJSONSerializer(t *testing.T) { + t.Skip() + is := is.New(t) + rec := Record{ + Position: Position("standing"), + Operation: OperationUpdate, + Metadata: Metadata{"foo": "bar"}, + Key: RawData("padlock-key"), + Payload: Change{ + Before: RawData("yellow"), + After: StructuredData{ + "bool": true, + + "int": 1, + "int32": int32(1), + "int64": int64(1), + + "float32": float32(1.2), + "float64": 1.2, + + "string": "orange", + }, + }, + } + + testCases := []struct { + name string + serializer JSONSerializer + want string + }{{ + name: "default", + serializer: JSONSerializer{}, + want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"cGFkbG9jay1rZXk=","payload":{"before":"eWVsbG93","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`, + }, { + name: "raw data as string", + serializer: JSONSerializer{RawDataAsString: true}, + want: `{"position":"c3RhbmRpbmc=","operation":"update","metadata":{"foo":"bar"},"key":"padlock-key","payload":{"before":"yellow","after":{"bool":true,"float32":1.2,"float64":1.2,"int":1,"int32":1,"int64":1,"string":"orange"}}}`, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rec.SetSerializer(tc.serializer) + b := rec.Bytes() + is.Equal(cmp.Diff(string(b), tc.want), "") + }) + } +} From 44c456787a2569a37add290276a87e7b6435307f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Wed, 28 Feb 2024 19:32:04 +0100 Subject: [PATCH 2/4] document code --- opencdc/data.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/opencdc/data.go b/opencdc/data.go index 813cbce..0fb5524 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -89,10 +89,14 @@ func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) { if ctx != nil { s := ctx.Value(jsonSerializerCtxKey{}) if s != nil && s.(*JSONSerializer).RawDataAsString { + // We should serialize RawData as a string. return json.Marshal(string(d)) } } + // We could use json.Marshal([]byte(d)) here, but it would be 3 times slower, + // and since this is in the hot path, we need to optimize it. + if d == nil { return []byte(`null`), nil } From d327c8f9327033ef5999ed6fd60f00401cd328b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Wed, 28 Feb 2024 19:54:58 +0100 Subject: [PATCH 3/4] expose json marshal options context func --- opencdc/data.go | 4 ++-- opencdc/json.go | 19 +++++++++++++++++++ opencdc/serializer.go | 15 ++++----------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/opencdc/data.go b/opencdc/data.go index 0fb5524..962fb0d 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -87,8 +87,8 @@ func (d RawData) Clone() Data { func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) { if ctx != nil { - s := ctx.Value(jsonSerializerCtxKey{}) - if s != nil && s.(*JSONSerializer).RawDataAsString { + s := ctx.Value(jsonMarshalOptionsCtxKey{}) + if s != nil && s.(*JSONMarshalOptions).RawDataAsString { // We should serialize RawData as a string. return json.Marshal(string(d)) } diff --git a/opencdc/json.go b/opencdc/json.go index 2935fe6..fd785f0 100644 --- a/opencdc/json.go +++ b/opencdc/json.go @@ -15,11 +15,30 @@ package opencdc import ( + "context" "fmt" "github.com/goccy/go-json" ) +// JSONMarshalOptions can customize how a record is serialized to JSON. It can +// be attached to a context using WithJSONMarshalOptions and supplied to +// json.MarshalContext to customize the serialization behavior. +type JSONMarshalOptions struct { + // RawDataAsString is a flag that indicates if the RawData type should be + // serialized as a string. If set to false, RawData will be serialized as a + // base64 encoded string. If set to true, RawData will be serialized as a + // string without conversion. + RawDataAsString bool +} + +type jsonMarshalOptionsCtxKey struct{} + +// WithJSONMarshalOptions attaches JSONMarshalOptions to a context. +func WithJSONMarshalOptions(ctx context.Context, options *JSONMarshalOptions) context.Context { + return context.WithValue(ctx, jsonMarshalOptionsCtxKey{}, options) +} + func (r *Record) UnmarshalJSON(b []byte) error { var raw struct { Position Position `json:"position"` diff --git a/opencdc/serializer.go b/opencdc/serializer.go index 6ea52d2..d2e4879 100644 --- a/opencdc/serializer.go +++ b/opencdc/serializer.go @@ -26,19 +26,12 @@ type RecordSerializer interface { Serialize(Record) ([]byte, error) } -// JSONSerializer is a RecordSerializer that serializes records to JSON. -type JSONSerializer struct { - // RawDataAsString is a flag that indicates if the RawData type should be - // serialized as a string. If set to false, RawData will be serialized as a - // base64 encoded string. If set to true, RawData will be serialized as a - // string without conversion. - RawDataAsString bool -} - -type jsonSerializerCtxKey struct{} +// JSONSerializer is a RecordSerializer that serializes records to JSON using +// the configured options. +type JSONSerializer JSONMarshalOptions func (s JSONSerializer) Serialize(r Record) ([]byte, error) { - ctx := context.WithValue(context.Background(), jsonSerializerCtxKey{}, &s) + ctx := WithJSONMarshalOptions(context.Background(), (*JSONMarshalOptions)(&s)) defer func() { // Workaround because of https://github.com/goccy/go-json/issues/499. // TODO: Remove this when the issue is fixed and store value in context From 49ce3f31fbcdf1293f2ebbd139b4aea8264ca7ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Wed, 28 Feb 2024 19:59:05 +0100 Subject: [PATCH 4/4] fix linter errors --- opencdc/data.go | 2 ++ opencdc/serializer.go | 7 ++++++- opencdc/serializer_test.go | 3 +-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/opencdc/data.go b/opencdc/data.go index 962fb0d..f8b787f 100644 --- a/opencdc/data.go +++ b/opencdc/data.go @@ -88,8 +88,10 @@ func (d RawData) Clone() Data { func (d RawData) MarshalJSON(ctx context.Context) ([]byte, error) { if ctx != nil { s := ctx.Value(jsonMarshalOptionsCtxKey{}) + //nolint:forcetypeassert // We know the type of the value. if s != nil && s.(*JSONMarshalOptions).RawDataAsString { // We should serialize RawData as a string. + //nolint:wrapcheck // If we didn't implement MarshalJSON this would be done by the json package. return json.Marshal(string(d)) } } diff --git a/opencdc/serializer.go b/opencdc/serializer.go index d2e4879..c75d64b 100644 --- a/opencdc/serializer.go +++ b/opencdc/serializer.go @@ -16,6 +16,7 @@ package opencdc import ( "context" + "fmt" "github.com/goccy/go-json" ) @@ -38,5 +39,9 @@ func (s JSONSerializer) Serialize(r Record) ([]byte, error) { // instead of pointer. s = JSONSerializer{} }() - return json.MarshalContext(ctx, r) + bytes, err := json.MarshalContext(ctx, r) + if err != nil { + return nil, fmt.Errorf("failed to serialize record to JSON: %w", err) + } + return bytes, nil } diff --git a/opencdc/serializer_test.go b/opencdc/serializer_test.go index fac3c26..f1c6e58 100644 --- a/opencdc/serializer_test.go +++ b/opencdc/serializer_test.go @@ -22,8 +22,6 @@ import ( ) func TestJSONSerializer(t *testing.T) { - t.Skip() - is := is.New(t) rec := Record{ Position: Position("standing"), Operation: OperationUpdate, @@ -62,6 +60,7 @@ func TestJSONSerializer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + is := is.New(t) rec.SetSerializer(tc.serializer) b := rec.Bytes() is.Equal(cmp.Diff(string(b), tc.want), "")