Skip to content

Commit

Permalink
feat: support avro
Browse files Browse the repository at this point in the history
  • Loading branch information
ucpr committed Jan 3, 2024
1 parent 6e1069f commit 54a49b6
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 10 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ require (
github.com/google/subcommands v1.2.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hamba/avro/v2 v2.18.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
Expand All @@ -89,6 +90,10 @@ github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hamba/avro/v2 v2.18.0 h1:U7T0xI8MGw9+m3SS48E2KHUxas/Hb0EvS0CpkmVcLoI=
github.com/hamba/avro/v2 v2.18.0/go.mod h1:dEG+AHrykTpkXvBYsc+XXTuRlvGC645Ix5d2qR8EdEs=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand All @@ -97,6 +102,13 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand All @@ -118,6 +130,7 @@ github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YI
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
11 changes: 9 additions & 2 deletions internal/app/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package app

import (
"context"
"encoding/json"

"github.com/ucpr/mongo-streamer/internal/mongo"
"github.com/ucpr/mongo-streamer/internal/pubsub"
"github.com/ucpr/mongo-streamer/pkg/log"
)
Expand All @@ -17,9 +19,14 @@ func NewHandler(ps pubsub.Publisher) *Handler {
}
}

func (e *Handler) EventHandler(ctx context.Context, event []byte) error {
func (e *Handler) EventHandler(ctx context.Context, event mongo.ChangeEvent) error {
data, err := json.Marshal(event)
if err != nil {
return err
}

res := e.pubsub.AsyncPublish(ctx, pubsub.Message{
Data: event,
Data: data,
})
id, err := res.Get(ctx)
if err != nil {
Expand Down
38 changes: 30 additions & 8 deletions internal/mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package mongo

import (
"context"
"encoding/json"
"errors"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

Expand All @@ -14,6 +14,29 @@ import (
)

type (
// ChangeEvent is a struct that represents a change stream event.
ChangeEvent struct {
ID string `avro:"_id" bson:"_id" json:"_id"`
OperationType string `avro:"operationType" bson:"operation_type" json:"operation_type"`
FullDocument []byte `avro:"fullDocument" bson:"full_document" json:"full_document"`
DocumentKey string `avro:"documentKey" bson:"document_key" json:"document_key"`
UpdateDescription *UpdateDescription `avro:"updateDescription" bson:"update_description" json:"update_description"`
Namespace Namespace `avro:"ns" bson:"namespace" json:"namespace"`
To *Namespace `avro:"to" bson:"to" json:"to"`
}

// UpdateDescription is a struct that represents an update description of change stream event.
UpdateDescription struct {
UpdatedFields string `avro:"updatedFields" bson:"updated_fields" json:"updated_fields"`
RemovedFields string `avro:"removedFields" bson:"removed_fields" json:"removed_fields"`
}

// Namespace is a struct that represents a namespace of change stream event.
Namespace struct {
DB string `avro:"db" bson:"db" json:"db"`
Coll string `avro:"coll" bson:"coll" json:"coll"`
}

// ChangeStream is a struct that represents a change stream.
ChangeStream struct {
cs *mongo.ChangeStream
Expand All @@ -32,7 +55,7 @@ type (
ChangeStreamOption func(opts *ChangeStreamOptions)

// ChangeStreamHandler is a type of handler function that handles ChangeStream.
ChangeStreamHandler func(ctx context.Context, event []byte) error
ChangeStreamHandler func(ctx context.Context, event ChangeEvent) error
)

// WithBatchSize sets the batch size for ChangeStream.
Expand Down Expand Up @@ -104,23 +127,22 @@ func (c *ChangeStream) Run(ctx context.Context) {
for c.cs.Next(ctx) {
mmetric.ReceiveChangeStream(c.db, c.col)

var streamObject bson.M
var streamObject ChangeEvent
if err := c.cs.Decode(&streamObject); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to decode steream object", log.Ferror(err))
continue
}

// marshal stream object to json
jb, err := bson.MarshalExtJSON(streamObject, false, false)
jb, err := json.Marshal(streamObject)
if err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to marshal stream object", log.Ferror(err))
continue
// skip for metrics retention use
log.Error("failed to marshal stream object to json", log.Ferror(err))
}
mmetric.ReceiveBytes(c.db, c.col, len(jb))

if err := c.handler(context.Background(), jb); err != nil {
if err := c.handler(context.Background(), streamObject); err != nil {
mmetric.HandleChangeEventFailed(c.db, c.col)
log.Error("failed to handle change stream", log.Ferror(err))
// TODO: If handle fails, the process is repeated again
Expand Down

0 comments on commit 54a49b6

Please sign in to comment.