Skip to content

Commit

Permalink
Add schema serde utilities (#73)
Browse files Browse the repository at this point in the history
* add schema serde utilities

* update go version

* move avro builder into avro package, rename avro.Schema to avro.Serde

* resolve lint warnings

* do not expose Sort method

* use bytes instead of string

* clarify comment

* add comment about sorting
  • Loading branch information
lovromazgon authored Jul 9, 2024
1 parent d68a4ca commit d973cba
Show file tree
Hide file tree
Showing 20 changed files with 2,324 additions and 34 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/conduitio/conduit-commons

go 1.21.1
go 1.22.4

require (
github.com/bufbuild/buf v1.34.0
Expand All @@ -11,6 +11,8 @@ require (
github.com/hamba/avro/v2 v2.22.1
github.com/matryer/is v1.4.1
github.com/mitchellh/mapstructure v1.5.0
github.com/modern-go/reflect2 v1.0.2
github.com/twmb/go-cache v1.2.1
go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
golang.org/x/tools v0.23.0
Expand Down Expand Up @@ -158,7 +160,6 @@ require (
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/moricho/tparallel v0.3.1 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,8 @@ github.com/tomarrell/wrapcheck/v2 v2.8.3 h1:5ov+Cbhlgi7s/a42BprYoxsr73CbdMUTzE3b
github.com/tomarrell/wrapcheck/v2 v2.8.3/go.mod h1:g9vNIyhb5/9TQgumxQyOEqDHsmGYcGsVMOx/xGkqdMo=
github.com/tommy-muehle/go-mnd/v2 v2.5.1 h1:NowYhSdyE/1zwK9QCLeRb6USWdoif80Ie+v+yU8u1Zw=
github.com/tommy-muehle/go-mnd/v2 v2.5.1/go.mod h1:WsUAkMJMYww6l/ufffCD3m+P7LEvr8TnZn9lwVDlgzw=
github.com/twmb/go-cache v1.2.1 h1:yUkLutow4S2x5NMbqFW24o14OsucoFI5Fzmlb6uBinM=
github.com/twmb/go-cache v1.2.1/go.mod h1:lArg9KhCl+GTFMikitLGhIBh/i11OK0lhSveqlMbbrY=
github.com/ultraware/funlen v0.1.0 h1:BuqclbkY6pO+cvxoq7OsktIXZpgBSkYTQtmwhAK81vI=
github.com/ultraware/funlen v0.1.0/go.mod h1:XJqmOQja6DpxarLj6Jj1U7JuoS8PvL4nEqDaQhy22p4=
github.com/ultraware/whitespace v0.1.1 h1:bTPOGejYFulW3PkcrqkeQwOd6NKOOXvmGD9bo/Gk8VQ=
Expand Down
31 changes: 23 additions & 8 deletions proto/schema/v1/schema.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions proto/schema/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@ message Schema {
TYPE_AVRO = 1;
}

// The subject of the schema. Together with the version, this uniquely
// identifies the schema.
string subject = 1;
// The version of the schema. Together with the subject, this uniquely
// identifies the schema.
int32 version = 2;
Type type = 3;
// The schema contents
bytes bytes = 4;

// Uniquely identifies the schema contents (not the schema itself!).
int32 id = 3;
// The schema type.
Type type = 4;
// The schema contents.
bytes bytes = 5;
}
86 changes: 86 additions & 0 deletions rabin/rabin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package rabin provides a Rabin fingerprint hash.Hash64 implementation compatible
// with the Avro spec: https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints.
package rabin

import "hash"

type digest uint64

// New constructs a new Rabin fingerprint hash.Hash64 initialized to the empty
// state according to the Avro spec: https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints.
func New() hash.Hash64 {
var d digest
d.Reset()
return &d
}

func (d *digest) Write(p []byte) (n int, err error) {
*d = update(*d, p)
return len(p), nil
}

func (d *digest) Sum64() uint64 {
return uint64(*d)
}

func (d *digest) Sum(in []byte) []byte {
s := d.Sum64()
return append(in, byte(s>>56), byte(s>>48), byte(s>>40), byte(s>>32), byte(s>>24), byte(s>>16), byte(s>>8), byte(s))
}

func (d *digest) Reset() {
*d = digest(rabinEmpty)
}

func (d *digest) Size() int { return 8 }
func (d *digest) BlockSize() int { return 1 }

const rabinEmpty = uint64(0xc15d213aa4d7a795)

// rabinTable is used to compute the CRC-64-AVRO fingerprint.
var rabinTable = newRabinFingerprintTable()

// newRabinFingerprintTable initializes the fingerprint table according to the
// spec: https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints
func newRabinFingerprintTable() [256]uint64 {
fpTable := [256]uint64{}
for i := 0; i < 256; i++ {
fp := uint64(i)
for j := 0; j < 8; j++ {
fp = (fp >> 1) ^ (rabinEmpty & -(fp & 1))
}
fpTable[i] = fp
}
return fpTable
}

// Bytes creates a Rabin fingerprint according to the spec:
// https://avro.apache.org/docs/1.8.2/spec.html#schema_fingerprints
func Bytes(buf []byte) uint64 {
h := New()
_, _ = h.Write(buf) // it never returns an error
return h.Sum64()
}

// update adds p to the running checksum d.
func update(d digest, p []byte) digest {
fp := uint64(d)
for i := 0; i < len(p); i++ {
fp = (fp >> 8) ^ rabinTable[(byte(fp)^p[i])&0xff]
}
return digest(fp)
}
52 changes: 52 additions & 0 deletions rabin/rabin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rabin

import (
"fmt"
"testing"

"github.com/matryer/is"
)

func TestRabin(t *testing.T) {
testCases := []struct {
have string
want uint64
}{
{have: `"int"`, want: 0x7275d51a3f395c8f},
{have: `"string"`, want: 0x8f014872634503c7},
{have: `"bool"`, want: 0x4a1c6b80ca0bcf48},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("Bytes_%v", tc.have), func(t *testing.T) {
is := is.New(t)
got := Bytes([]byte(tc.have))
is.Equal(tc.want, got)
})
t.Run(fmt.Sprintf("Hash_%v", tc.have), func(t *testing.T) {
is := is.New(t)
d := New()

n, err := d.Write([]byte(tc.have))
is.NoErr(err)
is.Equal(n, len(tc.have))

got := d.Sum64()
is.Equal(tc.want, got)
})
}
}
20 changes: 10 additions & 10 deletions schema/avro_builder.go → schema/avro/avro_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema
package avro

import (
"errors"
Expand All @@ -21,21 +21,21 @@ import (
"github.com/hamba/avro/v2"
)

// AvroBuilder builds avro.RecordSchema instances and marshals them into JSON.
// AvroBuilder accepts arguments for creating fields and creates them internally
// Builder builds avro.RecordSchema instances and marshals them into JSON.
// Builder accepts arguments for creating fields and creates them internally
// (i.e. a user doesn't need to create the fields).
// All errors will be returned as a joined error when marshaling the schema to JSON.
type AvroBuilder struct {
type Builder struct {
errs []error
fields []*avro.Field
name string
namespace string
}

// NewAvroBuilder constructs a new AvroBuilder and initializes it
// NewBuilder constructs a new Builder and initializes it
// with the given name and namespace.
func NewAvroBuilder(name, namespace string) *AvroBuilder {
return &AvroBuilder{
func NewBuilder(name, namespace string) *Builder {
return &Builder{
name: name,
namespace: namespace,
}
Expand All @@ -44,7 +44,7 @@ func NewAvroBuilder(name, namespace string) *AvroBuilder {
// AddField adds a new field with the given name, schema and schema options.
// If creating the field returns an error, the error is saved, joined with
// other errors (if any), and returned when marshaling to JSON.
func (b *AvroBuilder) AddField(name string, typ avro.Schema, opts ...avro.SchemaOption) *AvroBuilder {
func (b *Builder) AddField(name string, typ avro.Schema, opts ...avro.SchemaOption) *Builder {
f, err := avro.NewField(name, typ, opts...)
if err != nil {
b.errs = append(b.errs, fmt.Errorf("field %v: %w", name, err))
Expand All @@ -58,7 +58,7 @@ func (b *AvroBuilder) AddField(name string, typ avro.Schema, opts ...avro.Schema
// Build builds the underlying schema.
// Errors that occurred while creating fields or constructing
// the schema will be returned as a joined error.
func (b *AvroBuilder) Build() (*avro.RecordSchema, error) {
func (b *Builder) Build() (*avro.RecordSchema, error) {
if b.errs != nil {
return nil, errors.Join(b.errs...)
}
Expand All @@ -74,7 +74,7 @@ func (b *AvroBuilder) Build() (*avro.RecordSchema, error) {
// MarshalJSON marshals the underlying schema to JSON.
// Errors that occurred while creating fields, constructing
// the schema or marshaling it will be returned as a joined error.
func (b *AvroBuilder) MarshalJSON() ([]byte, error) {
func (b *Builder) MarshalJSON() ([]byte, error) {
schema, err := b.Build()
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema
package avro

import (
"fmt"
Expand All @@ -21,12 +21,12 @@ import (
"github.com/hamba/avro/v2"
)

func ExampleAvroBuilder() {
func ExampleBuilder() {
enumSchema, err := avro.NewEnumSchema("enum_schema", "enum_namespace", []string{"val1", "val2", "val3"})
if err != nil {
panic(err)
}
bytes, err := NewAvroBuilder("schema_name", "schema_namespace").
bytes, err := NewBuilder("schema_name", "schema_namespace").
AddField("int_field", avro.NewPrimitiveSchema(avro.Int, nil), avro.WithDefault(100)).
AddField("enum_field", enumSchema).
MarshalJSON()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package schema
package avro

import (
"testing"
Expand All @@ -21,7 +21,7 @@ import (
"github.com/matryer/is"
)

func TestAvroBuilder_Build(t *testing.T) {
func TestBuilder_Build(t *testing.T) {
is := is.New(t)

enumSchema, err := avro.NewEnumSchema("enum_schema", "enum_namespace", []string{"val1", "val2", "val3"})
Expand All @@ -43,7 +43,7 @@ func TestAvroBuilder_Build(t *testing.T) {
want, err := wantSchema.MarshalJSON()
is.NoErr(err)

got, err := NewAvroBuilder("schema_name", "schema_namespace").
got, err := NewBuilder("schema_name", "schema_namespace").
AddField("int_field", avro.NewPrimitiveSchema(avro.Int, nil), avro.WithDefault(100)).
AddField("enum_field", enumSchema).
MarshalJSON()
Expand Down
22 changes: 22 additions & 0 deletions schema/avro/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package avro

import "errors"

var (
ErrUnsupportedType = errors.New("unsupported avro type")
ErrSchemaValueMismatch = errors.New("avro schema doesn't match supplied value")
)
Loading

0 comments on commit d973cba

Please sign in to comment.