@@ -19,7 +19,9 @@ type MongodbSink struct {
19
19
session * mgo.Session
20
20
database string
21
21
collection string
22
+ idField string
22
23
workerCount int
24
+ timestamps bool
23
25
stopCh chan interface {}
24
26
putCh chan []byte
25
27
}
@@ -41,6 +43,8 @@ func NewMongodb() (*MongodbSink, error) {
41
43
return nil , fmt .Errorf ("[sink/mongodb] Missing SINK_MONGODB_COLLECTION" )
42
44
}
43
45
46
+ idField := os .Getenv ("SINK_MONGODB_ID" )
47
+
44
48
workerCountStr := os .Getenv ("SINK_MONGODB_WORKERS" )
45
49
if workerCountStr == "" {
46
50
workerCountStr = "1"
@@ -65,6 +69,7 @@ func NewMongodb() (*MongodbSink, error) {
65
69
session : session ,
66
70
database : database ,
67
71
collection : collection ,
72
+ idField : idField ,
68
73
workerCount : workerCount ,
69
74
stopCh : make (chan interface {}),
70
75
putCh : make (chan []byte , 1000 ),
@@ -126,9 +131,18 @@ func (s *MongodbSink) write(id int) {
126
131
log .Errorf ("[sink/mongodb/%d] %s" , id , err )
127
132
continue
128
133
}
129
- idRecord := bson.M {}
130
- update := bson.M {"$set" : record }
131
- _ , err = c .Upsert (idRecord , update )
134
+
135
+ if (s .idField != "" ) {
136
+ id := record [s .idField ].(string )
137
+ if (id == "" ) {
138
+ log .Errorf ("[sink/mongodb/%d] missing id field $s" , id , s .idField )
139
+ continue
140
+ }
141
+ _ , err = c .UpsertId (bson .ObjectIdHex (id ), update )
142
+ } else {
143
+ err = c .Insert (update )
144
+ }
145
+
132
146
if err != nil {
133
147
log .Errorf ("[sink/mongodb/%d] %s" , id , err )
134
148
} else {
0 commit comments