-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatcher.go
92 lines (84 loc) · 1.65 KB
/
batcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package gopool
import (
"fmt"
"time"
)
type Batcher struct {
config *PoolConfig
taskQueue chan Task
executor BatchExecutor
id int
}
func (bat *Batcher) start() {
defer func() {
if r := recover(); r != nil {
fmt.Println("batcher panic", "r", r)
bat.start()
}
}()
batchSize := bat.config.batchSize
batch := make([]Task, 0, batchSize)
lingerTime := bat.config.linger
ticker := time.NewTimer(lingerTime)
if !ticker.Stop() {
<-ticker.C
}
defer ticker.Stop()
for {
select {
case task, ok := <-bat.taskQueue:
if !ok {
bat.process(batch)
batch = make([]Task, 0, batchSize)
return
}
batch = append(batch, task)
size := len(batch)
if size < batchSize {
if size == 1 { // first element, tick reset
ticker.Reset(lingerTime)
}
break
}
bat.process(batch)
if !ticker.Stop() {
<-ticker.C
}
batch = make([]Task, 0, batchSize)
case <-ticker.C:
if len(batch) == 0 {
break
}
bat.process(batch)
batch = make([]Task, 0, batchSize)
}
}
}
func (bat *Batcher) process(tasks []Task) {
if err := bat.executor(tasks); err != nil {
fmt.Println("process task err", "err", err)
if bat.config.batchErrCb != nil {
go bat.config.batchErrCb(err, tasks, bat.executor)
return
}
}
}
func (bat *Batcher) addTask(t Task) error {
bat.taskQueue <- t
return nil
}
func (bat *Batcher) quit() {
close(bat.taskQueue)
batch := make([]Task, 0, bat.config.batchSize)
for {
select {
case task, ok := <-bat.taskQueue:
if !ok {
fmt.Println("batcher close quit...", "id", bat.id, "batch", len(batch))
bat.process(batch)
return
}
batch = append(batch, task)
}
}
}