-
Notifications
You must be signed in to change notification settings - Fork 10
/
partitions.js
86 lines (70 loc) · 2.44 KB
/
partitions.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
module.exports = Partitions
function Partitions(cache){
def(this, 'cache', cache)
cache
.on('commit', change => {
this.append(change)
if (cache.peers.me)
cache.peers.multicast(change.buffer, cache.peers.constants.commands.commit)
})
}
// get the latest offset of a particular partition
Partitions.prototype.head = function(id){
return this[id] && this[id].head && this[id].head.ptime || 0
}
// get the partition a change belongs to
Partitions.prototype.lookup = function(change){
return change.key && (change.key.split ? change.key.split('.')[0] : change.key || '')
}
// get offsets for all partitions, relative to base
Partitions.prototype.heads = function(){
return keys(this)
.sort()
.map(id => ({ id, head: this.head(id) }))
.reduce(to.obj, {})
}
// total number of partitions
Partitions.prototype.size = function(){
return keys(this).length
}
// get change records relative to base
Partitions.prototype.diff = function(base){
return keys(this)
.filter(id => (!(id in base)) || this.head(id) > base[id].head)
.map(id => this[id].slice(id in base ? base[id].head : 0))
.reduce(flatten, [])
.map(key(['type', 'key', 'value', 'ptime'])) // TODO proper serialise
}
Partitions.prototype.append = function(change){
// NOTE: not sure if this is overloading add too much
if (change.type == 'add' && !change.key) {
const id = nextID(this.cache)
if (is.lit(change.value)) change.value.id = id
if (change instanceof Message) {
change.buffer = (new Change(change.type, id, change.value)).buffer
change._key = change._value = change._type = undefined
} else {
change.key = id
}
}
const partition = this.lookup(change)
// create partition if doesn't exist
if (!this[partition])
this[partition] = new Partition(partition, this.cache.peers)
// TODO: inline?
if (!set(change, true)(this.cache))
return false
// append change
if (!this[partition].append(change))
return false //deb('append failed', change)
if (!change.replay)
this.cache.emit('change', change)
return true
}
const deb = require('./deb')('par'.bgRed.bold)
, { last } = require('./utils')
, { def, set, is, clone, to, keys, flatten, key, str, az } = require('utilise/pure')
, Partition = require('./partition')
, Change = require('./messages/change')
, Message = require('./messages/message')
, nextID = cache => ((+keys(cache).sort(az()).pop() + 1) || 1)