This repository is a fork of https://github.com/rhinof/grabbit. RabbitBus separated transactional database provider from source code. We'll manage a separate project to use database providers as driver to store transactional information.
RabbitMQ based service bus for GoLang
Install library in your existing go project
go get github.com/logiqbits/go-rabbitbus
Pattern wise messaging examples
Service bus builder
func createServiceBus(conn, serviceName string) gbus.Bus {
return builder.
New().
Bus(conn).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(serviceName)
}
Define command and reply structure, connections and service name strings
type Command1 struct{}
func (Command1) SchemaName() string {
return "cmd1"
}
type Reply1 struct{}
func (Reply1) SchemaName() string {
return "reply1"
}
connection := "amqp://guest:guest@localhost"
commandServiceName := "svc.cmd"
replyServiceName := "svc.cmd.reply"
Command service
commandService := createServiceBus(connection, commandServiceName)
commandService.HandleMessage(Reply1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println("[Received Reply]")
return nil
})
commandService.Start()
defer commandService.Shutdown()
commandService.Send(context.Background(), replyServiceName, gbus.NewBusMessage(Command1{}))
Reply service
replyService := createServiceBus(connection, replyServiceName)
replyService.HandleMessage(Command1{}, func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println("[Received Command]")
log.Println("[Dispatching Reply]")
return invocation.Reply(context.Background(), gbus.NewBusMessage(Reply1{}))
})
replyService.Start()
defer replyService.Shutdown()
Define event
type Event1 struct {
Data string
}
func (Event1) SchemaName() string {
return "Event1"
}
Subscriber
connection := "amqp://guest:guest@localhost"
const eventName = "service.event.customname"
bus := builder.
New().
Bus(connection).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(eventName)
eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
log.Println(message.Payload)
return nil
}
bus.HandleEvent("test_exchange", "test_topic", Event1{}, eventHandler)
bus.Start()
defer bus.Shutdown()
Publisher
connection := "amqp://guest:guest@localhost"
const eventName = "service.event.customname"
bus := builder.
New().
Bus(connection).
WithPolicies(&policy.Durable{}).
WithConfirms().
Build(eventName)
//need to start the bus before sending/publishing
bus.Start()
err := bus.Publish(context.Background(), "test_exchange", "test_topic", gbus.NewBusMessage(&Event1{Data: time.Now().String()}))
if err != nil {
log.Fatal(err)
}
defer bus.Shutdown()
Defining service names, connection string, service bus builder method
const (
serverServiceName = "serverServiceName"
invokerServiceName = "invokerServiceName"
)
type RpcRequest struct {
Data string
}
func (RpcRequest) SchemaName() string {
return "logiqbits.rpc.request"
}
type RpcResponse struct {
Data string
}
func (RpcResponse) SchemaName() string {
return "logiqbits.rpc.response"
}
func createRpcBus(conn, svcName string) gbus.Bus {
return builder.
New().
Bus(conn).
WithPolicies(&policy.Durable{}).
WithConfirms().
WithDeadlettering("dead-logiqbits-rabbitbus").
PurgeOnStartUp().
Build(svcName)
}
Server
connection := "amqp://guest:guest@localhost"
handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error {
req, ok := message.Payload.(*RpcRequest)
if !ok {
log.Fatalln("failed to parse request body")
}
return invocation.Reply(context.Background(), gbus.NewBusMessage(RpcResponse{
Data: fmt.Sprintf("Hello %s", req.Data),
}))
}
serverService := createRpcBus(connection, serverServiceName)
serverService.HandleMessage(RpcRequest{}, handler)
serverService.Start()
defer serverService.Shutdown()
Invoker
invokerService := createRpcBus(connection, invokerServiceName)
invokerService.Start()
defer invokerService.Shutdown()
log.Println("Sending RPC")
reply, err := invokerService.RPC(
context.Background(),
serverServiceName,
gbus.NewBusMessage(RpcRequest{Data: "Mr. Jack"}),
gbus.NewBusMessage(RpcResponse{}),
5*time.Second)
if err != nil {
log.Fatal(err)
}
log.Println(reply.Payload) // should be 'Hello Mr. Jack'