From bd6e8570094fd1971cdfe4eaa1a049a6418a68d4 Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Wed, 19 Oct 2022 14:29:01 -1000 Subject: [PATCH 1/4] Refactored tests to make them more readable (#57) Also removed all the annoying logs --- go.mod | 2 +- go.sum | 4 +- pubsub/pubsub_suite_test.go | 7 --- pubsub/sqs_pub.go | 10 ++-- pubsub/sqs_pub_test.go | 97 +++++++++++++------------------------ pubsub/sqs_sub.go | 2 +- pubsub/sqs_sub_test.go | 1 - 7 files changed, 43 insertions(+), 80 deletions(-) diff --git a/go.mod b/go.mod index ff0ef27..9ed3fed 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 - github.com/massenz/slf4go v0.3.1-gb35df61 + github.com/massenz/slf4go v0.3.2-g4eb5504 github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.18.1 diff --git a/go.sum b/go.sum index da0e7b2..97b5c66 100644 --- a/go.sum +++ b/go.sum @@ -55,8 +55,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= -github.com/massenz/slf4go v0.3.1-gb35df61 h1:X/rcmd918F2nkkPbahMcQE0Qb8wc6xg37rpsfunjGMM= -github.com/massenz/slf4go v0.3.1-gb35df61/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw= +github.com/massenz/slf4go v0.3.2-g4eb5504 h1:tRrxPOKcqNKQn25eS8Dy9bW3NMNPpuK4Sla9jKfWmSs= +github.com/massenz/slf4go v0.3.2-g4eb5504/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw= github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76 h1:tik7Xn5GL+w9U5RTJZ3mieoP2sun6RDM+cUBi7WGrUU= github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76/go.mod h1:EkwQg7wD6c/cmXVxfqNaUOVSrBLlti+xYljIxaQNJqA= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= diff --git a/pubsub/pubsub_suite_test.go b/pubsub/pubsub_suite_test.go index d000eb8..36304cf 100644 --- a/pubsub/pubsub_suite_test.go +++ b/pubsub/pubsub_suite_test.go @@ -24,8 +24,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" - - log "github.com/massenz/slf4go/logging" ) const ( @@ -53,11 +51,9 @@ var ( Region: ®ion, }, }))) - testLog = log.NewLog("PUBSUB") ) var _ = BeforeSuite(func() { - testLog.Level = log.NONE Expect(os.Setenv("AWS_REGION", region)).ToNot(HaveOccurred()) for _, topic := range []string{eventsQueue, notificationsQueue, acksQueue} { topic = fmt.Sprintf("%s-%d", topic, GinkgoParallelProcess()) @@ -67,7 +63,6 @@ var _ = BeforeSuite(func() { }) if err != nil { // the queue does not exist and ought to be created - testLog.Info("Creating SQS Queue %s", topic) _, err = testSqsClient.CreateQueue(&sqs.CreateQueueInput{ QueueName: &topic, }) @@ -85,7 +80,6 @@ var _ = AfterSuite(func() { }) Expect(err).NotTo(HaveOccurred()) if out != nil { - testLog.Info("Deleting SQS Queue %s", topic) _, err = testSqsClient.DeleteQueue(&sqs.DeleteQueueInput{QueueUrl: out.QueueUrl}) Expect(err).NotTo(HaveOccurred()) } @@ -121,7 +115,6 @@ func getSqsMessage(queue string) *sqs.Message { // send it over the `queue`, so that we can test the Publisher can correctly receive it. func postSqsMessage(queue string, msg *api.EventRequest) error { q := pubsub.GetQueueUrl(testSqsClient, queue) - testLog.Debug("Post Message -- Timestamp: %v", msg.Event.Timestamp) _, err := testSqsClient.SendMessage(&sqs.SendMessageInput{ MessageBody: aws.String(proto.MarshalTextString(msg)), QueueUrl: &q, diff --git a/pubsub/sqs_pub.go b/pubsub/sqs_pub.go index 671313e..7fb80a1 100644 --- a/pubsub/sqs_pub.go +++ b/pubsub/sqs_pub.go @@ -14,7 +14,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" "github.com/golang/protobuf/proto" - log "github.com/massenz/slf4go/logging" + slf4go "github.com/massenz/slf4go/logging" protos "github.com/massenz/statemachine-proto/golang/api" "strconv" ) @@ -30,16 +30,16 @@ func NewSqsPublisher(channel <-chan protos.EventResponse, awsUrl *string) *SqsPu return nil } return &SqsPublisher{ - logger: log.NewLog("SQS-Pub"), + logger: slf4go.NewLog("SQS-Pub"), client: client, notifications: channel, } } // SetLogLevel allows the SqsSubscriber to implement the log.Loggable interface -func (s *SqsPublisher) SetLogLevel(level log.LogLevel) { +func (s *SqsPublisher) SetLogLevel(level slf4go.LogLevel) { if s == nil { - fmt.Println("WARN: attempting to set log level on nil Publisher") + slf4go.RootLog.Warn("attempting to set log level on nil Publisher, ignoring") return } s.logger.Level = level @@ -52,7 +52,7 @@ func GetQueueUrl(client *sqs.SQS, topic string) string { }) if err != nil || out.QueueUrl == nil { // From the Google School: fail fast and noisily from an unrecoverable error - log.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err)) + slf4go.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err)) } return *out.QueueUrl } diff --git a/pubsub/sqs_pub_test.go b/pubsub/sqs_pub_test.go index 24493d3..a30b9fa 100644 --- a/pubsub/sqs_pub_test.go +++ b/pubsub/sqs_pub_test.go @@ -11,6 +11,7 @@ package pubsub_test import ( . "github.com/JiaYongfei/respect/gomega" + "github.com/google/uuid" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -66,12 +67,7 @@ var _ = Describe("SQS Publisher", func() { Expect(receivedEvt).To(Respect(notification)) close(notificationsCh) - select { - case <-done: - Succeed() - case <-time.After(timeout): - Fail("timed out waiting for Publisher to exit") - } + Eventually(done).Should(BeClosed()) }) It("will publish successful outcomes", func() { notification := protos.EventResponse{ @@ -93,23 +89,17 @@ var _ = Describe("SQS Publisher", func() { g.Expect(&response).To(Respect(¬ification)) }).Should(Succeed()) close(notificationsCh) - - select { - case <-done: - Succeed() - case <-time.After(timeout): - Fail("timed out waiting for Publisher to exit") - } + Eventually(done).Should(BeClosed()) }) It("will publish OK outcomes to acks queue if configured", func() { - notification := protos.EventResponse{ - EventId: "dead-pork", + errorResponse := protos.EventResponse{ + EventId: uuid.NewString(), Outcome: &protos.EventOutcome{ Code: protos.EventOutcome_InternalError, }, } - ack := protos.EventResponse{ - EventId: "dead-beef", + okResponse := protos.EventResponse{ + EventId: uuid.NewString(), Outcome: &protos.EventOutcome{ Code: protos.EventOutcome_Ok, }, @@ -119,29 +109,23 @@ var _ = Describe("SQS Publisher", func() { defer close(done) go testPublisher.Publish(getQueueName(notificationsQueue), getQueueName(acksQueue), false) }() - var response protos.EventResponse - - // Confirm notificationsQueue received the error - notificationsCh <- notification - res := getSqsMessage(getQueueName(notificationsQueue)) - Eventually(func(g Gomega) { - g.Expect(proto.UnmarshalText(*res.Body, &response)).ShouldNot(HaveOccurred()) - g.Expect(&response).To(Respect(¬ification)) - }).Should(Succeed()) - // Confirm acksQueue received the Ok - notificationsCh <- ack - res = getSqsMessage(getQueueName(acksQueue)) + notificationsCh <- errorResponse + notificationsCh <- okResponse Eventually(func(g Gomega) { - g.Expect(proto.UnmarshalText(*res.Body, &response)).ShouldNot(HaveOccurred()) - g.Expect(&response).To(Respect(&ack)) + var response protos.EventResponse + errMsg := getSqsMessage(getQueueName(notificationsQueue)) + g.Expect(proto.UnmarshalText(*errMsg.Body, &response)).ShouldNot(HaveOccurred()) + g.Expect(&response).To(Respect(&errorResponse)) + + okMsg := getSqsMessage(getQueueName(acksQueue)) + g.Expect(proto.UnmarshalText(*okMsg.Body, &response)).ShouldNot(HaveOccurred()) + g.Expect(&response).To(Respect(&okResponse)) + // There are no more messages in the notifications queue + g.Expect(getSqsMessage(getQueueName(notificationsQueue))).Should(BeNil()) }).Should(Succeed()) - // Confirm notificationsQueue did not receive the Ok - res = getSqsMessage(getQueueName(notificationsQueue)) - Eventually(res).Should(BeNil()) close(notificationsCh) - select { case <-done: Succeed() @@ -156,12 +140,7 @@ var _ = Describe("SQS Publisher", func() { go testPublisher.Publish(getQueueName(notificationsQueue), "", false) }() close(notificationsCh) - select { - case <-done: - Succeed() - case <-time.After(timeout): - Fail("Publisher did not exit within timeout") - } + Eventually(done).Should(BeClosed()) }) It("will survive an empty Message", func() { go testPublisher.Publish(getQueueName(notificationsQueue), "", false) @@ -204,42 +183,34 @@ var _ = Describe("SQS Publisher", func() { } }() close(notificationsCh) - select { - case <-done: - Succeed() - case <-time.After(timeout): - Fail("timed out waiting for Publisher to exit") - } + Eventually(done).Should(BeClosed()) }) It("will only notify error outcomes if configured to", func() { - ack := protos.EventResponse{ - EventId: "dead-beef", + responseOk := protos.EventResponse{ + EventId: uuid.NewString(), Outcome: &protos.EventOutcome{ Code: protos.EventOutcome_Ok, }, } - testPublisher = pubsub.NewSqsPublisher(notificationsCh, &sqsUrl) done := make(chan interface{}) go func() { defer close(done) go testPublisher.Publish(getQueueName(notificationsQueue), getQueueName(acksQueue), true) }() - notificationsCh <- ack - // Confirm both acksQueue and notificationsQueue do not get the Ok message - res := getSqsMessage(getQueueName(notificationsQueue)) - Expect(res).To(BeNil()) - res = getSqsMessage(getQueueName(acksQueue)) - Expect(res).To(BeNil()) + notificationsCh <- responseOk + // Confirm neither queues got the Ok message + // Note we need to "consistently" check, or we may get a false positive if + // we check only once and the message is not yet in the queue. + Consistently(func(g Gomega) { + res := getSqsMessage(getQueueName(notificationsQueue)) + Expect(res).To(BeNil()) + res = getSqsMessage(getQueueName(acksQueue)) + Expect(res).To(BeNil()) + }, "200ms").Should(Succeed()) close(notificationsCh) - - select { - case <-done: - Succeed() - case <-time.After(timeout): - Fail("timed out waiting for Publisher to exit") - } + Eventually(done).Should(BeClosed()) }) }) }) diff --git a/pubsub/sqs_sub.go b/pubsub/sqs_sub.go index d0d29a6..aa3f6df 100644 --- a/pubsub/sqs_sub.go +++ b/pubsub/sqs_sub.go @@ -77,7 +77,7 @@ func (s *SqsSubscriber) SetLogLevel(level log.LogLevel) { // Subscribe runs until signaled on the Done channel and listens for incoming Events func (s *SqsSubscriber) Subscribe(topic string, done <-chan interface{}) { queueUrl := GetQueueUrl(s.client, topic) - s.logger = log.NewLog(fmt.Sprintf("SQS-Sub{%s}", topic)) + s.logger.Name = fmt.Sprintf("%s{%s}", s.logger.Name, topic) s.logger.Info("SQS Subscriber started for queue: %s", queueUrl) timeout := int64(s.Timeout.Seconds()) diff --git a/pubsub/sqs_sub_test.go b/pubsub/sqs_sub_test.go index 9b766cf..399661a 100644 --- a/pubsub/sqs_sub_test.go +++ b/pubsub/sqs_sub_test.go @@ -56,7 +56,6 @@ var _ = Describe("SQS Subscriber", func() { select { case req := <-eventsCh: - testLog.Debug("Received Event -- Timestamp: %v", req.Event.Timestamp) // We null the timestamp as we don't want to compare that with Respect msg.Event.Timestamp = nil req.Event.Timestamp = nil From 927f71bef58e6662ebe9b11cf753b7ed9780f1e4 Mon Sep 17 00:00:00 2001 From: Marco Massenzio Date: Wed, 26 Oct 2022 01:46:14 -0700 Subject: [PATCH 2/4] Updated localstack to 1.2 image --- docker/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 2b34876..73625f1 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -18,7 +18,7 @@ services: localstack: container_name: "awslocal" - image: "localstack/localstack:latest" + image: "localstack/localstack:1.2" hostname: awslocal environment: - AWS_REGION=us-west-2 From 28211d476b34effba3379893675cced58d3dc43b Mon Sep 17 00:00:00 2001 From: Marco Massenzio <1153951+massenz@users.noreply.github.com> Date: Tue, 1 Nov 2022 19:27:01 -0700 Subject: [PATCH 3/4] Fixed issue w retries when creating FSMs (#59) --- storage/redis_store.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/storage/redis_store.go b/storage/redis_store.go index 1f2a094..392b5a6 100644 --- a/storage/redis_store.go +++ b/storage/redis_store.go @@ -192,6 +192,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error { if err == redis.Nil { // The key isn't there, no point in retrying csm.logger.Debug("Key `%s` not found", key) + cancel() return err } else if err != nil { if ctx.Err() == context.DeadlineExceeded { @@ -199,6 +200,7 @@ func (csm *RedisStore) get(key string, value proto.Message) error { csm.logger.Error(err.Error()) if attemptsLeft == 0 { csm.logger.Error("max retries reached, giving up") + cancel() return err } csm.logger.Trace("retrying after timeout, attempts left: %d", attemptsLeft) @@ -206,9 +208,11 @@ func (csm *RedisStore) get(key string, value proto.Message) error { } else { // This is a different error, we'll just return it csm.logger.Error(err.Error()) + cancel() return err } } else { + cancel() return proto.Unmarshal(data, value) } } @@ -226,10 +230,12 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e for { var ctx context.Context ctx, cancel = context.WithTimeout(context.Background(), csm.Timeout) + attemptsLeft-- data, err := proto.Marshal(value) if err != nil { csm.logger.Error("cannot convert proto to bytes: %q", err) + cancel() return err } cmd := csm.client.Set(ctx, key, data, ttl) @@ -240,15 +246,20 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e csm.logger.Error(err.Error()) if attemptsLeft == 0 { csm.logger.Error("max retries reached, giving up") + cancel() return err } - csm.logger.Trace("retrying after timeout, attempts left: %d", attemptsLeft) + csm.logger.Debug("retrying after timeout, attempts left: %d", attemptsLeft) csm.wait() } else { + cancel() return err } + } else { + csm.logger.Debug("Stored key `%s`", key) + cancel() + return nil } - return nil } } @@ -259,7 +270,7 @@ func (csm *RedisStore) Health() error { _, err := csm.client.Ping(ctx).Result() if err != nil { csm.logger.Error("Error pinging redis: %s", err.Error()) - return fmt.Errorf("Redis health check failed: %w", err) + return fmt.Errorf("redis health check failed: %w", err) } return nil } @@ -272,5 +283,4 @@ func (csm *RedisStore) Health() error { func (csm *RedisStore) wait() { waitForMsec := rand.Intn(500) time.Sleep(time.Duration(waitForMsec) * time.Millisecond) - } From 6da39141592ffaa6c92fa58c893d077062947c36 Mon Sep 17 00:00:00 2001 From: Marco Massenzio Date: Tue, 1 Nov 2022 19:31:31 -0700 Subject: [PATCH 4/4] Rel. 0.6.3 --- build.settings | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.settings b/build.settings index 22c9436..3b8581b 100644 --- a/build.settings +++ b/build.settings @@ -1,4 +1,4 @@ # Build configuration -version = 0.6.2 +version = 0.6.3