diff --git a/internal/model/change_event.go b/internal/model/change_event.go index d8503ec..5320aa0 100644 --- a/internal/model/change_event.go +++ b/internal/model/change_event.go @@ -1,5 +1,15 @@ package model +import ( + _ "embed" + "encoding/json" + + "github.com/hamba/avro/v2" +) + +//go:embed schema/change_stream.avsc +var avroSchema string + type ( // ChangeEvent is a struct that represents a change stream event. ChangeEvent struct { @@ -24,3 +34,28 @@ type ( Coll string `avro:"coll" bson:"coll" json:"coll"` } ) + +// Avro returns the avro encoded byte array of the change stream event. +func (c ChangeEvent) Avro() ([]byte, error) { + schema, err := avro.Parse(avroSchema) + if err != nil { + return nil, err + } + + b, err := avro.Marshal(schema, c) + if err != nil { + return nil, err + } + + return b, nil +} + +// JSON returns the json encoded byte array of the change stream event. +func (c ChangeEvent) JSON() ([]byte, error) { + b, err := json.Marshal(c) + if err != nil { + return nil, err + } + + return b, nil +} diff --git a/internal/model/change_event_test.go b/internal/model/change_event_test.go new file mode 100644 index 0000000..72bb840 --- /dev/null +++ b/internal/model/change_event_test.go @@ -0,0 +1,95 @@ +package model + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChangeEvent_Avro(t *testing.T) { + t.Parallel() + + patterns := []struct { + name string + in ChangeEvent + out []byte + }{ + { + name: "success", + in: ChangeEvent{ + ID: "3dade3fb-189a-4d22-9c62-c759069da1c8", + OperationType: "updated", + FullDocument: []byte("aabbccddeeffgg"), + DocumentKey: "96c316ca-39a4-4e5c-b7d6-716b869b5c08", + UpdateDescription: &UpdateDescription{ + UpdatedFields: "aabbccddeeffgg", + RemovedFields: "aabbccddeeffgg", + }, + Namespace: Namespace{ + DB: "database", + Coll: "collection", + }, + To: &Namespace{ + DB: "database", + Coll: "collection", + }, + }, + out: []byte{0x48, 0x33, 0x64, 0x61, 0x64, 0x65, 0x33, 0x66, 0x62, 0x2d, 0x31, 0x38, 0x39, 0x61, 0x2d, 0x34, 0x64, 0x32, 0x32, 0x2d, 0x39, 0x63, 0x36, 0x32, 0x2d, 0x63, 0x37, 0x35, 0x39, 0x30, 0x36, 0x39, 0x64, 0x61, 0x31, 0x63, 0x38, 0xe, 0x75, 0x70, 0x64, 0x61, 0x74, 0x65, 0x64, 0x2, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x48, 0x39, 0x36, 0x63, 0x33, 0x31, 0x36, 0x63, 0x61, 0x2d, 0x33, 0x39, 0x61, 0x34, 0x2d, 0x34, 0x65, 0x35, 0x63, 0x2d, 0x62, 0x37, 0x64, 0x36, 0x2d, 0x37, 0x31, 0x36, 0x62, 0x38, 0x36, 0x39, 0x62, 0x35, 0x63, 0x30, 0x38, 0x2, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x1c, 0x61, 0x61, 0x62, 0x62, 0x63, 0x63, 0x64, 0x64, 0x65, 0x65, 0x66, 0x66, 0x67, 0x67, 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x14, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x2, 0x10, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x14, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e}, + }, + } + + for _, tt := range patterns { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := tt.in.Avro() + assert.NoError(t, err) + assert.Equal(t, tt.out, got) + }) + } +} + +func TestChangeEvent_JSON(t *testing.T) { + t.Parallel() + + patterns := []struct { + name string + in ChangeEvent + out []byte + }{ + { + name: "success", + in: ChangeEvent{ + ID: "3dade3fb-189a-4d22-9c62-c759069da1c8", + OperationType: "updated", + FullDocument: []byte("aabbccddeeffgg"), + DocumentKey: "96c316ca-39a4-4e5c-b7d6-716b869b5c08", + UpdateDescription: &UpdateDescription{ + UpdatedFields: "aabbccddeeffgg", + RemovedFields: "aabbccddeeffgg", + }, + Namespace: Namespace{ + DB: "database", + Coll: "collection", + }, + To: &Namespace{ + DB: "database", + Coll: "collection", + }, + }, + out: []byte(`{"_id":"3dade3fb-189a-4d22-9c62-c759069da1c8","operation_type":"updated","full_document":"YWFiYmNjZGRlZWZmZ2c=","document_key":"96c316ca-39a4-4e5c-b7d6-716b869b5c08","update_description":{"updated_fields":"aabbccddeeffgg","removed_fields":"aabbccddeeffgg"},"namespace":{"db":"database","coll":"collection"},"to":{"db":"database","coll":"collection"}}`), + }, + } + + for _, tt := range patterns { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := tt.in.JSON() + assert.NoError(t, err) + assert.Equal(t, tt.out, got) + }) + } +}