Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
Dot-Liu committed May 10, 2024
2 parents 680fa0c + afbf005 commit 3a1336c
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 341 deletions.
15 changes: 0 additions & 15 deletions discovery/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package discovery
import (
"fmt"
"github.com/eolinker/eosc/eocontext"
"sync/atomic"
"time"
)

// NodeStatus 节点状态类型
Expand All @@ -25,7 +23,6 @@ type INode interface {
Addr() string
Port() int
Status() NodeStatus
Last() time.Time
Up()
Down()
Leave()
Expand All @@ -39,21 +36,9 @@ type _BaseNode struct {
port int

status NodeStatus
lastTime atomic.Pointer[time.Time]
statusChecker _INodeStatusCheck
}

func (n *_BaseNode) Last() time.Time {
t := n.lastTime.Load()
if t != nil {
now := time.Now()
n.lastTime.Store(&now)
t = n.lastTime.Load()
}
return *t

}

func newBaseNode(id string, ip string, port int, statusChecker _INodeStatusCheck) *_BaseNode {
return &_BaseNode{id: id, ip: ip, port: port, status: Running, statusChecker: statusChecker}
}
Expand Down
10 changes: 2 additions & 8 deletions drivers/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,8 @@ func (s *Service) PassHost() (eocontext.PassHostMod, string) {
}

func (s *Service) Nodes() []eocontext.INode {
all := s.app.Nodes()
ns := make([]eocontext.INode, 0, len(all))
for _, n := range all {
if n.Status() == eocontext.Running {
ns = append(ns, n)
}
}
return ns
return s.app.Nodes()

}

func (s *Service) Scheme() string {
Expand Down
7 changes: 4 additions & 3 deletions health-check-http/http-check.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) {
return
case <-ticker.C:
{
h.check(nodes.All())

h.check(nodes)
}
}
}
Expand All @@ -88,13 +89,13 @@ func (h *HTTPCheck) Stop() {
}

