1
1
package sink
2
2
3
3
import (
4
- "context"
5
4
"strconv"
6
5
"time"
7
6
8
7
"os"
9
8
10
9
"fmt"
11
- "encoding/json"
12
10
13
11
log "github.com/sirupsen/logrus"
14
12
15
- mongo "github.com/mongodb/mongo-go-driver/mongo"
13
+ "github.com/globalsign/mgo"
14
+ "github.com/globalsign/mgo/bson"
16
15
)
17
16
18
17
// MongodbSink ...
19
18
type MongodbSink struct {
20
- conn * mongo. Client
19
+ session * mgo. Session
21
20
database string
22
21
collection string
23
22
workerCount int
@@ -32,7 +31,7 @@ func NewMongodb() (*MongodbSink, error) {
32
31
return nil , fmt .Errorf ("[sink/mongodb] Missing SINK_MONGODB_CONNECTION (example: mongodb://foo:bar@localhost:27017)" )
33
32
}
34
33
35
- database := os .Getenv ("SINK_MONGODB_DATABASE" )
34
+ database := os .Getenv ("SINK_MONGODB_DATABASE" )
36
35
if database == "" {
37
36
return nil , fmt .Errorf ("[sink/mongodb] Mising SINK_MONGODB_DATABASE" )
38
37
}
@@ -46,24 +45,24 @@ func NewMongodb() (*MongodbSink, error) {
46
45
if workerCountStr == "" {
47
46
workerCountStr = "1"
48
47
}
48
+
49
49
workerCount , err := strconv .Atoi (workerCountStr )
50
50
if err != nil {
51
51
return nil , fmt .Errorf ("Invalid SINK_MONGODB_WORKERS, must be an integer" )
52
52
}
53
53
54
- conn , err := mongo .NewClient (connStr )
55
54
if err != nil {
56
- return nil , fmt .Errorf ("[sink/mongodb] Invalid to connect to string: %s" , err )
55
+ return nil , fmt .Errorf ("Invalid SINK_MONGODB_WORKERS, must be an integer" )
57
56
}
58
57
59
- err = conn . Connect ( context . Background () )
58
+ session , err := mgo . Dial ( connStr )
60
59
if err != nil {
61
60
return nil , fmt .Errorf ("[sink/mongodb] failed to connect to string: %s" , err )
62
61
}
63
62
64
63
65
64
return & MongodbSink {
66
- conn : conn ,
65
+ session : session ,
67
66
database : database ,
68
67
collection : collection ,
69
68
workerCount : workerCount ,
@@ -103,7 +102,7 @@ func (s *MongodbSink) Stop() {
103
102
}
104
103
105
104
close (s .stopCh )
106
- defer s .conn . Disconnect ( context . Background () )
105
+ defer s .session . Close ( )
107
106
}
108
107
109
108
// Put ..
@@ -116,19 +115,20 @@ func (s *MongodbSink) Put(data []byte) error {
116
115
func (s * MongodbSink ) write (id int ) {
117
116
log .Infof ("[sink/mongodb/%d] Starting writer" , id )
118
117
119
- collection := s .conn . Database (s .database ).Collection (s .collection )
118
+ c := s .session . DB (s .database ).C (s .collection )
120
119
121
120
for {
122
121
select {
123
122
case data := <- s .putCh :
124
- m := make (map [string ]interface {})
125
- err := json .Unmarshal (data , & m )
126
-
127
- if err != nil {
123
+ var record bson.M
124
+ err := bson .UnmarshalJSON (data , & record )
125
+ if (err != nil ) {
128
126
log .Errorf ("[sink/mongodb/%d] %s" , id , err )
129
127
continue
130
128
}
131
- _ , err = collection .InsertOne (context .Background (), m )
129
+ idRecord := bson.M {}
130
+ update := bson.M {"$set" : record }
131
+ _ , err = c .Upsert (idRecord , update )
132
132
if err != nil {
133
133
log .Errorf ("[sink/mongodb/%d] %s" , id , err )
134
134
} else {
0 commit comments