-
Notifications
You must be signed in to change notification settings - Fork 206
/
Copy pathtest_batch_consumer.go
57 lines (45 loc) · 1.11 KB
/
test_batch_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package rmq
import "sync"
type TestBatchConsumer struct {
mu sync.Mutex
// Deprecated: use Last() to avoid data races.
LastBatch Deliveries
// Deprecated use Consumed() to avoid data races.
ConsumedCount int64
AutoFinish bool
finish chan int
}
func NewTestBatchConsumer() *TestBatchConsumer {
return &TestBatchConsumer{
finish: make(chan int),
}
}
func (consumer *TestBatchConsumer) Last() Deliveries {
consumer.mu.Lock()
defer consumer.mu.Unlock()
return consumer.LastBatch
}
func (consumer *TestBatchConsumer) Consumed() int64 {
consumer.mu.Lock()
defer consumer.mu.Unlock()
return consumer.ConsumedCount
}
func (consumer *TestBatchConsumer) Consume(batch Deliveries) {
consumer.mu.Lock()
consumer.LastBatch = batch
consumer.ConsumedCount += int64(len(batch))
consumer.mu.Unlock()
if consumer.AutoFinish {
batch.Ack()
} else {
<-consumer.finish
// log.Printf("TestBatchConsumer.Consume() finished")
}
}
func (consumer *TestBatchConsumer) Finish() {
// log.Printf("TestBatchConsumer.Finish()")
consumer.mu.Lock()
consumer.LastBatch = nil
consumer.mu.Unlock()
consumer.finish <- 1
}