From 87fa8c726851c20912108f9d9b12eb2c8e93eb1d Mon Sep 17 00:00:00 2001 From: kumavis Date: Tue, 30 Apr 2019 02:34:51 +0800 Subject: [PATCH 1/2] refactor(async): refactor just internal private methods --- package.json | 2 + src/private.js | 601 ++++++++++++++++++++++++++----------------------- 2 files changed, 327 insertions(+), 276 deletions(-) diff --git a/package.json b/package.json index 48864ee7..02a4cdb6 100644 --- a/package.json +++ b/package.json @@ -55,7 +55,9 @@ "multihashing-async": "~0.5.2", "peer-id": "~0.12.2", "peer-info": "~0.15.1", + "pify": "^4.0.1", "priorityqueue": "~0.2.1", + "promise-to-callback": "^1.0.0", "protons": "^1.0.1", "pull-length-prefixed": "^1.3.2", "pull-stream": "^3.6.9", diff --git a/src/private.js b/src/private.js index 70f72cb1..8f61548e 100644 --- a/src/private.js +++ b/src/private.js @@ -3,9 +3,10 @@ const PeerId = require('peer-id') const libp2pRecord = require('libp2p-record') const waterfall = require('async/waterfall') -const each = require('async/each') const timeout = require('async/timeout') const PeerInfo = require('peer-info') +const pify = require('pify') +const promiseToCallback = require('promise-to-callback') const errcode = require('err-code') @@ -28,24 +29,18 @@ module.exports = (dht) => ({ * @private */ _nearestPeersToQuery (msg, callback) { - utils.convertBuffer(msg.key, (err, key) => { - if (err) { - return callback(err) - } - let ids - try { - ids = dht.routingTable.closestPeers(key, dht.ncp) - } catch (err) { - return callback(err) - } + promiseToCallback(this._nearestPeersToQueryAsync(msg))(callback) + }, - callback(null, ids.map((p) => { - if (dht.peerBook.has(p)) { - return dht.peerBook.get(p) - } else { - return dht.peerBook.put(new PeerInfo(p)) - } - })) + async _nearestPeersToQueryAsync (msg) { + const key = await pify(utils.convertBuffer)(msg.key) + + const ids = dht.routingTable.closestPeers(key, dht.ncp) + return ids.map((p) => { + if (dht.peerBook.has(p)) { + return dht.peerBook.get(p) + } + return dht.peerBook.put(new PeerInfo(p)) }) }, /** @@ -58,31 +53,31 @@ module.exports = (dht) => ({ * @returns {undefined} * @private */ + _betterPeersToQuery (msg, peer, callback) { - dht._log('betterPeersToQuery') - dht._nearestPeersToQuery(msg, (err, closer) => { - if (err) { - return callback(err) - } + promiseToCallback(this._betterPeersToQueryAsync(msg, peer))(callback) + }, - const filtered = closer.filter((closer) => { - if (dht._isSelf(closer.id)) { - // Should bail, not sure - dht._log.error('trying to return self as closer') - return false - } + async _betterPeersToQueryAsync (msg, peer) { + dht._log('betterPeersToQuery') + const closer = await dht._nearestPeersToQueryAsync(msg) - return !closer.id.isEqual(peer.id) - }) + return closer.filter((closer) => { + if (dht._isSelf(closer.id)) { + // Should bail, not sure + dht._log.error('trying to return self as closer') + return false + } - callback(null, filtered) + return !closer.id.isEqual(peer.id) }) }, + /** * Try to fetch a given record by from the local datastore. * Returns the record iff it is still valid, meaning * - it was either authored by this node, or - * - it was receceived less than `MAX_RECORD_AGE` ago. + * - it was received less than `MAX_RECORD_AGE` ago. * * @param {Buffer} key * @param {function(Error, Record)} callback @@ -90,51 +85,43 @@ module.exports = (dht) => ({ * *@private */ + _checkLocalDatastore (key, callback) { + promiseToCallback(this._checkLocalDatastoreAsync(key))(callback) + }, + + async _checkLocalDatastoreAsync (key) { dht._log('checkLocalDatastore: %b', key) const dsKey = utils.bufferToKey(key) - // 2. fetch value from ds - dht.datastore.has(dsKey, (err, exists) => { - if (err) { - return callback(err) - } - if (!exists) { - return callback() + // Fetch value from ds + let rawRecord + try { + rawRecord = await pify(cb => dht.datastore.get(dsKey, cb))() + } catch (err) { + if (err.code === 'ERR_NOT_FOUND') { + return undefined } - - dht.datastore.get(dsKey, (err, res) => { - if (err) { - return callback(err) - } - - const rawRecord = res - - // 4. create record from the returned bytes - let record - try { - record = Record.deserialize(rawRecord) - } catch (err) { - return callback(err) - } - - if (!record) { - return callback(errcode(new Error('Invalid record'), 'ERR_INVALID_RECORD')) - } - - // 5. check validity - - // compare recvtime with maxrecordage - if (record.timeReceived == null || - utils.now() - record.timeReceived > c.MAX_RECORD_AGE) { - // 6. if: record is bad delete it and return - return dht.datastore.delete(dsKey, callback) - } - - // else: return good record - callback(null, record) - }) - }) + throw err + } + + // Create record from the returned bytes + const record = Record.deserialize(rawRecord) + + if (!record) { + throw errcode('Invalid record', 'ERR_INVALID_RECORD') + } + + // Check validity: compare time received with max record age + if (record.timeReceived == null || + utils.now() - record.timeReceived > c.MAX_RECORD_AGE) { + // If record is bad delete it and return + await pify(cb => dht.datastore.delete(dsKey, cb))() + return undefined + } + + // Record is valid + return record }, /** * Add the peer to the routing table and update it in the peerbook. @@ -145,9 +132,15 @@ module.exports = (dht) => ({ * * @private */ + _add (peer, callback) { + promiseToCallback(this._addAsync(peer))(err => callback(err)) + }, + + async _addAsync (peer) { peer = dht.peerBook.put(peer) - dht.routingTable.add(peer.id, callback) + await pify(cb => dht.routingTable.add(peer.id, cb))() + return undefined }, /** * Verify a record without searching the DHT. @@ -158,64 +151,78 @@ module.exports = (dht) => ({ * * @private */ + _verifyRecordLocally (record, callback) { + promiseToCallback(this._verifyRecordLocallyAsync(record))(err => callback(err)) + }, + + async _verifyRecordLocallyAsync (record) { dht._log('verifyRecordLocally') - libp2pRecord.validator.verifyRecord( + await pify(cb => libp2pRecord.validator.verifyRecord( dht.validators, record, - callback - ) + cb + ))() + return undefined }, + /** * Find close peers for a given peer * * @param {Buffer} key * @param {PeerId} peer - * @param {function(Error)} callback + * @param {function(Error, Array)} callback * @returns {void} * * @private */ - _closerPeersSingle (key, peer, callback) { - dht._log('_closerPeersSingle %b from %s', key, peer.toB58String()) - dht._findPeerSingle(peer, new PeerId(key), (err, msg) => { - if (err) { - return callback(err) - } - const out = msg.closerPeers - .filter((pInfo) => !dht._isSelf(pInfo.id)) - .map((pInfo) => dht.peerBook.put(pInfo)) + _closerPeersSingle (key, peer, callback) { + promiseToCallback(this._closerPeersSingleAsync(key, peer))(callback) + }, - callback(null, out) - }) + async _closerPeersSingleAsync (key, peer) { + dht._log('_closerPeersSingle %b from %s', key, peer.toB58String()) + const msg = await dht._findPeerSingleAsync(peer, new PeerId(key)) + return msg.closerPeers + .filter((pInfo) => !dht._isSelf(pInfo.id)) + .map((pInfo) => dht.peerBook.put(pInfo)) }, + /** - * Is the given peer id the peer id? + * Is the given peer id our PeerId? * * @param {PeerId} other * @returns {bool} * * @private */ + _isSelf (other) { return other && dht.peerInfo.id.id.equals(other.id) }, + /** * Ask peer `peer` if they know where the peer with id `target` is. * * @param {PeerId} peer * @param {PeerId} target - * @param {function(Error)} callback + * @param {function(Error, Message)} callback * @returns {void} * * @private */ + _findPeerSingle (peer, target, callback) { + promiseToCallback(this._findPeerSingleAsync(peer, target))(callback) + }, + + async _findPeerSingleAsync (peer, target) { dht._log('_findPeerSingle %s', peer.toB58String()) const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) - dht.network.sendRequest(peer, msg, callback) + return pify(callback => dht.network.sendRequest(peer, msg, callback))() }, + /** * Store the given key/value pair at the peer `target`. * @@ -227,22 +234,22 @@ module.exports = (dht) => ({ * * @private */ + _putValueToPeer (key, rec, target, callback) { + promiseToCallback(this._putValueToPeerAsync(key, rec, target))(callback) + }, + + async _putValueToPeerAsync (key, rec, target) { const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) msg.record = rec - dht.network.sendRequest(target, msg, (err, resp) => { - if (err) { - return callback(err) - } + const resp = await pify(cb => dht.network.sendRequest(target, msg, cb))() - if (!resp.record.value.equals(Record.deserialize(rec).value)) { - return callback(errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID')) - } - - callback() - }) + if (!resp.record.value.equals(Record.deserialize(rec).value)) { + throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') + } }, + /** * Store the given key/value pair locally, in the datastore. * @param {Buffer} key @@ -252,11 +259,18 @@ module.exports = (dht) => ({ * * @private */ + _putLocal (key, rec, callback) { - dht.datastore.put(utils.bufferToKey(key), rec, callback) + promiseToCallback(this._putLocalAsync(key, rec))(err => callback(err)) }, + + async _putLocalAsync (key, rec) { + await pify(cb => dht.datastore.put(utils.bufferToKey(key), rec, cb))() + return undefined + }, + /** - * Get the value to the given key. + * Get the value for given key. * * @param {Buffer} key * @param {Object} options - get options @@ -266,61 +280,80 @@ module.exports = (dht) => ({ * * @private */ + _get (key, options, callback) { + promiseToCallback(this._getAsync(key, options))(callback) + }, + + async _getAsync (key, options) { dht._log('_get %b', key) - waterfall([ - (cb) => dht.getMany(key, c.GET_MANY_RECORD_COUNT, options, cb), - (vals, cb) => { - const recs = vals.map((v) => v.val) - let i = 0 - try { - i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs) - } catch (err) { - // Assume the first record if no selector available - if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') { - return cb(err) - } - } + const vals = await pify(cb => dht.getMany(key, c.GET_MANY_RECORD_COUNT, options, cb))() - const best = recs[i] - dht._log('GetValue %b %s', key, best) + const recs = vals.map((v) => v.val) + let i = 0 - if (!best) { - return cb(errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')) - } + try { + i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs) + } catch (err) { + // Assume the first record if no selector available + if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') { + throw err + } + } - // Send out correction record - waterfall([ - (cb) => utils.createPutRecord(key, best, cb), - (fixupRec, cb) => each(vals, (v, cb) => { - // no need to do anything - if (v.val.equals(best)) { - return cb() - } + const best = recs[i] + dht._log('GetValue %b %s', key, best) - // correct ourself - if (dht._isSelf(v.from)) { - return dht._putLocal(key, fixupRec, (err) => { - if (err) { - dht._log.error('Failed error correcting self', err) - } - cb() - }) - } + if (!best) { + throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND') + } - // send correction - dht._putValueToPeer(key, fixupRec, v.from, (err) => { - if (err) { - dht._log.error('Failed error correcting entry', err) - } - cb() - }) - }, cb) - ], (err) => cb(err, err ? null : best)) + await this._sendCorrectionRecord(key, vals, best) + + return best + }, + + /** + * Send the best record found to any peers that have an out of date record. + * + * @param {Buffer} key + * @param {Array} vals - values retrieved from the DHT + * @param {Object} best - the best record that was found + * @returns {Promise} + * + * @private + */ + async _sendCorrectionRecord (key, vals, best) { + const fixupRec = await pify(cb => utils.createPutRecord(key, best, cb))() + + return Promise.all(vals.map(async (v) => { + // no need to do anything + if (v.val.equals(best)) { + return + } + + // correct ourself + if (dht._isSelf(v.from)) { + try { + await pify(cb => dht._putLocal(key, fixupRec, cb))() + // await dht._putLocalAsync(key, fixupRec) + } catch (err) { + dht._log.error('Failed error correcting self', err) + } + return + } + + // send correction + try { + await pify(cb => dht._putValueToPeer(key, fixupRec, v.from, cb))() + // await dht._putValueToPeerAsync(key, fixupRec, v.from) + } catch (err) { + dht._log.error('Failed error correcting entry', err) } - ], callback) + })) }, + /** * Attempt to retrieve the value for the given key from * the local datastore. @@ -332,29 +365,20 @@ module.exports = (dht) => ({ * @private */ _getLocal (key, callback) { - dht._log('getLocal %b', key) + promiseToCallback(this._getLocalAsync(key))(callback) + }, - waterfall([ - (cb) => dht.datastore.get(utils.bufferToKey(key), cb), - (raw, cb) => { - dht._log('found %b in local datastore', key) - let rec - try { - rec = Record.deserialize(raw) - } catch (err) { - return cb(err) - } + async _getLocalAsync (key) { + dht._log('getLocal %b', key) - dht._verifyRecordLocally(rec, (err) => { - if (err) { - return cb(err) - } + const raw = await pify(cb => dht.datastore.get(utils.bufferToKey(key), cb))() + dht._log('found %b in local datastore', key) + const rec = Record.deserialize(raw) - cb(null, rec) - }) - } - ], callback) + await dht._verifyRecordLocallyAsync(rec) + return rec }, + /** * Query a particular peer for the value for the given key. * It will either return the value or a list of closer peers. @@ -368,35 +392,41 @@ module.exports = (dht) => ({ * * @private */ + _getValueOrPeers (peer, key, callback) { - waterfall([ - (cb) => dht._getValueSingle(peer, key, cb), - (msg, cb) => { - const peers = msg.closerPeers - const record = msg.record - - if (record) { - // We have a record - return dht._verifyRecordOnline(record, (err) => { - if (err) { - const errMsg = 'invalid record received, discarded' - - dht._log(errMsg) - return cb(errcode(new Error(errMsg), 'ERR_INVALID_RECORD')) - } + promiseToCallback(this._getValueOrPeersAsync(peer, key))((err, result) => { + if (err) return callback(err) + callback(null, result.record, result.peers) + }) + }, - return cb(null, record, peers) - }) - } + async _getValueOrPeersAsync (peer, key) { + const msg = await pify(cb => dht._getValueSingle(peer, key, cb))() - if (peers.length > 0) { - return cb(null, null, peers) - } + const peers = msg.closerPeers + const record = msg.record - cb(errcode(new Error('Not found'), 'ERR_NOT_FOUND')) + if (record) { + // We have a record + try { + // await dht._verifyRecordOnlineAsync(record) + await pify(cb => dht._verifyRecordOnline(record, cb))() + } catch (err) { + const errMsg = 'invalid record received, discarded' + dht._log(errMsg) + throw errcode(new Error(errMsg), 'ERR_INVALID_RECORD') } - ], callback) + + return { record, peers } + } + + if (peers.length > 0) { + return { peers } + } + + throw errcode(new Error('Not found'), 'ERR_NOT_FOUND') }, + /** * Get a value via rpc call for the given parameters. * @@ -407,10 +437,16 @@ module.exports = (dht) => ({ * * @private */ + _getValueSingle (peer, key, callback) { + promiseToCallback(this._getValueSingleAsync(peer, key))(callback) + }, + + async _getValueSingleAsync (peer, key) { const msg = new Message(Message.TYPES.GET_VALUE, key, 0) - dht.network.sendRequest(peer, msg, callback) + return pify(cb => dht.network.sendRequest(peer, msg, cb))() }, + /** * Verify a record, fetching missing public keys from the network. * Calls back with an error if the record is invalid. @@ -421,9 +457,15 @@ module.exports = (dht) => ({ * * @private */ + _verifyRecordOnline (record, callback) { - libp2pRecord.validator.verifyRecord(dht.validators, record, callback) + promiseToCallback(this._verifyRecordOnlineAsync(record))(err => callback(err)) + }, + + async _verifyRecordOnlineAsync (record) { + return pify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))() }, + /** * Get the public key directly from a node. * @@ -433,27 +475,30 @@ module.exports = (dht) => ({ * * @private */ + _getPublicKeyFromNode (peer, callback) { + promiseToCallback(this._getPublicKeyFromNodeAsync(peer))(callback) + }, + + async _getPublicKeyFromNodeAsync (peer) { const pkKey = utils.keyForPublicKey(peer) - waterfall([ - (cb) => dht._getValueSingle(peer, pkKey, cb), - (msg, cb) => { - if (!msg.record || !msg.record.value) { - return cb(errcode(new Error(`Node not responding with its public key: ${peer.toB58String()}`), 'ERR_INVALID_RECORD')) - } + // const msg = await dht._getValueSingleAsync(peer, pkKey) + const msg = await pify(cb => dht._getValueSingle(peer, pkKey, cb))() - PeerId.createFromPubKey(msg.record.value, cb) - }, - (recPeer, cb) => { - // compare hashes of the pub key - if (!recPeer.isEqual(peer)) { - return cb(errcode(new Error('public key does not match id'), 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')) - } + if (!msg.record || !msg.record.value) { + throw errcode(`Node not responding with its public key: ${peer.toB58String()}`, 'ERR_INVALID_RECORD') + } - cb(null, recPeer.pubKey) - } - ], callback) + const recPeer = await pify(cb => PeerId.createFromPubKey(msg.record.value, cb))() + + // compare hashes of the pub key + if (!recPeer.isEqual(peer)) { + throw errcode('public key does not match id', 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID') + } + + return recPeer.pubKey }, + /** * Search the dht for up to `n` providers of the given CID. * @@ -466,85 +511,85 @@ module.exports = (dht) => ({ * @private */ _findNProviders (key, providerTimeout, n, callback) { - let out = new LimitedPeerList(n) + promiseToCallback(this._findNProvidersAsync(key, providerTimeout, n))(callback) + }, - dht.providers.getProviders(key, (err, provs) => { - if (err) { - return callback(err) - } + async _findNProvidersAsync (key, providerTimeout, n) { + const out = new LimitedPeerList(n) - provs.forEach((id) => { - let info - if (dht.peerBook.has(id)) { - info = dht.peerBook.get(id) - } else { - info = dht.peerBook.put(new PeerInfo(id)) - } - out.push(info) - }) + const provs = await pify(cb => dht.providers.getProviders(key, cb))() - // All done - if (out.length >= n) { - return callback(null, out.toArray()) + provs.forEach((id) => { + let info + if (dht.peerBook.has(id)) { + info = dht.peerBook.get(id) + } else { + info = dht.peerBook.put(new PeerInfo(id)) } + out.push(info) + }) - // need more, query the network - const paths = [] - const query = new Query(dht, key.buffer, (pathIndex, numPaths) => { - // This function body runs once per disjoint path - const pathSize = utils.pathSize(out.length - n, numPaths) - const pathProviders = new LimitedPeerList(pathSize) - paths.push(pathProviders) - - // Here we return the query function to use on this particular disjoint path - return (peer, cb) => { - waterfall([ - (cb) => dht._findProvidersSingle(peer, key, cb), - (msg, cb) => { - const provs = msg.providerPeers - dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length) - - provs.forEach((prov) => { - pathProviders.push(dht.peerBook.put(prov)) - }) - - // hooray we have all that we want - if (pathProviders.length >= pathSize) { - return cb(null, { pathComplete: true }) - } - - // it looks like we want some more - cb(null, { - closerPeers: msg.closerPeers - }) - } - ], cb) - } - }) - - const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA) + // All done + if (out.length >= n) { + return out.toArray() + } + + // need more, query the network + const paths = [] + const query = new Query(dht, key.buffer, (pathIndex, numPaths) => { + // This function body runs once per disjoint path + const pathSize = utils.pathSize(out.length - n, numPaths) + const pathProviders = new LimitedPeerList(pathSize) + paths.push(pathProviders) + + // Here we return the query function to use on this particular disjoint path + return (peer, cb) => { + waterfall([ + (cb) => dht._findProvidersSingle(peer, key, cb), + (msg, cb) => { + const provs = msg.providerPeers + dht._log('(%s) found %s provider entries', dht.peerInfo.id.toB58String(), provs.length) - timeout((cb) => query.run(peers, cb), providerTimeout)((err) => { - query.stop() + provs.forEach((prov) => { + pathProviders.push(dht.peerBook.put(prov)) + }) - // combine peers from each path - paths.forEach((path) => { - path.toArray().forEach((peer) => { - out.push(peer) - }) - }) + // hooray we have all that we want + if (pathProviders.length >= pathSize) { + return cb(null, { pathComplete: true }) + } - if (err) { - if (err.code === 'ETIMEDOUT' && out.length > 0) { - return callback(null, out.toArray()) + // it looks like we want some more + cb(null, { + closerPeers: msg.closerPeers + }) } - return callback(err) - } + ], cb) + } + }) + + const peers = dht.routingTable.closestPeers(key.buffer, c.ALPHA) - callback(null, out.toArray()) + try { + await pify(callback => timeout((cb) => query.run(peers, cb), providerTimeout)(callback))() + } catch (err) { + if (err.code !== 'ETIMEDOUT' || out.length === 0) { + throw err + } + } finally { + query.stop() + } + + // combine peers from each path + paths.forEach((path) => { + path.toArray().forEach((peer) => { + out.push(peer) }) }) + + return out.toArray() }, + /** * Check for providers from a single node. * @@ -556,7 +601,11 @@ module.exports = (dht) => ({ * @private */ _findProvidersSingle (peer, key, callback) { + promiseToCallback(this._findProvidersSingleAsync(peer, key))(callback) + }, + + async _findProvidersSingleAsync (peer, key) { const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0) - dht.network.sendRequest(peer, msg, callback) + return pify(cb => dht.network.sendRequest(peer, msg, cb))() } }) From 1d228e0783884872e5dbb60922d899be78261e32 Mon Sep 17 00:00:00 2001 From: kumavis Date: Wed, 15 May 2019 15:52:56 +0800 Subject: [PATCH 2/2] feat: use promisify-es6 instead of pify --- package.json | 2 +- src/private.js | 46 +++++++++++++++++++++++----------------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/package.json b/package.json index 6d192104..200e70d0 100644 --- a/package.json +++ b/package.json @@ -56,9 +56,9 @@ "multihashing-async": "~0.5.2", "peer-id": "~0.12.2", "peer-info": "~0.15.1", - "pify": "^4.0.1", "priorityqueue": "~0.2.1", "promise-to-callback": "^1.0.0", + "promisify-es6": "^1.0.3", "protons": "^1.0.1", "pull-length-prefixed": "^1.3.2", "pull-stream": "^3.6.9", diff --git a/src/private.js b/src/private.js index f9444595..05da806b 100644 --- a/src/private.js +++ b/src/private.js @@ -5,7 +5,7 @@ const libp2pRecord = require('libp2p-record') const waterfall = require('async/waterfall') const timeout = require('async/timeout') const PeerInfo = require('peer-info') -const pify = require('pify') +const promisify = require('promisify-es6') const promiseToCallback = require('promise-to-callback') const errcode = require('err-code') @@ -33,7 +33,7 @@ module.exports = (dht) => ({ }, async _nearestPeersToQueryAsync (msg) { - const key = await pify(utils.convertBuffer)(msg.key) + const key = await promisify(utils.convertBuffer)(msg.key) const ids = dht.routingTable.closestPeers(key, dht.kBucketSize) return ids.map((p) => { @@ -97,7 +97,7 @@ module.exports = (dht) => ({ // Fetch value from ds let rawRecord try { - rawRecord = await pify(cb => dht.datastore.get(dsKey, cb))() + rawRecord = await promisify(cb => dht.datastore.get(dsKey, cb))() } catch (err) { if (err.code === 'ERR_NOT_FOUND') { return undefined @@ -116,7 +116,7 @@ module.exports = (dht) => ({ if (record.timeReceived == null || utils.now() - record.timeReceived > c.MAX_RECORD_AGE) { // If record is bad delete it and return - await pify(cb => dht.datastore.delete(dsKey, cb))() + await promisify(cb => dht.datastore.delete(dsKey, cb))() return undefined } @@ -139,7 +139,7 @@ module.exports = (dht) => ({ async _addAsync (peer) { peer = dht.peerBook.put(peer) - await pify(cb => dht.routingTable.add(peer.id, cb))() + await promisify(cb => dht.routingTable.add(peer.id, cb))() return undefined }, /** @@ -158,7 +158,7 @@ module.exports = (dht) => ({ async _verifyRecordLocallyAsync (record) { dht._log('verifyRecordLocally') - await pify(cb => libp2pRecord.validator.verifyRecord( + await promisify(cb => libp2pRecord.validator.verifyRecord( dht.validators, record, cb @@ -220,7 +220,7 @@ module.exports = (dht) => ({ async _findPeerSingleAsync (peer, target) { dht._log('_findPeerSingle %s', peer.toB58String()) const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0) - return pify(callback => dht.network.sendRequest(peer, msg, callback))() + return promisify(callback => dht.network.sendRequest(peer, msg, callback))() }, /** @@ -243,7 +243,7 @@ module.exports = (dht) => ({ const msg = new Message(Message.TYPES.PUT_VALUE, key, 0) msg.record = rec - const resp = await pify(cb => dht.network.sendRequest(target, msg, cb))() + const resp = await promisify(cb => dht.network.sendRequest(target, msg, cb))() if (!resp.record.value.equals(Record.deserialize(rec).value)) { throw errcode(new Error('value not put correctly'), 'ERR_PUT_VALUE_INVALID') @@ -265,7 +265,7 @@ module.exports = (dht) => ({ }, async _putLocalAsync (key, rec) { - await pify(cb => dht.datastore.put(utils.bufferToKey(key), rec, cb))() + await promisify(cb => dht.datastore.put(utils.bufferToKey(key), rec, cb))() return undefined }, @@ -288,7 +288,7 @@ module.exports = (dht) => ({ async _getAsync (key, options) { dht._log('_get %b', key) - const vals = await pify(cb => dht.getMany(key, c.GET_MANY_RECORD_COUNT, options, cb))() + const vals = await promisify(cb => dht.getMany(key, c.GET_MANY_RECORD_COUNT, options, cb))() const recs = vals.map((v) => v.val) let i = 0 @@ -325,7 +325,7 @@ module.exports = (dht) => ({ * @private */ async _sendCorrectionRecord (key, vals, best) { - const fixupRec = await pify(cb => utils.createPutRecord(key, best, cb))() + const fixupRec = await promisify(cb => utils.createPutRecord(key, best, cb))() return Promise.all(vals.map(async (v) => { // no need to do anything @@ -336,7 +336,7 @@ module.exports = (dht) => ({ // correct ourself if (dht._isSelf(v.from)) { try { - await pify(cb => dht._putLocal(key, fixupRec, cb))() + await promisify(cb => dht._putLocal(key, fixupRec, cb))() // await dht._putLocalAsync(key, fixupRec) } catch (err) { dht._log.error('Failed error correcting self', err) @@ -346,7 +346,7 @@ module.exports = (dht) => ({ // send correction try { - await pify(cb => dht._putValueToPeer(key, fixupRec, v.from, cb))() + await promisify(cb => dht._putValueToPeer(key, fixupRec, v.from, cb))() // await dht._putValueToPeerAsync(key, fixupRec, v.from) } catch (err) { dht._log.error('Failed error correcting entry', err) @@ -371,7 +371,7 @@ module.exports = (dht) => ({ async _getLocalAsync (key) { dht._log('getLocal %b', key) - const raw = await pify(cb => dht.datastore.get(utils.bufferToKey(key), cb))() + const raw = await promisify(cb => dht.datastore.get(utils.bufferToKey(key), cb))() dht._log('found %b in local datastore', key) const rec = Record.deserialize(raw) @@ -401,7 +401,7 @@ module.exports = (dht) => ({ }, async _getValueOrPeersAsync (peer, key) { - const msg = await pify(cb => dht._getValueSingle(peer, key, cb))() + const msg = await promisify(cb => dht._getValueSingle(peer, key, cb))() const peers = msg.closerPeers const record = msg.record @@ -410,7 +410,7 @@ module.exports = (dht) => ({ // We have a record try { // await dht._verifyRecordOnlineAsync(record) - await pify(cb => dht._verifyRecordOnline(record, cb))() + await promisify(cb => dht._verifyRecordOnline(record, cb))() } catch (err) { const errMsg = 'invalid record received, discarded' dht._log(errMsg) @@ -444,7 +444,7 @@ module.exports = (dht) => ({ async _getValueSingleAsync (peer, key) { const msg = new Message(Message.TYPES.GET_VALUE, key, 0) - return pify(cb => dht.network.sendRequest(peer, msg, cb))() + return promisify(cb => dht.network.sendRequest(peer, msg, cb))() }, /** @@ -463,7 +463,7 @@ module.exports = (dht) => ({ }, async _verifyRecordOnlineAsync (record) { - return pify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))() + return promisify(cb => libp2pRecord.validator.verifyRecord(dht.validators, record, cb))() }, /** @@ -483,13 +483,13 @@ module.exports = (dht) => ({ async _getPublicKeyFromNodeAsync (peer) { const pkKey = utils.keyForPublicKey(peer) // const msg = await dht._getValueSingleAsync(peer, pkKey) - const msg = await pify(cb => dht._getValueSingle(peer, pkKey, cb))() + const msg = await promisify(cb => dht._getValueSingle(peer, pkKey, cb))() if (!msg.record || !msg.record.value) { throw errcode(`Node not responding with its public key: ${peer.toB58String()}`, 'ERR_INVALID_RECORD') } - const recPeer = await pify(cb => PeerId.createFromPubKey(msg.record.value, cb))() + const recPeer = await promisify(cb => PeerId.createFromPubKey(msg.record.value, cb))() // compare hashes of the pub key if (!recPeer.isEqual(peer)) { @@ -517,7 +517,7 @@ module.exports = (dht) => ({ async _findNProvidersAsync (key, providerTimeout, n) { const out = new LimitedPeerList(n) - const provs = await pify(cb => dht.providers.getProviders(key, cb))() + const provs = await promisify(cb => dht.providers.getProviders(key, cb))() provs.forEach((id) => { let info @@ -571,7 +571,7 @@ module.exports = (dht) => ({ const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize) try { - await pify(callback => timeout((cb) => query.run(peers, cb), providerTimeout)(callback))() + await promisify(callback => timeout((cb) => query.run(peers, cb), providerTimeout)(callback))() } catch (err) { if (err.code !== 'ETIMEDOUT' || out.length === 0) { throw err @@ -606,6 +606,6 @@ module.exports = (dht) => ({ async _findProvidersSingleAsync (peer, key) { const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0) - return pify(cb => dht.network.sendRequest(peer, msg, cb))() + return promisify(cb => dht.network.sendRequest(peer, msg, cb))() } })