Skip to content

Commit

Permalink
Add AmqpQueue.Purge()
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Oct 1, 2024
1 parent 6d8aaeb commit fd45ee2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 1 deletion.
7 changes: 7 additions & 0 deletions rabbitmq_amqp/amqp_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rabbitmq_amqp

import (
"context"

"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -164,6 +165,12 @@ func (a *AmqpQueue) Delete(ctx context.Context) error {
return err
}

func (a *AmqpQueue) Purge(ctx context.Context) (int, error) {
path := queuePurgePath(a.name)
response, err := a.management.Request(ctx, amqp.Null{}, path, commandDelete, []int{responseCode200})
return int(response["message_count"].(uint64)), err
}

func (a *AmqpQueue) Name(queueName string) IQueueSpecification {
a.name = queueName
return a
Expand Down
39 changes: 39 additions & 0 deletions rabbitmq_amqp/amqp_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package rabbitmq_amqp

import (
"context"
"strconv"

"github.com/Azure/go-amqp"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
Expand Down Expand Up @@ -146,4 +149,40 @@ var _ = Describe("AMQP Queue test ", func() {
err = queueSpec.Delete(context.TODO())
Expect(err).To(BeNil())
})

It("AMQP Purge Queue should succeed and return the number of messages purged", func() {
const queueName = "AMQP Purge Queue should succeed and return the number of messages purged"
queueSpec := management.Queue(queueName)
_, err := queueSpec.Declare(context.TODO())
Expect(err).To(BeNil())
publishMessages(queueName, 10)
purged, err := queueSpec.Purge(context.TODO())
Expect(err).To(BeNil())
Expect(purged).To(Equal(10))
})
})

// TODO: This should be replaced with this library's publish function
// but for the time being, we need a way to publish messages or test purposes
func publishMessages(queueName string, count int) {
conn, err := amqp.Dial(context.TODO(), "amqp://guest:guest@localhost", nil)
if err != nil {
Fail(err.Error())
}
session, err := conn.NewSession(context.TODO(), nil)
if err != nil {
Fail(err.Error())
}
sender, err := session.NewSender(context.TODO(), queuePath(queueName), nil)
if err != nil {
Fail(err.Error())
}

for i := 0; i < count; i++ {
err = sender.Send(context.TODO(), amqp.NewMessage([]byte("Message #"+strconv.Itoa(i))), nil)
if err != nil {
Fail(err.Error())
}
}

}
7 changes: 6 additions & 1 deletion rabbitmq_amqp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"crypto/md5"
"encoding/base64"
"fmt"
"github.com/google/uuid"
"net/url"
"strings"

"github.com/google/uuid"
)

const (
Expand Down Expand Up @@ -69,6 +70,10 @@ func queuePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName)
}

func queuePurgePath(queueName string) string {
return "/" + queues + "/" + encodePathSegments(queueName) + "/messages"
}

func exchangePath(exchangeName string) string {
return "/" + exchanges + "/" + encodePathSegments(exchangeName)
}
Expand Down
1 change: 1 addition & 0 deletions rabbitmq_amqp/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type IQueueSpecification interface {
MaxLengthBytes(length int64) IQueueSpecification
DeadLetterExchange(dlx string) IQueueSpecification
DeadLetterRoutingKey(dlrk string) IQueueSpecification
Purge(ctx context.Context) (int, error)
}

// IQueueInfo represents the information of a queue
Expand Down

0 comments on commit fd45ee2

Please sign in to comment.