A lightweight, type-safe, reliable message queue system built on top of LISTEN/NOTIFY. It supports multiple topics, retries, and concurrent processing.
You can use the provided migrations with goose to set up your database.
See the full example
type msg struct{}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
connectionString := "host=localhost port=5432 user=postgres password=postgres dbname=postgres"
// Create a new client
client, err := pogomq.NewClient[msg](ctx, connectionString, pogomq.WithAutoComplete(), pogomq.WithTopic("pingpong"))
if err != nil {
panic(err)
}
defer client.Close()
// The message handler receives messages from the queue and returns a result indicating whether processing completed or failed, with optional messages to be sent back to the queue.
messageHandler := func(ctx context.Context, message pogomq.Message[msg]) pogomq.MessageResult {
switch message.GetId() {
case "ping":
println("Received ping message")
scheduled := time.Now().Add(time.Second)
pongMsg := pogomq.NewScheduledMessage("pong", msg{}, scheduled)
return message.Complete().Delete().Publish(pongMsg)
default:
println("Received pong message")
scheduled := time.Now().Add(time.Second)
pingMsg := pogomq.NewScheduledMessage("ping", msg{}, scheduled)
return message.Complete().Delete().Publish(pingMsg)
}
}
// Start the subscriber with the message handler
errs, err := client.Subscribe(ctx, messageHandler)
if err != nil {
panic(err)
}
// Create a new message with a unique ID
ping := pogomq.NewMessage("ping", msg{})
// Publish the message to the queue
if err := client.Publish(ctx, ping); err != nil {
cancel()
panic(err)
}
// Catch subscriber errors
select {
case err = <-errs:
panic(err)
}
}
PogoMQ clients can be configured with several options:
Option | Description | Default |
---|---|---|
WithAutoComplete |
Automatically complete messages after successful delivery | false |
WithMaxDeliveryCount |
Maximum number of delivery attempts | 1 |
WithTopic |
Topic name for the client | "default" |
WithTTL |
Time-to-live duration for messages | none |
WithWorkerCount |
Number of concurrent workers | 1 |
client.Close()
: Close the client and release resourcesclient.MessageCounts(ctx)
: Get counts of messages by status, returns a struct withActive
,Completed
,Failed
, andScheduled
integer fieldsclient.Publish(ctx, message)
: Publish a message to the queueclient.PurgeAllMessages(ctx)
: Delete all messagesclient.PurgeCompletedMessages(ctx)
: Delete all completed messagesclient.PurgeFailedMessages(ctx)
: Delete all failed messagesclient.PurgeTTLMessages(ctx)
: Delete all messages with expired TTLclient.ReadFailedMessages(ctx, limit)
: Read failed messages up to the specified limitclient.ResetFailedMessages(ctx)
: Reschedule all failed messages that exceeded max delivery count for reprocessingclient.ResetFailedMessage(ctx, id)
: Reschedule an individual failed message that exceeded max delivery count for reprocessingclient.Subscribe(ctx, messageHandler)
: Start listening for messages, returns an error chan
msg.Complete()
: Mark the message as completed, returns amessageResultCompleted
msg.GetBody()
: The message bodymsg.GetDeliveryCount()
: The number of times the message has been delivered, useful for calculating retry intervalsmsg.GetId()
: The message idmsg.GetScheduled()
: Schedule time for deliverymsg.GetTTL()
: Get the TTL timestamp, if setmsg.Fail()
: Mark the message as failed and to be retried immediately, returns amessageResultFailed
msg.SetScheduled(time)
: Set the time for deliverymsg.SetTTL(duration)
: Set the TTL differently to the default set in the client
result.Delete()
: Delete the message from the queue instead of marking it as completedresult.Publish(messages...)
: Publish additional messages to the queue
result.Delete()
: Delete the message from the queue instead of marking it as failedresult.Publish(messages...)
: Publish additional messages to the queueresult.Reschedule(time)
: Reschedule the message to be processed at a specified time