diff --git a/index.js b/index.js index 25c7fcf7..f21db034 100644 --- a/index.js +++ b/index.js @@ -77,9 +77,13 @@ class Hypercore extends EventEmitter { this._snapshot = null this._findingPeers = 0 this._active = opts.active !== false + this._stateIndex = -1 // maintained by session state + this._monitorIndex = -1 // maintained by replication state this.opening = this._open(storage, key, opts) this.opening.catch(safetyCatch) + + this.on('newListener', maybeAddMonitor) } [inspect] (depth, opts) { @@ -251,16 +255,18 @@ class Hypercore extends EventEmitter { if (key === null) key = opts.key || null if (this.core === null) initOnce(this, storage, key, opts) - - this.core.addSession(this) + if (this._monitorIndex === -2) this.core.addMonitor(this) try { await this._openSession(key, opts) } catch (err) { - this.core.removeSession(this) - if (this.core.autoClose && this.core.sessions.length === 0) await this.core.close() + if (this.core.autoClose && this.core.hasSession() === false) await this.core.close() if (this.exclusive) this.core.unlockExclusive() - this.emit('close', this.sessions.length === 0) + + this.core.removeMonitor(this) + if (this.state !== null) this.state.removeSession(this) + + this.emit('close', this.core.hasSession() === false) throw err } @@ -312,6 +318,8 @@ class Hypercore extends EventEmitter { this.state = this.core.state.ref() } + this.state.addSession(this) + if (opts.userData) { const batch = this.state.storage.createWriteBatch() for (const [key, value] of Object.entries(opts.userData)) { @@ -325,6 +333,7 @@ class Hypercore extends EventEmitter { } this.core.replicator.updateActivity(this._active ? 1 : 0) + this.opened = true } @@ -336,8 +345,7 @@ class Hypercore extends EventEmitter { return { length: this.state.tree.length, byteLength: this.state.tree.byteLength, - fork: this.state.tree.fork, - compatLength: this.state.tree.length + fork: this.state.tree.fork } } @@ -364,7 +372,8 @@ class Hypercore extends EventEmitter { if (this.opened === false) await this.opening if (this.closed === true) return - this.core.removeSession(this) + this.core.removeMonitor(this) + this.state.removeSession(this) this.readable = false this.writable = false @@ -386,7 +395,7 @@ class Hypercore extends EventEmitter { if (this.exclusive) this.core.unlockExclusive() - if (this.core.sessions.length) { + if (this.core.hasSession()) { // emit "fake" close as this is a session this.closed = true this.emit('close', false) @@ -499,8 +508,9 @@ class Hypercore extends EventEmitter { return this.opened === false ? null : this.core.globalCache } + // deprecated get sessions () { - return this.opened === false ? [] : this.core.sessions + return this.opened === false ? [] : this.core.allSessions() } ready () { @@ -627,6 +637,7 @@ class Hypercore extends EventEmitter { // Copy the block as it might be shared with other sessions. block = b4a.from(block) + if (this.encryption.compat !== this.core.compat) this._updateEncryption() this.encryption.decrypt(index, block) } @@ -670,10 +681,10 @@ class Hypercore extends EventEmitter { // snapshot should check if core has block if (this._snapshot !== null) { - checkSnapshot(this._snapshot, index) + checkSnapshot(this, index) const coreBlock = await readBlock(this.core.state.storage.createReadBatch(), index) - checkSnapshot(this._snapshot, index) + checkSnapshot(this, index) if (coreBlock !== null) return coreBlock } @@ -697,7 +708,7 @@ class Hypercore extends EventEmitter { if (timeout) req.context.setTimeout(req, timeout) const replicatedBlock = await req.promise - if (this._snapshot !== null) checkSnapshot(this._snapshot, index) + if (this._snapshot !== null) checkSnapshot(this, index) return maybeUnslab(replicatedBlock) } @@ -851,6 +862,7 @@ class Hypercore extends EventEmitter { } this.extensions.set(name, ext) + this.core.addMonitor(this) for (const peer of this.peers) { peer.extensions.set(name, ext) } @@ -889,6 +901,12 @@ class Hypercore extends EventEmitter { } return block } + + _updateEncryption () { + const e = this.encryption + this.encryption = new BlockEncryption(e.key, this.key, { compat: this.core.compat, isBlockKey: e.isBlockKey }) + if (e === this.core.encryption) this.core.encryption = this.encryption + } } module.exports = Hypercore @@ -905,6 +923,8 @@ function preappend (blocks) { const offset = this.state.tree.length const fork = this.state.tree.fork + if (this.encryption.compat !== this.core.compat) this._updateEncryption() + for (let i = 0; i < blocks.length; i++) { this.encryption.encrypt(offset + i, blocks[i], fork) } @@ -920,7 +940,7 @@ function maybeUnslab (block) { } function checkSnapshot (snapshot, index) { - if (index >= snapshot.compatLength) throw SNAPSHOT_NOT_AVAILABLE() + if (index >= snapshot.state.snapshotCompatLength) throw SNAPSHOT_NOT_AVAILABLE() } function readBlock (reader, index) { @@ -947,3 +967,14 @@ function initOnce (session, storage, key, opts) { globalCache: opts.globalCache || null // session is a temp option, not to be relied on unless you know what you are doing (no semver guarantees) }) } + +function maybeAddMonitor (name) { + if (name === 'append' || name === 'truncate') return + if (this._monitorIndex >= 0 || this.closing) return + + if (this.core === null) { + this._monitorIndex = -2 + } else { + this.core.addMonitor(this) + } +} diff --git a/lib/big-header.js b/lib/big-header.js deleted file mode 100644 index 9bf15162..00000000 --- a/lib/big-header.js +++ /dev/null @@ -1,55 +0,0 @@ -const c = require('compact-encoding') -const { oplog } = require('./messages') - -module.exports = class BigHeader { - constructor (storage) { - this.storage = storage - } - - async load (external) { - const buf = await new Promise((resolve, reject) => { - this.storage.read(external.start, external.length, (err, buf) => { - if (err) return reject(err) - resolve(buf) - }) - }) - - const header = c.decode(oplog.header, buf) - header.external = external - return header - } - - async flush (header) { - const external = header.external || { start: 0, length: 0 } - header.external = null - - const buf = c.encode(oplog.header, header) - - let start = 0 - if (buf.byteLength > external.start) { - start = external.start + external.length - const rem = start & 4095 - if (rem > 0) start += (4096 - rem) - } - - header.external = { start, length: buf.byteLength } - - await new Promise((resolve, reject) => { - this.storage.write(start, buf, (err) => { - if (err) return reject(err) - resolve() - }) - }) - - return header - } - - close () { - return new Promise((resolve, reject) => { - this.storage.close((err) => { - if (err) return reject(err) - resolve() - }) - }) - } -} diff --git a/lib/copy-prologue.js b/lib/copy-prologue.js index 9b61122d..9ebc0afc 100644 --- a/lib/copy-prologue.js +++ b/lib/copy-prologue.js @@ -178,9 +178,14 @@ async function flushBatch (prologue, src, dst, batch) { } function signalReplicator (core, upgraded, start, length) { - const status = upgraded ? 0b0011 : 0b0010 - const bitfield = { drop: false, start, length } - core._onupdate({ status, bitfield, value: null, from: null }) + if (upgraded) { + core.replicator.cork() + core.replicator.onhave(start, length, false) + core.replicator.onupgrade() + core.replicator.uncork() + } else { + core.replicator.onhave(start, length, false) + } } function prologueToTree (prologue) { diff --git a/lib/core.js b/lib/core.js index 3a4131c6..ea2443f5 100644 --- a/lib/core.js +++ b/lib/core.js @@ -16,11 +16,9 @@ const { BAD_ARGUMENT, STORAGE_EMPTY, STORAGE_CONFLICT, INVALID_OPERATION, INVALI const Verifier = require('./verifier') const audit = require('./audit') const copyPrologue = require('./copy-prologue') -const BlockEncryption = require('./block-encryption') const Replicator = require('./replicator') const HEAD = Symbol.for('head') -const CORE = Symbol.for('core') const CONTIG = Symbol.for('contig') const TREE = Symbol.for('tree') const BITFIELD = Symbol.for('bitfield') @@ -85,21 +83,13 @@ class Update { this.updates.push({ type: DEPENDENCY, update: dependency }) } - coreUpdate (update) { - let { bitfield, status, value, from } = update + updateContig (bitfield) { + const contig = updateContigBatch(this.contiguousLength, bitfield, this.bitfield) - if (bitfield) { - const contig = updateContigBatch(this.contiguousLength, bitfield, this.bitfield) - - status |= contig.status - - if (contig.length > this.contiguousLength || (bitfield.drop && contig.length < this.contiguousLength)) { - this.contiguousLength = contig.length - this._coreUpdates.push({ type: CONTIG, update: contig.length }) - } + if (contig.length > this.contiguousLength || (bitfield.drop && contig.length < this.contiguousLength)) { + this.contiguousLength = contig.length + this._coreUpdates.push({ type: CONTIG, update: contig.length }) } - - this._coreUpdates.push({ type: CORE, update: { status, bitfield, value, from } }) } async flush () { @@ -130,19 +120,19 @@ class Update { this.bitfield.setRange(batch.ancestors, batch.treeLength, false) this.batch.deleteBlockRange(bitfield.start, bitfield.start + bitfield.length) - const status = (batch.length > batch.ancestors) ? 0b0011 : 0b0010 - this.flushTreeBatch(batch) - this.coreUpdate({ status, bitfield, value: null, from }) + this.updateContig(bitfield) } } class SessionState { constructor (core, storage, blocks, tree, bitfield, snapshotLength, name) { this.core = core + this.index = this.core.sessionStates.push(this) - 1 this.storage = storage this.name = name + this.sessions = [] this.mutex = new Mutex() @@ -150,6 +140,7 @@ class SessionState { this.tree = tree this.bitfield = bitfield this.snapshotLength = snapshotLength + this.snapshotCompatLength = snapshotLength this.active = 0 @@ -158,17 +149,28 @@ class SessionState { this._activeBatch = null } - get isSnapshot () { + isSnapshot () { return this.storage.snapshotted } - get isDefault () { + isDefault () { return this.core.state === this } + addSession (s) { + s._stateIndex = this.sessions.push(s) - 1 + } + + removeSession (s) { + if (s._stateIndex === -1) return + const head = this.sessions.pop() + if (head !== s) this.sessions[(head._stateIndex = s._stateIndex)] = head + s._stateIndex = -1 + } + flushedLength () { - if (this.isDefault) return this.tree.length - if (this.isSnapshot) return this.snapshotLength + if (this.isDefault()) return this.tree.length + if (this.isSnapshot()) return this.snapshotLength return this.storage.dependencyLength() } @@ -184,9 +186,16 @@ class SessionState { } destroy () { + if (this.index === -1) return + this.active = 0 this.storage.destroy() this.mutex.destroy(new Error('Closed')).catch(noop) + + const head = this.core.sessionStates.pop() + if (head !== this) this.core.sessionStates[(head.index = this.index)] = head + + this.index = -1 } snapshot () { @@ -219,7 +228,7 @@ class SessionState { } _updateDependencies (dependency) { - assert(!this.isDefault, 'Default state should have no dependencies') + assert(!this.isDefault(), 'Default state should have no dependencies') const deps = this.storage.dependencies @@ -293,7 +302,7 @@ class SessionState { for (const { type, update } of u.updates) { switch (type) { case TREE: // tree - if (!this.isDefault) this.tree.onupdate(update) + if (!this.isDefault()) this.tree.onupdate(update) break case BITFIELD: // bitfield @@ -306,7 +315,7 @@ class SessionState { } } - if (!this.isDefault) return + if (!this.isDefault()) return this.core._processUpdates(u.updates) } @@ -329,7 +338,7 @@ class SessionState { throw INVALID_OPERATION('Truncation breaks prologue') } - if (!keyPair && this.isDefault) keyPair = this.core.header.keyPair + if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair await this.mutex.lock() @@ -349,6 +358,8 @@ class SessionState { if (batch.length < this.flushedLength()) update.updateDependency(batch.length) await this.flushUpdate(update) + + this.ontruncate(length, batch.treeLength, fork) } finally { this._unlock() } @@ -378,18 +389,22 @@ class SessionState { if (start === -1 || start >= this.tree.length) return this.blocks.clear(update.batch, start, end - start) - update.coreUpdate({ status: 0, bitfield, value: null, from: null }) + update.updateContig(bitfield) if (start < this.flushedLength()) update.updateDependency(start) await this.flushUpdate(update) + + if (this.isDefault()) { + this.core.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + } } finally { this._unlock() } } async append (values, { signature, keyPair, preappend } = {}) { - if (!keyPair && this.isDefault) keyPair = this.core.header.keyPair + if (!keyPair && this.isDefault()) keyPair = this.core.header.keyPair await this.mutex.lock() @@ -428,21 +443,54 @@ class SessionState { length: values.length } - update.coreUpdate({ - bitfield, - status: 0b0001, - value: null, - from: null - }) + update.updateContig(bitfield) await this.flushUpdate(update) + this.onappend(bitfield) return { length: batch.length, byteLength: batch.byteLength } } finally { this._unlock() } } + onappend (bitfield) { + if (this.isDefault()) { + if (bitfield) { + this.core.replicator.cork() + this.core.replicator.onupgrade() + this.core.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + this.core.replicator.uncork() + } else { + this.core.replicator.onupgrade() + } + } + + for (let i = this.sessions.length - 1; i >= 0; i--) { + this.sessions[i].emit('append') + } + } + + ontruncate (to, from, fork) { + if (this.isDefault()) { + const length = from - to + + this.core.replicator.cork() + this.core.replicator.ontruncate(to, length) + this.core.replicator.onhave(to, length, true) + this.core.replicator.onupgrade() + this.core.replicator.uncork() + + for (const sessionState of this.core.sessionStates) { + if (to < sessionState.snapshotCompatLength) sessionState.snapshotCompatLength = to + } + } + + for (let i = this.sessions.length - 1; i >= 0; i--) { + this.sessions[i].emit('truncate', to, fork) + } + } + async _overwrite (source, length, treeLength, signature) { const blockPromises = [] const treePromises = [] @@ -536,7 +584,7 @@ class SessionState { } async overwrite (state, { length = state.tree.length, treeLength = state.flushedLength() } = {}) { - assert(!this.isDefault, 'Cannot overwrite signed state') // TODO: make this check better + assert(!this.isDefault(), 'Cannot overwrite signed state') // TODO: make this check better await this.mutex.lock() @@ -560,6 +608,8 @@ module.exports = class Core { this.db = db this.storage = null this.replicator = new Replicator(this, opts) + this.sessionStates = [] + this.monitors = [] this.id = opts.key ? z32.encode(opts.key) : null this.key = opts.key || null @@ -580,7 +630,6 @@ module.exports = class Core { this.updating = false this.closed = false this.skipBitfield = null - this.sessions = [] this.globalCache = opts.globalCache || null this.autoClose = opts.autoClose !== false this.encryption = null @@ -606,19 +655,35 @@ module.exports = class Core { return this.opening } - addSession (session) { - this.sessions.push(session) + addMonitor (s) { + s._monitorIndex = this.monitors.push(s) - 1 } - removeSession (session) { - const index = this.sessions.indexOf(session) - if (index === -1) return - this.sessions.splice(index, 1) - this.checkIfIdle() + removeMonitor (s) { + if (s._monitorIndex < 0) return + const head = this.monitors.pop() + if (head !== s) this.monitors[(head._monitorIndex = s._monitorIndex)] = head + s._monitorIndex = -1 + } + + emitManifest () { + for (let i = this.monitors.length - 1; i >= 0; i--) { + this.monitors[i].emit('manifest') + } + } + + allSessions () { + const sessions = [] + for (const state of this.sessionStates) sessions.push(...state.sessions) + return sessions + } + + hasSession () { + return this.sessionStates.length > 1 || (this.state !== null && this.state.sessions.length !== 0) } checkIfIdle () { - if (this.sessions.length === 0 && this.replicator.idle() && this.state !== null && this.state.mutex.idle()) this.onidle() + if (this.hasSession() === false && this.replicator.idle() && this.state !== null && this.state.mutex.idle()) this.onidle() } async lockExclusive () { @@ -865,6 +930,7 @@ module.exports = class Core { this._setManifest(update, Verifier.createManifest(manifest), null) await this.state.flushUpdate(update) + this.replicator.onupgrade() } } finally { this.state._unlock() @@ -887,7 +953,8 @@ module.exports = class Core { this.verifier = verifier this._manifestFlushed = false - update.coreUpdate({ status: 0b10000, bitfield: null, value: null, from: null }) + this.replicator.onupgrade() + this.emitManifest() } async copyPrologue (src) { @@ -936,11 +1003,6 @@ module.exports = class Core { break } - case CORE: { // core - this._onupdate(update) - break - } - case CONTIG: { // contig this.header.hints.contiguousLength = update break @@ -1040,15 +1102,12 @@ module.exports = class Core { // update in memory bitfield this.bitfield.setRange(treeLength, length, true) - let status = 0 - const bitfield = { start: treeLength, length: length - treeLength, drop: false } - if (this.header.tree.length < tree.length || treeLength < this.header.tree.length) { - status = 0b0001 this.header.tree = tree } - this._onupdate({ status, bitfield, value: null, from: null }) + const bitfield = { start: treeLength, length: length - treeLength, drop: false } + this.state.onappend(bitfield) return { length: this.tree.length, @@ -1067,25 +1126,6 @@ module.exports = class Core { } } - // async purge () { - // return new Promise((resolve, reject) => { - // let missing = 4 - // let error = null - - // this.oplog.storage.unlink(done) - // this.tree.storage.unlink(done) - // this.bitfield.storage.unlink(done) - // this.blocks.storage.unlink(done) - - // function done (err) { - // if (err) error = err - // if (--missing) return - // if (error) reject(error) - // else resolve() - // } - // }) - // } - _verifyBatchUpgrade (update, batch, manifest) { if (!this.header.manifest) { if (!manifest && this.compat) manifest = Verifier.defaultSignerManifest(this.header.key) @@ -1122,12 +1162,15 @@ module.exports = class Core { if (bitfield) { update.bitfield.setRange(bitfield.start, bitfield.start + 1, true) + update.updateContig(bitfield) } - update.coreUpdate({ status: 0b0001, bitfield, value, from }) update.flushTreeBatch(batch) await this.state.flushUpdate(update) + + if (batch.upgraded) this.state.onappend(bitfield) + else if (bitfield) this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) } finally { this.state._clearActiveBatch() this.updating = false @@ -1158,7 +1201,7 @@ module.exports = class Core { } for (let i = 0; i < verifies.length; i++) { - const { batch, bitfield, value, manifest, from } = verifies[i] + const { batch, bitfield, manifest } = verifies[i] if (!batch.commitable()) { verifies[i] = null // signal that we cannot commit this one @@ -1175,11 +1218,17 @@ module.exports = class Core { this._setManifest(update, manifest, null) } - update.coreUpdate({ status: 0, bitfield, value, from }) + if (bitfield) update.updateContig(bitfield) + update.flushTreeBatch(batch) } await this.state.flushUpdate(update) + + for (let i = 0; i < verifies.length; i++) { + const bitfield = verifies[i] && verifies[i].bitfield + if (bitfield) this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) + } } finally { this.state._clearActiveBatch() this.state.mutex.unlock() @@ -1238,7 +1287,6 @@ module.exports = class Core { batch, bitfield: value && { drop: false, start: proof.block.index, length: 1 }, value, - status: 0, manifest: proof.manifest, from } @@ -1273,6 +1321,8 @@ module.exports = class Core { await update.truncate(batch, from) await this.state.flushUpdate(update) + + this.state.ontruncate(batch.ancestors, batch.treeLength, batch.fork) } finally { this.state._clearActiveBatch() this.truncating-- @@ -1302,71 +1352,13 @@ module.exports = class Core { return this.closing } - // session management - should be moved to some session manager next - _onupdate ({ status, bitfield, value, from }) { - if (this.sessions.length === 0 || this.replicator === null) return - - if (status !== 0) { - const truncated = (status & 0b0010) !== 0 - const appended = (status & 0b0001) !== 0 - - if (truncated) { - this.replicator.ontruncate(bitfield.start, bitfield.length) - } - - if ((status & 0b10011) !== 0) { - this.replicator.onupgrade() - } - - if (status & 0b10000) { - for (let i = 0; i < this.sessions.length; i++) { - const s = this.sessions[i] - - if (s.encryption && s.encryption.compat !== this.compat) { - s.encryption = this.encryption = new BlockEncryption(s.encryption.key, this.key, { compat: this.compat, isBlockKey: s.encryption.isBlockKey }) - } - } - - for (let i = 0; i < this.sessions.length; i++) { - this.sessions[i].emit('manifest') - } - } - - for (let i = 0; i < this.sessions.length; i++) { - const s = this.sessions[i] - - if (truncated) { - // If snapshotted, make sure to update our compat so we can fail gets - if (s._snapshot && bitfield.start < s._snapshot.compatLength) s._snapshot.compatLength = bitfield.start - } - - if (truncated) { - s.emit('truncate', bitfield.start, this.tree.fork) - } - - if (appended) { - s.emit('append') - } - } - } - - if (bitfield) { - this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop) - } - - if (value) { - const byteLength = value.byteLength - this.padding - - for (let i = 0; i < this.sessions.length; i++) { - this.sessions[i].emit('download', bitfield.start, byteLength, from) - } - } - } - async _onconflict (proof, from) { await this.replicator.onconflict(from) - for (const s of this.sessions) s.emit('conflict', proof.upgrade.length, proof.fork, proof) + for (let i = this.monitors.length - 1; i >= 0; i--) { + const s = this.monitors[i] + s.emit('conflict', proof.upgrade.length, proof.fork, proof) + } const err = new Error('Two conflicting signatures exist for length ' + proof.upgrade.length) await this.closeAllSessions(err) @@ -1375,7 +1367,7 @@ module.exports = class Core { async closeAllSessions (err) { // this.sessions modifies itself when a session closes // This way we ensure we indeed iterate over all sessions - const sessions = [...this.sessions] + const sessions = this.allSessions() const all = [] for (const s of sessions) all.push(s.close({ error: err, force: false })) // force false or else infinite recursion @@ -1389,7 +1381,7 @@ module.exports = class Core { async _close () { if (this.opened === false) await this.opening - if (this.sessions.length !== 0) throw new Error('Cannot close while sessions are open') + if (this.hasSession() === true) throw new Error('Cannot close while sessions are open') if (this.replicator) await this.replicator.close() @@ -1419,20 +1411,17 @@ function updateContigBatch (start, upd, bitfield) { if (c === start) { return { - status: 0b0000, length: null } } if (c > start) { return { - status: 0b0100, length: c } } return { - status: 0b1000, length: c } } diff --git a/lib/replicator.js b/lib/replicator.js index ae7a63c5..2b5f34b6 100644 --- a/lib/replicator.js +++ b/lib/replicator.js @@ -780,7 +780,7 @@ class Peer { } if (proof.block !== null) { - this.replicator._onupload(proof.block.index, proof.block.value, this) + this.replicator._onupload(proof.block.index, proof.block.value.byteLength, this) } this.wireData.send({ @@ -2063,6 +2063,7 @@ module.exports = class Replicator { _ondata (peer, req, data) { if (data.block !== null) { this._resolveBlockRequest(this._blocks, data.block.index, data.block.value, req) + this._ondownload(data.block.index, data.block.value.byteLength, peer) } if (data.hash !== null && (data.hash.index & 1) === 0) { @@ -2424,9 +2425,9 @@ module.exports = class Replicator { _onpeerupdate (added, peer) { const name = added ? 'peer-add' : 'peer-remove' - const sessions = this.core.sessions + const sessions = this.core.monitors - for (let i = 0; i < sessions.length; i++) { + for (let i = sessions.length - 1; i >= 0; i--) { sessions[i].emit(name, peer) if (added) { @@ -2437,18 +2438,26 @@ module.exports = class Replicator { } } - _onupload (index, value, from) { - const padding = this.core.encryption ? this.core.encryption.padding : 0 - const byteLength = value.byteLength - padding - const sessions = this.core.sessions + _ondownload (index, byteLength, from) { + const sessions = this.core.monitors - for (let i = 0; i < sessions.length; i++) { - sessions[i].emit('upload', index, byteLength, from) + for (let i = sessions.length - 1; i >= 0; i--) { + const s = sessions[i] + s.emit('download', index, byteLength - s.padding, from) + } + } + + _onupload (index, byteLength, from) { + const sessions = this.core.monitors + + for (let i = sessions.length - 1; i >= 0; i--) { + const s = sessions[i] + s.emit('upload', index, byteLength - s.padding, from) } } _oninvalid (err, req, res, from) { - const sessions = this.core.sessions + const sessions = this.core.monitors for (let i = 0; i < sessions.length; i++) { sessions[i].emit('verification-error', err, req, res, from) diff --git a/test/sessions.js b/test/sessions.js index e43891c4..4678a6ed 100644 --- a/test/sessions.js +++ b/test/sessions.js @@ -57,23 +57,6 @@ test('sessions - custom valueEncoding on session', async function (t) { await core1.close() }) -test('session on a from instance, pre-ready', async function (t) { - const a = await create(t) - - const b = new Hypercore({ core: a.core }) - const c = b.session() - - await a.ready() - await b.ready() - await c.ready() - - t.is(a.sessions, b.sessions) - t.is(a.sessions, c.sessions) - - await b.close() - await c.close() -}) - test('session on a from instance does not inject itself to other sessions', async function (t) { const a = await create(t, { })