We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pushConsumer 一直return consumer.ConsumeRetryLater,就只重试两次,然后就在admin后台查看变成了consumed 已消费状态
核心代码
topic := "GO_TEST_TOPIC" rocket_mq.NewConsumerProcessor(topic, func(c context.Context, msg []byte) error { xflog.Infof("msg:%v", string(msg)) return errors.New("111") }, consumer.WithConsumerModel(consumer.Clustering), consumer.WithMaxReconsumeTimes(16), consumer.WithGroupName("TEST_GROUP_4"), consumer.WithNameServer([]string{"127.0.0.1:9876"})).Start() c.Subscribe(cp.topic, cp.selector, func(ctx context.Context, messages ...*primitive.MessageExt) ( consumer.ConsumeResult, error) { // 处理消息 for _, message := range messages { err := cp.process(ctx, message) // 业务处理 if err != nil { fmt.Printf("messageId:%s, consumer times:%d", message.MsgId, message.ReconsumeTimes) concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx) concurrentCtx.DelayLevelWhenNextConsume = 2 // only run when return consumer.ConsumeRetryLater return consumer.ConsumeRetryLater, nil } } return consumer.ConsumeSuccess, nil })
The text was updated successfully, but these errors were encountered:
@twz915
Sorry, something went wrong.
我测试了,工作正常,和预期一致。你可以检查一下 rocketmq 服务端有没有什么设置只允许重试两次。 比如 broker.conf 文件,messageDelayLevel,maxReconsumeTimes 参数之类的
我使用的版本是 master 分支
No branches or pull requests
pushConsumer 一直return consumer.ConsumeRetryLater,就只重试两次,然后就在admin后台查看变成了consumed 已消费状态
核心代码
The text was updated successfully, but these errors were encountered: