Skip to content

feat: add reconnect for mq producer #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

feat: add reconnect for mq producer #27

wants to merge 1 commit into from

Conversation

Echin-h
Copy link
Contributor

@Echin-h Echin-h commented Jun 26, 2025

Summary by CodeRabbit

  • 新功能
    • 增强了消息生产者的稳定性,支持自动重连和重试机制,提升了在连接断开时的可靠性。

Copy link

coderabbitai bot commented Jun 26, 2025

Walkthrough

本次变更为 RabbitMQ Producer 增加了自动重连和重试机制。通过引入连接状态检测、新的重连函数以及并发去重机制,提升了在连接关闭时的健壮性,并在消息发布失败时自动尝试重连和重发。

Changes

文件/分组 变更摘要
mq/producer.go Producer 结构体新增 amqpURI 和 singleflight.Group 字段;构造函数 NewProducer 初始化新字段;新增 isConnected 和 connectFn 方法;publish 方法增加自动重连与重试逻辑。

Sequence Diagram(s)

sequenceDiagram
    participant Producer
    participant RabbitMQ

    Producer->>RabbitMQ: Publish 消息
    alt 连接正常
        RabbitMQ-->>Producer: 返回成功/失败
    else 连接关闭
        Producer->>Producer: 调用 connectFn 重连
        alt 重连成功
            Producer->>RabbitMQ: 再次 Publish
            RabbitMQ-->>Producer: 返回结果
        else 重连失败
            Producer-->>Producer: 返回错误
        end
    end
Loading

Poem

小兔敲鼓发消息,
断线重连不慌急。
单飞机制来帮忙,
三试重连保平安。
代码一更如春雨,
可靠传递乐无比!

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate Unit Tests
  • Create PR with Unit Tests
  • Post Copyable Unit Tests in Comment
  • Commit Unit Tests in branch reconnect

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai auto-generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
mq/producer.go (1)

193-202: 建议改进 Shutdown 方法

考虑到新增的重连机制,Shutdown 方法可以进行以下改进:

