Skip to content

Commit

Permalink
refactor: add ChangeStreamParams
Browse files Browse the repository at this point in the history
  • Loading branch information
ucpr committed Dec 28, 2023
1 parent b1b920c commit bd39c56
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
8 changes: 7 additions & 1 deletion cmd/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ func NewStreamer(ctx context.Context, cli *mongo.Client, mcfg *config.MongoDB, e
if err != nil {
return nil, err
}
cs, err := mongo.NewChangeStream(ctx, cli, mcfg.Database, mcfg.Collection, eh.EventHandler, st)
cs, err := mongo.NewChangeStream(ctx, mongo.ChangeStreamParams{
Client: cli,
Handler: eh.EventHandler,
Storage: st,
Database: mcfg.Database,
Collection: mcfg.Collection,
})
if err != nil {
return nil, err
}
Expand Down
22 changes: 16 additions & 6 deletions internal/mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,23 @@ func WithBatchSize(batchSize int32) ChangeStreamOption {
}
}

// ChangeStreamParams is a struct that represents parameters for creating a ChangeStream.
type ChangeStreamParams struct {
Client *Client
Handler ChangeStreamHandler
Storage persistent.StorageBuffer
Database string
Collection string
}

// NewChangeStream creates a new change stream instance.
func NewChangeStream(ctx context.Context, cli *Client, db, col string, handler ChangeStreamHandler, st persistent.StorageBuffer, opts ...ChangeStreamOption) (*ChangeStream, error) {
func NewChangeStream(ctx context.Context, params ChangeStreamParams, opts ...ChangeStreamOption) (*ChangeStream, error) {
chopts := &options.ChangeStreamOptions{}
for _, opt := range opts {
opt(&ChangeStreamOptions{chopts})
}

rt, err := st.Get()
rt, err := params.Storage.Get()
if err != nil {
return nil, err
}
Expand All @@ -60,14 +69,15 @@ func NewChangeStream(ctx context.Context, cli *Client, db, col string, handler C

// TODO: refactor
pipeline := mongo.Pipeline{}
collection := cli.cli.Database(db).Collection(col)
db, col := params.Database, params.Collection
collection := params.Client.cli.Database(db).Collection(col)
changeStream, err := collection.Watch(ctx, pipeline, chopts)
if err != nil {
// if resume token is not found, reset resume token and retry
if errors.Is(err, mongo.ErrMissingResumeToken) {
log.Warn("resume token is not found, reset resume token and retry", slog.String("db", db), slog.String("col", col))
chopts.SetResumeAfter(nil)
if err := st.Clear(); err != nil {
if err := params.Storage.Clear(); err != nil {
return nil, err
}

Expand All @@ -82,8 +92,8 @@ func NewChangeStream(ctx context.Context, cli *Client, db, col string, handler C

cs := &ChangeStream{
cs: changeStream,
handler: handler,
tokenManager: st,
handler: params.Handler,
tokenManager: params.Storage,
db: db,
col: col,
}
Expand Down

0 comments on commit bd39c56

Please sign in to comment.