Skip to content

Commit

Permalink
Bumped deps and fixed tests (#62)
Browse files Browse the repository at this point in the history
* Bump [email protected] [email protected]

* Qlobber match empty levels

* Release-it setup

* Handle error when update packet

* Bump [email protected] and [email protected]

* Remove 'delete messageId' and fixed tests
  • Loading branch information
robertsLando authored Feb 28, 2020
1 parent a9803ae commit 2857d2f
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
30 changes: 24 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,24 @@
"lint": "standard --verbose | snazzy",
"test": "tape test.js | faucet",
"coverage": "nyc --reporter=lcov tape test.js",
"license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause;Apache-2.0;Apache*'"
"license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause;Apache-2.0;Apache*'",
"release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics"
},
"release-it": {
"github": {
"release": true
},
"git": {
"tagName": "v${version}"
},
"hooks": {
"before:init": [
"npm run test"
]
},
"npm": {
"publish": true
}
},
"pre-commit": [
"lint",
Expand All @@ -34,23 +51,24 @@
"fastq": "^1.6.0",
"faucet": "0.0.1",
"license-checker": "^25.0.1",
"mqemitter": "^3.0.0",
"mqemitter-redis": "^3.0.0",
"mqemitter": "^4.1.1",
"mqemitter-redis": "^4.0.1",
"mqtt": "^3.0.0",
"nyc": "^14.1.1",
"nyc": "^15.0.0",
"pre-commit": "^1.2.2",
"release-it": "^12.6.1",
"snazzy": "^8.0.0",
"standard": "^14.1.0",
"tape": "^4.11.0"
},
"dependencies": {
"aedes-cached-persistence": "^7.0.0",
"aedes-cached-persistence": "^8.1.0",
"from2": "^2.3.0",
"hashlru": "^2.3.0",
"ioredis": "^4.14.0",
"msgpack-lite": "^0.1.26",
"pump": "^3.0.0",
"qlobber": "^3.1.0",
"qlobber": "^5.0.0",
"through2": "^3.0.1",
"throughv": "^1.0.4"
}
Expand Down
61 changes: 37 additions & 24 deletions persistence.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
'use strict'

var Redis = require('ioredis')
var from = require('from2')
var through = require('through2')
var throughv = require('throughv')
var msgpack = require('msgpack-lite')
var pump = require('pump')
var CachedPersistence = require('aedes-cached-persistence')
var Packet = CachedPersistence.Packet
var inherits = require('util').inherits
var HLRU = require('hashlru')
var QlobberTrue = require('qlobber').QlobberTrue
var qlobberOpts = {
const Redis = require('ioredis')
const from = require('from2')
const through = require('through2')
const throughv = require('throughv')
const msgpack = require('msgpack-lite')
const pump = require('pump')
const CachedPersistence = require('aedes-cached-persistence')
const Packet = CachedPersistence.Packet
const inherits = require('util').inherits
const HLRU = require('hashlru')
const QlobberTrue = require('qlobber').QlobberTrue
const qlobberOpts = {
separator: '/',
wildcard_one: '+',
wildcard_some: '#'
}
var clientKey = 'client:'
var clientsKey = 'clients'
var willsKey = 'will'
var willKey = 'will:'
var retainedKey = 'retained'
var outgoingKey = 'outgoing:'
var outgoingIdKey = 'outgoing-id:'
var incomingKey = 'incoming:'
wildcard_some: '#',
match_empty_levels: true
}
const clientKey = 'client:'
const clientsKey = 'clients'
const willsKey = 'will'
const willKey = 'will:'
const retainedKey = 'retained'
const outgoingKey = 'outgoing:'
const outgoingIdKey = 'outgoing-id:'
const incomingKey = 'incoming:'

function RedisPersistence (opts) {
if (!(this instanceof RedisPersistence)) {
Expand Down Expand Up @@ -293,6 +294,7 @@ RedisPersistence.prototype.outgoingEnqueueCombi = function (subs, packet, cb) {
var packetKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter
var countKey = 'packet:' + packet.brokerId + ':' + packet.brokerCounter + ':offlineCount'
var ttl = this.packetTTL(packet)

var encoded = msgpack.encode(new Packet(packet))

this._db.mset(packetKey, encoded, countKey, subs.length, finish)
Expand Down Expand Up @@ -326,7 +328,17 @@ function updateWithClientData (that, client, packet, cb) {

if (packet.cmd && packet.cmd !== 'pubrel') { // qos=1
that.messageIdCache.set(messageIdKey, packetKey)
return cb(null, client, packet)
return that._db.set(packetKey, msgpack.encode(packet), function updatePacket (err, result) {
if (err) {
return cb(err, client, packet)
}

if (result !== 'OK') {
cb(new Error('no such packet'), client, packet)
} else {
cb(null, client, packet)
}
})
}

// qos=2
Expand All @@ -346,6 +358,7 @@ function updateWithClientData (that, client, packet, cb) {
})

var ttl = that.packetTTL(packet)

var encoded = msgpack.encode(packet)

if (ttl > 0) {
Expand Down Expand Up @@ -389,7 +402,7 @@ function augmentWithBrokerData (that, client, packet, cb) {

RedisPersistence.prototype.outgoingUpdate = function (client, packet, cb) {
var that = this
if (packet.brokerId) {
if (packet.brokerId && packet.messageId) {
updateWithClientData(this, client, packet, cb)
} else {
augmentWithBrokerData(this, client, packet, function updateClient (err) {
Expand Down

0 comments on commit 2857d2f

Please sign in to comment.