Skip to content

Commit

Permalink
Merge branch 'main' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
massenz committed Nov 2, 2022
2 parents d71d6c0 + 6da3914 commit bedf63c
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 86 deletions.
2 changes: 1 addition & 1 deletion build.settings
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Build configuration

version = 0.6.2
version = 0.6.3

2 changes: 1 addition & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:

localstack:
container_name: "awslocal"
image: "localstack/localstack:latest"
image: "localstack/localstack:1.2"
hostname: awslocal
environment:
- AWS_REGION=us-west-2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/massenz/slf4go v0.3.1-gb35df61
github.com/massenz/slf4go v0.3.2-g4eb5504
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.18.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/massenz/slf4go v0.3.1-gb35df61 h1:X/rcmd918F2nkkPbahMcQE0Qb8wc6xg37rpsfunjGMM=
github.com/massenz/slf4go v0.3.1-gb35df61/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw=
github.com/massenz/slf4go v0.3.2-g4eb5504 h1:tRrxPOKcqNKQn25eS8Dy9bW3NMNPpuK4Sla9jKfWmSs=
github.com/massenz/slf4go v0.3.2-g4eb5504/go.mod h1:ZJjthXAnZMJGwXUz3Z3v5uyban00uAFFoDYODOoLFpw=
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76 h1:tik7Xn5GL+w9U5RTJZ3mieoP2sun6RDM+cUBi7WGrUU=
github.com/massenz/statemachine-proto/golang v0.6.0-ga901a76/go.mod h1:EkwQg7wD6c/cmXVxfqNaUOVSrBLlti+xYljIxaQNJqA=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down
7 changes: 0 additions & 7 deletions pubsub/pubsub_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"

log "github.com/massenz/slf4go/logging"
)

const (
Expand Down Expand Up @@ -53,11 +51,9 @@ var (
Region: &region,
},
})))
testLog = log.NewLog("PUBSUB")
)

