diff --git a/index.js b/index.js index 1ee1a67..54be5a1 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,7 @@ const safetyCatch = require('safety-catch') const crypto = require('hypercore-crypto') const Hypercore = require('hypercore') const { BLOCK_NOT_AVAILABLE, BAD_ARGUMENT } = require('hypercore-errors') +const Monitor = require('./lib/monitor') const keyEncoding = new SubEncoder('files', 'utf-8') @@ -30,6 +31,7 @@ module.exports = class Hyperdrive extends ReadyResource { this.blobs = null this.supportsMetadata = true this.encryptionKey = opts.encryptionKey || null + this.monitors = new Set() this._active = opts.active !== false this._openingBlobs = null @@ -188,6 +190,8 @@ module.exports = class Hyperdrive extends ReadyResource { if (!this._checkout && !this._batching) { await this.corestore.close() } + + await this.closeMonitors() } async _openBlobsFromHeader (opts) { @@ -278,6 +282,18 @@ module.exports = class Hyperdrive extends ReadyResource { return this.blobs } + monitor (name, opts = {}) { + const monitor = new Monitor(this, { name, ...opts }) + this.monitors.add(monitor) + return monitor + } + + async closeMonitors () { + const closing = [] + for (const monitor of this.monitors) closing.push(monitor.close()) + await Promise.allSettled(closing) + } + async get (name, opts) { const node = await this.entry(name, opts) if (!node?.value.blob) return null diff --git a/lib/monitor.js b/lib/monitor.js new file mode 100644 index 0000000..c41d30d --- /dev/null +++ b/lib/monitor.js @@ -0,0 +1,111 @@ +const ReadyResource = require('ready-resource') +const safetyCatch = require('safety-catch') +const speedometer = require('speedometer') + +module.exports = class Monitor extends ReadyResource { + constructor (drive, opts = {}) { + super() + this.drive = drive + this.blobs = null + this.name = opts.name || null + this.entry = opts.entry || null + + this._boundOnUpload = this._onUpload.bind(this) + this._boundOnDownload = this._onDownload.bind(this) + + const stats = { + startTime: 0, + percentage: 0, + peers: 0, + speed: 0, + blocks: 0, + totalBytes: 0, // local + bytes loaded during monitoring + monitoringBytes: 0, // bytes loaded during monitoring + targetBytes: 0, + targetBlocks: 0 + } + + // Updated on each upload/download event + this.uploadStats = { ...stats } + this.downloadStats = { ...stats } + + this.uploadSpeedometer = null + this.downloadSpeedometer = null + + this.ready().catch(safetyCatch) + } + + async _open () { + await this.drive.ready() + this.blobs = await this.drive.getBlobs() + if (!this.entry && this.name) this.entry = await this.drive.entry(this.name) + if (this.entry) this._setEntryInfo() + + // Handlers + this.blobs.core.on('upload', this._boundOnUpload) + this.blobs.core.on('download', this._boundOnDownload) + } + + async _close () { + this.blobs.core.off('upload', this._boundOnUpload) + this.blobs.core.off('download', this._boundOnDownload) + this.drive.monitors.delete(this) + } + + _setEntryInfo () { + if (!this.downloadStats.targetBytes || !this.downloadStats.targetBlocks) { + this.downloadStats.targetBytes = this.entry.value.blob.byteLength + this.downloadStats.targetBlocks = this.entry.value.blob.blockLength + } + + if (!this.uploadStats.targetBytes || !this.uploadStats.targetBlocks) { + this.uploadStats.targetBytes = this.entry.value.blob.byteLength + this.uploadStats.targetBlocks = this.entry.value.blob.blockLength + } + } + + _onUpload (index, bytes, from) { + if (!this.uploadSpeedometer) this.uploadSpeedometer = speedometer() + this.uploadStats.speed = this.uploadSpeedometer(bytes) + this._updateStats(this.uploadStats, index, bytes, from) + } + + _onDownload (index, bytes, from) { + if (!this.downloadSpeedometer) this.downloadSpeedometer = speedometer() + this.downloadStats.speed = this.downloadSpeedometer(bytes) + this._updateStats(this.downloadStats, index, bytes, from) + } + + _updateStats (stats, index, bytes, from) { + if (!this.entry || this.closing) return + if (!isWithinRange(index, this.entry)) return + + if (!stats.startTime) stats.startTime = Date.now() + stats.peers = from.replicator.peers.length + stats.blocks++ + stats.monitoringBytes += bytes + stats.totalBytes += bytes + // NOTE: you should not rely on the percentage until the monitor is initialized with the local state of the file + stats.percentage = toFixed(stats.blocks / stats.targetBlocks * 100) + + this.emit('update') + } + + downloadSpeed () { + return this.downloadSpeedometer ? this.downloadSpeedometer() : 0 + } + + uploadSpeed () { + return this.uploadSpeedometer ? this.uploadSpeedometer() : 0 + } +} + +function isWithinRange (index, entry) { + if (!entry || !entry.value) return + const { blockOffset, blockLength } = entry.value.blob + return index >= blockOffset && index < blockOffset + blockLength +} + +function toFixed (n) { + return Math.round(n * 100) / 100 +} diff --git a/package.json b/package.json index 2791626..01e508a 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "mirror-drive": "^1.2.0", "ready-resource": "^1.0.0", "safety-catch": "^1.0.2", + "speedometer": "^1.1.0", "streamx": "^2.12.4", "sub-encoder": "^2.1.1", "unix-path-resolve": "^1.0.2" diff --git a/test.js b/test.js index 2fb7c4f..38bcfc0 100644 --- a/test.js +++ b/test.js @@ -1562,6 +1562,69 @@ test('drive.list (recursive false) ignore', async (t) => { t.alike(entries, expectedEntries) }) +test('upload/download can be monitored', async (t) => { + t.plan(27) + const { corestore, drive, swarm, mirror } = await testenv(t.teardown) + swarm.on('connection', (conn) => corestore.replicate(conn)) + swarm.join(drive.discoveryKey, { server: true, client: false }) + await swarm.flush() + + mirror.swarm.on('connection', (conn) => mirror.corestore.replicate(conn)) + mirror.swarm.join(drive.discoveryKey, { server: false, client: true }) + await mirror.swarm.flush() + + const file = '/example.md' + const bytes = 1024 * 100 // big enough to trigger more than one update event + const buffer = Buffer.alloc(bytes, '0') + await drive.put(file, buffer) + + { + // Start monitoring upload + const monitor = drive.monitor(file) + await monitor.ready() + t.is(monitor.name, file) + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + monitor.on('update', () => { + t.is(monitor.uploadStats.blocks, expectedBlocks.pop()) + t.is(monitor.uploadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.uploadStats.targetBlocks, 2) + t.is(monitor.uploadStats.targetBytes, bytes) + t.is(monitor.uploadSpeed(), monitor.uploadStats.speed) + if (!expectedBlocks.length) t.is(monitor.uploadStats.percentage, 100) + t.absent(monitor.downloadStats.blocks) + }) + } + + { + // Start monitoring download + const monitor = mirror.drive.monitor(file) + await monitor.ready() + const expectedBlocks = [2, 1] + const expectedBytes = [bytes, 65536] + monitor.on('update', () => { + t.is(monitor.downloadStats.blocks, expectedBlocks.pop()) + t.is(monitor.downloadStats.monitoringBytes, expectedBytes.pop()) + t.is(monitor.downloadStats.targetBlocks, 2) + t.is(monitor.downloadStats.targetBytes, bytes) + t.is(monitor.downloadSpeed(), monitor.downloadStats.speed) + if (!expectedBlocks.length) t.is(monitor.downloadStats.percentage, 100) + t.absent(monitor.uploadStats.blocks) + }) + } + + await mirror.drive.get(file) +}) + +test('monitor is removed from the Set on close', async (t) => { + const { drive } = await testenv(t.teardown) + const monitor = drive.monitor('/example.md') + await monitor.ready() + t.is(drive.monitors.size, 1) + await monitor.close() + t.is(drive.monitors.size, 0) +}) + async function testenv (teardown) { const corestore = new Corestore(RAM) await corestore.ready()