Skip to content

Commit

Permalink
fix: prepare for persistence of more subscription properties (#33)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Lando <[email protected]>
  • Loading branch information
seriousme and robertsLando authored May 12, 2022
1 parent e168b6e commit bd81caa
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 92 deletions.
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@
"homepage": "https://github.com/mcollina/aedes-persistence-level#readme",
"devDependencies": {
"aedes": "^0.46.3",
"aedes-persistence": "^9.0.3",
"aedes-persistence": "^9.1.1",
"concat-stream": "^2.0.0",
"faucet": "0.0.1",
"level": "^8.0.0",
"mqemitter": "^4.5.0",
"pre-commit": "^1.2.2",
"release-it": "^14.14.2",
"release-it": "^15.0.0",
"standard": "^17.0.0",
"tape": "^4.13.3",
"tape": "^5.5.3",
"through2": "^4.0.2"
},
"dependencies": {
"aedes-packet": "^2.3.1",
"aedes-packet": "^3.0.0",
"msgpack-lite": "^0.1.26",
"qlobber": "^6.0.0"
"qlobber": "^7.0.0"
}
}
132 changes: 50 additions & 82 deletions persistence.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const Qlobber = require('qlobber').Qlobber
const Packet = require('aedes-packet')
const msgpack = require('msgpack-lite')
const EventEmitter = require('events').EventEmitter
const { EventEmitter } = require('events')
const { Readable } = require('stream')
const QlobberSub = require('qlobber/aedes/qlobber-sub')
const { QlobberTrue } = require('qlobber')

