Skip to content

Commit

Permalink
use clear to optimise bitfields
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Nov 8, 2023
1 parent 0ff08c1 commit b7a0d70
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 62 deletions.
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ module.exports = class Hypercore extends EventEmitter {
const appended = (status & 0b0001) !== 0

if (truncated) {
this.replicator.ontruncate(bitfield.start)
this.replicator.ontruncate(bitfield.start, bitfield.length)
}

if ((status & 0b10011) !== 0) {
Expand Down
8 changes: 8 additions & 0 deletions lib/bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ module.exports = class Bitfield {
}
}

getBitfield (index, length) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE

const p = this._pages.get(i)
return p || null
}

get (index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE
Expand Down
13 changes: 13 additions & 0 deletions lib/remote-bitfield.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RemoteBitfieldPage {

insert (start, bitfield) {
this.bitfield.set(bitfield, start / 32)
this.segment.refresh()
}

clear (start, bitfield) {
Expand All @@ -70,6 +71,10 @@ class RemoteBitfieldSegment {
return this.tree.chunks
}

refresh () {
this.tree = quickbit.Index.from(this.tree.chunks, BYTES_PER_SEGMENT)
}

add (page) {
this.pages[page.index - this.index * PAGES_PER_SEGMENT] = page

Expand Down Expand Up @@ -142,6 +147,14 @@ module.exports = class RemoteBitfield {
this._segments = new BigSparseArray()
}

getBitfield (index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE

const p = this._pages.get(i)
return p || null
}

get (index) {
const j = index & (BITS_PER_PAGE - 1)
const i = (index - j) / BITS_PER_PAGE
Expand Down
177 changes: 116 additions & 61 deletions lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ class Peer {

this.remoteOpened = false
this.remoteBitfield = new RemoteBitfield()
this.skipList = new RemoteBitfield()
this.missingBlocks = new RemoteBitfield()

this.remoteFork = 0
this.remoteLength = 0
Expand Down Expand Up @@ -350,7 +350,9 @@ class Peer {
}

broadcastRange (start, length, drop) {
if (drop) this.skipList.setRange(start, length, false)
if (drop) this._unclearLocalRange(start, length)
else this._clearLocalRange(start, length)

this.wireRange.send({
drop,
start,
Expand Down Expand Up @@ -699,6 +701,7 @@ class Peer {
// might be a fork, verify
this._checkIfConflict()
}

this.replicator._onnodata(this, req)
this.replicator.oninvalid(err, req, data, this)
return
Expand Down Expand Up @@ -732,25 +735,76 @@ class Peer {
}

onbitfield ({ start, bitfield }) {
if (this.remoteBitfield.insert(start, bitfield)) {
this.skipList.setRange(start, bitfield.byteLength * 8, false)
this._update()
this.remoteBitfield.insert(start, bitfield)
this.missingBlocks.insert(start, bitfield)
this._clearLocalRange(start, bitfield.byteLength * 8)
this._update()
}

_clearLocalRange (start, length) {
if (length === 1) {
this.missingBlocks.set(start, false)
return
}

const contig = Math.min(this.core.tree.length, this.core.header.hints.contiguousLength)

if (start < contig) {
const delta = contig - start
this.missingBlocks.setRange(start, delta)
start = contig
length -= delta
}

if ((start & 31) > 0) start -= (start & 31)

const end = start + Math.min(length, this.core.tree.length)
while (start < end) {
const local = this.core.bitfield.getBitfield(start)

if (local && local.bitfield) {
this.missingBlocks.clear(start, local.bitfield)
}

start += 32768
}
}

_unclearLocalRange (start, length) {
if (length === 1) {
this.missingBlocks.set(start, this.remoteBitfield.get(start))
return
}

if ((start & 31) > 0) start -= (start & 31)

const end = start + Math.min(length, this.remoteLength)
while (start < end) {
const remote = this.remoteBitfield.getBitfield(start)
if (remote && remote.bitfield) {
this.missingBlocks.insert(start, remote.bitfield)
}

start += 2097152
}

this._clearLocalRange(start, length)
}

onrange ({ drop, start, length }) {
const has = drop === false

if (length === 1) {
this.remoteBitfield.setRange(start, length, has)
this.skipList.set(start, false)
this.missingBlocks.set(start, has && !this.core.bitfield.get(start))
} else {
const rangeStart = this.remoteBitfield.findFirst(!has, start)
const rangeLength = length - (rangeStart - start)

if (rangeLength > 0) {
this.remoteBitfield.setRange(rangeStart, rangeLength, has)
this.skipList.setRange(rangeStart, rangeLength, false)
this.missingBlocks.setRange(rangeStart, rangeLength, has)
if (has) this._clearLocalRange(rangeStart, rangeLength)
}
}

Expand Down Expand Up @@ -942,74 +996,73 @@ class Peer {
return true
}

_requestRange (r) {
const { length, fork } = this.core.tree
_requestRangeBlock (index, length) {
if (this.core.bitfield.get(index) === true || !this._hasTreeParent(index)) return false

const end = Math.min(r.end === -1 ? this.remoteLength : r.end, this.remoteLength)
if (end < r.start || fork !== this.remoteFork) return false
const b = this.replicator._blocks.add(index, PRIORITY.NORMAL)
if (b.inflight.length > 0) return false

const len = end - r.start
const off = r.start + (r.linear ? 0 : Math.floor(Math.random() * len))
const req = this._makeRequest(index >= length, b.priority)

// TODO: we should weight this to request blocks < .length first
// as they are "cheaper" and will trigger an auto upgrade if possible
// If no blocks < .length is avaible then try the "needs upgrade" range
// If the request cannot be satisfied, dealloc the block request if no one is subscribed to it
if (req === null) {
b.gc()
return false
}

let wrapped = 0
req.block = { index, nodes: 0 }

for (let i = 0; i < len && wrapped < 2; i++) {
let index = off + i
if (index >= end) index -= len
b.inflight.push(req)
this._send(req)

if (r.blocks !== null) {
index = r.blocks[index]
} else { // TODO: make this loop better (something like a for loop that skips with the skip list)
index = this.skipList.findFirst(false, index)
if (index === -1 || index >= end) {
wrapped++
index = this.skipList.findFirst(false, r.start)
if (index === -1 || index >= end) {
return false
}
}
}
// Don't think this will ever happen, as the pending queue is drained before the range queue
// but doesn't hurt to check this explicitly here also.
if (b.queued) b.queued = false
return true
}

if (this.remoteBitfield.get(index) === false || this.core.bitfield.get(index) === true) {
this.skipList.set(index, true)
continue
}
_requestRange (r) {
const { length, fork } = this.core.tree

if (!this._hasTreeParent(index)) {
continue
if (r.blocks) {
let min = -1
let max = -1
for (let i = r.start; i < r.end; i++) {
const index = r.blocks[i]
if (min === -1 || index < min) min = index
if (max === -1 || index > max) max = index
if (this.missingBlocks.get(index) === true && this._requestRangeBlock(index, length)) return true
}
if (min > -1) this._maybeWant(min, max - min)
return false
}

const b = this.replicator._blocks.add(index, PRIORITY.NORMAL)
const end = Math.min(r.end === -1 ? this.remoteLength : r.end, this.remoteLength)
if (end < r.start || fork !== this.remoteFork) return false

if (b.inflight.length > 0) {
this.skipList.set(index, true)
continue
}
const len = end - r.start
const off = r.start + (r.linear ? 0 : Math.floor(Math.random() * len))

const req = this._makeRequest(index >= length, b.priority)
let i = off

// If the request cannot be satisfied, dealloc the block request if no one is subscribed to it
if (req === null) {
b.gc()
return false
}
while (true) {
i = this.missingBlocks.findFirst(true, i)

this.skipList.set(index, true)
if (i === -1 || i >= end) break

req.block = { index, nodes: 0 }
if (this._requestRangeBlock(i, length)) return true
i++
}

b.inflight.push(req)
this._send(req)
i = r.start

// Don't think this will ever happen, as the pending queue is drained before the range queue
// but doesn't hurt to check this explicitly here also.
if (b.queued) b.queued = false
while (true) {
i = this.missingBlocks.findFirst(true, i)

return true
if (i === -1 || i >= off) break

if (this.core.bitfield.get(i) === false && this._hasTreeParent(i) && this._requestRangeBlock(i, length)) return true
i++
}

this._maybeWant(r.start, len)
Expand Down Expand Up @@ -1197,7 +1250,7 @@ module.exports = class Replicator {
}

// Called externally when a truncation upgrade has been processed
ontruncate (newLength) {
ontruncate (newLength, truncated) {
const notify = []

for (const blk of this._blocks) {
Expand All @@ -1211,6 +1264,8 @@ module.exports = class Replicator {
blk.detach(r, SNAPSHOT_NOT_AVAILABLE())
}
}

for (const peer of this.peers) peer._unclearLocalRange(newLength, truncated)
}

// Called externally when a upgrade has been processed
Expand Down Expand Up @@ -1526,9 +1581,9 @@ module.exports = class Replicator {

if (b === null || removeInflight(b.inflight, req) === false) return

if (isBlock && this.core.bitfield.get(index) === false) {
for (const peer of this.peers) peer.skipList.set(index, false)
}
// if (isBlock && this.core.bitfield.get(index) === false) {
// for (const peer of this.peers) peer.skipList.set(index, false)
// }

if (b.refs.length > 0 && isBlock === true) {
this._queueBlock(b)
Expand Down
1 change: 1 addition & 0 deletions test/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,7 @@ test('sparse replication without gossiping', async function (t) {
t.teardown(() => unreplicate(s))

await c.download({ blocks: [4, 6] }).done()

t.pass('resolved')
})

Expand Down

0 comments on commit b7a0d70

Please sign in to comment.