diff --git a/index.js b/index.js index dd1ff12e..101c2d3a 100644 --- a/index.js +++ b/index.js @@ -402,6 +402,7 @@ module.exports = class Hypercore extends EventEmitter { this.replicator = new Replicator(this.core, this.key, { eagerUpgrade: true, + defaultUploading: opts.defaultUploading, notDownloadingLinger: opts.notDownloadingLinger, allowFork: opts.allowFork !== false, inflightRange: opts.inflightRange, diff --git a/lib/replicator.js b/lib/replicator.js index 7b1fcd9e..91dbc0fb 100644 --- a/lib/replicator.js +++ b/lib/replicator.js @@ -358,9 +358,17 @@ class Peer { this.lastExtensionSent = '' this.lastExtensionRecv = '' + this.uploading = this.replicator.defaultUploading + replicator._ifAvailable++ } + setUploading (uploading) { + if (uploading === this.uploading) return + this.uploading = uploading + this.sendSync() + } + get remoteContiguousLength () { return this.remoteBitfield.findFirst(false, this._remoteContiguousLength) } @@ -418,7 +426,7 @@ class Peer { length: this.core.tree.length, remoteLength: this.core.tree.fork === this.remoteFork ? this.remoteLength : 0, canUpgrade: this.canUpgrade, - uploading: true, + uploading: this.uploading, downloading: this.replicator.isDownloading(), hasManifest: !!this.core.header.manifest && this.core.compat === false }) @@ -646,6 +654,12 @@ class Peer { return } + // Respond to manifest requests even if not uploading + if (!this.uploading && proof.block) { + this.wireNoData.send({ request: msg.id }) + return + } + if (proof.block !== null) { this.replicator.onupload(proof.block.index, proof.block.value, this) } @@ -772,6 +786,7 @@ class Peer { } onwant ({ start, length }) { + if (!this.uploading) return this.replicator._onwant(this, start, length) } @@ -1042,6 +1057,9 @@ class Peer { return false } + // Send maybeWant before checking this? + if (!this.remoteUploading) return false + if (!this._hasTreeParent(b.index)) { return false } @@ -1058,6 +1076,7 @@ class Peer { } _requestRangeBlock (index, length) { + if (!this.remoteUploading) return false if (this.core.bitfield.get(index) === true || !this._hasTreeParent(index)) return false const b = this.replicator._blocks.add(index, PRIORITY.NORMAL) @@ -1093,6 +1112,7 @@ class Peer { } _requestRange (r) { + if (!this.remoteUploading) return false const { length, fork } = this.core.tree if (r.blocks) { @@ -1152,6 +1172,7 @@ class Peer { } _requestForkRange (f) { + if (!this.remoteUploading) return false if (f.fork !== this.remoteFork || f.batch.want === null) return false const end = Math.min(f.batch.want.end, this.remoteLength) @@ -1243,6 +1264,7 @@ module.exports = class Replicator { constructor (core, key, { notDownloadingLinger = NOT_DOWNLOADING_SLACK, + defaultUploading = true, eagerUpgrade = true, allowFork = true, inflightRange = null, @@ -1264,6 +1286,7 @@ module.exports = class Replicator { this.findingPeers = 0 // updateable from the outside this.destroyed = false this.downloading = false + this.defaultUploading = defaultUploading // should peers default to uploading this.activeSessions = 0 this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT diff --git a/test/replicate.js b/test/replicate.js index b37d767d..4163a069 100644 --- a/test/replicate.js +++ b/test/replicate.js @@ -745,6 +745,73 @@ test.skip('can disable downloading from a peer', async function (t) { t.is(cUploads, a.length) }) +test('can disable uploading to a peer', async function (t) { + const a = await create({ defaultUploading: false }) + + await a.append(['a', 'b', 'c', 'd', 'e']) + + const b = await create(a.key, { valueEncoding: 'utf-8' }) + const c = await create(a.key, { valueEncoding: 'utf-8' }) + + const [bStream] = replicate(a, b, t) + const [cStream] = replicate(a, c, t) + replicate(b, c, t) + + await new Promise(resolve => a.on('peer-add', function onAdd () { + if (a.peers.length < 2) return + a.off('peer-add', onAdd) + resolve() + })) + + let aUploadsToB = 0 + let aUploadsToC = 0 + + a.on('upload', function (index, block, peer) { + if (peer.remotePublicKey.equals(bPeer.remotePublicKey)) { + aUploadsToB++ + } else { + aUploadsToC++ + } + }) + + await Promise.all([ + b.update({ wait: true }), + c.update({ wait: true }) + ]) + + t.is(a.length, b.length) + t.is(a.length, c.length) + + const bPeer = a.peers[0].stream.rawStream === bStream + ? a.peers[0] + : a.peers[1] + const cPeer = a.peers[0].stream.rawStream === cStream + ? a.peers[0] + : a.peers[1] + + const cRange = c.download({ start: 0, end: a.length }) + const bRange = b.download({ start: 0, end: a.length }) + + await new Promise(resolve => setTimeout(resolve, 200)) + + t.is(b.contiguousLength, 0) + t.is(c.contiguousLength, 0) + t.is(aUploadsToB, 0) + t.is(aUploadsToC, 0) + + cPeer.setUploading(true) + + await Promise.all([ + bRange.done(), + cRange.done() + ]) + + t.is(b.contiguousLength, 5) + t.is(c.contiguousLength, 5) + t.is(aUploadsToB, 0) + t.is(aUploadsToC, 5) +}) + test('contiguous length', async function (t) { const a = await create()