Skip to content

Commit

Permalink
Simplify monitor + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MKPLKN committed Sep 2, 2024
1 parent a6370d2 commit 49581f6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 21 deletions.
33 changes: 13 additions & 20 deletions lib/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ module.exports = class Monitor extends ReadyResource {
this.blobs = null
this.name = opts.name
this.entry = opts.entry
this.upload = opts.upload !== false && opts.download !== true
this.download = opts.download !== false && !this.upload

this._boundOnAppend = this._onAppend.bind(this)
this._boundOnUpload = this._onUpload.bind(this)
this._boundOnDownload = this._onDownload.bind(this)
this._boundUpdateStats = this._updateStats.bind(this)
this.drive.on('close', () => this.close())

// Updated on each upload/download event
this.stats = {
type: this.upload ? 'upload' : 'download',
startTime: 0,
percentage: 0,
speed: null,
Expand All @@ -34,14 +31,14 @@ module.exports = class Monitor extends ReadyResource {
if (this.entry) this._setEntryInfo()
// Handlers
this.blobs.core.on('append', this._boundOnAppend)
if (this.upload) this.blobs.core.on('upload', this._boundOnUpload)
if (this.download) this.blobs.core.on('download', this._boundOnDownload)
this.blobs.core.on('upload', this._boundUpdateStats)
this.blobs.core.on('download', this._boundUpdateStats)
}

async _close () {
this.blobs.core.off('append', this._boundOnAppend)
if (this.upload) this.blobs.core.off('upload', this._boundOnUpload)
if (this.download) this.blobs.core.off('download', this._boundOnDownload)
this.blobs.core.off('upload', this._boundUpdateStats)
this.blobs.core.off('download', this._boundUpdateStats)
}

async _onAppend () {
Expand All @@ -51,24 +48,14 @@ module.exports = class Monitor extends ReadyResource {
if (this.entry) this._setEntryInfo()
}

async _onUpload (index, bytes, from) {
this._updateStats(index, bytes)
this.emit('update', { stats: this.stats })
}

async _onDownload (index, bytes, from) {
this._updateStats(index, bytes)
this.emit('update', { stats: this.stats })
}

_setEntryInfo () {
if (this.stats.totalBytes || this.stats.totalBlocks) return
this.stats.totalBytes = this.entry.value.blob.byteLength
this.stats.totalBlocks = this.entry.value.blob.blockLength
}

_updateStats (index, bytes) {
if (!this.entry) return
if (!this.entry || this.closing) return
if (!isWithinRange(index, this.entry)) return
if (!this.stats.startTime) this.stats.startTime = Date.now()

Expand All @@ -79,6 +66,12 @@ module.exports = class Monitor extends ReadyResource {
if (timeElapsed > 0) {
this.stats.speed = Math.floor(this.stats.bytes / timeElapsed) // Speed in bytes/sec
}

this.emit('update')
if (this.stats.totalBytes === this.stats.bytes) {
this.emit('done')
this.close()
}
}
}

Expand Down
64 changes: 63 additions & 1 deletion test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1501,6 +1501,65 @@ test('get drive key without using the constructor', async (t) => {
t.is(key.toString('hex'), drive.key.toString('hex'))
})

test('upload/download can be monitored', async (t) => {
t.plan(23)
const { corestore, drive, swarm, mirror } = await testenv(t.teardown)
swarm.on('connection', (conn) => corestore.replicate(conn))
swarm.join(drive.discoveryKey, { server: true, client: false })
await swarm.flush()

mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn))
mirror.swarm.join(drive.discoveryKey, { server: false, client: true })
await mirror.swarm.flush()

const file = '/example.md'
const bytes = 1024 * 100 // big enough to trigger more than one update event
const buffer = Buffer.alloc(bytes, '0')

{
// Start monitoring upload
const monitor = drive.monitor(file)
await monitor.ready()
t.is(monitor.name, file)
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.bytes, expectedBytes.pop())
t.is(monitor.stats.totalBlocks, 2)
t.is(monitor.stats.totalBytes, bytes)
})
monitor.on('done', () => {
t.is(monitor.stats.blocks, 2)
t.is(monitor.stats.bytes, bytes)
})
monitor.on('close', () => t.pass('Monitor is closed after its done'))
}

await drive.put(file, buffer)

{
// Start monitoring download
const monitor = mirror.drive.monitor(file)
await monitor.ready()
const expectedBlocks = [2, 1]
const expectedBytes = [bytes, 65536]
monitor.on('update', () => {
t.is(monitor.stats.blocks, expectedBlocks.pop())
t.is(monitor.stats.bytes, expectedBytes.pop())
t.is(monitor.stats.totalBlocks, 2)
t.is(monitor.stats.totalBytes, bytes)
})
monitor.on('done', () => {
t.is(monitor.stats.blocks, 2)
t.is(monitor.stats.bytes, bytes)
})
monitor.on('close', () => t.pass('Monitor is closed after its done'))
}

await mirror.drive.get(file)
})

async function testenv (teardown) {
const corestore = new Corestore(RAM)
await corestore.ready()
Expand Down Expand Up @@ -1573,7 +1632,10 @@ function eventFlush () {
}

async function replicate (drive, swarm, mirror) {
swarm.on('connection', (conn) => drive.corestore.replicate(conn))
swarm.on('connection', (conn) => {
drive.corestore.replicate(conn)
console.log('on con!')
})
const discovery = swarm.join(drive.discoveryKey, { server: true, client: false })
await discovery.flushed()

Expand Down

0 comments on commit 49581f6

Please sign in to comment.