-
Notifications
You must be signed in to change notification settings - Fork 0
messaging
Benjamin Bennett edited this page Dec 13, 2020
·
15 revisions
This is a simple implementation of message publishing using RabbitMQ and a Go RabbitMQ Client Library.
cd go-bits/messaging
docker-compose up
go run -race main.go
data:image/s3,"s3://crabby-images/4fafb/4fafbfd7465d3b905fc53e02393189ef2e6fda88" alt=""
- A connection manager is created and handles:
- Initial connection to RabbitMQ and automated re-connection if the connection is closed.
- Obtaining a server channel (not to be confused with a go channel).
- Connection to RabbitMQ is initiated.
- A queue is configured.
- A message publisher is created.
- Loop is used to publish messages.
data:image/s3,"s3://crabby-images/8d4e5/8d4e5c1d3d46fc8f9488dc460182640bc8d892d3" alt=""
Configuration for the connection manager is passed to NewConnManager
and used to set:
- The address for RabbitMQ.
- The retry interval.
- The maximum number of retry attempts.
data:image/s3,"s3://crabby-images/48a80/48a80a0953897245ec23bbc9f75ccd28d4f27ad8" alt=""
- A new connection is created on the connection manager struct by calling
dial()
. -
NotifyClose
registers a listener (a channel of*amqp.Error
) and assigns it tonotifyConnClose
on the connection manager struct.- If the connection is closed normally, the channel will be closed.
- If there is a connection error, the channel will receive a pointer to an
Error
.
- A go routine is started to handle reconnecting to RabbitMQ if the connection is closed.
data:image/s3,"s3://crabby-images/fbdb8/fbdb8342246019c242b1fb80f6bd20d44992d321" alt=""
- The
reconnect
func blocks waiting for context cancellation or - for a connection error, or for the connection to be closed.
- The number of retries relative to the maximum permitted number of retries is checked.
- The
dial
func is called to attempt to establish a connection. - If
dial
returns an error then the process blocks until context is cancelled or - the retry interval has passed.
- If the connection is successful then
NotifyClose
is called (see above) - and
reconnect
calls itself.
data:image/s3,"s3://crabby-images/c9bed/c9bedad8cc51c9e2aa89f96249ce1d6761def1e9" alt=""
- A channel is obtained from the connection.
- An exchange is declared.
- A queue is declared.
- The queue is then bound to the exchange.
data:image/s3,"s3://crabby-images/f9051/f90517b4a0f1268213b51faaf012d92ce537d2db" alt=""
-
NewPublisher
returns a pointer to a populated struct.
data:image/s3,"s3://crabby-images/83a10/83a1033ab30e9d39d2eea8db9b2687ccf3c395ac" alt=""
- A channel is obtained from the connection.
-
Confirm
puts the channel in confirm mode in order to verify that all messages have been successfully received by the server. - A message is published on the channel.
- The
select
blocks until either the publishing is confirmed and acknowledged or a timeout has been triggered waiting for the notification and confirmation.