This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
/
Copy pathbulk.go
152 lines (139 loc) · 3.61 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package mongodb
import (
"sync"
"time"
"github.com/compose/transporter/client"
"github.com/compose/transporter/log"
"github.com/compose/transporter/message"
"github.com/compose/transporter/message/ops"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
const (
maxBSONObjSize int = 16000000
)
var (
_ client.Writer = &Bulk{}
)
// Bulk implements client.Writer for use with MongoDB and takes advantage of the Bulk API for
// performance improvements.
type Bulk struct {
bulkMap map[string]*bulkOperation
*sync.RWMutex
confirmChan chan struct{}
}
type bulkOperation struct {
s *mgo.Session
bulk *mgo.Bulk
opCounter int
bsonOpSize int
}
func newBulker(done chan struct{}, wg *sync.WaitGroup) *Bulk {
b := &Bulk{
bulkMap: make(map[string]*bulkOperation),
RWMutex: &sync.RWMutex{},
}
wg.Add(1)
go b.run(done, wg)
return b
}
func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error) {
return func(s client.Session) (message.Msg, error) {
coll := msg.Namespace()
b.Lock()
b.confirmChan = msg.Confirms()
bOp, ok := b.bulkMap[coll]
if !ok {
s := s.(*Session).mgoSession.Clone()
bOp = &bulkOperation{
s: s,
bulk: s.DB("").C(coll).Bulk(),
}
b.bulkMap[coll] = bOp
}
bs, err := bson.Marshal(msg.Data())
if err != nil {
log.Infof("unable to marshal doc to BSON, can't calculate size: %v", err)
}
// add the 4 bytes for the MsgHeader
// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#standard-message-header
msgSize := len(bs) + 4
// if the next op is going to put us over, flush and recreate bOp
if bOp.opCounter >= s.(*Session).maxWriteBatchSize || bOp.bsonOpSize+msgSize >= maxBSONObjSize {
err = b.flush(coll, bOp)
if err != nil {
log.With("collection", coll).Infof("error flushing collection that has reached its size capacity: %s\n", err.Error())
}
if err == nil && b.confirmChan != nil {
b.confirmChan <- struct{}{}
}
s := s.(*Session).mgoSession.Clone()
bOp = &bulkOperation{
s: s,
bulk: s.DB("").C(coll).Bulk(),
}
b.bulkMap[coll] = bOp
}
switch msg.OP() {
case ops.Delete:
bOp.bulk.Remove(bson.M{"_id": msg.Data().Get("_id")})
case ops.Insert:
bOp.bulk.Insert(msg.Data())
case ops.Update:
bOp.bulk.Update(bson.M{"_id": msg.Data().Get("_id")}, msg.Data())
}
bOp.bsonOpSize += msgSize
bOp.opCounter++
b.Unlock()
return msg, err
}
}
func (b *Bulk) run(done chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-time.After(2 * time.Second):
if err := b.flushAll(); err != nil {
log.Errorf("flush error, %s", err)
return
}
case <-done:
log.Infoln("received done channel")
if err := b.flushAll(); err != nil {
log.Errorf("flush error, %s", err)
}
return
}
}
}
func (b *Bulk) flushAll() error {
b.Lock()
for c, bOp := range b.bulkMap {
if err := b.flush(c, bOp); err != nil {
return err
}
}
if b.confirmChan != nil {
b.confirmChan <- struct{}{}
}
b.Unlock()
return nil
}
func (b *Bulk) flush(c string, bOp *bulkOperation) error {
log.With("collection", c).With("opCounter", bOp.opCounter).With("bsonOpSize", bOp.bsonOpSize).Infoln("flushing bulk messages")
_, err := bOp.bulk.Run()
if err != nil && !mgo.IsDup(err) {
log.With("collection", c).Errorf("flush error, %s\n", err)
return err
} else if mgo.IsDup(err) {
bOp.bulk.Unordered()
if _, err := bOp.bulk.Run(); err != nil && !mgo.IsDup(err) {
log.With("collection", c).Errorf("flush error with unordered, %s\n", err)
return err
}
}
bOp.s.Close()
log.With("collection", c).Infoln("flush complete")
delete(b.bulkMap, c)
return nil
}