diff --git a/hack/tester/main.go b/hack/tester/main.go new file mode 100644 index 0000000..4df7541 --- /dev/null +++ b/hack/tester/main.go @@ -0,0 +1,128 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "os/signal" + "syscall" + "time" + + "go.mongodb.org/mongo-driver/mongo/options" + + "github.com/ucpr/mongo-streamer/internal/mongo" + "github.com/ucpr/mongo-streamer/pkg/log" +) + +const ( + gracefulShutdownTimeout = 5 * time.Second +) + +type Tweet struct { + ID string `bson:"_id"` + Text string `bson:"text"` + UserID string `bson:"userId"` + CreatedAt int64 `bson:"createdAt"` + UpdateAt int64 `bson:"updateAt"` +} + +func NewRandomTweet() *Tweet { + return &Tweet{ + ID: fmt.Sprintf("tweet-%d", time.Now().UnixNano()), + Text: "Hello, World!", + UserID: fmt.Sprintf("user-%d", time.Now().UnixNano()), + CreatedAt: time.Now().Unix(), + UpdateAt: time.Now().Unix(), + } +} + +type App struct { + mcli *mongo.Client +} + +func NewApp(mcli *mongo.Client) *App { + return &App{ + mcli: mcli, + } +} + +func (a *App) Run(ctx context.Context) error { + col := a.mcli.Collection("tweets") + for { + select { + case <-ctx.Done(): + log.Info("Received signal to stop the application...") + break + default: + operation := rand.Intn(4) // 0: insert, 1: insert & update, 2: upsert, 4: insert and delete + tweet := NewRandomTweet() + switch operation { + case 0: + if _, err := col.InsertOne(ctx, tweet); err != nil { + log.Error("Failed to insert tweet", log.Ferror(err)) + } + case 1: + if _, err := col.InsertOne(ctx, tweet); err != nil { + log.Error("Failed to insert tweet", log.Ferror(err)) + continue + } + tweet.Text = "Hello, World! Updated" + if _, err := col.UpdateOne(ctx, tweet.ID, tweet); err != nil { + log.Error("Failed to update tweet", log.Ferror(err)) + } + case 2: + opts := &options.UpdateOptions{ + Upsert: &[]bool{true}[0], + } + if _, err := col.UpdateOne(ctx, tweet.ID, tweet, opts); err != nil { + log.Error("Failed to upsert tweet", log.Ferror(err)) + } + case 3: + if _, err := col.InsertOne(ctx, tweet); err != nil { + log.Error("Failed to insert tweet", log.Ferror(err)) + continue + } + if _, err := col.DeleteOne(ctx, tweet.ID); err != nil { + log.Error("Failed to delete tweet", log.Ferror(err)) + } + } + } + + select { + case <-ctx.Done(): + log.Info("Received signal to stop the application...") + break + case <-time.After(time.Duration(rand.Intn(5)+1) * time.Second): + } + } +} + +func (a *App) Close(ctx context.Context) error { + return a.mcli.Disconnect(ctx) +} + +func main() { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + defer stop() + + app, err := inject(ctx) + if err != nil { + log.Panic("Failed to inject", log.Ferror(err)) + } + + go func() { + if err := app.Run(ctx); err != nil { + log.Error("Failed to run application", log.Ferror(err)) + } + }() + + <-ctx.Done() + tctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout) + defer cancel() + if err := app.Close(tctx); err != nil { + log.Error("Failed to close application", log.Ferror(err)) + return + } + + log.Info("Successfully graceful shutdown") +} diff --git a/hack/tester/wire.go b/hack/tester/wire.go new file mode 100644 index 0000000..8348d32 --- /dev/null +++ b/hack/tester/wire.go @@ -0,0 +1,21 @@ +//go:build wireinject + +package main + +import ( + "context" + + "github.com/google/wire" + + "github.com/ucpr/mongo-streamer/internal/config" + "github.com/ucpr/mongo-streamer/internal/mongo" +) + +func inject(ctx context.Context) (*App, error) { + wire.Build( + config.Set, + mongo.Set, + NewApp, + ) + return nil, nil +} diff --git a/hack/tester/wire_gen.go b/hack/tester/wire_gen.go new file mode 100644 index 0000000..5a923b3 --- /dev/null +++ b/hack/tester/wire_gen.go @@ -0,0 +1,28 @@ +// Code generated by Wire. DO NOT EDIT. + +//go:generate go run github.com/google/wire/cmd/wire +//go:build !wireinject +// +build !wireinject + +package main + +import ( + "context" + "github.com/ucpr/mongo-streamer/internal/config" + "github.com/ucpr/mongo-streamer/internal/mongo" +) + +// Injectors from wire.go: + +func inject(ctx context.Context) (*App, error) { + mongoDB, err := config.NewMongoDB(ctx) + if err != nil { + return nil, err + } + client, err := mongo.NewClient(ctx, mongoDB) + if err != nil { + return nil, err + } + app := NewApp(client) + return app, nil +} diff --git a/internal/mongo/mongo.go b/internal/mongo/mongo.go index ce93005..999402b 100644 --- a/internal/mongo/mongo.go +++ b/internal/mongo/mongo.go @@ -55,3 +55,8 @@ func NewClient(ctx context.Context, cfg *config.MongoDB) (*Client, error) { func (c *Client) Disconnect(ctx context.Context) error { return c.cli.Disconnect(ctx) } + +// Collection returns a collection from the MongoDB client. +func (c *Client) Collection(name string) *mongo.Collection { + return c.cli.Database(c.db).Collection(name) +}