func (p *Producer) Shutdown() error {
+	// 等待所有重连操作完成
+	p.sf.Forget("reconnect")
+
+	// 先关闭 channel
+	if p.Channel != nil && !p.Channel.IsClosed() {
+		if err := p.Channel.Close(); err != nil {
+			return fmt.Errorf("AMQP channel close error: %v", err)
+		}
+	}

	if err := p.Conn.Close(); err != nil {
-		return fmt.Errorf("AMQP connection close error: %s", err)
+		return fmt.Errorf("AMQP connection close error: %v", err)
	}

-	defer fmt.Printf("AMQP shutdown OK")
+	// 考虑使用正式的日志记录器而不是 fmt.Printf

	return nil
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 63b82f0 and 8e25763.

📒 Files selected for processing (1)
  • mq/producer.go (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
mq/producer.go (1)
mq/init.go (1)
  • UserIdKey (14-14)
🔇 Additional comments (4)
mq/producer.go (4)

6-13: 导入包合理且必要!

新增的导入包都是实现重连功能所必需的,特别是使用 singleflight 来防止并发重连是一个很好的设计选择。


20-22: 结构体字段设计合理!

新增的字段用于支持重连功能,命名清晰,注释明确。


32-34: 构造函数更新正确!

新字段的初始化正确,保持了与原有代码的一致性。


47-49: 连接状态检查方法实现正确!

isConnected() 方法简洁有效地检查了连接和通道的状态。

Comment on lines +167 to +187
if err != nil && errors.Is(err, amqp.ErrClosed) {
if err = p.connectFn(); err != nil {
return err
}

err = p.Channel.PublishWithContext(
ctx,
exchangeName,
key,
false,
false,
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Body: msg,
AppId: p.appId,
UserId: opts[UserIdKey],
MessageId: fmt.Sprintf("%x", md5.Sum(msg)),
Headers: headers,
})
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

重构建议:消除代码重复并改进重试逻辑

当前实现存在代码重复问题,且重试逻辑可以更加健壮。

建议将发布逻辑提取到一个内部方法中,并实现更完善的重试机制:

-	err := p.Channel.PublishWithContext(
-		ctx,
-		exchangeName,
-		key,
-		false,
-		false,
-		amqp.Publishing{
-			ContentType:  "application/json",
-			DeliveryMode: amqp.Persistent,
-			Body:         msg,
-			AppId:        p.appId,
-			UserId:       opts[UserIdKey],
-			MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
-			Headers:      headers,
-		})
-
-	if err != nil && errors.Is(err, amqp.ErrClosed) {
-		if err = p.connectFn(); err != nil {
-			return err
-		}
-
-		err = p.Channel.PublishWithContext(
-			ctx,
-			exchangeName,
-			key,
-			false,
-			false,
-			amqp.Publishing{
-				ContentType:  "application/json",
-				DeliveryMode: amqp.Persistent,
-				Body:         msg,
-				AppId:        p.appId,
-				UserId:       opts[UserIdKey],
-				MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
-				Headers:      headers,
-			})
-	}
-
-	return err
+	publishing := amqp.Publishing{
+		ContentType:  "application/json",
+		DeliveryMode: amqp.Persistent,
+		Body:         msg,
+		AppId:        p.appId,
+		UserId:       opts[UserIdKey],
+		MessageId:    fmt.Sprintf("%x", md5.Sum(msg)),
+		Headers:      headers,
+	}
+
+	// 最多重试 2 次
+	for i := 0; i < 2; i++ {
+		err := p.Channel.PublishWithContext(
+			ctx,
+			exchangeName,
+			key,
+			false,
+			false,
+			publishing,
+		)
+		
+		if err == nil {
+			return nil
+		}
+		
+		if !errors.Is(err, amqp.ErrClosed) {
+			return err
+		}
+		
+		// 尝试重连
+		if reconnectErr := p.connectFn(); reconnectErr != nil {
+			return fmt.Errorf("publish failed: %v, reconnect failed: %v", err, reconnectErr)
+		}
+	}
+	
+	return fmt.Errorf("publish failed after %d attempts", 2)

这样可以:

  1. 消除代码重复
  2. 支持多次重试
  3. 提供更清晰的错误信息

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In mq/producer.go around lines 167 to 187, the code duplicates the message
publishing logic and has a simplistic retry mechanism. Refactor by extracting
the publishing code into a separate internal function and implement a loop or
retry mechanism that attempts to publish multiple times upon failure, especially
when the connection is closed. This will remove duplication, allow multiple
retries, and improve error handling clarity.

Comment on lines +51 to +92
func (p *Producer) connectFn() error {
if p.isConnected() {
return nil
}

_, err, _ := p.sf.Do("reconnect", func() (interface{}, error) {
var lastErr error
for i := 0; i < 3; i++ {
if p.isConnected() {
return nil, nil
}

if i > 0 {
time.Sleep(time.Second * time.Duration(math.Pow(2, float64(i-1))))
}

conn, channel, err := initConnection(p.amqpURI)
if err != nil {
lastErr = fmt.Errorf("reconnect attempt %d failed: %s", i+1, err)
continue
}

oldConn := p.Conn
oldChannel := p.Channel
p.Conn = conn
p.Channel = channel

if oldChannel != nil {
_ = oldChannel.Close()
}
if oldConn != nil {
_ = oldConn.Close()
}

return nil, nil
}

return nil, lastErr
})

return err
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

存在线程安全问题,需要加锁保护连接替换操作

当前的连接替换逻辑(第73-83行)存在竞态条件。在读取旧连接和关闭它们之间,其他 goroutine 可能正在使用这些连接,导致潜在的并发问题。

建议使用互斥锁保护连接替换操作:

type Producer struct {
	Conn    *amqp.Connection
	Channel *amqp.Channel
+	mu      sync.RWMutex  // 保护连接的读写

	amqpURI string
	sf      *singleflight.Group

	appId string
}

然后在 connectFn 中:

-			oldConn := p.Conn
-			oldChannel := p.Channel
-			p.Conn = conn
-			p.Channel = channel
+			p.mu.Lock()
+			oldConn := p.Conn
+			oldChannel := p.Channel
+			p.Conn = conn
+			p.Channel = channel
+			p.mu.Unlock()

同时,需要在 isConnected()publish() 方法中使用读锁来访问连接。

另外,建议将错误格式化从 %s 改为 %v 以更好地处理各种错误类型:

-				lastErr = fmt.Errorf("reconnect attempt %d failed: %s", i+1, err)
+				lastErr = fmt.Errorf("reconnect attempt %d failed: %v", i+1, err)
🤖 Prompt for AI Agents
In mq/producer.go lines 51 to 92, the connection replacement code from lines 73
to 83 is not thread-safe and can cause race conditions when multiple goroutines
access or modify the connection concurrently. To fix this, introduce a mutex
lock to protect the connection and channel replacement operations inside the
reconnect logic. Also, modify the isConnected() and publish() methods to use a
read lock when accessing the connection to ensure safe concurrent reads.
Additionally, update the error formatting in the reconnect attempts from using
%s to %v to properly handle different error types.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant