diff --git a/discovery/node.go b/discovery/node.go index cdcd12ac..7d06ff9c 100644 --- a/discovery/node.go +++ b/discovery/node.go @@ -3,8 +3,6 @@ package discovery import ( "fmt" "github.com/eolinker/eosc/eocontext" - "sync/atomic" - "time" ) // NodeStatus 节点状态类型 @@ -25,7 +23,6 @@ type INode interface { Addr() string Port() int Status() NodeStatus - Last() time.Time Up() Down() Leave() @@ -39,21 +36,9 @@ type _BaseNode struct { port int status NodeStatus - lastTime atomic.Pointer[time.Time] statusChecker _INodeStatusCheck } -func (n *_BaseNode) Last() time.Time { - t := n.lastTime.Load() - if t != nil { - now := time.Now() - n.lastTime.Store(&now) - t = n.lastTime.Load() - } - return *t - -} - func newBaseNode(id string, ip string, port int, statusChecker _INodeStatusCheck) *_BaseNode { return &_BaseNode{id: id, ip: ip, port: port, status: Running, statusChecker: statusChecker} } diff --git a/drivers/service/service.go b/drivers/service/service.go index e154bda1..725c4e96 100644 --- a/drivers/service/service.go +++ b/drivers/service/service.go @@ -39,14 +39,8 @@ func (s *Service) PassHost() (eocontext.PassHostMod, string) { } func (s *Service) Nodes() []eocontext.INode { - all := s.app.Nodes() - ns := make([]eocontext.INode, 0, len(all)) - for _, n := range all { - if n.Status() == eocontext.Running { - ns = append(ns, n) - } - } - return ns + return s.app.Nodes() + } func (s *Service) Scheme() string { diff --git a/health-check-http/http-check.go b/health-check-http/http-check.go index bf91cfd9..ed4709b4 100644 --- a/health-check-http/http-check.go +++ b/health-check-http/http-check.go @@ -62,7 +62,8 @@ func (h *HTTPCheck) doCheckLoop(nodes discovery.INodes) { return case <-ticker.C: { - h.check(nodes.All()) + + h.check(nodes) } } } @@ -88,13 +89,13 @@ func (h *HTTPCheck) Stop() { } // check 对待检查的节点集合进行检测,入参:nodes map[agentID][nodeID]*checkNode -func (h *HTTPCheck) check(nodes []discovery.INode) { +func (h *HTTPCheck) check(nodes discovery.INodes) { /*对每个节点地址进行检测 成功则将属于该地址的所有节点的状态都置于可运行,并从HTTPCheck维护的待检测节点列表中移除 失败则下次定时检查再进行检测 */ - for _, ns := range nodes { + for _, ns := range nodes.All() { if ns.Status() != discovery.Down { continue } diff --git a/upstream/ip-hash/ip_hash.go b/upstream/ip-hash/ip_hash.go index ad365e77..d187c6a2 100644 --- a/upstream/ip-hash/ip_hash.go +++ b/upstream/ip-hash/ip_hash.go @@ -61,6 +61,9 @@ func (r *ipHash) Next(org eoscContext.EoContext) (eoscContext.INode, int, error) readIp := org.RealIP() nodes := r.Nodes() size := len(nodes) + if size == 1 { + return nodes[0], 0, nil + } if size < 1 { return nil, 0, errNoValidNode } diff --git a/upstream/round-robin/round_robin.go b/upstream/round-robin/round_robin.go index 9d020bd6..89b97fa9 100644 --- a/upstream/round-robin/round_robin.go +++ b/upstream/round-robin/round_robin.go @@ -3,11 +3,14 @@ package round_robin import ( "errors" "strconv" + "sync" + "sync/atomic" "time" + "github.com/eolinker/apinto/utils/queue" + eoscContext "github.com/eolinker/eosc/eocontext" - "github.com/eolinker/apinto/discovery" "github.com/eolinker/apinto/upstream/balance" ) @@ -25,6 +28,9 @@ var ( func Register() { balance.Register(name, newRoundRobinFactory()) } +func noValidNodeHandler() (eoscContext.INode, int, error) { + return nil, 0, errNoValidNode +} func newRoundRobinFactory() *roundRobinFactory { return &roundRobinFactory{} @@ -39,30 +45,29 @@ func (r roundRobinFactory) Create(app eoscContext.EoApp, scheme string, timeout } type node struct { - weight int - eoscContext.INode + index int + weight int64 + effectiveWeight int64 + node eoscContext.INode } type roundRobin struct { eoscContext.EoApp scheme string timeout time.Duration - // nodes 节点列表 - nodes []node - // 节点数量 + + // 节点数量,也是每次最大遍历数 size int - // index 当前索引 - index int - // gcdWeight 权重最大公约数 - gcdWeight int - // maxWeight 权重最大值 - maxWeight int - cw int + // gcdWeight 权重最大公约数 + gcdWeight int64 - updateTime time.Time + nextHandler func() (eoscContext.INode, int, error) - downNodes map[int]eoscContext.INode + locker sync.Mutex + nodeQueueNext queue.Queue[node] + nodeQueue queue.Queue[node] + updateTime int64 } func (r *roundRobin) Scheme() string { @@ -74,69 +79,94 @@ func (r *roundRobin) TimeOut() time.Duration { } func (r *roundRobin) Select(ctx eoscContext.EoContext) (eoscContext.INode, int, error) { + r.tryReset() + if r.nextHandler != nil { + return r.nextHandler() + } + return r.Next() } // Next 由现有节点根据round_Robin决策出一个可用节点 func (r *roundRobin) Next() (eoscContext.INode, int, error) { - if time.Now().Sub(r.updateTime) > time.Second*30 { - // 当上次节点更新时间与当前时间间隔超过30s,则重新设置节点 - r.set() - } - if r.size < 1 { - return nil, 0, errNoValidNode - } - for { - index := r.index % r.size - r.index = (r.index + 1) % r.size - if len(r.downNodes) >= r.size { - return nil, 0, errNoValidNode - } - if index == 0 { - r.cw = r.cw - r.gcdWeight - if r.cw <= 0 { - r.cw = r.maxWeight - if r.cw == 0 { - return nil, 0, errNoValidNode - } - } + r.locker.Lock() + defer r.locker.Unlock() + for i := 0; i < r.size; i++ { + + if r.nodeQueue.Empty() { + r.nodeQueue, r.nodeQueueNext = r.nodeQueueNext, r.nodeQueue } + entry := r.nodeQueue.Pop() + nodeValue := entry.Value() + + nodeValue.effectiveWeight -= r.gcdWeight - if r.nodes[index].weight >= r.cw { - if r.nodes[index].Status() == discovery.Down { - r.downNodes[index] = r.nodes[index] - continue - } - return r.nodes[index], index, nil + if nodeValue.effectiveWeight > 0 { + r.nodeQueue.Push(entry) + } else { + nodeValue.effectiveWeight = nodeValue.weight + r.nodeQueueNext.Push(entry) } + if nodeValue.node.Status() == eoscContext.Down { + // 如果节点down( 开启健康检查才会出现down 状态) 则去拿下一个节点 + continue + } + return nodeValue.node, nodeValue.index, nil } + return nil, 0, errNoValidNode + } -func (r *roundRobin) set() { - r.downNodes = make(map[int]eoscContext.INode) +func (r *roundRobin) tryReset() { + now := time.Now().Unix() + if now-atomic.LoadInt64(&r.updateTime) < 30 { + return + } + r.locker.Lock() + defer r.locker.Unlock() + if now-atomic.LoadInt64(&r.updateTime) < 30 { + return + } + atomic.StoreInt64(&r.updateTime, now) + nodes := r.Nodes() - r.size = len(nodes) - ns := make([]node, 0, r.size) - for i, n := range nodes { + size := len(nodes) + if size == 0 { + r.nextHandler = noValidNodeHandler + return + } + if size == 1 { + node := nodes[0] + r.nextHandler = func() (eoscContext.INode, int, error) { + return node, 0, nil + } + return + } + + ns := make([]*node, 0, size) + gcdWeight := int64(0) + for _, n := range nodes { weight, _ := n.GetAttrByName("weight") - w, _ := strconv.Atoi(weight) + w, _ := strconv.ParseInt(weight, 10, 64) if w == 0 { w = 1 } - nd := node{w, n} - ns = append(ns, nd) - if i == 0 { - r.maxWeight = w - r.gcdWeight = w - continue + nd := &node{ + weight: w, effectiveWeight: w, + node: n, } - r.gcdWeight = gcd(w, r.gcdWeight) - r.maxWeight = max(w, r.maxWeight) + ns = append(ns, nd) + + gcdWeight = gcd(w, gcdWeight) // 计算权重的最大公约数 } - r.nodes = ns - r.updateTime = time.Now() + r.size = size + + r.gcdWeight = gcdWeight + r.nodeQueue = queue.NewQueue(ns...) + r.nodeQueueNext = queue.NewQueue[node]() + r.nextHandler = nil } func newRoundRobin(app eoscContext.EoApp, scheme string, timeout time.Duration) *roundRobin { @@ -145,19 +175,22 @@ func newRoundRobin(app eoscContext.EoApp, scheme string, timeout time.Duration) scheme: scheme, timeout: timeout, } - r.set() + return r } -func gcd(a, b int) int { - c := a % b - if c == 0 { - return b +type intType interface { + int | int64 | int32 | int16 | int8 | uint64 | uint32 | uint16 | uint8 | uint +} + +func gcd[T intType](a, b T) T { + for b != 0 { + a, b = b, a%b } - return gcd(b, c) + return a } -func max(a, b int) int { +func max[T intType](a, b T) T { if a > b { return a } diff --git a/upstream/round-robin/round_robin_test.go b/upstream/round-robin/round_robin_test.go index c54215fd..5f8eb7b3 100644 --- a/upstream/round-robin/round_robin_test.go +++ b/upstream/round-robin/round_robin_test.go @@ -1,253 +1,154 @@ package round_robin -type nodeDemo struct { - label map[string]string - ip string - port int - down bool -} - -var testDemos = []struct { - name string - nodes map[string]nodeDemo - count map[string]int -}{ - { - name: "权重相等", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo2": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo3": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - }, - count: map[string]int{ - "demo1": 5, - "demo2": 5, - "demo3": 5, - "demo4": 5, - }, - }, - { - name: "权重4:3:2:1", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "40", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo2": { - label: map[string]string{ - "weight": "30", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo3": { - label: map[string]string{ - "weight": "20", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - }, - count: map[string]int{ - "demo1": 8, - "demo2": 6, - "demo3": 4, - "demo4": 2, - }, - }, - { - name: "权重4:3:2:1,down调权重40的节点", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "40", - }, - ip: "127.0.0.1", - port: 8580, - down: true, - }, - "demo2": { - label: map[string]string{ - "weight": "30", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo3": { - label: map[string]string{ - "weight": "20", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - }, - count: map[string]int{ - "demo1": 1, - "demo2": 10, - "demo3": 6, - "demo4": 3, - }, - }, - { - name: "权重4:3:2:1,down调权重30的节点", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "40", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo2": { - label: map[string]string{ - "weight": "30", - }, - ip: "127.0.0.1", - port: 8580, - down: true, - }, - "demo3": { - label: map[string]string{ - "weight": "20", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - }, - count: map[string]int{ - "demo1": 12, - "demo2": 1, - "demo3": 5, - "demo4": 2, - }, - }, - { - name: "权重4:3:2:1,down调权重20的节点", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "40", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo2": { - label: map[string]string{ - "weight": "30", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo3": { - label: map[string]string{ - "weight": "20", - }, - ip: "127.0.0.1", - port: 8580, - down: true, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - }, - }, - count: map[string]int{ - "demo1": 10, - "demo2": 7, - "demo3": 1, - "demo4": 2, - }, - }, - { - name: "权重4:3:2:1,down调权重10的节点", - nodes: map[string]nodeDemo{ - "demo1": { - label: map[string]string{ - "weight": "40", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo2": { - label: map[string]string{ - "weight": "30", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo3": { - label: map[string]string{ - "weight": "20", - }, - ip: "127.0.0.1", - port: 8580, - }, - "demo4": { - label: map[string]string{ - "weight": "10", - }, - ip: "127.0.0.1", - port: 8580, - down: true, - }, - }, - count: map[string]int{ - "demo1": 9, - "demo2": 6, - "demo3": 4, - "demo4": 1, - }, - }, +import ( + "runtime" + "sync" + "testing" + "time" + + "github.com/eolinker/apinto/drivers/discovery/static" + "github.com/eolinker/eosc/eocontext" +) + +type demoNode struct { +} + +func (d *demoNode) GetAttrs() eocontext.Attrs { + return make(eocontext.Attrs) +} + +func (d *demoNode) GetAttrByName(name string) (string, bool) { + return "", false +} + +func (d *demoNode) ID() string { + return "127.0.0.1:8080" +} + +func (d *demoNode) IP() string { + return "127.0.0.1" +} + +func (d *demoNode) Port() int { + return 8080 +} + +func (d *demoNode) Addr() string { + return "127.0.0.1:8080" +} + +func (d *demoNode) Status() eocontext.NodeStatus { + return eocontext.Running +} + +func (d *demoNode) Up() { +} + +func (d *demoNode) Down() { +} + +func (d *demoNode) Leave() { +} + +type demo struct { + nodeSing demoNode +} + +func (d *demo) Nodes() []eocontext.INode { + return []eocontext.INode{&d.nodeSing} +} + +func Test_roundRobin_Next_Retry_demo(t *testing.T) { + app := new(demo) + robin := newRoundRobin(app, "http", time.Second) + testDoRetry(robin, t) + +} +func testDoRetry(robin *roundRobin, t *testing.T) { + + timer := time.NewTimer(time.Second * 60) + for { + select { + case <-timer.C: + return + default: + + } + node, _, err := robin.Select(nil) + if err != nil { + t.Error(err) + return + } + + //t.Log(i, next.Addr()) + node.Down() + } +} +func Test_roundRobin_Next_Retry_Status(t *testing.T) { + runtime.GOMAXPROCS(runtime.NumCPU()) + discovery := static.CreateAnonymous(&static.Config{ + HealthOn: false, + Health: nil, + }) + + app, err := discovery.GetApp("127.0.0.1:8080;127.0.0.1:8081") + if err != nil { + return + } + + robin := newRoundRobin(app, "http", time.Second) + wg := sync.WaitGroup{} + wg.Add(runtime.NumCPU()) + for i := 0; i < runtime.NumCPU(); i++ { + go func() { + defer wg.Done() + testDoRetry(robin, t) + }() + + } + wg.Wait() +} + +func Test_roundRobin_Next_Retry_Status_2(t *testing.T) { + discovery := static.CreateAnonymous(&static.Config{ + HealthOn: false, + Health: nil, + }) + + app, err := discovery.GetApp("") + if err != nil { + return + } + robin := newRoundRobin(app, "http", time.Second) + for i := 0; i < 12; i++ { + n, _, err := robin.Select(nil) + if err != nil { + t.Error(err) + return + } + t.Log(i, n.Addr()) + } +} + +var () + +func Benchmark_roundRobin_Next_Retry_demo(b *testing.B) { + discovery := static.CreateAnonymous(&static.Config{ + HealthOn: false, + Health: nil, + }) + app, err := discovery.GetApp("127.0.0.1:8080;127.0.0.1:8081;127.0.0.1:8083;127.0.0.1:8084") + if err != nil { + b.Fatal(err) + } + + robin := newRoundRobin(app, "http", time.Second) + for i := 0; i < b.N; i++ { + _, _, err := robin.Select(nil) + if err != nil { + b.Fatal(err) + return + } + //b.Log(i, n.Addr()) + } } diff --git a/utils/queue/queue.go b/utils/queue/queue.go new file mode 100644 index 00000000..6055255a --- /dev/null +++ b/utils/queue/queue.go @@ -0,0 +1,68 @@ +package queue + +type Entry[T any] struct { + v *T + next *Entry[T] +} + +func (entry *Entry[T]) Value() *T { + return entry.v +} +func NewEntry[T any](v *T) *Entry[T] { + return &Entry[T]{ + v: v, + } +} + +type Queue[T any] interface { + Pop() *Entry[T] + Push(entry *Entry[T]) + Empty() bool +} + +var ( + _ Queue[any] = (*imlQueue[any])(nil) +) + +type imlQueue[T any] struct { + head *Entry[T] + tail *Entry[T] +} + +func (q *imlQueue[T]) Empty() bool { + return q.head == nil +} + +func NewQueue[T any](vs ...*T) Queue[T] { + q := &imlQueue[T]{} + for _, v := range vs { + q.Push(NewEntry(v)) + } + return q +} + +func (q *imlQueue[T]) Pop() *Entry[T] { + if q.head == nil { + return nil + } + head := q.head + next := head.next + head.next = nil + q.head = next + if next == nil { + q.tail = nil + } + return head +} + +func (q *imlQueue[T]) Push(entry *Entry[T]) { + + tail := q.tail + q.tail = entry + if tail == nil { + q.head = entry + } else { + tail.next = entry + } + +}