Skip to content

Commit

Permalink
【优化队列持久化】优化案例,增加队列持久化参数解决已经存在的队列持久化为false的连接异常错误
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxi2015 committed Aug 10, 2022
1 parent ddb140e commit 4c13dbf
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 22 deletions.
16 changes: 9 additions & 7 deletions examples/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ package main

import (
"fmt"
go_rabbitmq "github.com/MQEnergy/go-rabbitmq"
gorabbitmq "github.com/MQEnergy/go-rabbitmq"
"sync"
"time"
)

func main() {
config := &go_rabbitmq.Config{
User: "root",
Password: "",
Host: "",
config := &gorabbitmq.Config{
User: "guest",
Password: "guest",
Host: "127.0.0.1",
Port: "5672",
Vhost: "",
}
mq := go_rabbitmq.New(config, "test", "", "", 0, 1)
// 注意 队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
// 已存在的队列 查看 Features参数是否为持久化(D),不存在的队列按需设置是否持久化
mq := gorabbitmq.New(config, "test", "", "", 0, 10, false)
time.Sleep(time.Second * 1)
amqphandler(mq, 3)
}

// amqphandler 消息队列处理
func amqphandler(mq *go_rabbitmq.RabbitMQ, consumerNum int) error {
func amqphandler(mq *gorabbitmq.RabbitMQ, consumerNum int) error {
var wg sync.WaitGroup
cherrors := make(chan error)
wg.Add(consumerNum)
Expand Down
25 changes: 15 additions & 10 deletions examples/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package main

import (
"fmt"
go_rabbitmq "github.com/MQEnergy/go-rabbitmq"
gorabbitmq "github.com/MQEnergy/go-rabbitmq"
"time"
)

func main() {
config := &go_rabbitmq.Config{
User: "root",
Password: "",
Host: "",
config := &gorabbitmq.Config{
User: "guest",
Password: "guest",
Host: "127.0.0.1",
Port: "5672",
Vhost: "",
}
mq := go_rabbitmq.New(config, "test", "", "", 0, 1)
// 注意 队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
// 已存在的队列 查看 Features参数是否为持久化(D),不存在的队列按需设置是否持久化
mq := gorabbitmq.New(config, "test", "bms", "", 0, 1, false)
// 需要等待一秒钟
time.Sleep(time.Second * 1)

data := []byte("{\"hello\":\"world " + time.Now().Format("2006-01-02 15:04:05") + "\"}")
if err := mq.Push(data); err != nil {
panic(err)
for {
time.Sleep(time.Second * 1)
data := []byte("{\"hello\":\"world " + time.Now().Format("2006-01-02 15:04:05") + "\"}")
if err := mq.Push(data); err != nil {
panic(err)
}
fmt.Println("Push succeeded!", string(data))
}
fmt.Println("Push succeeded!", string(data))
}
13 changes: 8 additions & 5 deletions rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type (
Type string // 交换机连接方式 direct topic fanout headers 可为空
Done chan bool
isReady bool
PrefetchCount int // 消费者消费数据限流数
PrefetchCount int // 消费者消费数据限流数
Durable bool // 是否queue队列持久化
}

// Config amqp配置
Expand Down Expand Up @@ -53,10 +54,11 @@ var (
errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
errShutdown = errors.New("session is shutting down")
errFailedToPush = errors.New("failed to push: not connected")
)

// New 创建一个新的消费者状态实例,并自动尝试连接到服务器
func New(config *Config, queueName, exchange, routeKey string, exchangeType, prefetchCount int) *RabbitMQ {
func New(config *Config, queueName, exchange, routeKey string, exchangeType, prefetchCount int, durable bool) *RabbitMQ {
// amqp 出现url.Parse导致的错误 是因为特殊字符需要进行urlencode编码
password := url.QueryEscape(config.Password)
// amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
Expand Down Expand Up @@ -85,6 +87,7 @@ func New(config *Config, queueName, exchange, routeKey string, exchangeType, pre
Addr: addr,
Done: make(chan bool),
PrefetchCount: prefetchCount,
Durable: durable,
}
go rabbitmq.handleReconnect(rabbitmq.Addr)
return rabbitmq
Expand Down Expand Up @@ -159,8 +162,8 @@ func (m *RabbitMQ) init(conn *amqp.Connection) error {
}
_, err = ch.QueueDeclare(
m.QueueName,
// 是否持久化
false,
// 是否持久化 队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
m.Durable,
// 是否为自动删除
false,
// 是否具有排他性
Expand Down Expand Up @@ -222,7 +225,7 @@ func (m *RabbitMQ) changeChannel(ch *amqp.Channel) {
// 直到服务器发送确认信息。错误是只在推送操作本身失败时返回,参见UnsafePush。
func (m *RabbitMQ) Push(data []byte) error {
if m.isReady == false {
return errors.New("failed to push push: not connected")
return errFailedToPush
}
for {
if err := m.UnsafePush(data); err != nil {
Expand Down

0 comments on commit 4c13dbf

Please sign in to comment.