Skip to content

Commit

Permalink
Add support for querying queue info (#13)
Browse files Browse the repository at this point in the history
Request `GET /queues/:queue` and return the details or ErrDoesNotExist
  • Loading branch information
mkuratczyk authored Oct 1, 2024
1 parent e662d95 commit 5fc29f4
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 9 deletions.
1 change: 1 addition & 0 deletions rabbitmq_amqp/amqp_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq_amqp

import (
"context"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down
23 changes: 19 additions & 4 deletions rabbitmq_amqp/amqp_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/Azure/go-amqp"
"github.com/google/uuid"
"strconv"
"time"

"github.com/Azure/go-amqp"
"github.com/google/uuid"
)

var ErrPreconditionFailed = errors.New("precondition Failed")
var ErrDoesNotExist = errors.New("does not exist")

type AmqpManagement struct {
session *amqp.Session
Expand Down Expand Up @@ -185,20 +187,33 @@ func (a *AmqpManagement) request(ctx context.Context, id string, body any, path
return msg.Value.(map[string]any), nil
}

i, _ := strconv.Atoi(*msg.Properties.Subject)
responseCode, _ := strconv.Atoi(*msg.Properties.Subject)

err = a.validateResponseCode(i, expectedResponseCodes)
err = a.validateResponseCode(responseCode, expectedResponseCodes)
if err != nil {
return nil, err
}

if responseCode == responseCode404 {
return nil, ErrDoesNotExist
}

return make(map[string]any), nil
}

func (a *AmqpManagement) Queue(queueName string) IQueueSpecification {
return newAmqpQueue(a, queueName)
}

func (a *AmqpManagement) QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error) {
path := queuePath(queueName)
result, err := a.Request(ctx, amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
if err != nil {
return nil, err
}
return newAmqpQueueInfo(result), nil
}

func (a *AmqpManagement) QueueClientName() IQueueSpecification {
return newAmqpQueue(a, "")
}
Expand Down
22 changes: 21 additions & 1 deletion rabbitmq_amqp/amqp_management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package rabbitmq_amqp

import (
"context"
"time"

"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"time"
)

var _ = Describe("Management tests", func() {
Expand Down Expand Up @@ -68,4 +70,22 @@ var _ = Describe("Management tests", func() {
Expect(management.Close(context.Background())).To(BeNil())
amqpConnection.Close(context.Background())
})

It("GET on non-existing queue returns ErrDoesNotExist", func() {
amqpConnection := NewAmqpConnection()
Expect(amqpConnection).NotTo(BeNil())
Expect(amqpConnection).To(BeAssignableToTypeOf(&AmqpConnection{}))

connectionSettings := NewConnectionSettings()
Expect(connectionSettings).NotTo(BeNil())
Expect(connectionSettings).To(BeAssignableToTypeOf(&ConnectionSettings{}))
err := amqpConnection.Open(context.Background(), connectionSettings)
Expect(err).To(BeNil())

management := amqpConnection.Management()
path := "/queues/i-do-not-exist"
result, err := management.Request(context.Background(), amqp.Null{}, path, commandGet, []int{responseCode200, responseCode404})
Expect(err).To(Equal(ErrDoesNotExist))
Expect(result).To(BeNil())
})
})
34 changes: 30 additions & 4 deletions rabbitmq_amqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(connection.Close(context.Background())).To(BeNil())
})

It("AMQP Queue Declare With Response and Delete should succeed", func() {
It("AMQP Queue Declare With Response and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Response and Delete should succeed"
queueSpec := management.Queue(queueName)
queueInfo, err := queueSpec.Declare(context.TODO())
Expand All @@ -40,11 +40,16 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Classic))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Queue Declare With Parameters and Delete should succeed", func() {
It("AMQP Queue Declare With Parameters and Get/Delete should succeed", func() {
const queueName = "AMQP Queue Declare With Parameters and Delete should succeed"
queueSpec := management.Queue(queueName).Exclusive(true).
AutoDelete(true).
Expand All @@ -67,11 +72,15 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("x-dead-letter-routing-key", "dead-letter-routing-key"))
Expect(queueInfo.GetArguments()).To(HaveKeyWithValue("max-length-bytes", int64(1000000000)))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Declare Quorum Queue and Delete should succeed", func() {
It("AMQP Declare Quorum Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Quorum Queue and Delete should succeed"
// Quorum queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
Expand All @@ -86,11 +95,16 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Quorum))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Declare Stream Queue and Delete should succeed", func() {
It("AMQP Declare Stream Queue and Get/Delete should succeed", func() {
const queueName = "AMQP Declare Stream Queue and Delete should succeed"
// Stream queue will ignore Exclusive and AutoDelete settings
// since they are not supported by quorum queues
Expand All @@ -105,6 +119,11 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(queueInfo.IsAutoDelete()).To(BeFalse())
Expect(queueInfo.IsExclusive()).To(BeFalse())
Expect(queueInfo.Type()).To(Equal(Stream))

// validate GET (query queue info)
queueInfoReceived, err := management.QueueInfo(context.TODO(), queueName)
Expect(queueInfoReceived).To(Equal(queueInfo))

err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})
Expand Down Expand Up @@ -160,6 +179,13 @@ var _ = Describe("AMQP Queue test ", func() {
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})

It("AMQP GET on non-existing queue should return ErrDoesNotExist", func() {
const queueName = "This queue does not exist"
result, err := management.QueueInfo(context.TODO(), queueName)
Expect(err).To(Equal(ErrDoesNotExist))
Expect(result).To(BeNil())
})
})

// TODO: This should be replaced with this library's publish function
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
responseCode200 = 200
responseCode201 = 201
responseCode204 = 204
responseCode404 = 404
responseCode409 = 409
commandPut = "PUT"
commandGet = "GET"
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type IManagement interface {
Open(ctx context.Context, connection IConnection) error
Close(ctx context.Context) error
Queue(queueName string) IQueueSpecification
QueueInfo(ctx context.Context, queueName string) (IQueueInfo, error)
Exchange(exchangeName string) IExchangeSpecification
Binding() IBindingSpecification
QueueClientName() IQueueSpecification
Expand Down

0 comments on commit 5fc29f4

Please sign in to comment.