Skip to content

Commit

Permalink
feat: allow control over uploading to peers
Browse files Browse the repository at this point in the history
Partial fix to holepunchto#305
  • Loading branch information
gmaclennan committed Mar 27, 2024
1 parent 0a6efc5 commit c9a23c6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ module.exports = class Hypercore extends EventEmitter {

this.replicator = new Replicator(this.core, this.key, {
eagerUpgrade: true,
defaultUploading: opts.defaultUploading,
notDownloadingLinger: opts.notDownloadingLinger,
allowFork: opts.allowFork !== false,
inflightRange: opts.inflightRange,
Expand Down
25 changes: 24 additions & 1 deletion lib/replicator.js
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,17 @@ class Peer {
this.lastExtensionSent = ''
this.lastExtensionRecv = ''

this.uploading = this.replicator.defaultUploading

replicator._ifAvailable++
}

setUploading (uploading) {
if (uploading === this.uploading) return
this.uploading = uploading
this.sendSync()
}

get remoteContiguousLength () {
return this.remoteBitfield.findFirst(false, this._remoteContiguousLength)
}
Expand Down Expand Up @@ -418,7 +426,7 @@ class Peer {
length: this.core.tree.length,
remoteLength: this.core.tree.fork === this.remoteFork ? this.remoteLength : 0,
canUpgrade: this.canUpgrade,
uploading: true,
uploading: this.uploading,
downloading: this.replicator.isDownloading(),
hasManifest: !!this.core.header.manifest && this.core.compat === false
})
Expand Down Expand Up @@ -646,6 +654,12 @@ class Peer {
return
}

// Respond to manifest requests even if not uploading
if (!this.uploading && proof.block) {
this.wireNoData.send({ request: msg.id })
return
}

if (proof.block !== null) {
this.replicator.onupload(proof.block.index, proof.block.value, this)
}
Expand Down Expand Up @@ -772,6 +786,7 @@ class Peer {
}

onwant ({ start, length }) {
if (!this.uploading) return
this.replicator._onwant(this, start, length)
}

Expand Down Expand Up @@ -1042,6 +1057,9 @@ class Peer {
return false
}

// Send maybeWant before checking this?
if (!this.remoteUploading) return false

if (!this._hasTreeParent(b.index)) {
return false
}
Expand All @@ -1058,6 +1076,7 @@ class Peer {
}

_requestRangeBlock (index, length) {
if (!this.remoteUploading) return false
if (this.core.bitfield.get(index) === true || !this._hasTreeParent(index)) return false

const b = this.replicator._blocks.add(index, PRIORITY.NORMAL)
Expand Down Expand Up @@ -1093,6 +1112,7 @@ class Peer {
}

_requestRange (r) {
if (!this.remoteUploading) return false
const { length, fork } = this.core.tree

if (r.blocks) {
Expand Down Expand Up @@ -1152,6 +1172,7 @@ class Peer {
}

_requestForkRange (f) {
if (!this.remoteUploading) return false
if (f.fork !== this.remoteFork || f.batch.want === null) return false

const end = Math.min(f.batch.want.end, this.remoteLength)
Expand Down Expand Up @@ -1243,6 +1264,7 @@ module.exports = class Replicator {

constructor (core, key, {
notDownloadingLinger = NOT_DOWNLOADING_SLACK,
defaultUploading = true,
eagerUpgrade = true,
allowFork = true,
inflightRange = null,
Expand All @@ -1264,6 +1286,7 @@ module.exports = class Replicator {
this.findingPeers = 0 // updateable from the outside
this.destroyed = false
this.downloading = false
this.defaultUploading = defaultUploading // should peers default to uploading
this.activeSessions = 0

this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT
Expand Down
67 changes: 67 additions & 0 deletions test/replicate.js
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,73 @@ test.skip('can disable downloading from a peer', async function (t) {
t.is(cUploads, a.length)
})

test('can disable uploading to a peer', async function (t) {
const a = await create({ defaultUploading: false })

await a.append(['a', 'b', 'c', 'd', 'e'])

const b = await create(a.key, { valueEncoding: 'utf-8' })
const c = await create(a.key, { valueEncoding: 'utf-8' })

const [bStream] = replicate(a, b, t)
const [cStream] = replicate(a, c, t)
replicate(b, c, t)

await new Promise(resolve => a.on('peer-add', function onAdd () {
if (a.peers.length < 2) return
a.off('peer-add', onAdd)
resolve()
}))

let aUploadsToB = 0
let aUploadsToC = 0

a.on('upload', function (index, block, peer) {
if (peer.remotePublicKey.equals(bPeer.remotePublicKey)) {
aUploadsToB++
} else {
aUploadsToC++
}
})

await Promise.all([
b.update({ wait: true }),
c.update({ wait: true })
])

t.is(a.length, b.length)
t.is(a.length, c.length)

const bPeer = a.peers[0].stream.rawStream === bStream
? a.peers[0]
: a.peers[1]
const cPeer = a.peers[0].stream.rawStream === cStream
? a.peers[0]
: a.peers[1]

const cRange = c.download({ start: 0, end: a.length })
const bRange = b.download({ start: 0, end: a.length })

await new Promise(resolve => setTimeout(resolve, 200))

t.is(b.contiguousLength, 0)
t.is(c.contiguousLength, 0)
t.is(aUploadsToB, 0)
t.is(aUploadsToC, 0)

cPeer.setUploading(true)

await Promise.all([
bRange.done(),
cRange.done()
])

t.is(b.contiguousLength, 5)
t.is(c.contiguousLength, 5)
t.is(aUploadsToB, 0)
t.is(aUploadsToC, 5)
})

test('contiguous length', async function (t) {
const a = await create()

Expand Down

0 comments on commit c9a23c6

Please sign in to comment.