const QlobberOpts = {
wildcard_one: '+',
Expand Down Expand Up @@ -44,11 +45,11 @@ async function loadSubscriptions (db, trie) {
}

async function * retainedMessagesByPattern (db, pattern) {
const qlobber = new Qlobber(QlobberOpts)
qlobber.add(pattern, true)
const qlobber = new QlobberTrue(QlobberOpts)
qlobber.add(pattern)

for await (const packet of decodedDbValues(db, RETAINED)) {
if (qlobber.match(packet.topic).length) {
if (qlobber.test(packet.topic)) {
yield packet
}
}
Expand All @@ -57,7 +58,8 @@ async function * retainedMessagesByPattern (db, pattern) {
async function subscriptionsByClient (db, client) {
const resubs = []
for await (const sub of decodedDbValues(db, subByClientKey(client.id))) {
const resub = rmClientId(sub)
// remove clientId from sub
const { clientId, ...resub } = sub
resubs.push(resub)
}
return ((resubs.length > 0) ? resubs : null)
Expand Down Expand Up @@ -130,20 +132,22 @@ function subByClientKey (clientId) {
return `${SUBSCRIPTIONS}${encodeURIComponent(clientId)}`
}

function toSubKey (sub) {
return `${subByClientKey(sub.clientId)}:${sub.topic}`
}

class LevelPersistence extends EventEmitter {
// private class members start with #
#db
#trie
#ready
#keyPadLength

constructor (db) {
super()
this.#db = db
this.#trie = new Qlobber(QlobberOpts)
this.#trie = new QlobberSub(QlobberOpts)
this.#ready = false

this.#keyPadLength = 16
const that = this

loadSubscriptions(this.#db, this.#trie)
Expand Down Expand Up @@ -200,34 +204,37 @@ class LevelPersistence extends EventEmitter {
return
}

subs = subs
.map(withClientId, client)
.map(addSubToTrie, this.#trie)
const opArray = []
subs.forEach((sub) => {
const ops = {}
ops.type = 'put'
ops.key = toSubKey(sub)
ops.value = msgpack.encode(sub)
for (const subscription of subs) {
const sub = Object.assign({}, subscription)
sub.clientId = client.id
addSubToTrie(this.#trie, sub)
const ops = {
type: 'put',
key: toSubKey(sub),
value: msgpack.encode(sub)
}
opArray.push(ops)
})
}
this.#dbBatch(opArray, (err) => {
cb(err, client)
})
}

removeSubscriptions (client, subs, cb) {
subs = subs
.map(toSubObj, client)
.map(delSubFromTrie, this.#trie)
removeSubscriptions (client, topics, cb) {
const opArray = []
subs.forEach((sub) => {
const ops = {}
ops.type = 'del'
ops.key = toSubKey(sub)
ops.value = msgpack.encode(sub)
for (const topic of topics) {
const sub = {
clientId: client.id,
topic
}
delSubFromTrie(this.#trie, sub)
const ops = {
type: 'del',
key: toSubKey(sub)
}
opArray.push(ops)
})
}
this.#dbBatch(opArray, (err) => {
cb(err, client)
})
Expand Down Expand Up @@ -260,9 +267,7 @@ class LevelPersistence extends EventEmitter {

that.removeSubscriptions(
client,
subs.map((sub) => {
return sub.topic
}),
subs.map(sub => sub.topic),
(err) => {
cb(err, client)
}
Expand Down Expand Up @@ -451,30 +456,18 @@ class LevelPersistence extends EventEmitter {
}
}

function withClientId (sub) {
return {
topic: sub.topic,
clientId: this.id,
qos: sub.qos
}
}

function toSubKey (sub) {
return `${subByClientKey(sub.clientId)}:${sub.topic}`
}

function addSubToTrie (sub) {
let add
const matched = this.match(sub.topic)
function addSubToTrie (trie, sub) {
let add = false
const matched = trie.match(sub.topic)
if (matched.length > 0) {
add = true
for (let i = 0; i < matched.length; i++) {
if (matched[i].clientId === sub.clientId) {
if (matched[i].qos === sub.qos) {
for (const match of matched) {
if (match.clientId === sub.clientId) {
if (match.qos === sub.qos) {
add = false
break
} else {
this.remove(matched[i].topic, matched[i])
trie.remove(match.topic, match)
if (sub.qos === 0) {
add = false
}
Expand All @@ -484,42 +477,17 @@ function addSubToTrie (sub) {
} else if (sub.qos > 0) {
add = true
}

if (add) {
this.add(sub.topic, sub)
}

return sub
}

function toSubObj (topic) {
return {
clientId: this.id,
topic
trie.add(sub.topic, sub)
}
}

function delSubFromTrie (sub) {
this
.match(sub.topic)
.filter(matching, sub)
.forEach(rmSub, this)

return sub
}

function matching (sub) {
return sub.topic === this.topic && sub.clientId === this.clientId
}

function rmSub (sub) {
this.remove(sub.topic, sub)
}

function rmClientId (sub) {
return {
topic: sub.topic,
qos: sub.qos
function delSubFromTrie (trie, sub) {
const matches = trie.match(sub.topic)
for (const match of matches) {
if (sub.clientId === match.clientId && sub.topic === match.topic) {
trie.remove(sub.topic, sub)
}
}
}

Expand Down
25 changes: 20 additions & 5 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ test('restore', t => {
t.deepEqual(resubs, [{
clientId: client.id,
topic: 'hello/#',
qos: 1
qos: 1,
rh: undefined,
rap: undefined,
nl: undefined
}, {
clientId: client.id,
topic: 'hello',
qos: 1
qos: 1,
rh: undefined,
rap: undefined,
nl: undefined
}])
instance.destroy(t.end.bind(t))
})
Expand Down Expand Up @@ -131,11 +137,17 @@ test('Dont replace subscriptions with different QoS if client id is different',
t.deepEqual(resubs, [{
topic: 'test/television/dev/about',
clientId: 'test.1',
qos: 1
qos: 1,
rh: undefined,
rap: undefined,
nl: undefined
}, {
topic: 'test/+/dev/#',
clientId: 'test',
qos: 2
qos: 2,
rh: undefined,
rap: undefined,
nl: undefined
}])
instance.destroy(t.end.bind(t))
})
Expand Down Expand Up @@ -169,7 +181,10 @@ test('Replace subscriptions with different QoS if client id is same', t => {
t.deepEqual(resubs, [{
topic: 'test/television/dev/about',
clientId: 'test',
qos: 1
qos: 1,
rh: undefined,
rap: undefined,
nl: undefined
}])
instance.destroy(t.end.bind(t))
})
Expand Down

0 comments on commit bd81caa

Please sign in to comment.