diff --git a/README.md b/README.md index 1eb071b..a81bae0 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,11 @@ aliyun-mns是对阿里云消息服务的封装,具有以下特点: * 动态创建队列 * 可以设置消费者数目 -* 长轮询 * 消息处理时长自适应 -* 发送消息重试 +* 发送消息重试。目前基于网络错误、阿里云MNS错误码表InternalError重试 * 监控报警 * 优雅的关闭消费者 +* 处理函数处理最大时间限制 # 消费者 diff --git a/base64.go b/base64.go index 6b633f0..0ec171c 100644 --- a/base64.go +++ b/base64.go @@ -6,6 +6,7 @@ const ( base64Regex = `^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$` ) +// IsBase54 返回字符串是否是base64编码 func IsBase54(s string) bool { rp, _ := regexp.Compile(base64Regex) return rp.MatchString(s) diff --git a/consumer.go b/consumer.go index 1f204dd..4c3f665 100644 --- a/consumer.go +++ b/consumer.go @@ -2,6 +2,7 @@ package alimns import ( "encoding/base64" + "math/rand" "os" "os/signal" "runtime" @@ -161,6 +162,10 @@ func (c *Consumer) CreateQueueList(fetchQueueReady chan struct{}) chan struct{} return createQueueReady } +func randInRange(min, max int) int { + return rand.Intn(max-min) + min +} + // Schedule 使消息队列开始运作起来 func (c *Consumer) Schedule(createQueueReady chan struct{}) { go func() { @@ -168,6 +173,8 @@ func (c *Consumer) Schedule(createQueueReady chan struct{}) { select { case <-createQueueReady: for _, queue := range c.queues { + time.Sleep(time.Duration(randInRange(20, 51)) * time.Millisecond) + if c.isClosed { continue } @@ -366,7 +373,7 @@ func (c *Consumer) OnReceive(queue *Queue, receiveMsg *ReceiveMessage) { } } } - case <-time.After(5 * time.Hour): + case <-time.After(10 * time.Hour): { close(tickerStop) } diff --git a/error.go b/error.go index 8630875..5f501c7 100644 --- a/error.go +++ b/error.go @@ -72,6 +72,11 @@ func IsHandleCrash(err error) bool { return err == handleCrashError } +// IsInternalError 是否内部错误 +func IsInternalError(err error) bool { + return err == internalError +} + // RespErr 阿里云回复错误 type RespErr struct { XMLName xml.Name `xml:"Error"` diff --git a/send_message.go b/send_message.go index befb099..a947796 100644 --- a/send_message.go +++ b/send_message.go @@ -32,6 +32,9 @@ func (c *Client) SendBase64EncodedJSONMessage(name string, messageBody interface case shouldRetry(err): time.Sleep(time.Millisecond * 100) continue + case IsInternalError(err): + time.Sleep(time.Millisecond * 100) + continue default: close(ended) return @@ -129,6 +132,8 @@ func (c *Client) sendMessage(name, messageBody string, setters ...MessageSetter) switch respErr.Code { case queueNotExistError.Error(): return nil, queueNotExistError + case internalError.Error(): + return nil, internalError } return nil, errors.New(respErr.Message) }