-
Notifications
You must be signed in to change notification settings - Fork 9
/
itsalive.coffee
167 lines (143 loc) · 5.21 KB
/
itsalive.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# Turn a mongo db into a live-db
# you should probably redis-cli flushdb; before running this script
async = require 'async'
argv = require('optimist').argv
backend = require './backend'
livedb = require 'livedb'
LiveDbMongo = require 'livedb-mongo'
consoleError = (err) ->
return console.error err if err
# Taken from livedb-mongo/mongo.js
shallowClone = (object) ->
out = {}
for key of object
out[key] = object[key]
return out
castToDoc = (docName, data) ->
doc = if (typeof data.data == 'object' && data.data != null && !Array.isArray(data.data))
shallowClone(data.data)
else
_data: if data.data is undefined then null else data.data
doc._type = data.type || null
doc._v = data.v
doc._id = docName
return doc
castToSnapshot = (doc) ->
return if !doc
type = doc._type
v = doc._v
docName = doc._id
data = doc._data
if data is undefined
doc = shallowClone(doc)
delete doc._type
delete doc._v
delete doc._id
return {
data: doc
type: type
v: v
docName: docName
}
return {
data: data
type: type
v: v
docName: docName
}
exports = module.exports
exports.itsalive = (options = {}, callback) ->
#don't need to make these collections live
blacklist = ['system.indexes', 'system.profile', 'system.users', 'system.js', 'configs', 'sessions']
batch = options.b or argv.b
# Setup the db
mongo = backend.createMongo options
db = new LiveDbMongo mongo
redis = backend.createRedis options
redisObserver = backend.createRedis options
driver = livedb.redisDriver db, redis, redisObserver
ldbc = livedb.client
db: db
driver: driver
#redis: backend.createRedis options
#redisObserver: backend.createRedis options
ottypes = require('ottypes')
jsonType = ottypes.json0.uri
mongo.collections (err, cols) ->
return callback err if err
collections = []
cols.forEach (c) ->
console.log "collection", c.collectionName
if not ~blacklist.indexOf(c.collectionName) and not /_ops$/.test c.collectionName
collections.push c
# Each must be used for large datasets, toArray won't fit in memory: http://mongodb.github.io/node-mongodb-native/api-generated/cursor.html#each
async.each collections, (collection, cb) ->
#go through all of the docs in the collection and create an op log for it
cName = collection.collectionName
mongo.collection(cName + "_ops").ensureIndex {name: 1, v: 1}, false, consoleError
collection.count (err, count) ->
return cb err if err
console.log cName, "DOCS", count
counter = 0
cursor = collection.find()
cursor.batchSize(+batch) if batch
cursor.each (err, doc) ->
if err
console.log "uh oh", cName, err
return cb err
#we are done when we get a null doc
if !doc
cb(null, counter) if counter == count
return
doc._type ?= jsonType
snapshot = castToSnapshot doc
docName = snapshot.docName
handleError = (err, opts, ecb) ->
console.log "Error with #{cName} #{docName}"
console.error err
console.log "doc", JSON.stringify(doc)
console.log "opts", JSON.stringify(opts)
return ecb(err)
#console.log "doc", counter, cName, docName
db.getVersion cName, docName, (err, opVersion) ->
return handleError err, {fn: "getVersion"}, cb if err
if opVersion is 0
console.log "no ops", cName, docName, counter
# Create an operation that creates the document snapshot
opData = create: {type: jsonType, data: snapshot.data}, v:0
db.writeOp cName, docName, opData, (err) ->
return handleError err, {fn:"writeOp", opData}, cb if err
snapshot.v = 1
db.writeSnapshot cName, docName, snapshot, (err) ->
return handleError err, {fn:"writeSnapshot", snapshot}, cb if err
counter++
cb null, counter if counter is count
else if opVersion isnt doc._v
# replay oplog, which is the source of truth
console.log "diff ops", counter
db.writeSnapshot cName, docName, {v:0}, (err) ->
return handleError err, {fn:"writeSnapshot"}, cb if err
ldbc.fetch cName, docName, (err, snapshot) ->
return handleError err, {fn:"fetch", snapshot}, cb if err
db.writeSnapshot cName, docName, snapshot, (err) ->
return handleError err, {fn:"writeSnapshot", snapshot}, cb if err
counter++
cb null, counter if counter is count
else
console.log "ok", cName, docName, counter
counter++
cb null, counter if counter is count
, (err) ->
console.log("callback?", err)
return callback err if err
db.close()
callback()
#called directly from command line (not required as a module)
if require.main == module
exports.itsalive null, (err) ->
if err
console.log "ERROR! NOT FINISHED!"
console.log err
else
console.log "ALL DONE"
process.exit()