-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
126 lines (104 loc) · 4.23 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
const Union = require('sorted-union-stream')
const b4a = require('b4a')
const codecs = require('codecs')
function getKey (diffEntry) {
const { left, right } = diffEntry
return left ? left.key : right.key
}
function areEqual (diff1, diff2) {
if (diff1 === null && diff2 === null) return true
if (diff1 === null || diff2 === null) return false
return b4a.equals(diff1.value, diff2.value)
}
function unionCompare (e1, e2) {
const k1 = getKey(e1)
const k2 = getKey(e2)
return b4a.compare(k1, k2)
}
function decodeEntry (diffEntry, keyEncoding, valueEncoding) {
if (!diffEntry) return diffEntry
if (keyEncoding) diffEntry.key = keyEncoding.decode(diffEntry.key)
if (valueEncoding && diffEntry.value !== null) diffEntry.value = valueEncoding.decode(diffEntry.value)
return diffEntry
}
function createUnionMap (keyEncoding, valueEncoding) {
const decode = diffEntry => decodeEntry(diffEntry, keyEncoding, valueEncoding)
const filterSameValue = ({ left, right }) => {
// Diffs are also yielded when the value is the same, but the sequence
// is not. This filters out that case.
if (left?.value === right?.value) return null
return { left, right }
}
return function unionMap (undoDiffEntry, applyDiffEntry) {
if (undoDiffEntry === null) {
return filterSameValue({
left: decode(applyDiffEntry.left),
right: decode(applyDiffEntry.right)
}
)
}
if (applyDiffEntry === null) {
// requires undoing, so reverse
return filterSameValue({
left: decode(undoDiffEntry.right),
right: decode(undoDiffEntry.left)
})
}
const haveSameNewValue = areEqual(undoDiffEntry.left, applyDiffEntry.left)
if (!haveSameNewValue) {
// apply-entry wins, but the previous state (.right) is not the value
// at the last indexedLength, since a diffEntry to undo exists for the same key
// So we yield that to-undo diffEntry's final state as previous state for this change
return filterSameValue({
left: decode(applyDiffEntry.left),
right: decode(undoDiffEntry.left)
})
}
// else: already processed in prev getDiffs, so filter out
return null
}
}
function encodeKey (enc, key) {
return key ? (enc ? enc.encode(key) : key) : null
}
class BeeDiffStream extends Union {
constructor (leftSnapshot, rightSnapshot, opts = {}) {
const valueEncoding = opts.valueEncoding ? codecs(opts.valueEncoding) : leftSnapshot.valueEncoding
const keyEncoding = opts.keyEncoding ? codecs(opts.keyEncoding) : leftSnapshot.keyEncoding
const gt = encodeKey(keyEncoding, opts.gt)
const gte = encodeKey(keyEncoding, opts.gte)
const lt = encodeKey(keyEncoding, opts.lt)
const lte = encodeKey(keyEncoding, opts.lte)
// Binary encodings for easier comparison later
opts = { ...opts, gt, gte, lt, lte, valueEncoding: 'binary', keyEncoding: 'binary' }
if (leftSnapshot.core.indexedLength === undefined) {
throw new Error('Incompatible Hypercore version--must have indexedLength property')
}
// We know that everything indexed in both snapshots is shared
const sharedIndexedL = Math.min(
leftSnapshot.core.indexedLength, rightSnapshot.core.indexedLength
)
// TODO: consider optimisation for case where the version of both streams
// is lower than the sharedIndexedL (in which case only the changes from
// the oldest version to the newest must be calculated, on the newest stream)
// --currently it redundantly calcs diffStreams for both and filters out the
// shared entries
const toUndoDiffStream = leftSnapshot.createDiffStream(sharedIndexedL, opts)
const toApplyDiffStream = rightSnapshot.createDiffStream(sharedIndexedL, opts)
super(toUndoDiffStream, toApplyDiffStream, {
compare: unionCompare,
map: createUnionMap(keyEncoding, valueEncoding)
})
this.closeSnapshots = !(opts.closeSnapshots === false)
this._leftSnapshot = leftSnapshot
this._rightSnapshot = rightSnapshot
}
_destroy (cb) {
super._destroy((err) => {
if (!this.closeSnapshots) return cb(err)
const onclose = () => cb(err)
Promise.all([this._leftSnapshot.close(), this._rightSnapshot.close()]).then(onclose, cb)
})
}
}
module.exports = BeeDiffStream