diff --git a/example.js b/example.js index 473bba5..9dd8f06 100644 --- a/example.js +++ b/example.js @@ -1,9 +1,9 @@ -var mq = require('mqemitter-redis')() -var persistence = require('.')() -var aedes = require('aedes')({ +const mq = require('mqemitter-redis')() +const persistence = require('.')() +const aedes = require('aedes')({ mq, persistence }) -var server = require('net').createServer(aedes.handle) +const server = require('net').createServer(aedes.handle) server.listen(1883) diff --git a/package.json b/package.json index 594cd9f..3abd581 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,7 @@ ], "repository": { "type": "git", - "url": "git+https://github.com/mcollina/aedes-persistence-redis.git" + "url": "git+https://github.com/moscajs/aedes-persistence-redis.git" }, "keywords": [ "mqtt", @@ -43,32 +43,30 @@ "author": "Matteo Collina ", "license": "MIT", "bugs": { - "url": "https://github.com/mcollina/aedes-persistence-redis/issues" + "url": "https://github.com/moscajs/aedes-persistence-redis/issues" }, - "homepage": "https://github.com/mcollina/aedes-persistence-redis#readme", + "homepage": "https://github.com/moscajs/aedes-persistence-redis#readme", "devDependencies": { - "concat-stream": "^2.0.0", - "fastq": "^1.8.0", + "fastq": "^1.13.0", "faucet": "0.0.1", "license-checker": "^25.0.1", - "mqemitter": "^4.4.0", - "mqemitter-redis": "^4.0.3", - "mqtt": "^4.2.1", + "mqemitter": "^4.5.0", + "mqemitter-redis": "^5.0.0", + "mqtt": "^4.3.7", "nyc": "^15.1.0", "pre-commit": "^1.2.2", - "release-it": "^14.0.3", + "release-it": "^15.0.0", "snazzy": "^9.0.0", - "standard": "^14.3.4", - "tape": "^4.13.2" + "standard": "^17.0.0", + "tape": "^4.15.1" }, "dependencies": { - "aedes-cached-persistence": "^8.1.0", - "from2": "^2.3.0", + "aedes-cached-persistence": "^8.1.1", "hashlru": "^2.3.0", - "ioredis": "^4.17.3", + "ioredis": "^5.0.4", "msgpack-lite": "^0.1.26", "pump": "^3.0.0", - "qlobber": "^5.0.3", + "qlobber": "^7.0.0", "through2": "^4.0.2", "throughv": "^1.0.4" } diff --git a/persistence.js b/persistence.js index 507f3d9..fad99a5 100644 --- a/persistence.js +++ b/persistence.js @@ -1,14 +1,11 @@ -'use strict' - const Redis = require('ioredis') -const from = require('from2') +const { Readable } = require('stream') const through = require('through2') const throughv = require('throughv') const msgpack = require('msgpack-lite') const pump = require('pump') const CachedPersistence = require('aedes-cached-persistence') const Packet = CachedPersistence.Packet -const inherits = require('util').inherits const HLRU = require('hashlru') const QlobberTrue = require('qlobber').QlobberTrue const qlobberOpts = { @@ -26,311 +23,535 @@ const outgoingKey = 'outgoing:' const outgoingIdKey = 'outgoing-id:' const incomingKey = 'incoming:' -function RedisPersistence (opts) { - if (!(this instanceof RedisPersistence)) { - return new RedisPersistence(opts) - } +class RedisPersistence extends CachedPersistence { + constructor (opts = {}) { + super(opts) + this.maxSessionDelivery = opts.maxSessionDelivery || 1000 + this.packetTTL = opts.packetTTL || (() => { return 0 }) - opts = opts || {} - this.maxSessionDelivery = opts.maxSessionDelivery || 1000 - this.packetTTL = opts.packetTTL || function () { return 0 } + this.messageIdCache = HLRU(100000) - this.messageIdCache = HLRU(100000) + if (opts.cluster) { + this._db = new Redis.Cluster(opts.cluster) + } else { + this._db = opts.conn || new Redis(opts) + } - if (opts.cluster) { - this._db = new Redis.Cluster(opts.cluster) - } else { - this._db = opts.conn || new Redis(opts) + this._getRetainedChunkBound = this._getRetainedChunk.bind(this) } - this._getRetainedChunkBound = this._getRetainedChunk.bind(this) - CachedPersistence.call(this, opts) -} - -inherits(RedisPersistence, CachedPersistence) + storeRetained (packet, cb) { + if (packet.payload.length === 0) { + this._db.hdel(retainedKey, packet.topic, cb) + } else { + this._db.hset(retainedKey, packet.topic, msgpack.encode(packet), cb) + } + } -RedisPersistence.prototype.storeRetained = function (packet, cb) { - if (packet.payload.length === 0) { - this._db.hdel(retainedKey, packet.topic, cb) - } else { - this._db.hset(retainedKey, packet.topic, msgpack.encode(packet), cb) + _getRetainedChunk (chunk, enc, cb) { + this._db.hgetBuffer(retainedKey, chunk, cb) } -} -RedisPersistence.prototype._getRetainedChunk = function (chunk, enc, cb) { - this._db.hgetBuffer(retainedKey, chunk, cb) -} + createRetainedStreamCombi (patterns) { + const that = this + const qlobber = new QlobberTrue(qlobberOpts) + + for (const pattern of patterns) { + qlobber.add(pattern) + } + + const stream = through.obj(that._getRetainedChunkBound) -RedisPersistence.prototype.createRetainedStreamCombi = function (patterns) { - var that = this - var qlobber = new QlobberTrue(qlobberOpts) + this._db.hkeys(retainedKey, function getKeys (err, keys) { + if (err) { + stream.emit('error', err) + } else { + matchRetained(stream, keys, qlobber) + } + }) - for (var i = 0; i < patterns.length; i++) { - qlobber.add(patterns[i]) + return pump(stream, throughv.obj(decodeRetainedPacket)) } - var stream = through.obj(that._getRetainedChunkBound) + createRetainedStream (pattern) { + return this.createRetainedStreamCombi([pattern]) + } - this._db.hkeys(retainedKey, function getKeys (err, keys) { - if (err) { - stream.emit('error', err) - } else { - matchRetained(stream, keys, qlobber) + addSubscriptions (client, subs, cb) { + if (!this.ready) { + this.once('ready', this.addSubscriptions.bind(this, client, subs, cb)) + return } - }) - return pump(stream, throughv.obj(decodeRetainedPacket)) -} + const clientSubKey = clientKey + client.id -RedisPersistence.prototype.createRetainedStream = function (pattern) { - return this.createRetainedStreamCombi([pattern]) -} + const toStore = {} + let published = 0 + let errored -function matchRetained (stream, keys, qlobber) { - for (var i = 0, l = keys.length; i < l; i++) { - if (qlobber.test(keys[i])) { - stream.write(keys[i]) + for (const sub of subs) { + toStore[sub.topic] = sub.qos + } + + this._db.sadd(clientsKey, client.id, finish) + this._db.hmset(clientSubKey, toStore, finish) + + this._addedSubscriptions(client, subs, finish) + + function finish (err) { + errored = err + published++ + if (published === 3) { + cb(errored, client) + } } } - stream.end() -} -function decodeRetainedPacket (chunk, enc, cb) { - cb(null, msgpack.decode(chunk)) -} + removeSubscriptions (client, subs, cb) { + if (!this.ready) { + this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb)) + return + } + + const clientSubKey = clientKey + client.id + + let errored = false + let outstanding = 0 -RedisPersistence.prototype.addSubscriptions = function (client, subs, cb) { - if (!this.ready) { - this.once('ready', this.addSubscriptions.bind(this, client, subs, cb)) - return + function check (err) { + if (err) { + if (!errored) { + errored = true + cb(err) + } + } + + if (errored) { + return + } + + outstanding-- + if (outstanding === 0) { + cb(null, client) + } + } + + const that = this + this._db.hdel(clientSubKey, subs, function subKeysRemoved (err) { + if (err) { + return cb(err) + } + + outstanding++ + that._db.exists(clientSubKey, function checkAllSubsRemoved (err, subCount) { + if (err) { + return check(err) + } + if (subCount === 0) { + outstanding++ + that._db.del(outgoingKey + client.id, check) + return that._db.srem(clientsKey, client.id, check) + } + check() + }) + + outstanding++ + that._removedSubscriptions(client, subs.map(toSub), check) + }) } - var clientSubKey = clientKey + client.id + subscriptionsByClient (client, cb) { + const clientSubKey = clientKey + client.id - var toStore = {} - var published = 0 - var errored + this._db.hgetall(clientSubKey, function returnSubs (err, subs) { + const toReturn = returnSubsForClient(subs) + cb(err, toReturn.length > 0 ? toReturn : null, client) + }) + } + + countOffline (cb) { + const that = this + + this._db.scard(clientsKey, function countOfflineClients (err, count) { + if (err) { + return cb(err) + } - for (var i = 0; i < subs.length; i++) { - var sub = subs[i] - toStore[sub.topic] = sub.qos + cb(null, that._trie.subscriptionsCount, parseInt(count) || 0) + }) } - this._db.sadd(clientsKey, client.id, finish) - this._db.hmset(clientSubKey, toStore, finish) + subscriptionsByTopic (topic, cb) { + if (!this.ready) { + this.once('ready', this.subscriptionsByTopic.bind(this, topic, cb)) + return this + } - this._addedSubscriptions(client, subs, finish) + const result = this._trie.match(topic) - function finish (err) { - errored = err - published++ - if (published === 3) { - cb(errored, client) + cb(null, result) + } + + _setup () { + if (this.ready) { + return } + + const that = this + + const hgetallStream = throughv.obj(function getStream (clientId, enc, cb) { + const clientSubKey = clientKey + clientId + that._db.hgetall(clientSubKey, function clientHash (err, hash) { + cb(err, { clientHash: hash, clientId }) + }) + }, function emitReady (cb) { + that.ready = true + that.emit('ready') + cb() + }).on('data', function processKeys (data) { + processKeysForClient(data.clientId, data.clientHash, that) + }) + + this._db.smembers(clientsKey, function smembers (err, clientIds) { + if (err) { + hgetallStream.emit('error', err) + } else { + for (const clientId of clientIds) { + hgetallStream.write(clientId) + } + hgetallStream.end() + } + }) } -} -RedisPersistence.prototype.removeSubscriptions = function (client, subs, cb) { - if (!this.ready) { - this.once('ready', this.removeSubscriptions.bind(this, client, subs, cb)) - return + outgoingEnqueue (sub, packet, cb) { + this.outgoingEnqueueCombi([sub], packet, cb) } - var clientSubKey = clientKey + client.id + outgoingEnqueueCombi (subs, packet, cb) { + if (!subs || subs.length === 0) { + return cb(null, packet) + } + let count = 0 + let outstanding = 1 + let errored = false + const packetKey = `packet:${packet.brokerId}:${packet.brokerCounter}` + const countKey = `packet:${packet.brokerId}:${packet.brokerCounter}:offlineCount` + const ttl = this.packetTTL(packet) - var errored = false - var outstanding = 0 + const encoded = msgpack.encode(new Packet(packet)) - function check (err) { - if (err) { - if (!errored) { - errored = true - cb(err) - } + this._db.mset(packetKey, encoded, countKey, subs.length, finish) + if (ttl > 0) { + outstanding += 2 + this._db.expire(packetKey, ttl, finish) + this._db.expire(countKey, ttl, finish) } - if (errored) { - return + for (const sub of subs) { + const listKey = outgoingKey + sub.clientId + this._db.rpush(listKey, packetKey, finish) } - outstanding-- - if (outstanding === 0) { - cb(null, client) + function finish (err) { + count++ + if (err) { + errored = err + return cb(err) + } + if (count === (subs.length + outstanding) && !errored) { + cb(null, packet) + } } } - var that = this - this._db.hdel(clientSubKey, subs, function subKeysRemoved (err) { - if (err) { - return cb(err) + outgoingUpdate (client, packet, cb) { + const that = this + if (packet.brokerId && packet.messageId) { + updateWithClientData(this, client, packet, cb) + } else { + augmentWithBrokerData(this, client, packet, function updateClient (err) { + if (err) { return cb(err, client, packet) } + + updateWithClientData(that, client, packet, cb) + }) } + } + + outgoingClearMessageId (client, packet, cb) { + const that = this + const clientListKey = outgoingKey + client.id + const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` - outstanding++ - that._db.exists(clientSubKey, function checkAllSubsRemoved (err, subCount) { + const clientKey = this.messageIdCache.get(messageIdKey) + this.messageIdCache.remove(messageIdKey) + + if (!clientKey) { + return cb(null, packet) + } + + let count = 0 + let errored = false + + // TODO can be cached in case of wildcard deliveries + this._db.getBuffer(clientKey, function clearMessageId (err, buf) { + let origPacket + let packetKey + let countKey if (err) { - return check(err) + errored = err + return cb(err) } - if (subCount === 0) { - outstanding++ - that._db.del(outgoingKey + client.id, check) - return that._db.srem(clientsKey, client.id, check) + if (buf) { + origPacket = msgpack.decode(buf) + origPacket.messageId = packet.messageId + + packetKey = `packet:${origPacket.brokerId}:${origPacket.brokerCounter}` + countKey = `packet:${origPacket.brokerId}:${origPacket.brokerCounter}:offlineCount` + + if (clientKey !== packetKey) { // qos=2 + that._db.del(clientKey, finish) + } else { + finish() + } + } else { + finish() } - check() - }) - outstanding++ - that._removedSubscriptions(client, subs.map(toSub), check) - }) -} + that._db.lrem(clientListKey, 0, packetKey, finish) -function toSub (topic) { - return { - topic: topic + that._db.decr(countKey, (err, remained) => { + if (err) { + errored = err + return cb(err) + } + if (remained === 0) { + that._db.del(packetKey, countKey, finish) + } else { + finish() + } + }) + + function finish (err) { + count++ + if (err) { + errored = err + return cb(err) + } + if (count === 3 && !errored) { + cb(err, origPacket) + } + } + }) } -} -RedisPersistence.prototype.subscriptionsByClient = function (client, cb) { - var clientSubKey = clientKey + client.id + outgoingStream (client) { + const clientListKey = outgoingKey + client.id + const stream = throughv.obj(this._buildAugment(clientListKey)) - this._db.hgetall(clientSubKey, function returnSubs (err, subs) { - var toReturn = returnSubsForClient(subs) - cb(err, toReturn.length > 0 ? toReturn : null, client) - }) -} + this._db.lrange(clientListKey, 0, this.maxSessionDelivery, lrangeResult) -function returnSubsForClient (subs) { - var subKeys = Object.keys(subs) + function lrangeResult (err, results) { + if (err) { + stream.emit('error', err) + } else { + for (const result of results) { + stream.write(result) + } + stream.end() + } + } - var toReturn = [] + return stream + } - if (subKeys.length === 0) { - return toReturn + incomingStorePacket (client, packet, cb) { + const key = `${incomingKey + client.id}:${packet.messageId}` + const newp = new Packet(packet) + newp.messageId = packet.messageId + this._db.set(key, msgpack.encode(newp), cb) } - for (var i = 0; i < subKeys.length; i++) { - toReturn.push({ - topic: subKeys[i], - qos: parseInt(subs[subKeys[i]]) + incomingGetPacket (client, packet, cb) { + const key = `${incomingKey + client.id}:${packet.messageId}` + this._db.getBuffer(key, function decodeBuffer (err, buf) { + if (err) { + return cb(err) + } + + if (!buf) { + return cb(new Error('no such packet')) + } + + cb(null, msgpack.decode(buf), client) }) } - return toReturn -} + incomingDelPacket (client, packet, cb) { + const key = `${incomingKey + client.id}:${packet.messageId}` + this._db.del(key, cb) + } -RedisPersistence.prototype.countOffline = function (cb) { - var that = this + putWill (client, packet, cb) { + const key = `${willKey + this.broker.id}:${client.id}` + packet.clientId = client.id + packet.brokerId = this.broker.id + this._db.rpush(willsKey, key) + this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer) - this._db.scard(clientsKey, function countOfflineClients (err, count) { - if (err) { - return cb(err) + function encodeBuffer (err) { + cb(err, client) } + } - cb(null, that._trie.subscriptionsCount, parseInt(count) || 0) - }) -} + getWill (client, cb) { + const key = `${willKey + this.broker.id}:${client.id}` + this._db.getBuffer(key, function getWillForClient (err, packet) { + if (err) { return cb(err) } + + let result = null + + if (packet) { + result = msgpack.decode(packet) + } -RedisPersistence.prototype.subscriptionsByTopic = function (topic, cb) { - if (!this.ready) { - this.once('ready', this.subscriptionsByTopic.bind(this, topic, cb)) - return this + cb(null, result, client) + }) } - var result = this._trie.match(topic) + delWill (client, cb) { + const key = `${willKey + client.brokerId}:${client.id}` + let result = null + const that = this + this._db.lrem(willsKey, 0, key) + this._db.getBuffer(key, function getClientWill (err, packet) { + if (err) { return cb(err) } - cb(null, result) -} + if (packet) { + result = msgpack.decode(packet) + } -RedisPersistence.prototype._setup = function () { - if (this.ready) { - return + that._db.del(key, function deleteWill (err) { + cb(err, result, client) + }) + }) } - var that = this + streamWill (brokers) { + const stream = throughv.obj(this._buildAugment(willsKey)) - var hgetallStream = throughv.obj(function getStream (clientId, enc, cb) { - var clientSubKey = clientKey + clientId - that._db.hgetall(clientSubKey, function clientHash (err, hash) { - cb(err, { clientHash: hash, clientId: clientId }) - }) - }, function emitReady (cb) { - that.ready = true - that.emit('ready') - cb() - }).on('data', function processKeys (data) { - processKeysForClient(data.clientId, data.clientHash, that) - }) + this._db.lrange(willsKey, 0, 10000, streamWill) - this._db.smembers(clientsKey, function smembers (err, clientIds) { - if (err) { - hgetallStream.emit('error', err) - } else { - for (var i = 0, l = clientIds.length; i < l; i++) { - hgetallStream.write(clientIds[i]) + function streamWill (err, results) { + if (err) { + stream.emit('error', err) + } else { + for (const result of results) { + if (!brokers || !brokers[result.split(':')[1]]) { + stream.write(result) + } + } + stream.end() } - hgetallStream.end() } - }) -} + return stream + } -function processKeysForClient (clientId, clientHash, that) { - var topics = Object.keys(clientHash) - for (var i = 0; i < topics.length; i++) { - var topic = topics[i] - that._trie.add(topic, { - clientId: clientId, - topic: topic, - qos: clientHash[topic] + * #getClientIdFromEntries (entries) { + for (const entry of entries) { + yield entry.clientId + } + } + + getClientList (topic) { + const entries = this._trie.match(topic, topic) + return Readable.from(this.#getClientIdFromEntries(entries)) + } + + _buildAugment (listKey) { + const that = this + return function decodeAndAugment (key, enc, cb) { + that._db.getBuffer(key, function decodeMessage (err, result) { + let decoded + if (result) { + decoded = msgpack.decode(result) + } + if (err || !decoded) { + that._db.lrem(listKey, 0, key) + } + cb(err, decoded) + }) + } + } + + destroy (cb) { + const that = this + CachedPersistence.prototype.destroy.call(this, function disconnect () { + that._db.disconnect() + + if (cb) { + that._db.on('end', cb) + } }) } } -RedisPersistence.prototype.outgoingEnqueue = function (sub, packet, cb) { - this.outgoingEnqueueCombi([sub], packet, cb) +function matchRetained (stream, keys, qlobber) { + for (const key of keys) { + if (qlobber.test(key)) { + stream.write(key) + } + } + stream.end() +} + +function decodeRetainedPacket (chunk, enc, cb) { + cb(null, msgpack.decode(chunk)) } -RedisPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) { - if (!subs || subs.length === 0) { - return cb(null, packet) +function toSub (topic) { + return { + topic } - var count = 0 - var outstanding = 1 - var errored = false - var packetKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter - var countKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter + ':offlineCount' - var ttl = this.packetTTL(packet) +} + +function returnSubsForClient (subs) { + const subKeys = Object.keys(subs) - var encoded = msgpack.encode(new Packet(packet)) + const toReturn = [] - this._db.mset(packetKey, encoded, countKey, subs.length, finish) - if (ttl > 0) { - outstanding += 2 - this._db.expire(packetKey, ttl, finish) - this._db.expire(countKey, ttl, finish) + if (subKeys.length === 0) { + return toReturn } - for (var i = 0; i < subs.length; i++) { - var listKey = outgoingKey + subs[i].clientId - this._db.rpush(listKey, packetKey, finish) + for (const subKey of subKeys) { + toReturn.push({ + topic: subKey, + qos: parseInt(subs[subKey]) + }) } - function finish (err) { - count++ - if (err) { - errored = err - return cb(err) - } - if (count === (subs.length + outstanding) && !errored) { - cb(null, packet) - } + return toReturn +} + +function processKeysForClient (clientId, clientHash, that) { + const topics = Object.keys(clientHash) + + for (const topic of topics) { + that._trie.add(topic, { + clientId, + topic, + qos: clientHash[topic] + }) } } function updateWithClientData (that, client, packet, cb) { - var messageIdKey = outgoingIdKey + client.id + ':' + packet.messageId - var clientListKey = outgoingKey + client.id - var packetKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter + const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` + const clientListKey = outgoingKey + client.id + const packetKey = `packet:${packet.brokerId}:${packet.brokerCounter}` - var ttl = that.packetTTL(packet) + const ttl = that.packetTTL(packet) if (packet.cmd && packet.cmd !== 'pubrel') { // qos=1 that.messageIdCache.set(messageIdKey, packetKey) if (ttl > 0) { @@ -341,11 +562,11 @@ function updateWithClientData (that, client, packet, cb) { } // qos=2 - var clientUpdateKey = outgoingKey + client.id + ':' + packet.brokerId + ':' + packet.brokerCounter + const clientUpdateKey = `${outgoingKey + client.id}:${packet.brokerId}:${packet.brokerCounter}` that.messageIdCache.set(messageIdKey, clientUpdateKey) - var count = 0 - that._db.lrem(clientListKey, 0, packetKey, function (err, removed) { + let count = 0 + that._db.lrem(clientListKey, 0, packetKey, (err, removed) => { if (err) { return cb(err) } @@ -356,7 +577,7 @@ function updateWithClientData (that, client, packet, cb) { } }) - var encoded = msgpack.encode(packet) + const encoded = msgpack.encode(packet) if (ttl > 0) { that._db.set(clientUpdateKey, encoded, 'EX', ttl, setPostKey) @@ -396,248 +617,17 @@ function updateWithClientData (that, client, packet, cb) { } function augmentWithBrokerData (that, client, packet, cb) { - var messageIdKey = outgoingIdKey + client.id + ':' + packet.messageId + const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` - var key = that.messageIdCache.get(messageIdKey) + const key = that.messageIdCache.get(messageIdKey) if (!key) { return cb(new Error('unknown key')) } - var tokens = key.split(':') + const tokens = key.split(':') packet.brokerId = tokens[tokens.length - 2] packet.brokerCounter = tokens[tokens.length - 1] cb(null) } -RedisPersistence.prototype.outgoingUpdate = function (client, packet, cb) { - var that = this - if (packet.brokerId && packet.messageId) { - updateWithClientData(this, client, packet, cb) - } else { - augmentWithBrokerData(this, client, packet, function updateClient (err) { - if (err) { return cb(err, client, packet) } - - updateWithClientData(that, client, packet, cb) - }) - } -} - -RedisPersistence.prototype.outgoingClearMessageId = function (client, packet, cb) { - var that = this - var clientListKey = outgoingKey + client.id - var messageIdKey = outgoingIdKey + client.id + ':' + packet.messageId - - var clientKey = this.messageIdCache.get(messageIdKey) - this.messageIdCache.remove(messageIdKey) - - if (!clientKey) { - return cb(null, packet) - } - - var count = 0 - var errored = false - - // TODO can be cached in case of wildcard deliveries - this._db.getBuffer(clientKey, function clearMessageId (err, buf) { - if (err) { - errored = err - return cb(err) - } - if (buf) { - var origPacket = msgpack.decode(buf) - origPacket.messageId = packet.messageId - - var packetKey = 'packet:' + origPacket.brokerId + ':' + origPacket.brokerCounter - var countKey = 'packet:' + origPacket.brokerId + ':' + origPacket.brokerCounter + ':offlineCount' - - if (clientKey !== packetKey) { // qos=2 - that._db.del(clientKey, finish) - } else { - finish() - } - } else { - finish() - } - - that._db.lrem(clientListKey, 0, packetKey, finish) - - that._db.decr(countKey, function (err, remained) { - if (err) { - errored = err - return cb(err) - } - if (remained === 0) { - that._db.del(packetKey, countKey, finish) - } else { - finish() - } - }) - - function finish (err) { - count++ - if (err) { - errored = err - return cb(err) - } - if (count === 3 && !errored) { - cb(err, origPacket) - } - } - }) -} - -RedisPersistence.prototype.outgoingStream = function (client) { - var clientListKey = outgoingKey + client.id - var stream = throughv.obj(this._buildAugment(clientListKey)) - - this._db.lrange(clientListKey, 0, this.maxSessionDelivery, lrangeResult) - - function lrangeResult (err, results) { - if (err) { - stream.emit('error', err) - } else { - for (var i = 0, l = results.length; i < l; i++) { - stream.write(results[i]) - } - stream.end() - } - } - - return stream -} - -RedisPersistence.prototype.incomingStorePacket = function (client, packet, cb) { - var key = incomingKey + client.id + ':' + packet.messageId - var newp = new Packet(packet) - newp.messageId = packet.messageId - this._db.set(key, msgpack.encode(newp), cb) -} - -RedisPersistence.prototype.incomingGetPacket = function (client, packet, cb) { - var key = incomingKey + client.id + ':' + packet.messageId - this._db.getBuffer(key, function decodeBuffer (err, buf) { - if (err) { - return cb(err) - } - - if (!buf) { - return cb(new Error('no such packet')) - } - - cb(null, msgpack.decode(buf), client) - }) -} - -RedisPersistence.prototype.incomingDelPacket = function (client, packet, cb) { - var key = incomingKey + client.id + ':' + packet.messageId - this._db.del(key, cb) -} - -RedisPersistence.prototype.putWill = function (client, packet, cb) { - var key = willKey + this.broker.id + ':' + client.id - packet.clientId = client.id - packet.brokerId = this.broker.id - this._db.rpush(willsKey, key) - this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer) - - function encodeBuffer (err) { - cb(err, client) - } -} - -RedisPersistence.prototype.getWill = function (client, cb) { - var key = willKey + this.broker.id + ':' + client.id - this._db.getBuffer(key, function getWillForClient (err, packet) { - if (err) { return cb(err) } - - var result = null - - if (packet) { - result = msgpack.decode(packet) - } - - cb(null, result, client) - }) -} - -RedisPersistence.prototype.delWill = function (client, cb) { - var key = willKey + client.brokerId + ':' + client.id - var result = null - var that = this - this._db.lrem(willsKey, 0, key) - this._db.getBuffer(key, function getClientWill (err, packet) { - if (err) { return cb(err) } - - if (packet) { - result = msgpack.decode(packet) - } - - that._db.del(key, function deleteWill (err) { - cb(err, result, client) - }) - }) -} - -RedisPersistence.prototype.streamWill = function (brokers) { - var stream = throughv.obj(this._buildAugment(willsKey)) - - this._db.lrange(willsKey, 0, 10000, streamWill) - - function streamWill (err, results) { - if (err) { - stream.emit('error', err) - } else { - for (var i = 0, l = results.length; i < l; i++) { - if (!brokers || !brokers[results[i].split(':')[1]]) { - stream.write(results[i]) - } - } - stream.end() - } - } - return stream -} - -RedisPersistence.prototype.getClientList = function (topic) { - var entries = this._trie.match(topic, topic) - - function pushClientList (size, next) { - if (entries.length === 0) { - return next(null, null) - } - var chunk = entries.slice(0, 1) - entries = entries.slice(1) - next(null, chunk[0].clientId) - } - - return from.obj(pushClientList) -} - -RedisPersistence.prototype._buildAugment = function (listKey) { - var that = this - return function decodeAndAugment (key, enc, cb) { - that._db.getBuffer(key, function decodeMessage (err, result) { - var decoded - if (result) { - decoded = msgpack.decode(result) - } - if (err || !decoded) { - that._db.lrem(listKey, 0, key) - } - cb(err, decoded) - }) - } -} - -RedisPersistence.prototype.destroy = function (cb) { - var that = this - CachedPersistence.prototype.destroy.call(this, function disconnect () { - that._db.disconnect() - - if (cb) { - that._db.on('end', cb) - } - }) -} - -module.exports = RedisPersistence +module.exports = (opts) => new RedisPersistence(opts) diff --git a/test.js b/test.js index 987126d..993ff74 100644 --- a/test.js +++ b/test.js @@ -1,13 +1,11 @@ -'use strict' - -var test = require('tape').test -var persistence = require('./') -var Redis = require('ioredis') -var mqemitterRedis = require('mqemitter-redis') -var abs = require('aedes-cached-persistence/abstract') -var db = new Redis() - -db.on('error', function (e) { +const test = require('tape').test +const persistence = require('./') +const Redis = require('ioredis') +const mqemitterRedis = require('mqemitter-redis') +const abs = require('aedes-cached-persistence/abstract') +const db = new Redis() + +db.on('error', e => { console.trace(e) }) @@ -17,26 +15,26 @@ function unref () { this.connector.stream.unref() } -test('external Redis conn', function (t) { +test('external Redis conn', t => { t.plan(2) - var externalRedis = new Redis() - var emitter = mqemitterRedis() + const externalRedis = new Redis() + const emitter = mqemitterRedis() - db.on('error', function (e) { + db.on('error', e => { t.notOk(e) }) - db.on('connect', function () { + db.on('connect', () => { t.pass('redis connected') }) - var instance = persistence({ + const instance = persistence({ conn: externalRedis }) instance.broker = toBroker('1', emitter) - instance.on('ready', function () { + instance.on('ready', () => { t.pass('instance ready') externalRedis.disconnect() instance.destroy() @@ -45,15 +43,15 @@ test('external Redis conn', function (t) { }) abs({ - test: test, - buildEmitter: function () { + test, + buildEmitter () { const emitter = mqemitterRedis() emitter.subConn.on('connect', unref) emitter.pubConn.on('connect', unref) return emitter }, - persistence: function () { + persistence () { db.flushall() return persistence() }, @@ -62,30 +60,30 @@ abs({ function toBroker (id, emitter) { return { - id: id, + id, publish: emitter.emit.bind(emitter), subscribe: emitter.on.bind(emitter), unsubscribe: emitter.removeListener.bind(emitter) } } -test('packet ttl', function (t) { +test('packet ttl', t => { t.plan(4) db.flushall() - var emitter = mqemitterRedis() - var instance = persistence({ - packetTTL: function () { + const emitter = mqemitterRedis() + const instance = persistence({ + packetTTL () { return 1 } }) instance.broker = toBroker('1', emitter) - var subs = [{ + const subs = [{ clientId: 'ttlTest', topic: 'hello', qos: 1 }] - var packet = { + const packet = { cmd: 'publish', topic: 'hello', payload: 'ttl test', @@ -97,12 +95,12 @@ test('packet ttl', function (t) { instance.outgoingEnqueueCombi(subs, packet, function enqueued (err, saved) { t.notOk(err) t.deepEqual(saved, packet) - setTimeout(function () { - var offlineStream = instance.outgoingStream({ id: 'ttlTest' }) - offlineStream.on('data', function (offlinePacket) { + setTimeout(() => { + const offlineStream = instance.outgoingStream({ id: 'ttlTest' }) + offlineStream.on('data', offlinePacket => { t.notOk(offlinePacket) }) - offlineStream.on('end', function () { + offlineStream.on('end', () => { instance.destroy(t.pass.bind(t, 'stop instance')) emitter.close(t.pass.bind(t, 'stop emitter')) }) @@ -110,12 +108,12 @@ test('packet ttl', function (t) { }) }) -test('outgoingUpdate doesn\'t clear packet ttl', function (t) { +test('outgoingUpdate doesn\'t clear packet ttl', t => { t.plan(5) db.flushall() const emitter = mqemitterRedis() const instance = persistence({ - packetTTL: function () { + packetTTL () { return 1 } }) @@ -143,7 +141,7 @@ test('outgoingUpdate doesn\'t clear packet ttl', function (t) { t.notOk(err) t.deepEqual(saved, packet) instance.outgoingUpdate(client, packet, function updated () { - setTimeout(function () { + setTimeout(() => { db.exists('packet:1:42', (_, exists) => { t.notOk(exists, 'packet key should have expired') }) @@ -154,18 +152,18 @@ test('outgoingUpdate doesn\'t clear packet ttl', function (t) { }) }) -test('multiple persistences', function (t) { +test('multiple persistences', t => { t.plan(7) db.flushall() - var emitter = mqemitterRedis() - var emitter2 = mqemitterRedis() - var instance = persistence() - var instance2 = persistence() + const emitter = mqemitterRedis() + const emitter2 = mqemitterRedis() + const instance = persistence() + const instance2 = persistence() instance.broker = toBroker('1', emitter) instance2.broker = toBroker('2', emitter2) - var client = { id: 'multipleTest' } - var subs = [{ + const client = { id: 'multipleTest' } + const subs = [{ topic: 'hello', qos: 1 }, { @@ -176,8 +174,8 @@ test('multiple persistences', function (t) { qos: 1 }] - var gotSubs = false - var addedSubs = false + let gotSubs = false + let addedSubs = false function close () { if (gotSubs && addedSubs) { @@ -188,8 +186,8 @@ test('multiple persistences', function (t) { } } - instance2._waitFor(client, 'sub_' + 'hello', function () { - instance2.subscriptionsByTopic('hello', function (err, resubs) { + instance2._waitFor(client, 'sub_' + 'hello', () => { + instance2.subscriptionsByTopic('hello', (err, resubs) => { t.notOk(err, 'subs by topic no error') t.deepEqual(resubs, [{ clientId: client.id, @@ -205,12 +203,12 @@ test('multiple persistences', function (t) { }) }) - var ready = false - var ready2 = false + let ready = false + let ready2 = false function addSubs () { if (ready && ready2) { - instance.addSubscriptions(client, subs, function (err) { + instance.addSubscriptions(client, subs, err => { t.notOk(err, 'add subs no error') addedSubs = true close() @@ -218,28 +216,28 @@ test('multiple persistences', function (t) { } } - instance.on('ready', function () { + instance.on('ready', () => { ready = true addSubs() }) - instance2.on('ready', function () { + instance2.on('ready', () => { ready2 = true addSubs() }) }) -test('unknown cache key', function (t) { +test('unknown cache key', t => { t.plan(3) db.flushall() - var emitter = mqemitterRedis() - var instance = persistence() - var client = { id: 'unknown_pubrec' } + const emitter = mqemitterRedis() + const instance = persistence() + const client = { id: 'unknown_pubrec' } instance.broker = toBroker('1', emitter) // packet with no brokerId - var packet = { + const packet = { cmd: 'pubrec', topic: 'hello', qos: 2, @@ -251,12 +249,12 @@ test('unknown cache key', function (t) { emitter.close(t.pass.bind(t, 'emitter dies')) } - instance.outgoingUpdate(client, packet, function (err, client, packet) { + instance.outgoingUpdate(client, packet, (err, client, packet) => { t.equal(err.message, 'unknown key', 'Received unknown PUBREC') close() }) }) -test.onFinish(function () { +test.onFinish(() => { process.exit(0) }) diff --git a/tester.js b/tester.js index c9b2dec..2a50db4 100644 --- a/tester.js +++ b/tester.js @@ -1,18 +1,16 @@ // npm install mqtt fastq // command to run : node fastbench 25000 -'use strict' - -var queue = require('fastq')(worker, 1) -var mqtt = require('mqtt') -var connected = 1 -var clients = [] -var count = 0 -var subscriptions = 0 -var st = Date.now() +const queue = require('fastq')(worker, 1) +const mqtt = require('mqtt') +let connected = 1 +const clients = [] +let count = 0 +let subscriptions = 0 +const st = Date.now() console.log('Started connecting clients', st) -setInterval(function () { - console.log(new Date() + '-' + count + ' - ' + subscriptions) +setInterval(() => { + console.log(`${new Date()}-${count} - ${subscriptions}`) }, 1000) function worker (arg, cb) { @@ -20,23 +18,21 @@ function worker (arg, cb) { port: 1883, keepalive: 60 }) - clients[count].on('connect', - (function (clientObj) { - return function () { - connected++ - if (connected === process.argv[2]) { - console.log('done connecting clients', Date.now() - st) - } - clientObj.subscribe(clientObj.options.clientId, function () { - subscriptions++ - }) - cb(null, 42 * 2) + clients[count].on('connect', (clientObj => { + return () => { + connected++ + if (connected === process.argv[2]) { + console.log('done connecting clients', Date.now() - st) } - }(clients[count])) - ) + clientObj.subscribe(clientObj.options.clientId, () => { + subscriptions++ + }) + cb(null, 42 * 2) + } + })(clients[count])) count++ } -for (var i = 0; i < process.argv[2]; i++) { - queue.push(42, function () {}) +for (let i = 0; i < process.argv[2]; i++) { + queue.push(42, () => {}) }