diff --git a/index.js b/index.js index 86ea2aa..7bacd75 100644 --- a/index.js +++ b/index.js @@ -6,6 +6,7 @@ var EE = require('events').EventEmitter var inherits = require('util').inherits var MultiStream = require('multistream') var parallel = require('fastparallel') +var from = require('from2') var QlobberOpts = { wildcard_one: '+', @@ -190,6 +191,57 @@ CachedPersistence.prototype.createRetainedStreamCombi = function (patterns) { return MultiStream.obj(streams) } +CachedPersistence.prototype.addSubsToCache = function (clientId, topics) { + for (var topic in topics) { + this._trie.add(topic, { + clientId: clientId, + topic: topic, + qos: topics[topic] + }) + } +} + +CachedPersistence.prototype.removeSubsFromCache = function (clientId, topics) { + for (var topic in topics) { + this._trie.remove(topic, { + clientId: clientId, + topic: topic, + qos: topics[topic] + }) + } +} + +CachedPersistence.prototype.getClientList = function (topic) { + var entries = this._trie.match(topic, topic) + + function pushClientList (size, next) { + if (entries.length === 0) { + return next(null, null) + } + var chunk = entries.slice(0, 1) + entries = entries.slice(1) + next(null, chunk[0].clientId) + } + + return from.obj(pushClientList) +} + +CachedPersistence.prototype.countOfflineClients = function (cb) { + cb(new Error('Not Implemented')) +} + +CachedPersistence.prototype.countOffline = function (cb) { + var that = this + + this.countOfflineClients(function (err, count) { + if (err) { + return cb(err) + } + + cb(null, that._trie.subscriptionsCount, parseInt(count) || 0) + }) +} + CachedPersistence.prototype.destroy = function (cb) { this.destroyed = true this.broker.unsubscribe(subTopic, this._onMessage, function () { diff --git a/package.json b/package.json index 2d94928..adb876c 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ "aedes-packet": "^2.1.0", "aedes-persistence": "^7.1.1", "fastparallel": "^2.3.0", + "from2": "^2.3.0", "multistream": "^4.0.0", "qlobber": "^3.1.0" }