Skip to content

Commit

Permalink
fix: wills table de duplicate (#104)
Browse files Browse the repository at this point in the history
* Wills table de duplicate, streamWill on startup support

* Add test for duplicates, removed streamWill addition

* wills dedupe test switch to streamWill

Co-authored-by: Erik Friesen <[email protected]>
  • Loading branch information
friesendrywall and friesendrywall authored Aug 1, 2022
1 parent a607e8b commit f53ed12
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
1 change: 1 addition & 0 deletions persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ class RedisPersistence extends CachedPersistence {
const key = willKey(this.broker.id, client.id)
packet.clientId = client.id
packet.brokerId = this.broker.id
this._db.lrem(WILLSKEY, 0, key) // Remove duplicates
this._db.rpush(WILLSKEY, key)
this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer)

Expand Down
42 changes: 42 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,48 @@ test('unknown cache key', t => {
})
})

test('wills table de-duplicate', t => {
t.plan(5)
db.flushall()
const emitter = mqemitterRedis()
const instance = persistence()
const client = { id: 'willsTest' }

instance.broker = toBroker('1', emitter)

const packet = {
cmd: 'publish',
topic: 'hello',
payload: 'willsTest',
qos: 1,
retain: false,
brokerId: instance.broker.id,
brokerCounter: 42,
messageId: 123
}

instance.putWill(client, packet, err => {
t.notOk(err, 'putWill #1 no error')
instance.putWill(client, packet, err => {
t.notOk(err, 'putWill #2 no error')
let willCount = 0
const wills = instance.streamWill()
wills.on('data', function (chunk) {
willCount++
})
wills.on('end', function () {
t.equal(willCount, 1, 'should only be one will')
close()
})
})
})

function close () {
instance.destroy(t.pass.bind(t, 'instance dies'))
emitter.close(t.pass.bind(t, 'emitter dies'))
}
})

test.onFinish(() => {
process.exit(0)
})

0 comments on commit f53ed12

Please sign in to comment.