熔断的做法和限流有点像,都是统计一定时间内的计数。对限流来说是统计访问量,超过阈值则拒绝或放进队列;对熔断来说是统计错误量,超过阈值则拒绝,并在一定时间后恢复
但两个应用场景不一样,但限流主要是为了保护自己的服务,熔断主要是为了保护下游、别人的服务
熔断有两个关键词,一个是暂时,一个是停止。
停止是说,当前服务一旦对下游服务进行熔断,当请求到达时,当前服务不再对下游服务进行调用,而是使用设定好的策略(如构建默认值)直接返回。
暂时是说,熔断后,并不会一直不再调用下游服务,而是以一定的策略(如每分钟调用 10 次,若均返回成功,则增大调用量)试探调用下游服务,当下游服务恢复可用时,自动停止熔断。
这两个都做的自适应的处理
熔断状态有三个:
- 正常状态:正常执行查询
- 熔断状态:该状态下直接拒绝读写。同时设置一个定时器,在一段时间后,进入半开路状态
- 半开路状态:允许读写。如果查询正常,恢复至正常状态。如果报错,则回到熔断状态
type CircuitBreakerStatus uint8
const (
CircuitBreakerClosed CircuitBreakerStatus = iota // 正常状态
CircuitBreakerOpen // 熔断状态
CircuitBreakerHalfOpen // 半开路
)
type CircuitBreaker struct {
failedCount int64 // 失败计数
threshold int64 // 熔断器触发阈值
retryDur time.Duration
callerInfo fmt.Stringer
rwm sync.RWMutex
timer *time.Timer
s CircuitBreakerStatus
}
// 执行查询
func (cb *CircuitBreaker) Run(f func() (err error, skip bool)) error {
switch cb.Status() {
case CircuitBreakerOpen: // 熔断状态:直接返回
return fmt.Errorf("circuit breaker is open %s", cb.callerInfo)
case CircuitBreakerClosed, CircuitBreakerHalfOpen: // 正常/半开路状态:执行查询,并判断结果
err, skip := f()
switch {
case err == nil: // 无错:看是否恢复至正常状态
cb.setStatus(CircuitBreakerClosed)
case !skip:
cb.errHappened() // 出错:判断是否需要熔断
}
return err
}
return nil
}
// 出错
func (cb *CircuitBreaker) errHappened() {
switch cb.Status() {
case CircuitBreakerClosed: // 正常状态
if atomic.AddInt64(&cb.failedCount, 1) > cb.threshold { // 增加错误计数并和阈值比较
cb.setStatus(CircuitBreakerOpen) // 如果超过阈值,则进入熔断
}
case CircuitBreakerHalfOpen: // 半开路:增加错误计数,并立即熔断
atomic.AddInt64(&cb.failedCount, 1)
cb.setStatus(CircuitBreakerOpen)
case CircuitBreakerOpen: // 熔断:增加错误计数
atomic.AddInt64(&cb.failedCount, 1)
}
}
// 设置熔断器状态
func (cb *CircuitBreaker) setStatus(cbs CircuitBreakerStatus) CircuitBreakerStatus {
cb.rwm.Lock()
defer cb.rwm.Unlock()
if cb.s != cbs {
switch cbs {
case CircuitBreakerOpen:
slf.Errorf("circuit breaker is open %s", cb.callerInfo)
case CircuitBreakerHalfOpen:
slf.Infof("circuit breaker is half open %s", cb.callerInfo)
case CircuitBreakerClosed:
slf.Infof("circuit breaker is closed %s", cb.callerInfo)
}
}
cb.s = cbs
switch cb.s {
case CircuitBreakerOpen:
if cb.timer == nil { // 这里设置了一个定时器,在一定时间后将熔断器恢复至半开路状态
cb.timer = time.AfterFunc(cb.retryDur, func() {
cb.setStatus(CircuitBreakerHalfOpen)
})
} else {
cb.timer.Reset(cb.retryDur)
}
case CircuitBreakerClosed: // 半开路状态下无错,则立即将计数器清零,恢复为正常状态
atomic.StoreInt64(&cb.failedCount, 0)
}
return cb.s
}
func (cb *CircuitBreaker) Status() CircuitBreakerStatus {
cb.rwm.RLock()
defer cb.rwm.RUnlock()
return cb.s
}