From 608afba68658c7dc88dd3c1b6f46116c98ca8a70 Mon Sep 17 00:00:00 2001 From: Tim Holm Date: Tue, 6 Feb 2024 09:56:59 +1100 Subject: [PATCH] fix queue tests. --- cloud/aws/runtime/queue/sqs.go | 12 +- cloud/aws/runtime/queue/sqs_test.go | 109 +++++----- cloud/azure/Makefile | 2 + cloud/azure/mocks/azqueue/mock.go | 245 ++++++++++++++++++++++ cloud/azure/runtime/queue/azqueue_test.go | 22 +- cloud/gcp/runtime/queue/pubsub.go | 5 +- 6 files changed, 323 insertions(+), 72 deletions(-) create mode 100644 cloud/azure/mocks/azqueue/mock.go diff --git a/cloud/aws/runtime/queue/sqs.go b/cloud/aws/runtime/queue/sqs.go index 35e4088ae..60098bbb1 100644 --- a/cloud/aws/runtime/queue/sqs.go +++ b/cloud/aws/runtime/queue/sqs.go @@ -25,6 +25,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go" "github.com/golang/protobuf/proto" + structpb "github.com/golang/protobuf/ptypes/struct" "github.com/google/uuid" "go.opentelemetry.io/contrib/instrumentation/github.com/aws/aws-sdk-go-v2/otelaws" "google.golang.org/grpc/codes" @@ -196,10 +197,8 @@ func (s *SQSQueueService) Receive(ctx context.Context, req *queuepb.QueueReceive tasks := make([]*queuepb.ReceivedTask, 0, len(res.Messages)) for _, m := range res.Messages { - receivedTask := &queuepb.ReceivedTask{ - LeaseId: *m.ReceiptHandle, - } - err := proto.Unmarshal([]byte(*m.Body), receivedTask.Payload) + var structPayload structpb.Struct + err := proto.Unmarshal([]byte(*m.Body), &structPayload) if err != nil { return nil, newErr( codes.Internal, @@ -208,7 +207,10 @@ func (s *SQSQueueService) Receive(ctx context.Context, req *queuepb.QueueReceive ) } - tasks = append(tasks, receivedTask) + tasks = append(tasks, &queuepb.ReceivedTask{ + LeaseId: *m.ReceiptHandle, + Payload: &structPayload, + }) } return &queuepb.QueueReceiveResponse{ diff --git a/cloud/aws/runtime/queue/sqs_test.go b/cloud/aws/runtime/queue/sqs_test.go index 07e98fdf6..05a51c666 100644 --- a/cloud/aws/runtime/queue/sqs_test.go +++ b/cloud/aws/runtime/queue/sqs_test.go @@ -24,6 +24,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go" "github.com/golang/mock/gomock" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" . "github.com/onsi/ginkgo" @@ -32,16 +33,23 @@ import ( mock_provider "github.com/nitrictech/nitric/cloud/aws/mocks/provider" mocks_sqs "github.com/nitrictech/nitric/cloud/aws/mocks/sqs" "github.com/nitrictech/nitric/cloud/aws/runtime/resource" - queuepb "github.com/nitrictech/nitric/core/pkg/proto/queue/v1" + queuepb "github.com/nitrictech/nitric/core/pkg/proto/queues/v1" ) var _ = Describe("Sqs", func() { + testStruct, err := structpb.NewStruct(map[string]interface{}{"Test": "Test"}) + + Expect(err).To(BeNil()) + + testPayloadBytes, err := proto.Marshal(testStruct) + Expect(err).To(BeNil()) + Context("getUrlForQueueName", func() { When("GetResources returns an error", func() { It("Should fail to publish the message", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock).(*SQSQueueService) By("Calling GetResources and receiving an error") @@ -60,11 +68,11 @@ var _ = Describe("Sqs", func() { It("Should fail to publish the message", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock).(*SQSQueueService) By("Calling GetResources and have queue be missing") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]string{}, nil) + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]resource.ResolvedResource{}, nil) _, err := plugin.getUrlForQueueName(context.TODO(), "test-queue") @@ -82,14 +90,16 @@ var _ = Describe("Sqs", func() { It("Should send the task to the queue", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("The queue being available") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "test-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue", + }, }, nil) By("Calling GetQueueUrl to get the queue name") @@ -98,21 +108,12 @@ var _ = Describe("Sqs", func() { }, nil) By("Calling SendMessageBatch with the expected batch entries") - sqsMock.EXPECT().SendMessageBatch(gomock.Any(), &sqs.SendMessageBatchInput{ - QueueUrl: queueUrl, - Entries: []types.SendMessageBatchRequestEntry{ - { - Id: aws.String("1234"), - MessageBody: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`), - }, - }, - }).Return(&sqs.SendMessageBatchOutput{}, nil) + sqsMock.EXPECT().SendMessageBatch(gomock.Any(), gomock.Any()).Return(&sqs.SendMessageBatchOutput{}, nil) _, err := plugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{ QueueName: "test-queue", Requests: []*queuepb.QueueSendRequest{ { - Id: "1234", Payload: &structpb.Struct{ Fields: map[string]*structpb.Value{ "Test": structpb.NewStringValue("Test"), @@ -132,14 +133,16 @@ var _ = Describe("Sqs", func() { It("Should return an error", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("The queue being available") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "test-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue", + }, }, nil) By("Calling GetQueueUrl to get the queue name") @@ -153,21 +156,12 @@ var _ = Describe("Sqs", func() { } By("Calling SendMessageBatch with the expected batch entries") - sqsMock.EXPECT().SendMessageBatch(gomock.Any(), &sqs.SendMessageBatchInput{ - QueueUrl: queueUrl, - Entries: []types.SendMessageBatchRequestEntry{ - { - Id: aws.String("1234"), - MessageBody: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`), - }, - }, - }).Return(nil, opErr) + sqsMock.EXPECT().SendMessageBatch(gomock.Any(), gomock.Any()).Return(nil, opErr) _, err := plugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{ QueueName: "test-queue", Requests: []*queuepb.QueueSendRequest{ { - Id: "1234", Payload: &structpb.Struct{ Fields: map[string]*structpb.Value{ "Test": structpb.NewStringValue("Test"), @@ -189,7 +183,7 @@ var _ = Describe("Sqs", func() { It("Should fail to publish the message", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) By("provider GetResources returning an error") @@ -199,7 +193,6 @@ var _ = Describe("Sqs", func() { QueueName: "test-queue", Requests: []*queuepb.QueueSendRequest{ { - Id: "1234", Payload: &structpb.Struct{ Fields: map[string]*structpb.Value{ "Test": structpb.NewStringValue("Test"), @@ -225,14 +218,16 @@ var _ = Describe("Sqs", func() { It("Should receive the message", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("calling provider GetResources to get the queue arn") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "mock-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue", + }, }, nil) By("calling provider GetQueuUrl") @@ -251,7 +246,7 @@ var _ = Describe("Sqs", func() { Messages: []types.Message{ { ReceiptHandle: aws.String("mockreceipthandle"), - Body: aws.String(`{"id":"1234","payloadType":"test-payload","payload":{"Test":"Test"}}`), + Body: aws.String(string(testPayloadBytes)), }, }, }, nil) @@ -263,14 +258,8 @@ var _ = Describe("Sqs", func() { }) Expect(response.Tasks).To(HaveLen(1)) - Expect(response.Tasks[0]).To(BeEquivalentTo(queuepb.ReceivedTask{ - LeaseId: "mockreceipthandle", - Payload: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "Test": structpb.NewStringValue("Test"), - }, - }, - })) + Expect(response.Tasks[0].LeaseId).To(BeEquivalentTo("mockreceipthandle")) + Expect(response.Tasks[0].Payload.AsMap()).To(BeEquivalentTo(testStruct.AsMap())) Expect(err).ShouldNot(HaveOccurred()) ctrl.Finish() @@ -281,14 +270,16 @@ var _ = Describe("Sqs", func() { It("Should receive no messages", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("Calling GetResources to get the queue arn") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "mock-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue", + }, }, nil) By("Calling GetQueueUrl to get the queue url") @@ -326,14 +317,16 @@ var _ = Describe("Sqs", func() { It("Should return an error", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("Calling GetResources to get the queue arn") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "mock-queue": "arn:aws:sqs:us-east-2:444455556666:mock-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "mock-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:mock-queue", + }, }, nil) opErr := &smithy.OperationError{ @@ -376,14 +369,16 @@ var _ = Describe("Sqs", func() { It("Should successfully delete the task", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("https://example.com/test-queue") By("Calling GetResources to get the queue arn") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]string{ - "test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Return(map[string]resource.ResolvedResource{ + "test-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue", + }, }, nil) By("Calling ListQueueTags to get the stack specific nitric name") @@ -417,14 +412,16 @@ var _ = Describe("Sqs", func() { It("Return an error", func() { ctrl := gomock.NewController(GinkgoT()) sqsMock := mocks_sqs.NewMockSQSAPI(ctrl) - providerMock := mock_provider.NewMockAwsProvider(ctrl) + providerMock := mock_provider.NewMockAwsResourceProvider(ctrl) plugin := NewWithClient(providerMock, sqsMock) queueUrl := aws.String("http://example.com/queue") By("Calling GetResources to get the queue arn") - providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]string{ - "test-queue": "arn:aws:sqs:us-east-2:444455556666:test-queue", + providerMock.EXPECT().GetResources(gomock.Any(), resource.AwsResource_Queue).Times(1).Return(map[string]resource.ResolvedResource{ + "test-queue": { + ARN: "arn:aws:sqs:us-east-2:444455556666:test-queue", + }, }, nil) By("Calling GetQueueUrl to get the queueurl") diff --git a/cloud/azure/Makefile b/cloud/azure/Makefile index f8b931e9c..b46542596 100644 --- a/cloud/azure/Makefile +++ b/cloud/azure/Makefile @@ -64,10 +64,12 @@ generate-mocks: clean-mocks @echo Generating Mock Clients @mkdir -p mocks/key_vault @mkdir -p mocks/azblob + @mkdir -p mocks/azqueue @mkdir -p mocks/mock_event_grid @mkdir -p mocks/provider @go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/cloud/azure/runtime/resource AzProvider > mocks/provider/azure.go @go run github.com/golang/mock/mockgen -package mock_azblob github.com/Azure/azure-storage-blob-go/azblob StorageError > mocks/azblob/error.go + @go run github.com/golang/mock/mockgen -package mock_azqueue github.com/nitrictech/nitric/cloud/azure/runtime/queue/iface AzqueueServiceUrlIface,AzqueueQueueUrlIface,AzqueueMessageUrlIface,DequeueMessagesResponseIface,AzqueueMessageIdUrlIface > mocks/azqueue/mock.go @go run github.com/golang/mock/mockgen -package mock_azblob github.com/nitrictech/nitric/cloud/azure/runtime/storage/iface AzblobServiceUrlIface,AzblobContainerUrlIface,AzblobBlockBlobUrlIface,AzblobDownloadResponse > mocks/azblob/mock.go @go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/cloud/azure/runtime/secret KeyVaultClient > mocks/key_vault/mock.go @go run github.com/golang/mock/mockgen github.com/Azure/azure-sdk-for-go/services/eventgrid/2018-01-01/eventgrid/eventgridapi BaseClientAPI > mocks/mock_event_grid/mock.go diff --git a/cloud/azure/mocks/azqueue/mock.go b/cloud/azure/mocks/azqueue/mock.go new file mode 100644 index 000000000..72c3d2656 --- /dev/null +++ b/cloud/azure/mocks/azqueue/mock.go @@ -0,0 +1,245 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/nitrictech/nitric/cloud/azure/runtime/queue/iface (interfaces: AzqueueServiceUrlIface,AzqueueQueueUrlIface,AzqueueMessageUrlIface,DequeueMessagesResponseIface,AzqueueMessageIdUrlIface) + +// Package mock_azqueue is a generated GoMock package. +package mock_azqueue + +import ( + context "context" + reflect "reflect" + time "time" + + azqueue "github.com/Azure/azure-storage-queue-go/azqueue" + gomock "github.com/golang/mock/gomock" + iface "github.com/nitrictech/nitric/cloud/azure/runtime/queue/iface" +) + +// MockAzqueueServiceUrlIface is a mock of AzqueueServiceUrlIface interface. +type MockAzqueueServiceUrlIface struct { + ctrl *gomock.Controller + recorder *MockAzqueueServiceUrlIfaceMockRecorder +} + +// MockAzqueueServiceUrlIfaceMockRecorder is the mock recorder for MockAzqueueServiceUrlIface. +type MockAzqueueServiceUrlIfaceMockRecorder struct { + mock *MockAzqueueServiceUrlIface +} + +// NewMockAzqueueServiceUrlIface creates a new mock instance. +func NewMockAzqueueServiceUrlIface(ctrl *gomock.Controller) *MockAzqueueServiceUrlIface { + mock := &MockAzqueueServiceUrlIface{ctrl: ctrl} + mock.recorder = &MockAzqueueServiceUrlIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAzqueueServiceUrlIface) EXPECT() *MockAzqueueServiceUrlIfaceMockRecorder { + return m.recorder +} + +// NewQueueURL mocks base method. +func (m *MockAzqueueServiceUrlIface) NewQueueURL(arg0 string) iface.AzqueueQueueUrlIface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewQueueURL", arg0) + ret0, _ := ret[0].(iface.AzqueueQueueUrlIface) + return ret0 +} + +// NewQueueURL indicates an expected call of NewQueueURL. +func (mr *MockAzqueueServiceUrlIfaceMockRecorder) NewQueueURL(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewQueueURL", reflect.TypeOf((*MockAzqueueServiceUrlIface)(nil).NewQueueURL), arg0) +} + +// MockAzqueueQueueUrlIface is a mock of AzqueueQueueUrlIface interface. +type MockAzqueueQueueUrlIface struct { + ctrl *gomock.Controller + recorder *MockAzqueueQueueUrlIfaceMockRecorder +} + +// MockAzqueueQueueUrlIfaceMockRecorder is the mock recorder for MockAzqueueQueueUrlIface. +type MockAzqueueQueueUrlIfaceMockRecorder struct { + mock *MockAzqueueQueueUrlIface +} + +// NewMockAzqueueQueueUrlIface creates a new mock instance. +func NewMockAzqueueQueueUrlIface(ctrl *gomock.Controller) *MockAzqueueQueueUrlIface { + mock := &MockAzqueueQueueUrlIface{ctrl: ctrl} + mock.recorder = &MockAzqueueQueueUrlIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAzqueueQueueUrlIface) EXPECT() *MockAzqueueQueueUrlIfaceMockRecorder { + return m.recorder +} + +// NewMessageURL mocks base method. +func (m *MockAzqueueQueueUrlIface) NewMessageURL() iface.AzqueueMessageUrlIface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewMessageURL") + ret0, _ := ret[0].(iface.AzqueueMessageUrlIface) + return ret0 +} + +// NewMessageURL indicates an expected call of NewMessageURL. +func (mr *MockAzqueueQueueUrlIfaceMockRecorder) NewMessageURL() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewMessageURL", reflect.TypeOf((*MockAzqueueQueueUrlIface)(nil).NewMessageURL)) +} + +// MockAzqueueMessageUrlIface is a mock of AzqueueMessageUrlIface interface. +type MockAzqueueMessageUrlIface struct { + ctrl *gomock.Controller + recorder *MockAzqueueMessageUrlIfaceMockRecorder +} + +// MockAzqueueMessageUrlIfaceMockRecorder is the mock recorder for MockAzqueueMessageUrlIface. +type MockAzqueueMessageUrlIfaceMockRecorder struct { + mock *MockAzqueueMessageUrlIface +} + +// NewMockAzqueueMessageUrlIface creates a new mock instance. +func NewMockAzqueueMessageUrlIface(ctrl *gomock.Controller) *MockAzqueueMessageUrlIface { + mock := &MockAzqueueMessageUrlIface{ctrl: ctrl} + mock.recorder = &MockAzqueueMessageUrlIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAzqueueMessageUrlIface) EXPECT() *MockAzqueueMessageUrlIfaceMockRecorder { + return m.recorder +} + +// Dequeue mocks base method. +func (m *MockAzqueueMessageUrlIface) Dequeue(arg0 context.Context, arg1 int32, arg2 time.Duration) (iface.DequeueMessagesResponseIface, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Dequeue", arg0, arg1, arg2) + ret0, _ := ret[0].(iface.DequeueMessagesResponseIface) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Dequeue indicates an expected call of Dequeue. +func (mr *MockAzqueueMessageUrlIfaceMockRecorder) Dequeue(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Dequeue", reflect.TypeOf((*MockAzqueueMessageUrlIface)(nil).Dequeue), arg0, arg1, arg2) +} + +// Enqueue mocks base method. +func (m *MockAzqueueMessageUrlIface) Enqueue(arg0 context.Context, arg1 string, arg2, arg3 time.Duration) (*azqueue.EnqueueMessageResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enqueue", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*azqueue.EnqueueMessageResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Enqueue indicates an expected call of Enqueue. +func (mr *MockAzqueueMessageUrlIfaceMockRecorder) Enqueue(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enqueue", reflect.TypeOf((*MockAzqueueMessageUrlIface)(nil).Enqueue), arg0, arg1, arg2, arg3) +} + +// NewMessageIDURL mocks base method. +func (m *MockAzqueueMessageUrlIface) NewMessageIDURL(arg0 azqueue.MessageID) iface.AzqueueMessageIdUrlIface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewMessageIDURL", arg0) + ret0, _ := ret[0].(iface.AzqueueMessageIdUrlIface) + return ret0 +} + +// NewMessageIDURL indicates an expected call of NewMessageIDURL. +func (mr *MockAzqueueMessageUrlIfaceMockRecorder) NewMessageIDURL(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewMessageIDURL", reflect.TypeOf((*MockAzqueueMessageUrlIface)(nil).NewMessageIDURL), arg0) +} + +// MockDequeueMessagesResponseIface is a mock of DequeueMessagesResponseIface interface. +type MockDequeueMessagesResponseIface struct { + ctrl *gomock.Controller + recorder *MockDequeueMessagesResponseIfaceMockRecorder +} + +// MockDequeueMessagesResponseIfaceMockRecorder is the mock recorder for MockDequeueMessagesResponseIface. +type MockDequeueMessagesResponseIfaceMockRecorder struct { + mock *MockDequeueMessagesResponseIface +} + +// NewMockDequeueMessagesResponseIface creates a new mock instance. +func NewMockDequeueMessagesResponseIface(ctrl *gomock.Controller) *MockDequeueMessagesResponseIface { + mock := &MockDequeueMessagesResponseIface{ctrl: ctrl} + mock.recorder = &MockDequeueMessagesResponseIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDequeueMessagesResponseIface) EXPECT() *MockDequeueMessagesResponseIfaceMockRecorder { + return m.recorder +} + +// Message mocks base method. +func (m *MockDequeueMessagesResponseIface) Message(arg0 int32) *azqueue.DequeuedMessage { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Message", arg0) + ret0, _ := ret[0].(*azqueue.DequeuedMessage) + return ret0 +} + +// Message indicates an expected call of Message. +func (mr *MockDequeueMessagesResponseIfaceMockRecorder) Message(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Message", reflect.TypeOf((*MockDequeueMessagesResponseIface)(nil).Message), arg0) +} + +// NumMessages mocks base method. +func (m *MockDequeueMessagesResponseIface) NumMessages() int32 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NumMessages") + ret0, _ := ret[0].(int32) + return ret0 +} + +// NumMessages indicates an expected call of NumMessages. +func (mr *MockDequeueMessagesResponseIfaceMockRecorder) NumMessages() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NumMessages", reflect.TypeOf((*MockDequeueMessagesResponseIface)(nil).NumMessages)) +} + +// MockAzqueueMessageIdUrlIface is a mock of AzqueueMessageIdUrlIface interface. +type MockAzqueueMessageIdUrlIface struct { + ctrl *gomock.Controller + recorder *MockAzqueueMessageIdUrlIfaceMockRecorder +} + +// MockAzqueueMessageIdUrlIfaceMockRecorder is the mock recorder for MockAzqueueMessageIdUrlIface. +type MockAzqueueMessageIdUrlIfaceMockRecorder struct { + mock *MockAzqueueMessageIdUrlIface +} + +// NewMockAzqueueMessageIdUrlIface creates a new mock instance. +func NewMockAzqueueMessageIdUrlIface(ctrl *gomock.Controller) *MockAzqueueMessageIdUrlIface { + mock := &MockAzqueueMessageIdUrlIface{ctrl: ctrl} + mock.recorder = &MockAzqueueMessageIdUrlIfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockAzqueueMessageIdUrlIface) EXPECT() *MockAzqueueMessageIdUrlIfaceMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockAzqueueMessageIdUrlIface) Delete(arg0 context.Context, arg1 azqueue.PopReceipt) (*azqueue.MessageIDDeleteResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", arg0, arg1) + ret0, _ := ret[0].(*azqueue.MessageIDDeleteResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Delete indicates an expected call of Delete. +func (mr *MockAzqueueMessageIdUrlIfaceMockRecorder) Delete(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockAzqueueMessageIdUrlIface)(nil).Delete), arg0, arg1) +} diff --git a/cloud/azure/runtime/queue/azqueue_test.go b/cloud/azure/runtime/queue/azqueue_test.go index 435fa4ccd..794cd887d 100644 --- a/cloud/azure/runtime/queue/azqueue_test.go +++ b/cloud/azure/runtime/queue/azqueue_test.go @@ -21,17 +21,23 @@ import ( azqueue "github.com/Azure/azure-storage-queue-go/azqueue" "github.com/golang/mock/gomock" + "github.com/golang/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" mock_azqueue "github.com/nitrictech/nitric/cloud/azure/mocks/azqueue" - "github.com/nitrictech/nitric/core/pkg/plugins/queue" - queuepb "github.com/nitrictech/nitric/core/pkg/proto/queue/v1" + queuepb "github.com/nitrictech/nitric/core/pkg/proto/queues/v1" ) var _ = Describe("Azqueue", func() { + testStruct, err := structpb.NewStruct(map[string]interface{}{"testval": "testkey"}) + Expect(err).To(BeNil()) + + testPayloadBytes, err := proto.Marshal(testStruct) + Expect(err).To(BeNil()) + Context("Send", func() { When("Azure returns a successfully response", func() { crtl := gomock.NewController(GinkgoT()) @@ -54,13 +60,11 @@ var _ = Describe("Azqueue", func() { By("Calling Enqueue once on the Message URL with the expected options") mockMessages.EXPECT().Enqueue( gomock.Any(), - "{\"payload\":{\"testval\":\"testkey\"}}", + string(testPayloadBytes), time.Duration(0), time.Duration(0), ).Times(2).Return(&azqueue.EnqueueMessageResponse{}, nil) - testStruct, _ := structpb.NewStruct(map[string]interface{}{"testval": "testkey"}) - resp, err := queuePlugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{ QueueName: "test-queue", Requests: []*queuepb.QueueSendRequest{ @@ -104,13 +108,13 @@ var _ = Describe("Azqueue", func() { By("Calling Enqueue once on the Message URL with the expected options") mockMessages.EXPECT().Enqueue( gomock.Any(), - "{\"payload\":{\"testval\":\"testkey\"}}", + string(testPayloadBytes), time.Duration(0), time.Duration(0), ).AnyTimes( /* Using AnyTimes because Times(2) doesn't work for multiple returns */ ).Return(nil, fmt.Errorf("a test error")).Return(&azqueue.EnqueueMessageResponse{}, nil) - testStruct, _ := structpb.NewStruct(map[string]interface{}{"testval": "testkey"}) + // testStruct, _ := structpb.NewStruct(map[string]interface{}{"testval": "testkey"}) resp, err := queuePlugin.Send(context.TODO(), &queuepb.QueueSendRequestBatch{ QueueName: "test-queue", @@ -128,7 +132,7 @@ var _ = Describe("Azqueue", func() { Expect(err).ToNot(HaveOccurred()) By("Not returning failed tasks") - Expect(resp.FailedRequests).To(Equal([]*queue.FailedTask{})) + Expect(len(resp.FailedRequests)).To(Equal(0)) crtl.Finish() }) @@ -170,7 +174,7 @@ var _ = Describe("Azqueue", func() { PopReceipt: "popreceipt", NextVisibleTime: time.Time{}, DequeueCount: 0, - Text: "{\"payload\":{\"testval\":\"testkey\"}}", + Text: string(testPayloadBytes), }) resp, err := queuePlugin.Receive(context.TODO(), &queuepb.QueueReceiveRequest{ diff --git a/cloud/gcp/runtime/queue/pubsub.go b/cloud/gcp/runtime/queue/pubsub.go index 607cc5475..1491aa148 100644 --- a/cloud/gcp/runtime/queue/pubsub.go +++ b/cloud/gcp/runtime/queue/pubsub.go @@ -16,6 +16,7 @@ package queue import ( "context" + "errors" "fmt" structpb "github.com/golang/protobuf/ptypes/struct" @@ -36,8 +37,6 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" - "errors" - ifaces_pubsub "github.com/nitrictech/nitric/cloud/gcp/ifaces/pubsub" grpc_errors "github.com/nitrictech/nitric/core/pkg/grpc/errors" @@ -186,6 +185,7 @@ func (s *PubsubQueueService) Send(ctx context.Context, req *queuespb.QueueSendRe FailedRequests: failedTasks, }, nil } + func (s *PubsubQueueService) Receive(ctx context.Context, req *queuespb.QueueReceiveRequest) (*queuespb.QueueReceiveResponse, error) { newErr := grpc_errors.ErrorsWithScope("PubsubQueueService.Receive") @@ -262,6 +262,7 @@ func (s *PubsubQueueService) Receive(ctx context.Context, req *queuespb.QueueRec Tasks: tasks, }, nil } + func (s *PubsubQueueService) Complete(ctx context.Context, req *queuespb.QueueCompleteRequest) (*queuespb.QueueCompleteResponse, error) { newErr := grpc_errors.ErrorsWithScope("PubsubQueueService.Complete")