// check 对待检查的节点集合进行检测,入参:nodes map[agentID][nodeID]*checkNode
func (h *HTTPCheck) check(nodes []discovery.INode) {
func (h *HTTPCheck) check(nodes discovery.INodes) {

/*对每个节点地址进行检测
成功则将属于该地址的所有节点的状态都置于可运行,并从HTTPCheck维护的待检测节点列表中移除
失败则下次定时检查再进行检测
*/
for _, ns := range nodes {
for _, ns := range nodes.All() {
if ns.Status() != discovery.Down {
continue
}
Expand Down
3 changes: 3 additions & 0 deletions upstream/ip-hash/ip_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func (r *ipHash) Next(org eoscContext.EoContext) (eoscContext.INode, int, error)
readIp := org.RealIP()
nodes := r.Nodes()
size := len(nodes)
if size == 1 {
return nodes[0], 0, nil
}
if size < 1 {
return nil, 0, errNoValidNode
}
Expand Down
163 changes: 98 additions & 65 deletions upstream/round-robin/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package round_robin
import (
"errors"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/eolinker/apinto/utils/queue"

eoscContext "github.com/eolinker/eosc/eocontext"

"github.com/eolinker/apinto/discovery"
"github.com/eolinker/apinto/upstream/balance"
)

Expand All @@ -25,6 +28,9 @@ var (
func Register() {
balance.Register(name, newRoundRobinFactory())
}
func noValidNodeHandler() (eoscContext.INode, int, error) {
return nil, 0, errNoValidNode
}

func newRoundRobinFactory() *roundRobinFactory {
return &roundRobinFactory{}
Expand All @@ -39,30 +45,29 @@ func (r roundRobinFactory) Create(app eoscContext.EoApp, scheme string, timeout
}

type node struct {
weight int
eoscContext.INode
index int
weight int64
effectiveWeight int64
node eoscContext.INode
}

type roundRobin struct {
eoscContext.EoApp
scheme string
timeout time.Duration
// nodes 节点列表
nodes []node
// 节点数量

// 节点数量,也是每次最大遍历数
size int
// index 当前索引
index int
// gcdWeight 权重最大公约数
gcdWeight int
// maxWeight 权重最大值
maxWeight int

cw int
// gcdWeight 权重最大公约数
gcdWeight int64

updateTime time.Time
nextHandler func() (eoscContext.INode, int, error)

downNodes map[int]eoscContext.INode
locker sync.Mutex
nodeQueueNext queue.Queue[node]
nodeQueue queue.Queue[node]
updateTime int64
}

func (r *roundRobin) Scheme() string {
Expand All @@ -74,69 +79,94 @@ func (r *roundRobin) TimeOut() time.Duration {
}

func (r *roundRobin) Select(ctx eoscContext.EoContext) (eoscContext.INode, int, error) {
r.tryReset()
if r.nextHandler != nil {
return r.nextHandler()
}

return r.Next()
}

// Next 由现有节点根据round_Robin决策出一个可用节点
func (r *roundRobin) Next() (eoscContext.INode, int, error) {
if time.Now().Sub(r.updateTime) > time.Second*30 {
// 当上次节点更新时间与当前时间间隔超过30s,则重新设置节点
r.set()
}
if r.size < 1 {
return nil, 0, errNoValidNode
}
for {
index := r.index % r.size
r.index = (r.index + 1) % r.size
if len(r.downNodes) >= r.size {
return nil, 0, errNoValidNode
}

if index == 0 {
r.cw = r.cw - r.gcdWeight
if r.cw <= 0 {
r.cw = r.maxWeight
if r.cw == 0 {
return nil, 0, errNoValidNode
}
}
r.locker.Lock()
defer r.locker.Unlock()
for i := 0; i < r.size; i++ {

if r.nodeQueue.Empty() {
r.nodeQueue, r.nodeQueueNext = r.nodeQueueNext, r.nodeQueue
}
entry := r.nodeQueue.Pop()
nodeValue := entry.Value()

nodeValue.effectiveWeight -= r.gcdWeight

if r.nodes[index].weight >= r.cw {
if r.nodes[index].Status() == discovery.Down {
r.downNodes[index] = r.nodes[index]
continue
}
return r.nodes[index], index, nil
if nodeValue.effectiveWeight > 0 {
r.nodeQueue.Push(entry)
} else {
nodeValue.effectiveWeight = nodeValue.weight
r.nodeQueueNext.Push(entry)
}
if nodeValue.node.Status() == eoscContext.Down {
// 如果节点down( 开启健康检查才会出现down 状态) 则去拿下一个节点
continue
}
return nodeValue.node, nodeValue.index, nil
}
return nil, 0, errNoValidNode

}

func (r *roundRobin) set() {
r.downNodes = make(map[int]eoscContext.INode)
func (r *roundRobin) tryReset() {
now := time.Now().Unix()
if now-atomic.LoadInt64(&r.updateTime) < 30 {
return
}
r.locker.Lock()
defer r.locker.Unlock()
if now-atomic.LoadInt64(&r.updateTime) < 30 {
return
}
atomic.StoreInt64(&r.updateTime, now)

nodes := r.Nodes()
r.size = len(nodes)
ns := make([]node, 0, r.size)
for i, n := range nodes {
size := len(nodes)
if size == 0 {
r.nextHandler = noValidNodeHandler
return
}
if size == 1 {
node := nodes[0]
r.nextHandler = func() (eoscContext.INode, int, error) {
return node, 0, nil
}
return
}

ns := make([]*node, 0, size)
gcdWeight := int64(0)
for _, n := range nodes {

weight, _ := n.GetAttrByName("weight")
w, _ := strconv.Atoi(weight)
w, _ := strconv.ParseInt(weight, 10, 64)
if w == 0 {
w = 1
}
nd := node{w, n}
ns = append(ns, nd)
if i == 0 {
r.maxWeight = w
r.gcdWeight = w
continue
nd := &node{
weight: w, effectiveWeight: w,
node: n,
}
r.gcdWeight = gcd(w, r.gcdWeight)
r.maxWeight = max(w, r.maxWeight)
ns = append(ns, nd)

gcdWeight = gcd(w, gcdWeight) // 计算权重的最大公约数
}
r.nodes = ns
r.updateTime = time.Now()
r.size = size

r.gcdWeight = gcdWeight
r.nodeQueue = queue.NewQueue(ns...)
r.nodeQueueNext = queue.NewQueue[node]()
r.nextHandler = nil
}

func newRoundRobin(app eoscContext.EoApp, scheme string, timeout time.Duration) *roundRobin {
Expand All @@ -145,19 +175,22 @@ func newRoundRobin(app eoscContext.EoApp, scheme string, timeout time.Duration)
scheme: scheme,
timeout: timeout,
}
r.set()

return r
}

func gcd(a, b int) int {
c := a % b
if c == 0 {
return b
type intType interface {
int | int64 | int32 | int16 | int8 | uint64 | uint32 | uint16 | uint8 | uint
}

func gcd[T intType](a, b T) T {
for b != 0 {
a, b = b, a%b
}
return gcd(b, c)
return a
}

func max(a, b int) int {
func max[T intType](a, b T) T {
if a > b {
return a
}
Expand Down
Loading

0 comments on commit 3a1336c

Please sign in to comment.