-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* remove builders --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
- Loading branch information
1 parent
5fc29f4
commit 60e006b
Showing
19 changed files
with
680 additions
and
498 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,93 +1,87 @@ | ||
package main | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"fmt" | ||
mq "github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" | ||
"os" | ||
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp" | ||
"time" | ||
) | ||
|
||
func main() { | ||
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n") | ||
chStatusChanged := make(chan *mq.StatusChanged, 1) | ||
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1) | ||
|
||
go func(ch chan *mq.StatusChanged) { | ||
go func(ch chan *rabbitmq_amqp.StatusChanged) { | ||
for statusChanged := range ch { | ||
fmt.Printf("Status changed from %d to %d\n", statusChanged.From, statusChanged.To) | ||
fmt.Printf("%s\n", statusChanged) | ||
} | ||
}(chStatusChanged) | ||
|
||
amqpConnection := mq.NewAmqpConnection() | ||
amqpConnection.NotifyStatusChange(chStatusChanged) | ||
err := amqpConnection.Open(context.Background(), mq.NewConnectionSettings()) | ||
amqpConnection := rabbitmq_amqp.NewAmqpConnectionNotifyStatusChanged(chStatusChanged) | ||
err := amqpConnection.Open(context.Background(), rabbitmq_amqp.NewConnectionSettings()) | ||
if err != nil { | ||
fmt.Printf("Error opening connection: %v\n", err) | ||
return | ||
} | ||
|
||
fmt.Printf("AMQP Connection opened.\n") | ||
management := amqpConnection.Management() | ||
queueSpec := management.Queue("getting_started_queue"). | ||
QueueType(mq.QueueType{Type: mq.Quorum}). | ||
MaxLengthBytes(mq.CapacityGB(1)) | ||
exchangeSpec := management.Exchange("getting_started_exchange"). | ||
ExchangeType(mq.ExchangeType{Type: mq.Topic}) | ||
|
||
queueInfo, err := queueSpec.Declare(context.Background()) | ||
exchangeInfo, err := management.DeclareExchange(context.TODO(), &rabbitmq_amqp.ExchangeSpecification{ | ||
Name: "getting-started-exchange", | ||
}) | ||
if err != nil { | ||
fmt.Printf("Error declaring queue %s\n", err) | ||
fmt.Printf("Error declaring exchange: %v\n", err) | ||
return | ||
} | ||
fmt.Printf("Queue %s created.\n", queueInfo.GetName()) | ||
|
||
exchangeInfo, err := exchangeSpec.Declare(context.Background()) | ||
queueInfo, err := management.DeclareQueue(context.TODO(), &rabbitmq_amqp.QueueSpecification{ | ||
Name: "getting-started-queue", | ||
QueueType: rabbitmq_amqp.QueueType{Type: rabbitmq_amqp.Quorum}, | ||
}) | ||
|
||
if err != nil { | ||
fmt.Printf("Error declaring exchange %s\n", err) | ||
fmt.Printf("Error declaring queue: %v\n", err) | ||
return | ||
} | ||
fmt.Printf("Exchange %s created.\n", exchangeInfo.GetName()) | ||
|
||
bindingSpec := management.Binding().SourceExchange(exchangeSpec).DestinationQueue(queueSpec).Key("routing-key") | ||
bindingPath, err := management.Bind(context.TODO(), &rabbitmq_amqp.BindingSpecification{ | ||
SourceExchange: exchangeInfo.Name(), | ||
DestinationQueue: queueInfo.Name(), | ||
BindingKey: "routing-key", | ||
}) | ||
|
||
err = bindingSpec.Bind(context.Background()) | ||
if err != nil { | ||
fmt.Printf("Error binding %s\n", err) | ||
fmt.Printf("Error binding: %v\n", err) | ||
return | ||
} | ||
|
||
fmt.Printf("Binding between %s and %s created.\n", exchangeInfo.GetName(), queueInfo.GetName()) | ||
|
||
fmt.Println("Press any key to cleanup and exit") | ||
reader := bufio.NewReader(os.Stdin) | ||
_, _ = reader.ReadString('\n') | ||
err = management.Unbind(context.TODO(), bindingPath) | ||
|
||
err = bindingSpec.Unbind(context.Background()) | ||
if err != nil { | ||
fmt.Printf("Error unbinding %s\n", err) | ||
fmt.Printf("Error unbinding: %v\n", err) | ||
return | ||
} | ||
|
||
fmt.Printf("Binding between %s and %s deleted.\n", exchangeInfo.GetName(), queueInfo.GetName()) | ||
|
||
err = exchangeSpec.Delete(context.Background()) | ||
err = management.DeleteExchange(context.TODO(), exchangeInfo.Name()) | ||
if err != nil { | ||
fmt.Printf("Error deleting exchange %s\n", err) | ||
fmt.Printf("Error deleting exchange: %v\n", err) | ||
return | ||
} | ||
|
||
err = queueSpec.Delete(context.Background()) | ||
err = management.DeleteQueue(context.TODO(), queueInfo.Name()) | ||
if err != nil { | ||
fmt.Printf("Error deleting queue: %v\n", err) | ||
return | ||
} | ||
fmt.Printf("Queue %s deleted.\n", queueInfo.GetName()) | ||
|
||
err = amqpConnection.Close(context.Background()) | ||
if err != nil { | ||
fmt.Printf("Error closing connection: %v\n", err) | ||
return | ||
} | ||
|
||
fmt.Printf("AMQP Connection closed.\n") | ||
// Wait for the status change to be printed | ||
time.Sleep(500 * time.Millisecond) | ||
|
||
close(chStatusChanged) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
package rabbitmq_amqp | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"net/url" | ||
"strings" | ||
) | ||
|
||
type AddressBuilder struct { | ||
queue *string | ||
exchange *string | ||
key *string | ||
append *string | ||
} | ||
|
||
func NewAddressBuilder() *AddressBuilder { | ||
return &AddressBuilder{} | ||
} | ||
|
||
func (a *AddressBuilder) Queue(queue string) *AddressBuilder { | ||
a.queue = &queue | ||
return a | ||
} | ||
|
||
func (a *AddressBuilder) Exchange(exchange string) *AddressBuilder { | ||
a.exchange = &exchange | ||
return a | ||
} | ||
|
||
func (a *AddressBuilder) Key(key string) *AddressBuilder { | ||
a.key = &key | ||
return a | ||
} | ||
|
||
func (a *AddressBuilder) Append(append string) *AddressBuilder { | ||
a.append = &append | ||
return a | ||
} | ||
|
||
func (a *AddressBuilder) Address() (string, error) { | ||
if a.exchange == nil && a.queue == nil { | ||
return "", errors.New("exchange or queue must be set") | ||
} | ||
|
||
urlAppend := "" | ||
if !isStringNilOrEmpty(a.append) { | ||
urlAppend = *a.append | ||
} | ||
if !isStringNilOrEmpty(a.exchange) && !isStringNilOrEmpty(a.queue) { | ||
return "", errors.New("exchange and queue cannot be set together") | ||
} | ||
|
||
if !isStringNilOrEmpty(a.exchange) { | ||
if !isStringNilOrEmpty(a.key) { | ||
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + "/" + encodePathSegments(*a.key) + urlAppend, nil | ||
} | ||
return "/" + exchanges + "/" + encodePathSegments(*a.exchange) + urlAppend, nil | ||
} | ||
|
||
if a.queue == nil { | ||
return "", nil | ||
} | ||
|
||
if isStringNilOrEmpty(a.queue) { | ||
return "", errors.New("queue must be set") | ||
} | ||
|
||
return "/" + queues + "/" + encodePathSegments(*a.queue) + urlAppend, nil | ||
} | ||
|
||
// encodePathSegments takes a string and returns its percent-encoded representation. | ||
func encodePathSegments(input string) string { | ||
var encoded strings.Builder | ||
|
||
// Iterate over each character in the input string | ||
for _, char := range input { | ||
// Check if the character is an unreserved character (i.e., it doesn't need encoding) | ||
if isUnreserved(char) { | ||
encoded.WriteRune(char) // Append as is | ||
} else { | ||
// Encode character To %HH format | ||
encoded.WriteString(fmt.Sprintf("%%%02X", char)) | ||
} | ||
} | ||
|
||
return encoded.String() | ||
} | ||
|
||
// Decode takes a percent-encoded string and returns its decoded representation. | ||
func decode(input string) (string, error) { | ||
// Use url.QueryUnescape which properly decodes percent-encoded strings | ||
decoded, err := url.QueryUnescape(input) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
return decoded, nil | ||
} | ||
|
||
// isUnreserved checks if a character is an unreserved character in percent encoding | ||
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~ | ||
func isUnreserved(char rune) bool { | ||
return (char >= 'A' && char <= 'Z') || | ||
(char >= 'a' && char <= 'z') || | ||
(char >= '0' && char <= '9') || | ||
char == '-' || char == '.' || char == '_' || char == '~' | ||
} | ||
|
||
func bindingPath() string { | ||
return "/" + bindings | ||
} | ||
|
||
func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName, key string) string { | ||
sourceNameEncoded := encodePathSegments(sourceName) | ||
destinationNameEncoded := encodePathSegments(destinationName) | ||
keyEncoded := encodePathSegments(key) | ||
destinationType := "dste" | ||
if toQueue { | ||
destinationType = "dstq" | ||
} | ||
format := "/%s/src=%s;%s=%s;key=%s;args=" | ||
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package rabbitmq_amqp | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
) | ||
|
||
var _ = Describe("Address builder test ", func() { | ||
It("With exchange, queue and key should raise and error", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Queue("queue").Exchange("exchange").Key("key") | ||
_, err := addressBuilder.Address() | ||
Expect(err).NotTo(BeNil()) | ||
Expect(err.Error()).To(Equal("exchange and queue cannot be set together")) | ||
}) | ||
|
||
It("Without exchange and queue should raise and error", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
_, err := addressBuilder.Address() | ||
Expect(err).NotTo(BeNil()) | ||
Expect(err.Error()).To(Equal("exchange or queue must be set")) | ||
}) | ||
|
||
It("With exchange and key should return address", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Exchange("my_exchange").Key("my_key") | ||
address, err := addressBuilder.Address() | ||
Expect(err).To(BeNil()) | ||
Expect(address).To(Equal("/exchanges/my_exchange/my_key")) | ||
}) | ||
|
||
It("With exchange should return address", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Exchange("my_exchange") | ||
address, err := addressBuilder.Address() | ||
Expect(err).To(BeNil()) | ||
Expect(address).To(Equal("/exchanges/my_exchange")) | ||
}) | ||
|
||
It("With exchange and key with names to encode should return the encoded address", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Exchange("my_ exchange/()").Key("my_key ") | ||
address, err := addressBuilder.Address() | ||
Expect(err).To(BeNil()) | ||
Expect(address).To(Equal("/exchanges/my_%20exchange%2F%28%29/my_key%20")) | ||
}) | ||
|
||
It("With queue should return address", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Queue("my_queue>") | ||
address, err := addressBuilder.Address() | ||
Expect(err).To(BeNil()) | ||
Expect(address).To(Equal("/queues/my_queue%3E")) | ||
}) | ||
|
||
It("With queue and append should return address", func() { | ||
addressBuilder := NewAddressBuilder() | ||
Expect(addressBuilder).NotTo(BeNil()) | ||
Expect(addressBuilder).To(BeAssignableToTypeOf(&AddressBuilder{})) | ||
addressBuilder.Queue("my_queue").Append("/messages") | ||
address, err := addressBuilder.Address() | ||
Expect(err).To(BeNil()) | ||
Expect(address).To(Equal("/queues/my_queue/messages")) | ||
}) | ||
|
||
}) |
Oops, something went wrong.