From bb4394c818f11c428421db901d5870a2bcab21b9 Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 14 Oct 2024 10:44:07 +0200 Subject: [PATCH] rebase main --- spec/message_routing_spec.cr | 12 +++ spec/mqtt/integrations/connect_spec.cr | 6 +- spec/mqtt/integrations/message_qos_spec.cr | 12 ++- spec/mqtt/integrations/publish_spec.cr | 26 ------ src/lavinmq/exchange/exchange.cr | 1 + src/lavinmq/exchange/mqtt.cr | 97 +++++++++------------- src/lavinmq/mqtt/session.cr | 7 +- 7 files changed, 68 insertions(+), 93 deletions(-) diff --git a/spec/message_routing_spec.cr b/spec/message_routing_spec.cr index 17c39c015b..649d49c253 100644 --- a/spec/message_routing_spec.cr +++ b/spec/message_routing_spec.cr @@ -434,4 +434,16 @@ describe LavinMQ::MQTTExchange do end end end + + it "publish messages to queues with it's own publish method" do + with_amqp_server do |s| + vhost = s.vhosts.create("x") + s1 = LavinMQ::MQTT::Session.new(vhost, "session 1") + x = LavinMQ::MQTTExchange.new(vhost, "mqtt.default") + x.bind(s1, "s1", LavinMQ::AMQP::Table.new) + msg = LavinMQ::Message.new("mqtt.default", "s1", "hej") + x.publish(msg, false) + s1.message_count.should eq 1 + end + end end diff --git a/spec/mqtt/integrations/connect_spec.cr b/spec/mqtt/integrations/connect_spec.cr index ee2cbfd8f1..eccbac53db 100644 --- a/spec/mqtt/integrations/connect_spec.cr +++ b/spec/mqtt/integrations/connect_spec.cr @@ -31,6 +31,7 @@ module MqttSpecs packet_id: 1u16 ) disconnect(io) + sleep 0.1 end with_client_io(server) do |io| connack = connect(io, clean_session: true) @@ -50,6 +51,7 @@ module MqttSpecs packet_id: 1u16 ) disconnect(io) + sleep 0.1 end with_client_io(server) do |io| connack = connect(io, clean_session: false) @@ -69,6 +71,7 @@ module MqttSpecs packet_id: 1u16 ) disconnect(io) + sleep 0.1 end with_client_io(server) do |io| connack = connect(io, clean_session: true) @@ -88,6 +91,7 @@ module MqttSpecs packet_id: 1u16 ) disconnect(io) + sleep 0.1 end with_client_io(server) do |io| connack = connect(io, clean_session: false) @@ -106,7 +110,6 @@ module MqttSpecs connack = connect(io) connack.should be_a(MQTT::Protocol::Connack) connack = connack.as(MQTT::Protocol::Connack) - pp connack.return_code connack.return_code.should eq(MQTT::Protocol::Connack::ReturnCode::Accepted) end end @@ -271,7 +274,6 @@ module MqttSpecs topics = mk_topic_filters({"a/b", 1}) subscribe(io, topic_filters: topics) disconnect(io) - pp server.vhosts["/"].queues["amq.mqtt-client_id"].consumers end sleep 0.1 server.vhosts["/"].queues["amq.mqtt-client_id"].consumers.should be_empty diff --git a/spec/mqtt/integrations/message_qos_spec.cr b/spec/mqtt/integrations/message_qos_spec.cr index 467da52e2b..96cd3e1ecd 100644 --- a/spec/mqtt/integrations/message_qos_spec.cr +++ b/spec/mqtt/integrations/message_qos_spec.cr @@ -4,7 +4,7 @@ module MqttSpecs extend MqttHelpers extend MqttMatchers describe "message qos" do - pending "both qos bits can't be set [MQTT-3.3.1-4]" do + it "both qos bits can't be set [MQTT-3.3.1-4]" do with_server do |server| with_client_io(server) do |io| connect(io) @@ -19,7 +19,7 @@ module MqttSpecs end end - pending "qos is set according to subscription qos [MYRA non-normative]" do + it "qos is set according to subscription qos [LavinMQ non-normative]" do with_server do |server| with_client_io(server) do |io| connect(io) @@ -46,13 +46,14 @@ module MqttSpecs end end - pending "qos1 messages are stored for offline sessions [MQTT-3.1.2-5]" do + it "qos1 messages are stored for offline sessions [MQTT-3.1.2-5]" do with_server do |server| with_client_io(server) do |io| connect(io) topic_filters = mk_topic_filters({"a/b", 1u8}) subscribe(io, topic_filters: topic_filters) disconnect(io) + sleep 0.1 end with_client_io(server) do |publisher_io| @@ -62,6 +63,7 @@ module MqttSpecs publish(publisher_io, topic: "a/b", qos: 0u8) end disconnect(publisher_io) + sleep 0.1 end with_client_io(server) do |io| @@ -78,7 +80,7 @@ module MqttSpecs end end - pending "acked qos1 message won't be sent again" do + it "acked qos1 message won't be sent again" do with_server do |server| with_client_io(server) do |io| connect(io) @@ -90,6 +92,7 @@ module MqttSpecs publish(publisher_io, topic: "a/b", payload: "1".to_slice, qos: 0u8) publish(publisher_io, topic: "a/b", payload: "2".to_slice, qos: 0u8) disconnect(publisher_io) + sleep 0.1 end pkt = read_packet(io) @@ -98,6 +101,7 @@ module MqttSpecs puback(io, pub.packet_id) end disconnect(io) + sleep 0.1 end with_client_io(server) do |io| diff --git a/spec/mqtt/integrations/publish_spec.cr b/spec/mqtt/integrations/publish_spec.cr index 68fad531ac..35c2ea1df9 100644 --- a/spec/mqtt/integrations/publish_spec.cr +++ b/spec/mqtt/integrations/publish_spec.cr @@ -27,31 +27,5 @@ module MqttSpecs end end end - - it "should put the message in a queue" do - with_server do |server| - with_channel(server) do |ch| - x = ch.exchange("amq.topic", "topic") - q = ch.queue("test") - q.bind(x.name, q.name) - - with_client_io(server) do |io| - connect(io) - - payload = slice = Bytes[1, 254, 200, 197, 123, 4, 87] - ack = publish(io, topic: "test", payload: payload, qos: 1u8) - ack.should_not be_nil - - body = q.get(no_ack: true).try do |v| - s = Slice(UInt8).new(payload.size) - v.body_io.read(s) - s - end - body.should eq(payload) - disconnect(io) - end - end - end - end end end diff --git a/src/lavinmq/exchange/exchange.cr b/src/lavinmq/exchange/exchange.cr index 57ee5db4cb..84d830df34 100644 --- a/src/lavinmq/exchange/exchange.cr +++ b/src/lavinmq/exchange/exchange.cr @@ -152,6 +152,7 @@ module LavinMQ queues : Set(Queue) = Set(Queue).new, exchanges : Set(Exchange) = Set(Exchange).new) : Int32 @publish_in_count += 1 + pp "pub" count = do_publish(msg, immediate, queues, exchanges) @unroutable_count += 1 if count.zero? @publish_out_count += count diff --git a/src/lavinmq/exchange/mqtt.cr b/src/lavinmq/exchange/mqtt.cr index 71e05bc534..5c459df0c9 100644 --- a/src/lavinmq/exchange/mqtt.cr +++ b/src/lavinmq/exchange/mqtt.cr @@ -3,7 +3,21 @@ require "../mqtt/subscription_tree" module LavinMQ class MQTTExchange < Exchange - @bindings = Hash(BindingKey, Set(Destination)).new do |h, k| + struct MqttBindingKey + def initialize(routing_key : String, arguments : AMQP::Table? = nil) + @binding_key = BindingKey.new(routing_key, arguments) + end + + def inner + @binding_key + end + + def hash + @binding_key.routing_key.hash + end + end + + @bindings = Hash(MqttBindingKey, Set(Destination)).new do |h, k| h[k] = Set(Destination).new end @tree = MQTT::SubscriptionTree.new @@ -12,40 +26,13 @@ module LavinMQ "mqtt" end - def bindings_details : Iterator(BindingDetails) - @bindings.each.flat_map do |binding_key, ds| - ds.each.map do |d| - BindingDetails.new(name, vhost.name, binding_key, d) - end - end - end - - # TODO: we can probably clean this up a bit - def publish(msg : Message, immediate : Bool, - queues : Set(Queue) = Set(Queue).new, - exchanges : Set(Exchange) = Set(Exchange).new) : Int32 - @publish_in_count += 1 - headers = msg.properties.headers - find_queues(msg.routing_key, headers, queues, exchanges) - if queues.empty? - @unroutable_count += 1 - return 0 - end - return 0 if immediate && !queues.any? &.immediate_delivery? - + private def _publish(msg : Message, immediate : Bool, + queues : Set(Queue) = Set(Queue).new, + exchanges : Set(Exchange) = Set(Exchange).new) : Int32 count = 0 - queues.each do |queue| - qos = 0_u8 - bindings_details.each do |binding_detail| - next unless binding_detail.destination == queue - next unless arg = binding_detail.binding_key.arguments - next unless qos_value = arg["x-mqtt-qos"]? - qos = qos_value.try &.as(UInt8) - end + @tree.each_entry(msg.routing_key) do |queue, qos| msg.properties.delivery_mode = qos - if queue.publish(msg) - @publish_out_count += 1 count += 1 msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind end @@ -53,52 +40,46 @@ module LavinMQ count end + def bindings_details : Iterator(BindingDetails) + @bindings.each.flat_map do |binding_key, ds| + ds.each.map do |d| + BindingDetails.new(name, vhost.name, binding_key.inner, d) + end + end + end + + # Only here to make superclass happy + protected def bindings(routing_key, headers) : Iterator(Destination) + Iterator(Destination).empty + end + def bind(destination : Destination, routing_key : String, headers = nil) : Bool raise LavinMQ::Exchange::AccessRefused.new(self) unless destination.is_a?(MQTT::Session) - binding_key = BindingKey.new(routing_key, headers) - return false unless @bindings[binding_key].add? destination - - qos = headers.try { |h| h.fetch("x-mqtt-qos", "0").as(UInt8) } + qos = headers.try { |h| h["x-mqtt-qos"]?.try(&.as(UInt8)) } || 0u8 + binding_key = MqttBindingKey.new(routing_key, headers) + @bindings[binding_key].add destination @tree.subscribe(routing_key, destination, qos) - data = BindingDetails.new(name, vhost.name, binding_key, destination) + data = BindingDetails.new(name, vhost.name, binding_key.inner, destination) notify_observers(ExchangeEvent::Bind, data) true end def unbind(destination : Destination, routing_key, headers = nil) : Bool raise LavinMQ::Exchange::AccessRefused.new(self) unless destination.is_a?(MQTT::Session) - binding_key = BindingKey.new(routing_key, headers) + binding_key = MqttBindingKey.new(routing_key, headers) rk_bindings = @bindings[binding_key] - return false unless rk_bindings.delete destination + rk_bindings.delete destination @bindings.delete binding_key if rk_bindings.empty? @tree.unsubscribe(routing_key, destination) - data = BindingDetails.new(name, vhost.name, binding_key, destination) + data = BindingDetails.new(name, vhost.name, binding_key.inner, destination) notify_observers(ExchangeEvent::Unbind, data) delete if @auto_delete && @bindings.each_value.all?(&.empty?) true end - - protected def bindings : Iterator(Destination) - @bindings.values.each.flat_map(&.each) - end - - protected def bindings(routing_key, headers) : Iterator(Destination) - binding_key = BindingKey.new(routing_key, headers) - matches(binding_key).each - end - - private def matches(binding_key : BindingKey) : Iterator(Destination) - @tree.each_entry(binding_key.routing_key) do |session, qos| - end - - @bindings.each.select do |binding, destinations| - binding.routing_key == binding_key.routing_key - end.flat_map { |_, v| v.each } - end end end diff --git a/src/lavinmq/mqtt/session.cr b/src/lavinmq/mqtt/session.cr index 8a56c63cba..ab6ab7be6e 100644 --- a/src/lavinmq/mqtt/session.cr +++ b/src/lavinmq/mqtt/session.cr @@ -81,14 +81,14 @@ module LavinMQ end private def get(no_ack : Bool, & : Envelope -> Nil) : Bool - #let packet_id be message counter, look at myra for counter raise ClosedError.new if @closed loop do # retry if msg expired or deliver limit hit env = @msg_store_lock.synchronize { @msg_store.shift? } || break sp = env.segment_position no_ack = env.message.properties.delivery_mode == 0 - if false + if no_ack + pp "no ack" begin yield env # deliver the message rescue ex # requeue failed delivery @@ -113,7 +113,8 @@ module LavinMQ end def ack(sp : SegmentPosition) : Nil - # TDO: maybe risky to not have locka round this + # TODO: maybe risky to not have lock around this + pp "Acking?" @unacked.delete sp super sp end