Skip to content

Commit

Permalink
Merge pull request #36 from lpapier/plus_wildcard
Browse files Browse the repository at this point in the history
Plus wildcard
  • Loading branch information
ThomasTJdev authored Jan 14, 2023
2 parents 4791cba + 71642d9 commit 2eac588
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 0 deletions.
13 changes: 13 additions & 0 deletions nmqtt.nim
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,23 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
if top == topic or top == "#":
cb.cb(topic, message)
if top.endsWith("/#"):
# the multi-level wildcard can represent zero levels.
if topic == top[0 .. ^3]:
cb.cb(topic, message)
continue
var topicw = top
topicw.removeSuffix("#")
if topic.contains(topicw):
cb.cb(topic, message)
if top.contains("+"):
var topelem = split(top, '/')
if len(topelem) == count(topic, '/') + 1:
var i = 0
for e in split(topic, '/'):
if topelem[i] != "+" and e != topelem[i]: break
i = i+1
if i == len(topelem):
cb.cb(topic, message)

if qos == 1:
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)
Expand Down
44 changes: 44 additions & 0 deletions tests/subscribe.nim
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,54 @@ suite "test suite for subscribe":
await sleepAsync 500
await ctxMain.publish("test/random1", msg, 0)
await ctxMain.publish("second/random2", msg, 0)
await ctxMain.publish("test", msg, 0)
await ctxMain.publish("test/random3", msg, 0)
await sleepAsync 500
await ctxListen.unsubscribe("test/#")
await sleepAsync 500
check(msgCount == 3)

waitFor conn()

test "subscribe to test/+":
let (_, msg) = tdata("subscribe to test/+")

proc conn() {.async.} =
var msgCount: int
proc on_data_sub_wild(topic: string, message: string) =
msgCount += 1

await ctxListen.subscribe("test/+", 0, on_data_sub_wild)
await sleepAsync 500
await ctxMain.publish("test/random1", msg, 0)
await ctxMain.publish("second/random2", msg, 0)
await ctxMain.publish("test", msg, 0)
await ctxMain.publish("test/random3", msg, 0)
await sleepAsync 500
await ctxListen.unsubscribe("test/+")
await sleepAsync 500
check(msgCount == 2)

waitFor conn()

test "subscribe to test/+/test":
let (_, msg) = tdata("subscribe to test/+/test")

proc conn() {.async.} =
var msgCount: int
proc on_data_sub_wild(topic: string, message: string) =
msgCount += 1

await ctxListen.subscribe("test/+/data", 0, on_data_sub_wild)
await sleepAsync 500
await ctxMain.publish("test/random1/data", msg, 0)
await ctxMain.publish("second/random2/data", msg, 0)
await ctxMain.publish("test/random3", msg, 0)
await ctxMain.publish("test/random4/data", msg, 0)
await ctxMain.publish("test/random5/data/random6", msg, 0)
await sleepAsync 500
await ctxListen.unsubscribe("test/+/data")
await sleepAsync 500
check(msgCount == 2)

waitFor conn()
Expand Down

0 comments on commit 2eac588

Please sign in to comment.