forked from go-mgo/mgo
-
Notifications
You must be signed in to change notification settings - Fork 17
/
changestreams.go
313 lines (257 loc) · 8.43 KB
/
changestreams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
package mgo
import (
"fmt"
"reflect"
"sync"
"gopkg.in/mgo.v2/bson"
)
type ChangeStream struct {
iter *Iter
isClosed bool
options ChangeStreamOptions
pipeline interface{}
resumeToken *bson.Raw
collection *Collection
readPreference *ReadPreference
err error
m sync.Mutex
}
type ChangeStreamOptions struct {
// FullDocument controls the amount of data that the server will return when
// returning a changes document.
FullDocument string
// ResumeAfter specifies the logical starting point for the new change stream.
ResumeAfter *bson.Raw
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
// on new documents to satisfy a change stream query.
MaxAwaitTimeMS int64
// BatchSize specifies the number of documents to return per batch.
BatchSize int32
// Collation specifies the way the server should collate returned data.
Collation *Collation
}
// Watch constructs a new ChangeStream capable of receiving continuing data
// from the database.
func (coll *Collection) Watch(pipeline interface{},
options ChangeStreamOptions) (*ChangeStream, error) {
if pipeline == nil {
pipeline = []bson.M{}
}
pipe := constructChangeStreamPipeline(pipeline, options)
pIter := coll.Pipe(&pipe).Iter()
// check that there was no issue creating the iterator.
// this will fail immediately with an error from the server if running against
// a standalone.
if err := pIter.Err(); err != nil {
return nil, err
}
pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
collection: coll,
resumeToken: nil,
options: options,
pipeline: pipeline,
}, nil
}
// Next retrieves the next document from the change stream, blocking if necessary.
// Next returns true if a document was successfully unmarshalled into result,
// and false if an error occured. When Next returns false, the Err method should
// be called to check what error occurred during iteration.
//
// For example:
//
// pipeline := []bson.M{}
//
// changeStream := collection.Watch(pipeline, ChangeStreamOptions{})
// for changeStream.Next(&changeDoc) {
// fmt.Printf("Change: %v\n", changeDoc)
// }
//
// if err := changeStream.Close(); err != nil {
// return err
// }
//
// If the pipeline used removes the _id field from the result, Next will error
// because the _id field is needed to resume iteration when an error occurs.
//
func (changeStream *ChangeStream) Next(result interface{}) bool {
// the err field is being constantly overwritten and we don't want the user to
// attempt to read it at this point so we lock.
changeStream.m.Lock()
defer changeStream.m.Unlock()
// if we are in a state of error, then don't continue.
if changeStream.err != nil {
return false
}
if changeStream.isClosed {
changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream")
return false
}
var err error
// attempt to fetch the change stream result.
err = changeStream.fetchResultSet(result)
if err == nil {
return true
}
// check if the error is resumable
if !isResumableError(err) {
// error is not resumable, give up and return it to the user.
changeStream.err = err
return false
}
// try to resume.
err = changeStream.resume()
if err != nil {
// we've not been able to successfully resume and should only try once,
// so we give up.
changeStream.err = err
return false
}
// we've successfully resumed the changestream.
// try to fetch the next result.
err = changeStream.fetchResultSet(result)
if err != nil {
changeStream.err = err
return false
}
return true
}
// Err returns nil if no errors happened during iteration, or the actual
// error otherwise.
func (changeStream *ChangeStream) Err() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
return changeStream.err
}
// Close kills the server cursor used by the iterator, if any, and returns
// nil if no errors happened during iteration, or the actual error otherwise.
func (changeStream *ChangeStream) Close() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
changeStream.isClosed = true
err := changeStream.iter.Close()
if err != nil {
changeStream.err = err
}
return err
}
// ResumeToken returns a copy of the current resume token held by the change stream.
// This token should be treated as an opaque token that can be provided to instantiate
// a new change stream.
func (changeStream *ChangeStream) ResumeToken() *bson.Raw {
changeStream.m.Lock()
defer changeStream.m.Unlock()
if changeStream.resumeToken == nil {
return nil
}
var tokenCopy bson.Raw = *changeStream.resumeToken
return &tokenCopy
}
func constructChangeStreamPipeline(pipeline interface{},
options ChangeStreamOptions) interface{} {
pipelinev := reflect.ValueOf(pipeline)
// ensure that the pipeline passed in is a slice.
if pipelinev.Kind() != reflect.Slice {
panic("pipeline argument must be a slice")
}
// construct the options to be used by the change notification
// pipeline stage.
changeStreamStageOptions := bson.M{}
if options.FullDocument != "" {
changeStreamStageOptions["fullDocument"] = options.FullDocument
}
if options.ResumeAfter != nil {
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
}
changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
// insert the change notification pipeline stage at the beginning of the
// aggregation.
pipeOfInterfaces[0] = changeStreamStage
// convert the passed in slice to a slice of interfaces.
for i := 0; i < pipelinev.Len(); i++ {
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
}
var pipelineAsInterface interface{} = pipeOfInterfaces
return pipelineAsInterface
}
func (changeStream *ChangeStream) resume() error {
// copy the information for the new socket.
// Copy() destroys the sockets currently associated with this session
// so future uses will acquire a new socket against the newly selected DB.
newSession := changeStream.iter.session.Copy()
// fetch the cursor from the iterator and use it to run a killCursors
// on the connection.
cursorId := changeStream.iter.op.cursorId
err := runKillCursorsOnSession(newSession, cursorId)
if err != nil {
return err
}
// change out the old connection to the database with the new connection.
changeStream.collection.Database.Session = newSession
// make a new pipeline containing the resume token.
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, changeStream.options)
// generate the new iterator with the new connection.
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
changeStream.iter = newPipe.Iter()
changeStream.iter.isChangeStream = true
return nil
}
// fetchResumeToken unmarshals the _id field from the document, setting an error
// on the changeStream if it is unable to.
func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error {
changeStreamResult := struct {
ResumeToken *bson.Raw `bson:"_id,omitempty"`
}{}
err := rawResult.Unmarshal(&changeStreamResult)
if err != nil {
return err
}
if changeStreamResult.ResumeToken == nil {
return fmt.Errorf("resume token missing from result")
}
changeStream.resumeToken = changeStreamResult.ResumeToken
return nil
}
func (changeStream *ChangeStream) fetchResultSet(result interface{}) error {
rawResult := bson.Raw{}
// fetch the next set of documents from the cursor.
gotNext := changeStream.iter.Next(&rawResult)
err := changeStream.iter.Err()
if err != nil {
return err
}
if !gotNext && err == nil {
// If the iter.Err() method returns nil despite us not getting a next batch,
// it is becuase iter.Err() silences this case.
return ErrNotFound
}
// grab the resumeToken from the results
if err := changeStream.fetchResumeToken(&rawResult); err != nil {
return err
}
// put the raw results into the data structure the user provided.
if err := rawResult.Unmarshal(result); err != nil {
return err
}
return nil
}
func isResumableError(err error) bool {
_, isQueryError := err.(*QueryError)
// if it is not a database error OR it is a database error,
// but the error is a notMaster error
return !isQueryError || isNotMasterError(err)
}
func runKillCursorsOnSession(session *Session, cursorId int64) error {
socket, err := session.acquireSocket(true)
if err != nil {
return err
}
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
if err != nil {
return err
}
socket.Release()
return nil
}