+ 'use strict';
+
+const async = require('async');
+const { Writable: WritableStream } = require('stream');
+const constants = require('./constants');
+const { knuthShuffle: shuffle } = require('knuth-shuffle');
+const utils = require('./utils');
+const AbstractNode = require('./node-abstract');
+const KademliaRules = require('./rules-kademlia');
+const ContactList = require('./contact-list');
+const MetaPipe = require('metapipe');
+
+
+/**
+ * Extends {@link AbstractNode} with Kademlia-specific rules
+ * @class
+ * @extends {AbstractNode}
+ */
+class KademliaNode extends AbstractNode {
+
+ /**
+ * @typedef {object} KademliaNode~entry
+ * @property {string|object|array} value - The primary entry value
+ * @property {string} publisher - Node identity of the original publisher
+ * @property {number} timestamp - Last update/replicate time
+ */
+
+ /**
+ * @constructor
+ */
+ constructor(options) {
+ super(options);
+
+ this._lookups = new Map(); // NB: Track the last lookup time for buckets
+ this._pings = new Map();
+ this._updateContactQueue = async.queue(
+ (task, cb) => this._updateContactWorker(task, cb),
+ 1
+ );
+
+ this.replicatePipeline = new MetaPipe({ objectMode: true });
+ this.expirePipeline = new MetaPipe({ objectMode: true });
+ }
+
+ /**
+ * Adds the kademlia rule handlers before calling super#listen()
+ */
+ listen() {
+ let handlers = new KademliaRules(this);
+
+ this.use('PING', handlers.ping.bind(handlers));
+ this.use('STORE', handlers.store.bind(handlers));
+ this.use('FIND_NODE', handlers.findNode.bind(handlers));
+ this.use('FIND_VALUE', handlers.findValue.bind(handlers));
+
+ setInterval(
+ utils.preventConvoy(() => this.refresh(0)),
+ constants.T_REFRESH
+ );
+ setInterval(
+ utils.preventConvoy(() => this.replicate(() => this.expire())),
+ constants.T_REPLICATE
+ );
+
+ super.listen(...arguments);
+ }
+
+ /**
+ * Inserts the given contact into the routing table and uses it to perform
+ * a {@link KademliaNode#iterativeFindNode} for this node's identity,
+ * then refreshes all buckets further than it's closest neighbor, which will
+ * be in the occupied bucket with the lowest index
+ * @param {Bucket~contact} peer - Peer to bootstrap from
+ * @param {function} [joinListener] - Function to set as join listener
+ * @returns {Promise}
+ */
+ join(peer, callback) {
+ if (typeof callback === 'function') {
+ return this._join(peer).then(function() {
+ callback(null, ...arguments);
+ }, callback);
+ } else {
+ return this._join(peer);
+ }
+ }
+
+ /**
+ * @private
+ */
+ _join([identity, contact]) {
+ return new Promise((resolve, reject) => {
+ this.router.addContactByNodeId(identity, contact);
+ async.series([
+ (next) => this.iterativeFindNode(this.identity.toString('hex'), next),
+ (next) => this.refresh(this.router.getClosestBucket() + 1, next)
+ ], (err) => {
+ if (err) {
+ reject(err);
+ } else {
+ resolve();
+ }
+ });
+ });
+ }
+
+ /**
+ * Sends a PING message to the supplied contact, resolves with latency
+ * @param {Bucket~contact} peer
+ * @param {KademliaNode~pingCallback} [callback]
+ * @returns {Promise<number>}
+ */
+ ping(contact, callback) {
+ if (typeof callback ==='function') {
+ return this._ping(contact).then(function() {
+ callback(null, ...arguments);
+ }, callback);
+ } else {
+ return this._ping(contact);
+ }
+ }
+ /**
+ * @callback KademliaNode~pingCallback
+ * @param {error|null} error
+ * @param {number} latency - Milliseconds before response received
+ */
+
+ /**
+ * @private
+ */
+ _ping(contact) {
+ return new Promise((resolve, reject) => {
+ const start = Date.now();
+
+ this.send('PING', [], contact, (err) => {
+ if (err) {
+ return reject(err);
+ }
+
+ resolve(Date.now() - start);
+ });
+ });
+ }
+
+ /**
+ * @private
+ */
+ _createStorageItem(value) {
+ const keys = Object.keys(value);
+ const alreadyHasMetadata = keys.includes('value') &&
+ keys.includes('publisher') &&
+ keys.includes('timestamp');
+
+ if (alreadyHasMetadata) {
+ value.timestamp = Date.now();
+ value.publisher = value.publisher.toString('hex');
+ return value;
+ }
+
+ return {
+ value: value,
+ timestamp: Date.now(),
+ publisher: this.identity.toString('hex')
+ };
+ }
+
+ /**
+ * Performs a {@link KademliaNode#iterativeFindNode} to collect K contacts
+ * nearest to the given key, sending a STORE message to each of them.
+ * @param {buffer|string} key - Key to store data under
+ * @param {buffer|string|object} value - Value to store by key
+ * @param {KademliaNode~iterativeStoreCallback} callback
+ * @returns {Promise<number>}
+ */
+ iterativeStore(key, value, callback) {
+ if (typeof callback === 'function') {
+ return this._iterativeStore(key, value).then(function() {
+ callback(null, ...arguments);
+ }, callback);
+ } else {
+ return this._iterativeStore(key, value);
+ }
+ }
+ /**
+ * Note that if there is a protocol/validation error, you will not receive
+ * it as an error in the callback. Be sure to also check that stored > 0 as
+ * part of error handling here.
+ * @callback KademliaNode~iterativeStoreCallback
+ * @param {error|null} error
+ * @param {number} stored - Total nodes who stored the pair
+ */
+
+ /**
+ * @private
+ */
+ _iterativeStore(key, value) {
+ return new Promise((resolve, reject) => {
+ key = key.toString('hex');
+ let stored = 0;
+
+ const createStoreRpc = (target) => {
+ return ['STORE', [key, this._createStorageItem(value)], target];
+ };
+
+ const dispatchStoreRpcs = (contacts, callback) => {
+ async.eachLimit(contacts, constants.ALPHA, (target, done) => {
+ this.send(...createStoreRpc(target), (err) => {
+ stored = err ? stored : stored + 1;
+ done();
+ });
+ }, callback);
+ };
+
+ async.waterfall([
+ (next) => this.iterativeFindNode(key, next),
+ (contacts, next) => dispatchStoreRpcs(contacts, next),
+ (next) => {
+ this.storage.put(key, this._createStorageItem(value), {
+ valueEncoding: 'json'
+ }, next);
+ }
+ ], () => {
+ if (stored === 0) {
+ return reject(new Error('Failed to stored entry with peers'));
+ }
+ resolve(stored);
+ });
+ });
+ }
+
+ /**
+ * Basic kademlia lookup operation that builds a set of K contacts closest
+ * to the given key
+ * @param {buffer|string} key - Reference key for node lookup
+ * @param {KademliaNode~iterativeFindNodeCallback} [callback]
+ * @returns {Promise<Bucket~contact[]>}
+ */
+ iterativeFindNode(key, callback) {
+ key = key.toString('hex');
+
+ if (typeof callback === 'function') {
+ return this._iterativeFind('FIND_NODE', key).then(function() {
+ callback(null, ...arguments);
+ }, callback);
+ } else {
+ return this._iterativeFind('FIND_NODE', key);
+ }
+ }
+ /**
+ * @callback KademliaNode~iterativeFindNodeCallback
+ * @param {error|null} error
+ * @param {Bucket~contact[]} contacts - Result of the lookup operation
+ */
+
+ /**
+ * Kademlia search operation that is conducted as a node lookup and builds
+ * a list of K closest contacts. If at any time during the lookup the value
+ * is returned, the search is abandoned. If no value is found, the K closest
+ * contacts are returned. Upon success, we must store the value at the
+ * nearest node seen during the search that did not return the value.
+ * @param {buffer|string} key - Key for value lookup
+ * @param {KademliaNode~iterativeFindValueCallback} [callback]
+ * @returns {Promise<object>}
+ */
+ iterativeFindValue(key, callback) {
+ key = key.toString('hex');
+
+ if (typeof callback === 'function') {
+ return this._iterativeFind('FIND_VALUE', key).then(function() {
+ callback(null, ...arguments);
+ }, callback);
+ } else {
+ return this._iterativeFind('FIND_VALUE', key);
+ }
+ }
+ /**
+ * @callback KademliaNode~iterativeFindValueCallback
+ * @param {error|null} error
+ * @param {KademliaNode~entry} value
+ * @param {null|Bucket~contact} contact - Contact responded with entry
+ */
+
+ /**
+ * Performs a scan of the storage adapter and performs
+ * republishing/replication of items stored. Items that we did not publish
+ * ourselves get republished every T_REPLICATE. Items we did publish get
+ * republished every T_REPUBLISH.
+ * @param {KademliaNode~replicateCallback} [callback]
+ * @returns {Promise}
+ */
+ replicate(callback) {
+ if (typeof callback === 'function') {
+ return this._replicate().then(callback, callback);
+ } else {
+ return this._replicate();
+ }
+ }
+ /**
+ * @callback KademliaNode~replicateCallback
+ * @param {error|null} error
+ */
+
+ /**
+ * @private
+ */
+ _replicate() {
+ const self = this;
+ const now = Date.now();
+
+ return new Promise((resolve, reject) => {
+ const itemStream = this.storage.createReadStream({
+ valueEncoding: 'json'
+ });
+ const replicateStream = new WritableStream({
+ objectMode: true,
+ write: maybeReplicate
+ });
+
+ function maybeReplicate({ key, value }, enc, next) {
+ const isPublisher = value.publisher === self.identity.toString('hex');
+ const republishDue = (value.timestamp + constants.T_REPUBLISH) <= now;
+ const replicateDue = (value.timestamp + constants.T_REPLICATE) <= now;
+ const shouldRepublish = isPublisher && republishDue;
+ const shouldReplicate = !isPublisher && replicateDue;
+
+ if (shouldReplicate || shouldRepublish) {
+ return self.iterativeStore(key, value, next);
+ }
+
+ next();
+ }
+
+ function triggerCallback(err) {
+ itemStream.removeAllListeners();
+ replicateStream.removeAllListeners();
+
+ if (err) {
+ return reject(err);
+ }
+
+ resolve();
+ }
+
+ itemStream.on('error', triggerCallback);
+ replicateStream.on('error', triggerCallback);
+ replicateStream.on('finish', triggerCallback);
+ itemStream.pipe(this.replicatePipeline).pipe(replicateStream);
+ });
+ }
+
+ /**
+ * Items expire T_EXPIRE seconds after the original publication. All items
+ * are assigned an expiration time which is "exponentially inversely
+ * proportional to the number of nodes between the current node and the node
+ * whose ID is closest to the key", where this number is "inferred from the
+ * bucket structure of the current node".
+ * @param {KademliaNode~expireCallback} [callback]
+ * @returns {Promise}
+ */
+ expire(callback) {
+ if (typeof callback === 'function') {
+ return this._expire().then(callback, callback);
+ } else {
+ return this._expire();
+ }
+ }
+ /**
+ * @callback KademliaNode~expireCallback
+ * @param {error|null} error
+ */
+
+ /**
+ * @private
+ */
+ _expire() {
+ const self = this;
+ const now = Date.now();
+
+ return new Promise((resolve, reject) => {
+ const itemStream = this.storage.createReadStream({
+ valueEncoding: 'json'
+ });
+ const expireStream = new WritableStream({
+ objectMode: true,
+ write: maybeExpire
+ });
+
+ function maybeExpire({ key, value }, enc, next) {
+ if ((value.timestamp + constants.T_EXPIRE) <= now) {
+ return self.storage.del(key, next);
+ }
+
+ next();
+ }
+
+ function triggerCallback(err) {
+ itemStream.removeAllListeners();
+ expireStream.removeAllListeners();
+
+ if (err) {
+ return reject(err);
+ }
+
+ resolve();
+ }
+
+ itemStream.on('error', triggerCallback);
+ expireStream.on('error', triggerCallback);
+ expireStream.on('finish', triggerCallback);
+ itemStream.pipe(this.expirePipeline).pipe(expireStream);
+ });
+ }
+
+ /**
+ * If no node lookups have been performed in any given bucket's range for
+ * T_REFRESH, the node selects a random number in that range and does a
+ * refresh, an iterativeFindNode using that number as key.
+ * @param {number} startIndex - bucket index to start refresh from
+ * @param {KademliaNode~refreshCallback} [callback]
+ * @returns {Promise}
+ */
+ refresh(startIndex = 0, callback) {
+ if (typeof callback === 'function') {
+ return this._refresh(startIndex).then(callback, callback);
+ } else {
+ return this._refresh(startIndex);
+ }
+ }
+ /**
+ * @callback KademliaNode~refreshCallback
+ * @param {error|null} error
+ * @param {array} bucketsRefreshed
+ */
+
+ /**
+ * @private
+ */
+ _refresh(startIndex) {
+ const now = Date.now();
+ const indices = [
+ ...this.router.entries()
+ ].slice(startIndex).map((entry) => entry[0]);
+
+ // NB: We want to avoid high churn during refresh and prevent further
+ // NB: refreshes if lookups in the next bucket do not return any new
+ // NB: contacts. To do this we will shuffle the bucket indexes we are
+ // NB: going to check and only continue to refresh if new contacts were
+ // NB: discovered in the last MAX_UNIMPROVED_REFRESHES consecutive lookups.
+ let results = new Set(), consecutiveUnimprovedLookups = 0;
+
+ function isDiscoveringNewContacts() {
+ return consecutiveUnimprovedLookups < constants.MAX_UNIMPROVED_REFRESHES;
+ }
+
+ return new Promise((resolve, reject) => {
+ async.eachSeries(shuffle(indices), (index, next) => {
+ if (!isDiscoveringNewContacts()) {
+ return resolve();
+ }
+
+ const lastBucketLookup = this._lookups.get(index) || 0;
+ const needsRefresh = lastBucketLookup + constants.T_REFRESH <= now;
+
+ if (needsRefresh) {
+ return this.iterativeFindNode(
+ utils.getRandomBufferInBucketRange(this.identity, index)
+ .toString('hex'),
+ (err, contacts) => {
+ if (err) {
+ return next(err);
+ }
+
+ let discoveredNewContacts = false;
+
+ for (let [identity] of contacts) {
+ if (!results.has(identity)) {
+ discoveredNewContacts = true;
+ consecutiveUnimprovedLookups = 0;
+ results.add(identity);
+ }
+ }
+
+ if (!discoveredNewContacts) {
+ consecutiveUnimprovedLookups++;
+ }
+
+ next();
+ }
+ );
+ }
+
+ next();
+ }, (err) => {
+ if (err) {
+ return reject(err);
+ }
+
+ resolve();
+ });
+ });
+ }
+
+ /**
+ * Builds an list of closest contacts for a particular RPC
+ * @private
+ */
+ _iterativeFind(method, key) {
+ return new Promise((resolve) => {
+ function createRpc(target) {
+ return [method, [key], target];
+ }
+
+ let shortlist = new ContactList(key, [
+ ...this.router.getClosestContactsToKey(key, constants.ALPHA)
+ ]);
+ let closest = shortlist.closest;
+
+ this._lookups.set(utils.getBucketIndex(this.identity, key), Date.now());
+
+ function iterativeLookup(selection, continueLookup = true) {
+ if (!selection.length) {
+ return resolve(shortlist.active.slice(0, constants.K));
+ }
+
+ async.each(selection, (contact, next) => {
+ // NB: mark this node as contacted so as to avoid repeats
+ shortlist.contacted(contact);
+
+ this.send(...createRpc(contact), (err, result) => {
+ if (err) {
+ return next();
+ }
+
+ // NB: mark this node as active to include it in any return values
+ shortlist.responded(contact);
+
+ // NB: If the result is a contact/node list, just keep track of it
+ // NB: Otherwise, do not proceed with iteration, just callback
+ if (Array.isArray(result) || method !== 'FIND_VALUE') {
+ shortlist
+ .add(Array.isArray(result) ? result : [])
+ .forEach(contact => {
+ // NB: If it wasn't in the shortlist, we haven't added to the
+ // NB: routing table, so do that now.
+ this._updateContact(...contact);
+ });
+
+ return next();
+ }
+
+ // NB: If we did get an item back, get the closest node we contacted
+ // NB: who is missing the value and store a copy with them
+ const closestMissingValue = shortlist.active[0]
+
+ if (closestMissingValue) {
+ this.send('STORE', [
+ key,
+ this._createStorageItem(result)
+ ], closestMissingValue, () => null);
+ }
+
+ // NB: we found a value, so stop searching
+ resolve(result, contact);
+ });
+ }, () => {
+
+ // NB: If we have reached at least K active nodes, or haven't found a
+ // NB: closer node, even on our finishing trip, return to the caller
+ // NB: the K closest active nodes.
+ if (shortlist.active.length >= constants.K ||
+ (closest[0] === shortlist.closest[0] && !continueLookup)
+ ) {
+ return resolve(shortlist.active.slice(0, constants.K));
+ }
+
+ // NB: we haven't discovered a closer node, call k uncalled nodes and
+ // NB: finish up
+ if (closest[0] === shortlist.closest[0]) {
+ return iterativeLookup.call(
+ this,
+ shortlist.uncontacted.slice(0, constants.K),
+ false
+ );
+ }
+
+ closest = shortlist.closest;
+
+ // NB: continue the lookup with ALPHA close, uncontacted nodes
+ iterativeLookup.call(
+ this,
+ shortlist.uncontacted.slice(0, constants.ALPHA),
+ true
+ );
+ });
+ }
+
+ iterativeLookup.call(
+ this,
+ shortlist.uncontacted.slice(0, constants.ALPHA),
+ true
+ );
+ });
+ }
+ /**
+ * Adds the given contact to the routing table
+ * @private
+ */
+ _updateContact(identity, contact) {
+ this._updateContactQueue.push({ identity, contact }, (err, headId) => {
+ if (err) {
+ this.router.removeContactByNodeId(headId);
+ this.router.addContactByNodeId(identity, contact);
+ }
+ });
+ }
+
+ /**
+ * Worker for updating contact in a routing table bucket
+ * @private
+ */
+ _updateContactWorker(task, callback) {
+ const { identity, contact } = task;
+
+ if (identity === this.identity.toString('hex')) {
+ return callback();
+ }
+
+ const now = Date.now();
+ const reset = 600000;
+ const [, bucket, contactIndex] = this.router.addContactByNodeId(
+ identity,
+ contact
+ );
+
+ const [headId, headContact] = bucket.head;
+ const lastPing = this._pings.get(headId);
+
+ if (contactIndex !== -1) {
+ return callback();
+ }
+
+ if (lastPing && lastPing.responded && lastPing.timestamp > (now - reset)) {
+ return callback();
+ }
+
+ this.ping([headId, headContact], (err) => {
+ this._pings.set(headId, { timestamp: Date.now(), responded: !err });
+ callback(err, headId);
+ });
+ }
+
+}
+
+module.exports = KademliaNode;
+
+
+