-
Notifications
You must be signed in to change notification settings - Fork 5
/
kv.js
80 lines (64 loc) · 1.77 KB
/
kv.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
'use strict'
const clone = require('clone')
const nes = require('never-ending-stream')
const commands = require('./lib/commands')
module.exports = function (upring, opts, next) {
if (upring.kv) {
return next(new Error('kv property already exist'))
}
upring.kv = new UpRingKV(upring, opts)
next()
}
function UpRingKV (upring, opts) {
opts = opts || {}
this.upring = upring
this.closed = false
this.ns = opts.namespace || 'kv'
commands(this)
// expose the parent logger
this.log = this.upring.log
}
UpRingKV.prototype.put = function (key, value, cb) {
if (!this.upring.isReady) {
this.upring.once('up', this.put.bind(this, key, value, cb))
return
}
if (this.upring.allocatedToMe(key)) {
value = clone(value)
}
if (typeof cb === 'function') {
this.upring.request({ key, value, ns: this.ns, cmd: 'put' }, cb)
} else {
this.upring.requestp({ key, value, ns: this.ns, cmd: 'put' })
}
}
UpRingKV.prototype.get = function (key, cb) {
if (!this.upring.isReady) {
this.upring.once('up', this.get.bind(this, key, cb))
return
}
if (typeof cb === 'function') {
this.upring.request({ key, ns: this.ns, cmd: 'get' }, function (err, result) {
cb(err, result ? result.value : null)
})
} else {
return new Promise((resolve, reject) => {
this.upring.requestp({ key, ns: this.ns, cmd: 'get' })
.then(result => resolve(result.value))
.catch(err => reject(err))
})
}
}
UpRingKV.prototype.liveUpdates = function (key) {
const result = nes.obj((done) => {
this.upring.request({ key, ns: this.ns, cmd: 'liveUpdates' }, function (err, res) {
if (err) {
done(err)
return
}
result.emit('newStream')
done(null, res.streams.updates)
})
})
return result
}