Skip to content

Commit

Permalink
Single level '+' wildcard support
Browse files Browse the repository at this point in the history
  • Loading branch information
Laurent Papier committed Dec 31, 2022
1 parent 13b3b1f commit 71642d9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
9 changes: 9 additions & 0 deletions nmqtt.nim
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,15 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} =
topicw.removeSuffix("#")
if topic.contains(topicw):
cb.cb(topic, message)
if top.contains("+"):
var topelem = split(top, '/')
if len(topelem) == count(topic, '/') + 1:
var i = 0
for e in split(topic, '/'):
if topelem[i] != "+" and e != topelem[i]: break
i = i+1
if i == len(topelem):
cb.cb(topic, message)

if qos == 1:
ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck)
Expand Down
43 changes: 43 additions & 0 deletions tests/subscribe.nim
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,49 @@ suite "test suite for subscribe":

waitFor conn()

test "subscribe to test/+":
let (_, msg) = tdata("subscribe to test/+")

proc conn() {.async.} =
var msgCount: int
proc on_data_sub_wild(topic: string, message: string) =
msgCount += 1

await ctxListen.subscribe("test/+", 0, on_data_sub_wild)
await sleepAsync 500
await ctxMain.publish("test/random1", msg, 0)
await ctxMain.publish("second/random2", msg, 0)
await ctxMain.publish("test", msg, 0)
await ctxMain.publish("test/random3", msg, 0)
await sleepAsync 500
await ctxListen.unsubscribe("test/+")
await sleepAsync 500
check(msgCount == 2)

waitFor conn()

test "subscribe to test/+/test":
let (_, msg) = tdata("subscribe to test/+/test")

proc conn() {.async.} =
var msgCount: int
proc on_data_sub_wild(topic: string, message: string) =
msgCount += 1

await ctxListen.subscribe("test/+/data", 0, on_data_sub_wild)
await sleepAsync 500
await ctxMain.publish("test/random1/data", msg, 0)
await ctxMain.publish("second/random2/data", msg, 0)
await ctxMain.publish("test/random3", msg, 0)
await ctxMain.publish("test/random4/data", msg, 0)
await ctxMain.publish("test/random5/data/random6", msg, 0)
await sleepAsync 500
await ctxListen.unsubscribe("test/+/data")
await sleepAsync 500
check(msgCount == 2)

waitFor conn()

test "stay subscribed after disconnect with reconnect":
let (tpc, msg) = tdata("stay subscribed after disconnect with reconnect")

Expand Down

0 comments on commit 71642d9

Please sign in to comment.