Skip to content

feat: add reconnect for mq producer #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion mq/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ package mq
import (
"context"
"crypto/md5"
"errors"
"fmt"
"math"
"time"

amqp "github.com/rabbitmq/amqp091-go"
"go.opentelemetry.io/otel"
"golang.org/x/sync/singleflight"
)

type Producer struct {
Conn *amqp.Connection
Channel *amqp.Channel

amqpURI string // AMQP URI for RabbitMQ reconnection
sf *singleflight.Group

appId string
}

Expand All @@ -21,7 +29,9 @@ func NewProducer(appId string, amqpURI string) (*Producer, error) {
}

p := &Producer{
appId: appId,
appId: appId,
amqpURI: amqpURI,
sf: new(singleflight.Group),
}

var err error
Expand All @@ -34,6 +44,53 @@ func NewProducer(appId string, amqpURI string) (*Producer, error) {
return p, nil
}

func (p *Producer) isConnected() bool {
return !p.Conn.IsClosed() && !p.Channel.IsClosed()
}

func (p *Producer) connectFn() error {
if p.isConnected() {
return nil
}

_, err, _ := p.sf.Do("reconnect", func() (interface{}, error) {
var lastErr error
for i := 0; i < 3; i++ {
if p.isConnected() {
return nil, nil
}

if i > 0 {
time.Sleep(time.Second * time.Duration(math.Pow(2, float64(i-1))))
}

conn, channel, err := initConnection(p.amqpURI)
if err != nil {
lastErr = fmt.Errorf("reconnect attempt %d failed: %s", i+1, err)
continue
}

oldConn := p.Conn
oldChannel := p.Channel
p.Conn = conn
p.Channel = channel

if oldChannel != nil {
_ = oldChannel.Close()
}
if oldConn != nil {
_ = oldConn.Close()
}

return nil, nil
}

return nil, lastErr
})

return err
}
Comment on lines +51 to +92
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

存在线程安全问题,需要加锁保护连接替换操作

当前的连接替换逻辑(第73-83行)存在竞态条件。在读取旧连接和关闭它们之间,其他 goroutine 可能正在使用这些连接,导致潜在的并发问题。

建议使用互斥锁保护连接替换操作:

type Producer struct {
	Conn    *amqp.Connection
	Channel *amqp.Channel
+	mu      sync.RWMutex  // 保护连接的读写

	amqpURI string
	sf      *singleflight.Group

	appId string
}

然后在 connectFn 中:

-			oldConn := p.Conn
-			oldChannel := p.Channel
-			p.Conn = conn
-			p.Channel = channel
+			p.mu.Lock()
+			oldConn := p.Conn
+			oldChannel := p.Channel
+			p.Conn = conn
+			p.Channel = channel
+			p.mu.Unlock()

同时,需要在 isConnected()publish() 方法中使用读锁来访问连接。

另外,建议将错误格式化从 %s 改为 %v 以更好地处理各种错误类型:

-				lastErr = fmt.Errorf("reconnect attempt %d failed: %s", i+1, err)
+				lastErr = fmt.Errorf("reconnect attempt %d failed: %v", i+1, err)
🤖 Prompt for AI Agents
In mq/producer.go lines 51 to 92, the connection replacement code from lines 73
to 83 is not thread-safe and can cause race conditions when multiple goroutines
access or modify the connection concurrently. To fix this, introduce a mutex
lock to protect the connection and channel replacement operations inside the
reconnect logic. Also, modify the isConnected() and publish() methods to use a
read lock when accessing the connection to ensure safe concurrent reads.
Additionally, update the error formatting in the reconnect attempts from using
%s to %v to properly handle different error types.


func (p *Producer) PublishNotice(ctx context.Context, data *NoticeTemplate, options ...string) error {

if data == nil {
Expand Down Expand Up @@ -107,6 +164,28 @@ func (p *Producer) publish(ctx context.Context, key string, msg []byte, opts map
Headers: headers,
})

if err != nil && errors.Is(err, amqp.ErrClosed) {
if err = p.connectFn(); err != nil {
return err
}

err = p.Channel.PublishWithContext(
ctx,
exchangeName,
key,
false,
false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: msg,
AppId: p.appId,
UserId: opts[UserIdKey],
MessageId: fmt.Sprintf("%x", md5.Sum(msg)),
Headers: headers,
})
}
Comment on lines +167 to +187
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

重构建议:消除代码重复并改进重试逻辑

当前实现存在代码重复问题,且重试逻辑可以更加健壮。

建议将发布逻辑提取到一个内部方法中,并实现更完善的重试机制:

-	err := p.Channel.PublishWithContext(
-		ctx,
-		exchangeName,
-		key,
-		false,
-		false,
-		amqp.Publishing{
-			ContentType:  "application/json",
-			DeliveryMode: amqp.Persistent,
-			Body:         msg,
-			AppId:        p.appId,
-			UserId:       opts[UserIdKey],
-			MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
-			Headers:      headers,
-		})
-
-	if err != nil && errors.Is(err, amqp.ErrClosed) {
-		if err = p.connectFn(); err != nil {
-			return err
-		}
-
-		err = p.Channel.PublishWithContext(
-			ctx,
-			exchangeName,
-			key,
-			false,
-			false,
-			amqp.Publishing{
-				ContentType:  "application/json",
-				DeliveryMode: amqp.Persistent,
-				Body:         msg,
-				AppId:        p.appId,
-				UserId:       opts[UserIdKey],
-				MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
-				Headers:      headers,
-			})
-	}
-
-	return err
+	publishing := amqp.Publishing{
+		ContentType:  "application/json",
+		DeliveryMode: amqp.Persistent,
+		Body:         msg,
+		AppId:        p.appId,
+		UserId:       opts[UserIdKey],
+		MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
+		Headers:      headers,
+	}
+
+	// 最多重试 2 次
+	for i := 0; i < 2; i++ {
+		err := p.Channel.PublishWithContext(
+			ctx,
+			exchangeName,
+			key,
+			false,
+			false,
+			publishing,
+		)
+		
+		if err == nil {
+			return nil
+		}
+		
+		if !errors.Is(err, amqp.ErrClosed) {
+			return err
+		}
+		
+		// 尝试重连
+		if reconnectErr := p.connectFn(); reconnectErr != nil {
+			return fmt.Errorf("publish failed: %v, reconnect failed: %v", err, reconnectErr)
+		}
+	}
+	
+	return fmt.Errorf("publish failed after %d attempts", 2)

这样可以:

  1. 消除代码重复
  2. 支持多次重试
  3. 提供更清晰的错误信息

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In mq/producer.go around lines 167 to 187, the code duplicates the message
publishing logic and has a simplistic retry mechanism. Refactor by extracting
the publishing code into a separate internal function and implement a loop or
retry mechanism that attempts to publish multiple times upon failure, especially
when the connection is closed. This will remove duplication, allow multiple
retries, and improve error handling clarity.


return err

}
Expand Down