diff --git a/nmqtt.nim b/nmqtt.nim index 095e3f2..f2eb009 100644 --- a/nmqtt.nim +++ b/nmqtt.nim @@ -847,6 +847,15 @@ proc onPublish(ctx: MqttCtx, pkt: Pkt) {.async.} = topicw.removeSuffix("#") if topic.contains(topicw): cb.cb(topic, message) + if top.contains("+"): + var topelem = split(top, '/') + if len(topelem) == count(topic, '/') + 1: + var i = 0 + for e in split(topic, '/'): + if topelem[i] != "+" and e != topelem[i]: break + i = i+1 + if i == len(topelem): + cb.cb(topic, message) if qos == 1: ctx.workQueue[msgId] = Work(wk: PubWork, msgId: msgId, state: WorkNew, qos: 1, typ: PubAck) diff --git a/tests/subscribe.nim b/tests/subscribe.nim index de20561..94d14ae 100644 --- a/tests/subscribe.nim +++ b/tests/subscribe.nim @@ -210,6 +210,49 @@ suite "test suite for subscribe": waitFor conn() + test "subscribe to test/+": + let (_, msg) = tdata("subscribe to test/+") + + proc conn() {.async.} = + var msgCount: int + proc on_data_sub_wild(topic: string, message: string) = + msgCount += 1 + + await ctxListen.subscribe("test/+", 0, on_data_sub_wild) + await sleepAsync 500 + await ctxMain.publish("test/random1", msg, 0) + await ctxMain.publish("second/random2", msg, 0) + await ctxMain.publish("test", msg, 0) + await ctxMain.publish("test/random3", msg, 0) + await sleepAsync 500 + await ctxListen.unsubscribe("test/+") + await sleepAsync 500 + check(msgCount == 2) + + waitFor conn() + + test "subscribe to test/+/test": + let (_, msg) = tdata("subscribe to test/+/test") + + proc conn() {.async.} = + var msgCount: int + proc on_data_sub_wild(topic: string, message: string) = + msgCount += 1 + + await ctxListen.subscribe("test/+/data", 0, on_data_sub_wild) + await sleepAsync 500 + await ctxMain.publish("test/random1/data", msg, 0) + await ctxMain.publish("second/random2/data", msg, 0) + await ctxMain.publish("test/random3", msg, 0) + await ctxMain.publish("test/random4/data", msg, 0) + await ctxMain.publish("test/random5/data/random6", msg, 0) + await sleepAsync 500 + await ctxListen.unsubscribe("test/+/data") + await sleepAsync 500 + check(msgCount == 2) + + waitFor conn() + test "stay subscribed after disconnect with reconnect": let (tpc, msg) = tdata("stay subscribed after disconnect with reconnect")