Skip to content

Commit

Permalink
feat: 基于InternalError重试
Browse files Browse the repository at this point in the history
  • Loading branch information
zenghur committed Apr 8, 2019
1 parent 1b9729d commit 416cbfb
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 3 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ aliyun-mns是对阿里云消息服务的封装,具有以下特点:

* 动态创建队列
* 可以设置消费者数目
* 长轮询
* 消息处理时长自适应
* 发送消息重试
* 发送消息重试。目前基于网络错误、阿里云MNS错误码表InternalError重试
* 监控报警
* 优雅的关闭消费者
* 处理函数处理最大时间限制

# 消费者

Expand Down
1 change: 1 addition & 0 deletions base64.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
base64Regex = `^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$`
)

// IsBase54 返回字符串是否是base64编码
func IsBase54(s string) bool {
rp, _ := regexp.Compile(base64Regex)
return rp.MatchString(s)
Expand Down
9 changes: 8 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package alimns

import (
"encoding/base64"
"math/rand"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -161,13 +162,19 @@ func (c *Consumer) CreateQueueList(fetchQueueReady chan struct{}) chan struct{}
return createQueueReady
}

func randInRange(min, max int) int {
return rand.Intn(max-min) + min
}

// Schedule 使消息队列开始运作起来
func (c *Consumer) Schedule(createQueueReady chan struct{}) {
go func() {
for {
select {
case <-createQueueReady:
for _, queue := range c.queues {
time.Sleep(time.Duration(randInRange(20, 51)) * time.Millisecond)

if c.isClosed {
continue
}
Expand Down Expand Up @@ -366,7 +373,7 @@ func (c *Consumer) OnReceive(queue *Queue, receiveMsg *ReceiveMessage) {
}
}
}
case <-time.After(5 * time.Hour):
case <-time.After(10 * time.Hour):
{
close(tickerStop)
}
Expand Down
5 changes: 5 additions & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func IsHandleCrash(err error) bool {
return err == handleCrashError
}

// IsInternalError 是否内部错误
func IsInternalError(err error) bool {
return err == internalError
}

// RespErr 阿里云回复错误
type RespErr struct {
XMLName xml.Name `xml:"Error"`
Expand Down
5 changes: 5 additions & 0 deletions send_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (c *Client) SendBase64EncodedJSONMessage(name string, messageBody interface
case shouldRetry(err):
time.Sleep(time.Millisecond * 100)
continue
case IsInternalError(err):
time.Sleep(time.Millisecond * 100)
continue
default:
close(ended)
return
Expand Down Expand Up @@ -129,6 +132,8 @@ func (c *Client) sendMessage(name, messageBody string, setters ...MessageSetter)
switch respErr.Code {
case queueNotExistError.Error():
return nil, queueNotExistError
case internalError.Error():
return nil, internalError
}
return nil, errors.New(respErr.Message)
}
Expand Down

0 comments on commit 416cbfb

Please sign in to comment.