diff --git a/package.json b/package.json index 863397f..94243bd 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,24 @@ "lint": "standard --verbose | snazzy", "test": "tape test.js | faucet", "coverage": "nyc --reporter=lcov tape test.js", - "license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause;Apache-2.0;Apache*'" + "license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause;Apache-2.0;Apache*'", + "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics" + }, + "release-it": { + "github": { + "release": true + }, + "git": { + "tagName": "v${version}" + }, + "hooks": { + "before:init": [ + "npm run test" + ] + }, + "npm": { + "publish": true + } }, "pre-commit": [ "lint", @@ -34,23 +51,24 @@ "fastq": "^1.6.0", "faucet": "0.0.1", "license-checker": "^25.0.1", - "mqemitter": "^3.0.0", - "mqemitter-redis": "^3.0.0", + "mqemitter": "^4.1.1", + "mqemitter-redis": "^4.0.1", "mqtt": "^3.0.0", - "nyc": "^14.1.1", + "nyc": "^15.0.0", "pre-commit": "^1.2.2", + "release-it": "^12.6.1", "snazzy": "^8.0.0", "standard": "^14.1.0", "tape": "^4.11.0" }, "dependencies": { - "aedes-cached-persistence": "^7.0.0", + "aedes-cached-persistence": "^8.1.0", "from2": "^2.3.0", "hashlru": "^2.3.0", "ioredis": "^4.14.0", "msgpack-lite": "^0.1.26", "pump": "^3.0.0", - "qlobber": "^3.1.0", + "qlobber": "^5.0.0", "through2": "^3.0.1", "throughv": "^1.0.4" } diff --git a/persistence.js b/persistence.js index 8ce9f73..8c49a26 100644 --- a/persistence.js +++ b/persistence.js @@ -1,29 +1,30 @@ 'use strict' -var Redis = require('ioredis') -var from = require('from2') -var through = require('through2') -var throughv = require('throughv') -var msgpack = require('msgpack-lite') -var pump = require('pump') -var CachedPersistence = require('aedes-cached-persistence') -var Packet = CachedPersistence.Packet -var inherits = require('util').inherits -var HLRU = require('hashlru') -var QlobberTrue = require('qlobber').QlobberTrue -var qlobberOpts = { +const Redis = require('ioredis') +const from = require('from2') +const through = require('through2') +const throughv = require('throughv') +const msgpack = require('msgpack-lite') +const pump = require('pump') +const CachedPersistence = require('aedes-cached-persistence') +const Packet = CachedPersistence.Packet +const inherits = require('util').inherits +const HLRU = require('hashlru') +const QlobberTrue = require('qlobber').QlobberTrue +const qlobberOpts = { separator: '/', wildcard_one: '+', - wildcard_some: '#' -} -var clientKey = 'client:' -var clientsKey = 'clients' -var willsKey = 'will' -var willKey = 'will:' -var retainedKey = 'retained' -var outgoingKey = 'outgoing:' -var outgoingIdKey = 'outgoing-id:' -var incomingKey = 'incoming:' + wildcard_some: '#', + match_empty_levels: true +} +const clientKey = 'client:' +const clientsKey = 'clients' +const willsKey = 'will' +const willKey = 'will:' +const retainedKey = 'retained' +const outgoingKey = 'outgoing:' +const outgoingIdKey = 'outgoing-id:' +const incomingKey = 'incoming:' function RedisPersistence (opts) { if (!(this instanceof RedisPersistence)) { @@ -293,6 +294,7 @@ RedisPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) { var packetKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter var countKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter + ':offlineCount' var ttl = this.packetTTL(packet) + var encoded = msgpack.encode(new Packet(packet)) this._db.mset(packetKey, encoded, countKey, subs.length, finish) @@ -326,7 +328,17 @@ function updateWithClientData (that, client, packet, cb) { if (packet.cmd && packet.cmd !== 'pubrel') { // qos=1 that.messageIdCache.set(messageIdKey, packetKey) - return cb(null, client, packet) + return that._db.set(packetKey, msgpack.encode(packet), function updatePacket (err, result) { + if (err) { + return cb(err, client, packet) + } + + if (result !== 'OK') { + cb(new Error('no such packet'), client, packet) + } else { + cb(null, client, packet) + } + }) } // qos=2 @@ -346,6 +358,7 @@ function updateWithClientData (that, client, packet, cb) { }) var ttl = that.packetTTL(packet) + var encoded = msgpack.encode(packet) if (ttl > 0) { @@ -389,7 +402,7 @@ function augmentWithBrokerData (that, client, packet, cb) { RedisPersistence.prototype.outgoingUpdate = function (client, packet, cb) { var that = this - if (packet.brokerId) { + if (packet.brokerId && packet.messageId) { updateWithClientData(this, client, packet, cb) } else { augmentWithBrokerData(this, client, packet, function updateClient (err) {