Skip to content

Commit

Permalink
Merge pull request #17 from syncromatics/hotfix/amqp-publish-overload
Browse files Browse the repository at this point in the history
(+semver: fix) AMQP publish with routing key
  • Loading branch information
dkaminski authored Dec 2, 2021
2 parents 2de95da + 01f4da3 commit d4ad693
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
18 changes: 18 additions & 0 deletions amqp/exchangePublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,21 @@ func (p *ExchangePublisher) Publish(exchangeName string, headers map[string]stri

return nil
}

// PublishWithRoutingKey publishes a message to the given exchange, with a routing key to specify the queue
func (p *ExchangePublisher) PublishWithRoutingKey(exchangeName string, routingKey string, body []byte) error {
channel, err := p.connection.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel to broker")
}
defer channel.Close()

err = channel.Publish(exchangeName, routingKey, false, false, amqp.Publishing{
Body: body,
})
if err != nil {
return errors.Wrap(err, "failed to publish message")
}

return nil
}
41 changes: 41 additions & 0 deletions amqp/exchangePublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,44 @@ func Test_Publish_Successful(t *testing.T) {
}

}

func Test_PublishWithRoutingKey_MatchSuccessful(t *testing.T) {
// Arrange
conn, err := amqp.Dial(amqpURL)
assert.Nil(t, err)
defer conn.Close()

channel, err := conn.Channel()
assert.Nil(t, err)
defer channel.Close()

testQueueName := fmt.Sprintf("test.%v", uuid.NewString())
_, err = channel.QueueDeclare(testQueueName, false, true, true, false, nil)
assert.Nil(t, err)

err = channel.QueueBind(testQueueName, "prefix.123.#", EXCHANGE_NAME, false, nil)
assert.Nil(t, err)

actualMessages, err := channel.Consume(testQueueName, uuid.NewString(), true, true, false, false, nil)
assert.Nil(t, err)

publisher := sut.NewExchangePublisher(amqpURL)

err = publisher.EnsurePublisherIsReady()
assert.Nil(t, err)

expectedBody := []byte(`{"VehicleId":1}`)

// Act
err = publisher.PublishWithRoutingKey(EXCHANGE_NAME, "prefix.123.otherstuff.789", expectedBody)

// Assert
assert.Nil(t, err)

select {
case actual := <-actualMessages:
assert.Equal(t, expectedBody, actual.Body)
case <-time.After(3 * time.Second):
assert.Fail(t, "expected to receive message within a timely manner")
}
}

0 comments on commit d4ad693

Please sign in to comment.