Skip to content

Commit 3faa016

Browse files
committed
mongodb: define which field is used as mongo _id
1 parent 593253b commit 3faa016

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ The `redis` sink is configured using `$SINK_REDIS_URL` (`redis://[user]:[passwor
8888

8989
The `kafka` sink is configured using `$SINK_KAFKA_BROKERS` (`kafka1:9092,kafka2:9092,kafka3:9092`), and `$SINK_KAFKA_TOPIC` environment variables.
9090

91-
The `mongo` sink is configured using `$SINK_MONGODB_CONNECTION` (`mongodb://localhost:27017/`), `$SINK_MONGODB_DATABASE` and `$SINK_MONGODB_COLLECTION` environment variables.
91+
The `mongo` sink is configured using `$SINK_MONGODB_CONNECTION` (`mongodb://localhost:27017/`), `$SINK_MONGODB_DATABASE`, `$SINK_MONGODB_COLLECTION` and `$SINK_MONGODB_ID` environment variables.
9292

9393
The `http` sink is configured using `$SINK_HTTP_ADDRESS` (`localhost:8080/allocations`)` environment variable.
9494

sink/mongodb.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ type MongodbSink struct {
1919
session *mgo.Session
2020
database string
2121
collection string
22+
idField string
2223
workerCount int
24+
timestamps bool
2325
stopCh chan interface{}
2426
putCh chan []byte
2527
}
@@ -41,6 +43,8 @@ func NewMongodb() (*MongodbSink, error) {
4143
return nil, fmt.Errorf("[sink/mongodb] Missing SINK_MONGODB_COLLECTION")
4244
}
4345

46+
idField := os.Getenv("SINK_MONGODB_ID")
47+
4448
workerCountStr := os.Getenv("SINK_MONGODB_WORKERS")
4549
if workerCountStr == "" {
4650
workerCountStr = "1"
@@ -65,6 +69,7 @@ func NewMongodb() (*MongodbSink, error) {
6569
session: session,
6670
database: database,
6771
collection: collection,
72+
idField: idField,
6873
workerCount: workerCount,
6974
stopCh: make(chan interface{}),
7075
putCh: make(chan []byte, 1000),
@@ -126,9 +131,18 @@ func (s *MongodbSink) write(id int) {
126131
log.Errorf("[sink/mongodb/%d] %s", id, err)
127132
continue
128133
}
129-
idRecord := bson.M{}
130-
update := bson.M{"$set": record}
131-
_, err = c.Upsert(idRecord, update)
134+
135+
if (s.idField != "") {
136+
id := record[s.idField].(string)
137+
if (id == "") {
138+
log.Errorf("[sink/mongodb/%d] missing id field $s", id, s.idField)
139+
continue
140+
}
141+
_, err = c.UpsertId(bson.ObjectIdHex(id), update)
142+
} else {
143+
err = c.Insert(update)
144+
}
145+
132146
if err != nil {
133147
log.Errorf("[sink/mongodb/%d] %s", id, err)
134148
} else {

0 commit comments

Comments
 (0)