Skip to content

Commit

Permalink
fix queue tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjholm committed Feb 5, 2024
1 parent ac2fa8f commit 608afba
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 72 deletions.
12 changes: 7 additions & 5 deletions cloud/aws/runtime/queue/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"github.com/golang/protobuf/proto"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/google/uuid"
"go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -196,10 +197,8 @@ func (s *SQSQueueService) Receive(ctx context.Context, req *queuepb.QueueReceive

tasks := make([]*queuepb.ReceivedTask, 0, len(res.Messages))
for _, m := range res.Messages {
receivedTask := &queuepb.ReceivedTask{
LeaseId: *m.ReceiptHandle,
}
err := proto.Unmarshal([]byte(*m.Body), receivedTask.Payload)
var structPayload structpb.Struct
err := proto.Unmarshal([]byte(*m.Body), &structPayload)
if err != nil {
return nil, newErr(
codes.Internal,
Expand All @@ -208,7 +207,10 @@ func (s *SQSQueueService) Receive(ctx context.Context, req *queuepb.QueueReceive
)
}

tasks = append(tasks, receivedTask)
tasks = append(tasks, &queuepb.ReceivedTask{
LeaseId: *m.ReceiptHandle,
Payload: &structPayload,
})
}

return &queuepb.QueueReceiveResponse{
Expand Down
109 changes: 53 additions & 56 deletions cloud/aws/runtime/queue/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go"
"github.com/golang/mock/gomock"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

. "github.com/onsi/ginkgo"
Expand All @@ -32,16 +33,23 @@ import (
mock_provider "github.com/nitrictech/nitric/cloud/aws/mocks/provider"
mocks_sqs "github.com/nitrictech/nitric/cloud/aws/mocks/sqs"
"github.com/nitrictech/nitric/cloud/aws/runtime/resource"
queuepb "github.com/nitrictech/nitric/core/pkg/proto/queue/v1"
queuepb "github.com/nitrictech/nitric/core/pkg/proto/queues/v1"
)

var _ = Describe("Sqs", func() {
testStruct, err := structpb.NewStruct(map[string]interface{}{"Test": "Test"})

Expect(err).To(BeNil())

testPayloadBytes, err := proto.Marshal(testStruct)
Expect(err).To(BeNil())

Context("getUrlForQueueName", func() {
When("GetResources returns an error", func() {
It("Should fail to publish the message", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock).(*SQSQueueService)

By("Calling GetResources and receiving an error")
Expand All @@ -60,11 +68,11 @@ var _ = Describe("Sqs", func() {
It("Should fail to publish the message", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock).(*SQSQueueService)

By("Calling GetResources and have queue be missing")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]string{}, nil)
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]resource.ResolvedResource{}, nil)

_, err := plugin.getUrlForQueueName(context.TODO(), "test-queue")

Expand All @@ -82,14 +90,16 @@ var _ = Describe("Sqs", func() {
It("Should send the task to the queue", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("The queue being available")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"test-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue",
},
}, nil)

By("Calling GetQueueUrl to get the queue name")
Expand All @@ -98,21 +108,12 @@ var _ = Describe("Sqs", func() {
}, nil)

By("Calling SendMessageBatch with the expected batch entries")
sqsMock.EXPECT().SendMessageBatch(gomock.Any(), &sqs.SendMessageBatchInput{
QueueUrl: queueUrl,
Entries: []types.SendMessageBatchRequestEntry{
{
Id: aws.String("1234"),
MessageBody: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`),
},
},
}).Return(&sqs.SendMessageBatchOutput{}, nil)
sqsMock.EXPECT().SendMessageBatch(gomock.Any(), gomock.Any()).Return(&sqs.SendMessageBatchOutput{}, nil)

_, err := plugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{
QueueName: "test-queue",
Requests: []*queuepb.QueueSendRequest{
{
Id: "1234",
Payload: &structpb.Struct{
Fields: map[string]*structpb.Value{
"Test": structpb.NewStringValue("Test"),
Expand All @@ -132,14 +133,16 @@ var _ = Describe("Sqs", func() {
It("Should return an error", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("The queue being available")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"test-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue",
},
}, nil)

By("Calling GetQueueUrl to get the queue name")
Expand All @@ -153,21 +156,12 @@ var _ = Describe("Sqs", func() {
}

By("Calling SendMessageBatch with the expected batch entries")
sqsMock.EXPECT().SendMessageBatch(gomock.Any(), &sqs.SendMessageBatchInput{
QueueUrl: queueUrl,
Entries: []types.SendMessageBatchRequestEntry{
{
Id: aws.String("1234"),
MessageBody: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`),
},
},
}).Return(nil, opErr)
sqsMock.EXPECT().SendMessageBatch(gomock.Any(), gomock.Any()).Return(nil, opErr)

_, err := plugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{
QueueName: "test-queue",
Requests: []*queuepb.QueueSendRequest{
{
Id: "1234",
Payload: &structpb.Struct{
Fields: map[string]*structpb.Value{
"Test": structpb.NewStringValue("Test"),
Expand All @@ -189,7 +183,7 @@ var _ = Describe("Sqs", func() {
It("Should fail to publish the message", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

By("provider GetResources returning an error")
Expand All @@ -199,7 +193,6 @@ var _ = Describe("Sqs", func() {
QueueName: "test-queue",
Requests: []*queuepb.QueueSendRequest{
{
Id: "1234",
Payload: &structpb.Struct{
Fields: map[string]*structpb.Value{
"Test": structpb.NewStringValue("Test"),
Expand All @@ -225,14 +218,16 @@ var _ = Describe("Sqs", func() {
It("Should receive the message", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("calling provider GetResources to get the queue arn")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"mock-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue",
},
}, nil)

By("calling provider GetQueuUrl")
Expand All @@ -251,7 +246,7 @@ var _ = Describe("Sqs", func() {
Messages: []types.Message{
{
ReceiptHandle: aws.String("mockreceipthandle"),
Body: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`),
Body: aws.String(string(testPayloadBytes)),
},
},
}, nil)
Expand All @@ -263,14 +258,8 @@ var _ = Describe("Sqs", func() {
})

Expect(response.Tasks).To(HaveLen(1))
Expect(response.Tasks[0]).To(BeEquivalentTo(queuepb.ReceivedTask{
LeaseId: "mockreceipthandle",
Payload: &structpb.Struct{
Fields: map[string]*structpb.Value{
"Test": structpb.NewStringValue("Test"),
},
},
}))
Expect(response.Tasks[0].LeaseId).To(BeEquivalentTo("mockreceipthandle"))
Expect(response.Tasks[0].Payload.AsMap()).To(BeEquivalentTo(testStruct.AsMap()))
Expect(err).ShouldNot(HaveOccurred())

ctrl.Finish()
Expand All @@ -281,14 +270,16 @@ var _ = Describe("Sqs", func() {
It("Should receive no messages", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("Calling GetResources to get the queue arn")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"mock-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue",
},
}, nil)

By("Calling GetQueueUrl to get the queue url")
Expand Down Expand Up @@ -326,14 +317,16 @@ var _ = Describe("Sqs", func() {
It("Should return an error", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("Calling GetResources to get the queue arn")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"mock-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue",
},
}, nil)

opErr := &smithy.OperationError{
Expand Down Expand Up @@ -376,14 +369,16 @@ var _ = Describe("Sqs", func() {
It("Should successfully delete the task", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("https://example.com/test-queue")

By("Calling GetResources to get the queue arn")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{
"test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{
"test-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue",
},
}, nil)

By("Calling ListQueueTags to get the stack specific nitric name")
Expand Down Expand Up @@ -417,14 +412,16 @@ var _ = Describe("Sqs", func() {
It("Return an error", func() {
ctrl := gomock.NewController(GinkgoT())
sqsMock := mocks_sqs.NewMockSQSAPI(ctrl)
providerMock := mock_provider.NewMockAwsProvider(ctrl)
providerMock := mock_provider.NewMockAwsResourceProvider(ctrl)
plugin := NewWithClient(providerMock, sqsMock)

queueUrl := aws.String("http://example.com/queue")

By("Calling GetResources to get the queue arn")
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]string{
"test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue",
providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]resource.ResolvedResource{
"test-queue": {
ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue",
},
}, nil)

By("Calling GetQueueUrl to get the queueurl")
Expand Down
2 changes: 2 additions & 0 deletions cloud/azure/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ generate-mocks: clean-mocks
@echo Generating Mock Clients
@mkdir -p mocks/key_vault
@mkdir -p mocks/azblob
@mkdir -p mocks/azqueue
@mkdir -p mocks/mock_event_grid
@mkdir -p mocks/provider
@go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/cloud/azure/runtime/resource AzProvider > mocks/provider/azure.go
@go run github.com/golang/mock/mockgen -package mock_azblob github.com/Azure/azure-storage-blob-go/azblob StorageError > mocks/azblob/error.go
@go run github.com/golang/mock/mockgen -package mock_azqueue github.com/nitrictech/nitric/cloud/azure/runtime/queue/iface AzqueueServiceUrlIface,AzqueueQueueUrlIface,AzqueueMessageUrlIface,DequeueMessagesResponseIface,AzqueueMessageIdUrlIface > mocks/azqueue/mock.go
@go run github.com/golang/mock/mockgen -package mock_azblob github.com/nitrictech/nitric/cloud/azure/runtime/storage/iface AzblobServiceUrlIface,AzblobContainerUrlIface,AzblobBlockBlobUrlIface,AzblobDownloadResponse > mocks/azblob/mock.go
@go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/cloud/azure/runtime/secret KeyVaultClient > mocks/key_vault/mock.go
@go run github.com/golang/mock/mockgen github.com/Azure/azure-sdk-for-go/services/eventgrid/2018-01-01/eventgrid/eventgridapi BaseClientAPI > mocks/mock_event_grid/mock.go
Expand Down
Loading

0 comments on commit 608afba

Please sign in to comment.