var _ = BeforeSuite(func() {
testLog.Level = log.NONE
Expect(os.Setenv("AWS_REGION", region)).ToNot(HaveOccurred())
for _, topic := range []string{eventsQueue, notificationsQueue, acksQueue} {
topic = fmt.Sprintf("%s-%d", topic, GinkgoParallelProcess())
Expand All @@ -67,7 +63,6 @@ var _ = BeforeSuite(func() {
})
if err != nil {
// the queue does not exist and ought to be created
testLog.Info("Creating SQS Queue %s", topic)
_, err = testSqsClient.CreateQueue(&sqs.CreateQueueInput{
QueueName: &topic,
})
Expand All @@ -85,7 +80,6 @@ var _ = AfterSuite(func() {
})
Expect(err).NotTo(HaveOccurred())
if out != nil {
testLog.Info("Deleting SQS Queue %s", topic)
_, err = testSqsClient.DeleteQueue(&sqs.DeleteQueueInput{QueueUrl: out.QueueUrl})
Expect(err).NotTo(HaveOccurred())
}
Expand Down Expand Up @@ -121,7 +115,6 @@ func getSqsMessage(queue string) *sqs.Message {
// send it over the `queue`, so that we can test the Publisher can correctly receive it.
func postSqsMessage(queue string, msg *api.EventRequest) error {
q := pubsub.GetQueueUrl(testSqsClient, queue)
testLog.Debug("Post Message -- Timestamp: %v", msg.Event.Timestamp)
_, err := testSqsClient.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String(proto.MarshalTextString(msg)),
QueueUrl: &q,
Expand Down
10 changes: 5 additions & 5 deletions pubsub/sqs_pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/golang/protobuf/proto"
log "github.com/massenz/slf4go/logging"
slf4go "github.com/massenz/slf4go/logging"
protos "github.com/massenz/statemachine-proto/golang/api"
"strconv"
)
Expand All @@ -30,16 +30,16 @@ func NewSqsPublisher(channel <-chan protos.EventResponse, awsUrl *string) *SqsPu
return nil
}
return &SqsPublisher{
logger: log.NewLog("SQS-Pub"),
logger: slf4go.NewLog("SQS-Pub"),
client: client,
notifications: channel,
}
}

// SetLogLevel allows the SqsSubscriber to implement the log.Loggable interface
func (s *SqsPublisher) SetLogLevel(level log.LogLevel) {
func (s *SqsPublisher) SetLogLevel(level slf4go.LogLevel) {
if s == nil {
fmt.Println("WARN: attempting to set log level on nil Publisher")
slf4go.RootLog.Warn("attempting to set log level on nil Publisher, ignoring")
return
}
s.logger.Level = level
Expand All @@ -52,7 +52,7 @@ func GetQueueUrl(client *sqs.SQS, topic string) string {
})
if err != nil || out.QueueUrl == nil {
// From the Google School: fail fast and noisily from an unrecoverable error
log.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err))
slf4go.RootLog.Fatal(fmt.Errorf("cannot get SQS Queue URL for topic %s: %v", topic, err))
}
return *out.QueueUrl
}
Expand Down
97 changes: 34 additions & 63 deletions pubsub/sqs_pub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package pubsub_test

import (
. "github.com/JiaYongfei/respect/gomega"
"github.com/google/uuid"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand Down Expand Up @@ -66,12 +67,7 @@ var _ = Describe("SQS Publisher", func() {
Expect(receivedEvt).To(Respect(notification))

close(notificationsCh)
select {
case <-done:
Succeed()
case <-time.After(timeout):
Fail("timed out waiting for Publisher to exit")
}
Eventually(done).Should(BeClosed())
})
It("will publish successful outcomes", func() {
notification := protos.EventResponse{
Expand All @@ -93,23 +89,17 @@ var _ = Describe("SQS Publisher", func() {
g.Expect(&response).To(Respect(&notification))
}).Should(Succeed())
close(notificationsCh)

select {
case <-done:
Succeed()
case <-time.After(timeout):
Fail("timed out waiting for Publisher to exit")
}
Eventually(done).Should(BeClosed())
})
It("will publish OK outcomes to acks queue if configured", func() {
notification := protos.EventResponse{
EventId: "dead-pork",
errorResponse := protos.EventResponse{
EventId: uuid.NewString(),
Outcome: &protos.EventOutcome{
Code: protos.EventOutcome_InternalError,
},
}
ack := protos.EventResponse{
EventId: "dead-beef",
okResponse := protos.EventResponse{
EventId: uuid.NewString(),
Outcome: &protos.EventOutcome{
Code: protos.EventOutcome_Ok,
},
Expand All @@ -119,29 +109,23 @@ var _ = Describe("SQS Publisher", func() {
defer close(done)
go testPublisher.Publish(getQueueName(notificationsQueue), getQueueName(acksQueue), false)
}()
var response protos.EventResponse

// Confirm notificationsQueue received the error
notificationsCh <- notification
res := getSqsMessage(getQueueName(notificationsQueue))
Eventually(func(g Gomega) {
g.Expect(proto.UnmarshalText(*res.Body, &response)).ShouldNot(HaveOccurred())
g.Expect(&response).To(Respect(&notification))
}).Should(Succeed())

// Confirm acksQueue received the Ok
notificationsCh <- ack
res = getSqsMessage(getQueueName(acksQueue))
notificationsCh <- errorResponse
notificationsCh <- okResponse
Eventually(func(g Gomega) {
g.Expect(proto.UnmarshalText(*res.Body, &response)).ShouldNot(HaveOccurred())
g.Expect(&response).To(Respect(&ack))
var response protos.EventResponse
errMsg := getSqsMessage(getQueueName(notificationsQueue))
g.Expect(proto.UnmarshalText(*errMsg.Body, &response)).ShouldNot(HaveOccurred())
g.Expect(&response).To(Respect(&errorResponse))

okMsg := getSqsMessage(getQueueName(acksQueue))
g.Expect(proto.UnmarshalText(*okMsg.Body, &response)).ShouldNot(HaveOccurred())
g.Expect(&response).To(Respect(&okResponse))
// There are no more messages in the notifications queue
g.Expect(getSqsMessage(getQueueName(notificationsQueue))).Should(BeNil())
}).Should(Succeed())
// Confirm notificationsQueue did not receive the Ok
res = getSqsMessage(getQueueName(notificationsQueue))
Eventually(res).Should(BeNil())

close(notificationsCh)

select {
case <-done:
Succeed()
Expand All @@ -156,12 +140,7 @@ var _ = Describe("SQS Publisher", func() {
go testPublisher.Publish(getQueueName(notificationsQueue), "", false)
}()
close(notificationsCh)
select {
case <-done:
Succeed()
case <-time.After(timeout):
Fail("Publisher did not exit within timeout")
}
Eventually(done).Should(BeClosed())
})
It("will survive an empty Message", func() {
go testPublisher.Publish(getQueueName(notificationsQueue), "", false)
Expand Down Expand Up @@ -204,42 +183,34 @@ var _ = Describe("SQS Publisher", func() {
}
}()
close(notificationsCh)
select {
case <-done:
Succeed()
case <-time.After(timeout):
Fail("timed out waiting for Publisher to exit")
}
Eventually(done).Should(BeClosed())
})
It("will only notify error outcomes if configured to", func() {
ack := protos.EventResponse{
EventId: "dead-beef",
responseOk := protos.EventResponse{
EventId: uuid.NewString(),
Outcome: &protos.EventOutcome{
Code: protos.EventOutcome_Ok,
},
}
testPublisher = pubsub.NewSqsPublisher(notificationsCh, &sqsUrl)
done := make(chan interface{})
go func() {
defer close(done)
go testPublisher.Publish(getQueueName(notificationsQueue), getQueueName(acksQueue), true)
}()

notificationsCh <- ack
// Confirm both acksQueue and notificationsQueue do not get the Ok message
res := getSqsMessage(getQueueName(notificationsQueue))
Expect(res).To(BeNil())
res = getSqsMessage(getQueueName(acksQueue))
Expect(res).To(BeNil())
notificationsCh <- responseOk
// Confirm neither queues got the Ok message
// Note we need to "consistently" check, or we may get a false positive if
// we check only once and the message is not yet in the queue.
Consistently(func(g Gomega) {
res := getSqsMessage(getQueueName(notificationsQueue))
Expect(res).To(BeNil())
res = getSqsMessage(getQueueName(acksQueue))
Expect(res).To(BeNil())
}, "200ms").Should(Succeed())

close(notificationsCh)

select {
case <-done:
Succeed()
case <-time.After(timeout):
Fail("timed out waiting for Publisher to exit")
}
Eventually(done).Should(BeClosed())
})
})
})
2 changes: 1 addition & 1 deletion pubsub/sqs_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (s *SqsSubscriber) SetLogLevel(level log.LogLevel) {
// Subscribe runs until signaled on the Done channel and listens for incoming Events
func (s *SqsSubscriber) Subscribe(topic string, done <-chan interface{}) {
queueUrl := GetQueueUrl(s.client, topic)
s.logger = log.NewLog(fmt.Sprintf("SQS-Sub{%s}", topic))
s.logger.Name = fmt.Sprintf("%s{%s}", s.logger.Name, topic)
s.logger.Info("SQS Subscriber started for queue: %s", queueUrl)

timeout := int64(s.Timeout.Seconds())
Expand Down
1 change: 0 additions & 1 deletion pubsub/sqs_sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ var _ = Describe("SQS Subscriber", func() {

select {
case req := <-eventsCh:
testLog.Debug("Received Event -- Timestamp: %v", req.Event.Timestamp)
// We null the timestamp as we don't want to compare that with Respect
msg.Event.Timestamp = nil
req.Event.Timestamp = nil
Expand Down
18 changes: 14 additions & 4 deletions storage/redis_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,23 +192,27 @@ func (csm *RedisStore) get(key string, value proto.Message) error {
if err == redis.Nil {
// The key isn't there, no point in retrying
csm.logger.Debug("Key `%s` not found", key)
cancel()
return err
} else if err != nil {
if ctx.Err() == context.DeadlineExceeded {
// The error here may be recoverable, so we'll keep trying until we run out of attempts
csm.logger.Error(err.Error())
if attemptsLeft == 0 {
csm.logger.Error("max retries reached, giving up")
cancel()
return err
}
csm.logger.Trace("retrying after timeout, attempts left: %d", attemptsLeft)
csm.wait()
} else {
// This is a different error, we'll just return it
csm.logger.Error(err.Error())
cancel()
return err
}
} else {
cancel()
return proto.Unmarshal(data, value)
}
}
Expand All @@ -226,10 +230,12 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e
for {
var ctx context.Context
ctx, cancel = context.WithTimeout(context.Background(), csm.Timeout)

attemptsLeft--
data, err := proto.Marshal(value)
if err != nil {
csm.logger.Error("cannot convert proto to bytes: %q", err)
cancel()
return err
}
cmd := csm.client.Set(ctx, key, data, ttl)
Expand All @@ -240,15 +246,20 @@ func (csm *RedisStore) put(key string, value proto.Message, ttl time.Duration) e
csm.logger.Error(err.Error())
if attemptsLeft == 0 {
csm.logger.Error("max retries reached, giving up")
cancel()
return err
}
csm.logger.Trace("retrying after timeout, attempts left: %d", attemptsLeft)
csm.logger.Debug("retrying after timeout, attempts left: %d", attemptsLeft)
csm.wait()
} else {
cancel()
return err
}
} else {
csm.logger.Debug("Stored key `%s`", key)
cancel()
return nil
}
return nil
}
}

Expand All @@ -259,7 +270,7 @@ func (csm *RedisStore) Health() error {
_, err := csm.client.Ping(ctx).Result()
if err != nil {
csm.logger.Error("Error pinging redis: %s", err.Error())
return fmt.Errorf("Redis health check failed: %w", err)
return fmt.Errorf("redis health check failed: %w", err)
}
return nil
}
Expand All @@ -272,5 +283,4 @@ func (csm *RedisStore) Health() error {
func (csm *RedisStore) wait() {
waitForMsec := rand.Intn(500)
time.Sleep(time.Duration(waitForMsec) * time.Millisecond)

}

0 comments on commit bedf63c

Please sign in to comment.