diff --git a/compose.yaml b/compose.yaml index 3144acc..70261d8 100644 --- a/compose.yaml +++ b/compose.yaml @@ -56,9 +56,14 @@ services: - setup-pubsub-emulator ports: - "8080:8080" + develop: + watch: + - action: rebuild + files: + - ./ environment: MONGO_DB_URI: "mongodb://mongo:27017/test?tls=false&directConnection=true&retryWrites=false&replicaSet=rs0" - MONGO_DB_COLLECTION: test + MONGO_DB_COLLECTION: tweets MONGO_DB_DATABASE: test METRICS_ADDR: :8080 PUBSUB_PROJECT_ID: "dummy-project" diff --git a/hack/tester/main.go b/hack/tester/main.go index 4df7541..8e266c7 100644 --- a/hack/tester/main.go +++ b/hack/tester/main.go @@ -8,6 +8,7 @@ import ( "syscall" "time" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo/options" "github.com/ucpr/mongo-streamer/internal/mongo" @@ -52,7 +53,7 @@ func (a *App) Run(ctx context.Context) error { select { case <-ctx.Done(): log.Info("Received signal to stop the application...") - break + return nil default: operation := rand.Intn(4) // 0: insert, 1: insert & update, 2: upsert, 4: insert and delete tweet := NewRandomTweet() @@ -66,15 +67,22 @@ func (a *App) Run(ctx context.Context) error { log.Error("Failed to insert tweet", log.Ferror(err)) continue } - tweet.Text = "Hello, World! Updated" - if _, err := col.UpdateOne(ctx, tweet.ID, tweet); err != nil { + updated := "Hello, World! Updated" + if _, err := col.UpdateOne(ctx, bson.M{"_id": tweet.ID}, bson.M{ + "$set": bson.M{ + "text": updated, + }, + }); err != nil { log.Error("Failed to update tweet", log.Ferror(err)) } case 2: opts := &options.UpdateOptions{ Upsert: &[]bool{true}[0], } - if _, err := col.UpdateOne(ctx, tweet.ID, tweet, opts); err != nil { + if _, err := col.UpdateOne(ctx, bson.M{"_id": tweet.ID}, + bson.M{ + "$set": tweet, + }, opts); err != nil { log.Error("Failed to upsert tweet", log.Ferror(err)) } case 3: @@ -82,7 +90,9 @@ func (a *App) Run(ctx context.Context) error { log.Error("Failed to insert tweet", log.Ferror(err)) continue } - if _, err := col.DeleteOne(ctx, tweet.ID); err != nil { + if _, err := col.DeleteOne(ctx, bson.M{ + "_id": tweet.ID, + }); err != nil { log.Error("Failed to delete tweet", log.Ferror(err)) } } @@ -91,7 +101,7 @@ func (a *App) Run(ctx context.Context) error { select { case <-ctx.Done(): log.Info("Received signal to stop the application...") - break + return nil case <-time.After(time.Duration(rand.Intn(5)+1) * time.Second): } } @@ -102,6 +112,7 @@ func (a *App) Close(ctx context.Context) error { } func main() { + log.Info("Starting hack/tester...") ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) defer stop()