From 5bdd2aabc7d07dfe5482351db1230abe264a8277 Mon Sep 17 00:00:00 2001 From: Hans Klunder Date: Fri, 20 May 2022 16:34:01 +0200 Subject: [PATCH] feat: bump deps and sanitize clientId in redis keys (#96) --- README.md | 4 ++ UPGRADE.md | 6 ++ package.json | 6 +- persistence.js | 172 ++++++++++++++++++++++++++++--------------------- test.js | 15 +++-- 5 files changed, 121 insertions(+), 82 deletions(-) create mode 100644 UPGRADE.md diff --git a/README.md b/README.md index a801867..4b891aa 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,10 @@ See [aedes-persistence][aedes-persistence] for the full API, and [Aedes][aedes] npm install aedes aedes-persistence-redis --save ``` +## Upgrade + +Please check [UPGRADE.md](UPGRADE.md) when upgrading from a previous version. + ## API ### aedesPersistenceRedis([opts]) diff --git a/UPGRADE.md b/UPGRADE.md new file mode 100644 index 0000000..88011c6 --- /dev/null +++ b/UPGRADE.md @@ -0,0 +1,6 @@ +# Upgrade + +## x.x.x to 9.x.x +The database schema has changed between 8.x.x to 9.x.x. + +Start with a clean database if you migrate from x.x.x to 9.x.x diff --git a/package.json b/package.json index 3abd581..439c899 100644 --- a/package.json +++ b/package.json @@ -58,12 +58,12 @@ "release-it": "^15.0.0", "snazzy": "^9.0.0", "standard": "^17.0.0", - "tape": "^4.15.1" + "tape": "^5.5.3" }, "dependencies": { - "aedes-cached-persistence": "^8.1.1", + "aedes-cached-persistence": "^9.0.0", "hashlru": "^2.3.0", - "ioredis": "^5.0.4", + "ioredis": "^5.0.5", "msgpack-lite": "^0.1.26", "pump": "^3.0.0", "qlobber": "^7.0.0", diff --git a/persistence.js b/persistence.js index fad99a5..63ac9c7 100644 --- a/persistence.js +++ b/persistence.js @@ -7,21 +7,54 @@ const pump = require('pump') const CachedPersistence = require('aedes-cached-persistence') const Packet = CachedPersistence.Packet const HLRU = require('hashlru') -const QlobberTrue = require('qlobber').QlobberTrue +const { QlobberTrue } = require('qlobber') const qlobberOpts = { separator: '/', wildcard_one: '+', wildcard_some: '#', match_empty_levels: true } -const clientKey = 'client:' -const clientsKey = 'clients' -const willsKey = 'will' -const willKey = 'will:' -const retainedKey = 'retained' -const outgoingKey = 'outgoing:' -const outgoingIdKey = 'outgoing-id:' -const incomingKey = 'incoming:' +const CLIENTKEY = 'client:' +const CLIENTSKEY = 'clients' +const WILLSKEY = 'will' +const WILLKEY = 'will:' +const RETAINEDKEY = 'retained' +const OUTGOINGKEY = 'outgoing:' +const OUTGOINGIDKEY = 'outgoing-id:' +const INCOMINGKEY = 'incoming:' +const PACKETKEY = 'packet:' + +function clientSubKey (clientId) { + return `${CLIENTKEY}${encodeURIComponent(clientId)}` +} + +function willKey (brokerId, clientId) { + return `${WILLKEY}${brokerId}:${encodeURIComponent(clientId)}` +} + +function outgoingKey (clientId) { + return `${OUTGOINGKEY}${encodeURIComponent(clientId)}` +} + +function outgoingByBrokerKey (clientId, brokerId, brokerCounter) { + return `${outgoingKey(clientId)}:${brokerId}:${brokerCounter}` +} + +function outgoingIdKey (clientId, messageId) { + return `${OUTGOINGIDKEY}${encodeURIComponent(clientId)}:${messageId}` +} + +function incomingKey (clientId, messageId) { + return `${INCOMINGKEY}${encodeURIComponent(clientId)}:${messageId}` +} + +function packetKey (brokerId, brokerCounter) { + return `${PACKETKEY}${brokerId}:${brokerCounter}` +} + +function packetCountKey (brokerId, brokerCounter) { + return `${PACKETKEY}${brokerId}:${brokerCounter}:offlineCount` +} class RedisPersistence extends CachedPersistence { constructor (opts = {}) { @@ -42,14 +75,14 @@ class RedisPersistence extends CachedPersistence { storeRetained (packet, cb) { if (packet.payload.length === 0) { - this._db.hdel(retainedKey, packet.topic, cb) + this._db.hdel(RETAINEDKEY, packet.topic, cb) } else { - this._db.hset(retainedKey, packet.topic, msgpack.encode(packet), cb) + this._db.hset(RETAINEDKEY, packet.topic, msgpack.encode(packet), cb) } } _getRetainedChunk (chunk, enc, cb) { - this._db.hgetBuffer(retainedKey, chunk, cb) + this._db.hgetBuffer(RETAINEDKEY, chunk, cb) } createRetainedStreamCombi (patterns) { @@ -62,7 +95,7 @@ class RedisPersistence extends CachedPersistence { const stream = through.obj(that._getRetainedChunkBound) - this._db.hkeys(retainedKey, function getKeys (err, keys) { + this._db.hkeys(RETAINEDKEY, function getKeys (err, keys) { if (err) { stream.emit('error', err) } else { @@ -83,18 +116,16 @@ class RedisPersistence extends CachedPersistence { return } - const clientSubKey = clientKey + client.id - const toStore = {} let published = 0 let errored for (const sub of subs) { - toStore[sub.topic] = sub.qos + toStore[sub.topic] = msgpack.encode(sub) } - this._db.sadd(clientsKey, client.id, finish) - this._db.hmset(clientSubKey, toStore, finish) + this._db.sadd(CLIENTSKEY, client.id, finish) + this._db.hmsetBuffer(clientSubKey(client.id), toStore, finish) this._addedSubscriptions(client, subs, finish) @@ -113,7 +144,7 @@ class RedisPersistence extends CachedPersistence { return } - const clientSubKey = clientKey + client.id + const clientSK = clientSubKey(client.id) let errored = false let outstanding = 0 @@ -137,20 +168,20 @@ class RedisPersistence extends CachedPersistence { } const that = this - this._db.hdel(clientSubKey, subs, function subKeysRemoved (err) { + this._db.hdel(clientSK, subs, function subKeysRemoved (err) { if (err) { return cb(err) } outstanding++ - that._db.exists(clientSubKey, function checkAllSubsRemoved (err, subCount) { + that._db.exists(clientSK, function checkAllSubsRemoved (err, subCount) { if (err) { return check(err) } if (subCount === 0) { outstanding++ - that._db.del(outgoingKey + client.id, check) - return that._db.srem(clientsKey, client.id, check) + that._db.del(outgoingKey(client.id), check) + return that._db.srem(CLIENTSKEY, client.id, check) } check() }) @@ -161,9 +192,7 @@ class RedisPersistence extends CachedPersistence { } subscriptionsByClient (client, cb) { - const clientSubKey = clientKey + client.id - - this._db.hgetall(clientSubKey, function returnSubs (err, subs) { + this._db.hgetallBuffer(clientSubKey(client.id), function returnSubs (err, subs) { const toReturn = returnSubsForClient(subs) cb(err, toReturn.length > 0 ? toReturn : null, client) }) @@ -172,7 +201,7 @@ class RedisPersistence extends CachedPersistence { countOffline (cb) { const that = this - this._db.scard(clientsKey, function countOfflineClients (err, count) { + this._db.scard(CLIENTSKEY, function countOfflineClients (err, count) { if (err) { return cb(err) } @@ -200,8 +229,7 @@ class RedisPersistence extends CachedPersistence { const that = this const hgetallStream = throughv.obj(function getStream (clientId, enc, cb) { - const clientSubKey = clientKey + clientId - that._db.hgetall(clientSubKey, function clientHash (err, hash) { + that._db.hgetall(clientSubKey(clientId), function clientHash (err, hash) { cb(err, { clientHash: hash, clientId }) }) }, function emitReady (cb) { @@ -212,7 +240,7 @@ class RedisPersistence extends CachedPersistence { processKeysForClient(data.clientId, data.clientHash, that) }) - this._db.smembers(clientsKey, function smembers (err, clientIds) { + this._db.smembers(CLIENTSKEY, function smembers (err, clientIds) { if (err) { hgetallStream.emit('error', err) } else { @@ -235,22 +263,22 @@ class RedisPersistence extends CachedPersistence { let count = 0 let outstanding = 1 let errored = false - const packetKey = `packet:${packet.brokerId}:${packet.brokerCounter}` - const countKey = `packet:${packet.brokerId}:${packet.brokerCounter}:offlineCount` + const pktKey = packetKey(packet.brokerId, packet.brokerCounter) + const countKey = packetCountKey(packet.brokerId, packet.brokerCounter) const ttl = this.packetTTL(packet) const encoded = msgpack.encode(new Packet(packet)) - this._db.mset(packetKey, encoded, countKey, subs.length, finish) + this._db.mset(pktKey, encoded, countKey, subs.length, finish) if (ttl > 0) { outstanding += 2 - this._db.expire(packetKey, ttl, finish) + this._db.expire(pktKey, ttl, finish) this._db.expire(countKey, ttl, finish) } for (const sub of subs) { - const listKey = outgoingKey + sub.clientId - this._db.rpush(listKey, packetKey, finish) + const listKey = outgoingKey(sub.clientId) + this._db.rpush(listKey, pktKey, finish) } function finish (err) { @@ -267,7 +295,7 @@ class RedisPersistence extends CachedPersistence { outgoingUpdate (client, packet, cb) { const that = this - if (packet.brokerId && packet.messageId) { + if ('brokerId' in packet && 'messageId' in packet) { updateWithClientData(this, client, packet, cb) } else { augmentWithBrokerData(this, client, packet, function updateClient (err) { @@ -280,8 +308,8 @@ class RedisPersistence extends CachedPersistence { outgoingClearMessageId (client, packet, cb) { const that = this - const clientListKey = outgoingKey + client.id - const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` + const clientListKey = outgoingKey(client.id) + const messageIdKey = outgoingIdKey(client.id, packet.messageId) const clientKey = this.messageIdCache.get(messageIdKey) this.messageIdCache.remove(messageIdKey) @@ -296,7 +324,7 @@ class RedisPersistence extends CachedPersistence { // TODO can be cached in case of wildcard deliveries this._db.getBuffer(clientKey, function clearMessageId (err, buf) { let origPacket - let packetKey + let pktKey let countKey if (err) { errored = err @@ -306,10 +334,10 @@ class RedisPersistence extends CachedPersistence { origPacket = msgpack.decode(buf) origPacket.messageId = packet.messageId - packetKey = `packet:${origPacket.brokerId}:${origPacket.brokerCounter}` - countKey = `packet:${origPacket.brokerId}:${origPacket.brokerCounter}:offlineCount` + pktKey = packetKey(origPacket.brokerId, origPacket.brokerCounter) + countKey = packetCountKey(origPacket.brokerId, origPacket.brokerCounter) - if (clientKey !== packetKey) { // qos=2 + if (clientKey !== pktKey) { // qos=2 that._db.del(clientKey, finish) } else { finish() @@ -318,7 +346,7 @@ class RedisPersistence extends CachedPersistence { finish() } - that._db.lrem(clientListKey, 0, packetKey, finish) + that._db.lrem(clientListKey, 0, pktKey, finish) that._db.decr(countKey, (err, remained) => { if (err) { @@ -326,7 +354,7 @@ class RedisPersistence extends CachedPersistence { return cb(err) } if (remained === 0) { - that._db.del(packetKey, countKey, finish) + that._db.del(pktKey, countKey, finish) } else { finish() } @@ -346,7 +374,7 @@ class RedisPersistence extends CachedPersistence { } outgoingStream (client) { - const clientListKey = outgoingKey + client.id + const clientListKey = outgoingKey(client.id) const stream = throughv.obj(this._buildAugment(clientListKey)) this._db.lrange(clientListKey, 0, this.maxSessionDelivery, lrangeResult) @@ -366,14 +394,14 @@ class RedisPersistence extends CachedPersistence { } incomingStorePacket (client, packet, cb) { - const key = `${incomingKey + client.id}:${packet.messageId}` + const key = incomingKey(client.id, packet.messageId) const newp = new Packet(packet) newp.messageId = packet.messageId this._db.set(key, msgpack.encode(newp), cb) } incomingGetPacket (client, packet, cb) { - const key = `${incomingKey + client.id}:${packet.messageId}` + const key = incomingKey(client.id, packet.messageId) this._db.getBuffer(key, function decodeBuffer (err, buf) { if (err) { return cb(err) @@ -388,15 +416,15 @@ class RedisPersistence extends CachedPersistence { } incomingDelPacket (client, packet, cb) { - const key = `${incomingKey + client.id}:${packet.messageId}` + const key = incomingKey(client.id, packet.messageId) this._db.del(key, cb) } putWill (client, packet, cb) { - const key = `${willKey + this.broker.id}:${client.id}` + const key = willKey(this.broker.id, client.id) packet.clientId = client.id packet.brokerId = this.broker.id - this._db.rpush(willsKey, key) + this._db.rpush(WILLSKEY, key) this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer) function encodeBuffer (err) { @@ -405,7 +433,7 @@ class RedisPersistence extends CachedPersistence { } getWill (client, cb) { - const key = `${willKey + this.broker.id}:${client.id}` + const key = willKey(this.broker.id, client.id) this._db.getBuffer(key, function getWillForClient (err, packet) { if (err) { return cb(err) } @@ -420,10 +448,10 @@ class RedisPersistence extends CachedPersistence { } delWill (client, cb) { - const key = `${willKey + client.brokerId}:${client.id}` + const key = willKey(client.brokerId, client.id) let result = null const that = this - this._db.lrem(willsKey, 0, key) + this._db.lrem(WILLSKEY, 0, key) this._db.getBuffer(key, function getClientWill (err, packet) { if (err) { return cb(err) } @@ -438,9 +466,9 @@ class RedisPersistence extends CachedPersistence { } streamWill (brokers) { - const stream = throughv.obj(this._buildAugment(willsKey)) + const stream = throughv.obj(this._buildAugment(WILLSKEY)) - this._db.lrange(willsKey, 0, 10000, streamWill) + this._db.lrange(WILLSKEY, 0, 10000, streamWill) function streamWill (err, results) { if (err) { @@ -525,10 +553,7 @@ function returnSubsForClient (subs) { } for (const subKey of subKeys) { - toReturn.push({ - topic: subKey, - qos: parseInt(subs[subKey]) - }) + toReturn.push(msgpack.decode(subs[subKey])) } return toReturn @@ -536,37 +561,34 @@ function returnSubsForClient (subs) { function processKeysForClient (clientId, clientHash, that) { const topics = Object.keys(clientHash) - for (const topic of topics) { - that._trie.add(topic, { - clientId, - topic, - qos: clientHash[topic] - }) + const sub = clientHash[topic] + sub.clientId = clientId + that._trie.add(topic, sub) } } function updateWithClientData (that, client, packet, cb) { - const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` - const clientListKey = outgoingKey + client.id - const packetKey = `packet:${packet.brokerId}:${packet.brokerCounter}` + const clientListKey = outgoingKey(client.id) + const messageIdKey = outgoingIdKey(client.id, packet.messageId) + const pktKey = packetKey(packet.brokerId, packet.brokerCounter) const ttl = that.packetTTL(packet) if (packet.cmd && packet.cmd !== 'pubrel') { // qos=1 - that.messageIdCache.set(messageIdKey, packetKey) + that.messageIdCache.set(messageIdKey, pktKey) if (ttl > 0) { - return that._db.set(packetKey, msgpack.encode(packet), 'EX', ttl, updatePacket) + return that._db.set(pktKey, msgpack.encode(packet), 'EX', ttl, updatePacket) } else { - return that._db.set(packetKey, msgpack.encode(packet), updatePacket) + return that._db.set(pktKey, msgpack.encode(packet), updatePacket) } } // qos=2 - const clientUpdateKey = `${outgoingKey + client.id}:${packet.brokerId}:${packet.brokerCounter}` + const clientUpdateKey = outgoingByBrokerKey(client.id, packet.brokerId, packet.brokerCounter) that.messageIdCache.set(messageIdKey, clientUpdateKey) let count = 0 - that._db.lrem(clientListKey, 0, packetKey, (err, removed) => { + that._db.lrem(clientListKey, 0, pktKey, (err, removed) => { if (err) { return cb(err) } @@ -617,7 +639,7 @@ function updateWithClientData (that, client, packet, cb) { } function augmentWithBrokerData (that, client, packet, cb) { - const messageIdKey = `${outgoingIdKey + client.id}:${packet.messageId}` + const messageIdKey = outgoingIdKey(client.id, packet.messageId) const key = that.messageIdCache.get(messageIdKey) if (!key) { diff --git a/test.js b/test.js index 993ff74..325fc3e 100644 --- a/test.js +++ b/test.js @@ -154,6 +154,7 @@ test('outgoingUpdate doesn\'t clear packet ttl', t => { test('multiple persistences', t => { t.plan(7) + t.timeoutAfter(60 * 1000) db.flushall() const emitter = mqemitterRedis() const emitter2 = mqemitterRedis() @@ -186,18 +187,24 @@ test('multiple persistences', t => { } } - instance2._waitFor(client, 'sub_' + 'hello', () => { + instance2._waitFor(client, true, 'hello', () => { instance2.subscriptionsByTopic('hello', (err, resubs) => { t.notOk(err, 'subs by topic no error') t.deepEqual(resubs, [{ clientId: client.id, topic: 'hello/#', - qos: 1 + qos: 1, + rh: undefined, + rap: undefined, + nl: undefined }, { clientId: client.id, topic: 'hello', - qos: 1 - }]) + qos: 1, + rh: undefined, + rap: undefined, + nl: undefined + }], 'received correct subs') gotSubs = true close() })