rxjs components to interact with rabbitmq message broker
Based on the RabbitMQ tutorials, I created these rxjs observables to interact with the different queue and exchange types in RabbitMQ.
I did not implement yet the RPC functionality, hopefully I can add that in a later version.
basic queue sender:
const { rabbitmq$, basicSubmitter$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { basicSubmitter$ } = require ('rx-rabbitmq/basic')
const { switchMap } = require ('rxjs/operators')
rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
switchMap (conn => basicSubmitter$ (conn, 'my_basic_queue'))
)
.subscribe ({
next: q => {
q.next ({ message: 'message 1' }) // send some messages
q.next ({ message: 'another message' })
q.next ({ message: { name: 'mymsg', ... } }) // you can also send an object as the message, will be JSON.stringify ()'d
q.complete () // closing the connection
},
complete: () => console.log ('Connection closed')
})
basic queue receiver:
const { rabbitmq$, basicReceiver$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { basicReceiver$ } = require ('rx-rabbitmq/basic')
const { switchMap } = require ('rxjs/operators')
rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
switchMap (conn => basicReceiver$ (conn, 'my_basic_queue'))
)
.subscribe ({
next: msg => console.log (msg.message)
})
topic exchange sender:
const { rabbitmq$, topicSubmitter$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { topicSubmitter$ } = require ('rx-rabbitmq/topic')
const { switchMap } = require ('rxjs/operators')
rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
switchMap (conn => topicSubmitter$ (conn, 'my_topic_exchange'))
)
.subscribe ({
next: ex => {
ex.next ({ key: 'level1.level2.level3', message: 'message 1' }) // send some messages
ex.next ({ key: 'level1.anotherlevel.level3', message: { name: 'mymsg', ... } }) // you can also send an object as the message, will be JSON.stringify ()'d
ex.complete () // closing the connection
},
complete: () => console.log ('Connection closed')
})
topic exchange receiver:
const { rabbitmq$, topicReceiver$ } = require ('rx-rabbitmq')
- or -
const { rabbitmq$ } = require ('rx-rabbitmq/connection')
const { topicReceiver$ } = require ('rx-rabbitmq/topic')
const { switchMap } = require ('rxjs/operators')
rabbitmq$ () // creating connection on amqp://localhost:5672
.pipe (
switchMap (conn => topicReceiver$ (conn, 'my_topic_exchange', 'level1.*.level3'))
)
.subscribe ({
next: msg => console.log (`Received ${msg.message} with key ${msg.routingKey}`)
})
// to import all objects, you can use:
const {
rabbitmq$,
basicReceiver$, basicSubmitter$,
workqueueReceiver$, workqueueSubmitter$,
pubsubReceiver$, pubsubSubmitter$,
routingReceiver$, routingSubmitter$,
topicReceiver$, topicSubmitter$
} = require ('rx-rabbitmq')
// or when you only want to import the necessary objects
const {rabbitmq$} = require ('rx-rabbitmq/connection')
const { basicSubmitter$, basicReceiver$ } = require ('rx-rabbitmq/basic')
const { workqueueSubmitter$, workqueueReceiver$ } = require ('rx-rabbitmq/workqueue')
const { pubsubSubmitter$, pubsubReceiver$ } = require ('rx-rabbitmq/pubsub')
const { routingSubmitter$, routingReceiver$ } = require ('rx-rabbitmq/routing')
const { topicSubmitter$, topicReceiver$ } = require ('rx-rabbitmq/topic')
With npm installed, run
$ npm install rx-rabbitmq
rx-rabbitmq was inspired by the creators of amqp and the excellent getting started guide of RabbitMQ
ISC