Skip to content

Commit

Permalink
feat: impl avro marshal method
Browse files Browse the repository at this point in the history
  • Loading branch information
ucpr committed Jan 5, 2024
1 parent d7fbae8 commit 25752fb
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
35 changes: 35 additions & 0 deletions internal/model/change_event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package model

import (
_ "embed"
"encoding/json"

"github.com/hamba/avro/v2"
)

//go:embed schema/change_stream.avsc
var avroSchema string

type (
// ChangeEvent is a struct that represents a change stream event.
ChangeEvent struct {
Expand All @@ -24,3 +34,28 @@ type (
Coll string `avro:"coll" bson:"coll" json:"coll"`
}
)

// Avro returns the avro encoded byte array of the change stream event.
func (c ChangeEvent) Avro() ([]byte, error) {
schema, err := avro.Parse(avroSchema)
if err != nil {
return nil, err
}

b, err := avro.Marshal(schema, c)
if err != nil {
return nil, err
}

return b, nil
}

// JSON returns the json encoded byte array of the change stream event.
func (c ChangeEvent) JSON() ([]byte, error) {
b, err := json.Marshal(c)
if err != nil {
return nil, err
}

return b, nil
}
95 changes: 95 additions & 0 deletions internal/model/change_event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package model

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestChangeEvent_Avro(t *testing.T) {
t.Parallel()

patterns := []struct {
name string
in ChangeEvent
out []byte
}{
{
name: "success",
in: ChangeEvent{
ID: "3dade3fb-189a-4d22-9c62-c759069da1c8",
OperationType: "updated",
FullDocument: []byte("aabbccddeeffgg"),
DocumentKey: "96c316ca-39a4-4e5c-b7d6-716b869b5c08",
UpdateDescription: &UpdateDescription{
UpdatedFields: "aabbccddeeffgg",
RemovedFields: "aabbccddeeffgg",
},
Namespace: Namespace{
DB: "database",
Coll: "collection",
},
To: &Namespace{
DB: "database",
Coll: "collection",
},
},
out: []byte{0x48, 0x33, 0x64, 0x61, 0x64, 0x65, 0x33, 0x66, 0x62, 0x2d, 0x31, 0x38, 0x39, 0x61, 0x2d, 0x34, 0x64, 0x32, 0x32, 0x2d, 0x39, 0x63, 0x36, 0x32, 0x2d, 0x63, 0x37, 0x35, 0x39, 0x30, 0x36, 0x39, 0x64, 0x61, 0x31, 0x63, 0x38, 0xe, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x2, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x48, 0x39, 0x36, 0x63, 0x33, 0x31, 0x36, 0x63, 0x61, 0x2d, 0x33, 0x39, 0x61, 0x34, 0x2d, 0x34, 0x65, 0x35, 0x63, 0x2d, 0x62, 0x37, 0x64, 0x36, 0x2d, 0x37, 0x31, 0x36, 0x62, 0x38, 0x36, 0x39, 0x62, 0x35, 0x63, 0x30, 0x38, 0x2, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x14, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2, 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x14, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e},
},
}

for _, tt := range patterns {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := tt.in.Avro()
assert.NoError(t, err)
assert.Equal(t, tt.out, got)
})
}
}

func TestChangeEvent_JSON(t *testing.T) {
t.Parallel()

patterns := []struct {
name string
in ChangeEvent
out []byte
}{
{
name: "success",
in: ChangeEvent{
ID: "3dade3fb-189a-4d22-9c62-c759069da1c8",
OperationType: "updated",
FullDocument: []byte("aabbccddeeffgg"),
DocumentKey: "96c316ca-39a4-4e5c-b7d6-716b869b5c08",
UpdateDescription: &UpdateDescription{
UpdatedFields: "aabbccddeeffgg",
RemovedFields: "aabbccddeeffgg",
},
Namespace: Namespace{
DB: "database",
Coll: "collection",
},
To: &Namespace{
DB: "database",
Coll: "collection",
},
},
out: []byte(`{"_id":"3dade3fb-189a-4d22-9c62-c759069da1c8","operation_type":"updated","full_document":"YWFiYmNjZGRlZWZmZ2c=","document_key":"96c316ca-39a4-4e5c-b7d6-716b869b5c08","update_description":{"updated_fields":"aabbccddeeffgg","removed_fields":"aabbccddeeffgg"},"namespace":{"db":"database","coll":"collection"},"to":{"db":"database","coll":"collection"}}`),
},
}

for _, tt := range patterns {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := tt.in.JSON()
assert.NoError(t, err)
assert.Equal(t, tt.out, got)
})
}
}

0 comments on commit 25752fb

Please sign in to comment.