diff --git a/spec/mqtt/integrations/subscribe_spec.cr b/spec/mqtt/integrations/subscribe_spec.cr index ee9b84382d..6081abc59f 100644 --- a/spec/mqtt/integrations/subscribe_spec.cr +++ b/spec/mqtt/integrations/subscribe_spec.cr @@ -93,7 +93,7 @@ module MqttSpecs end end - pending "should replace old subscription with new [MQTT-3.8.4-3]" do + it "should replace old subscription with new [MQTT-3.8.4-3]" do with_server do |server| with_client_io(server) do |io| connect(io) @@ -124,7 +124,7 @@ module MqttSpecs publish(io, topic: "a/b", payload: "a".to_slice, qos: 1u8) # ... consume it... packet = read_packet(io).as(MQTT::Protocol::Publish) - # ... and verify it be qos0 (i.e. our subscribe is correct) + # ... and verify it be qos1 (i.e. our second subscribe is correct) packet.qos.should eq(1u8) io.should be_drained @@ -132,22 +132,4 @@ module MqttSpecs end end end - - describe "amqp" do - pending "should create a queue and subscribe queue to amq.topic" do - with_server do |server| - with_client_io(server) do |io| - connect(io) - - topic_filters = mk_topic_filters({"a/b", 0}) - suback = subscribe(io, topic_filters: topic_filters) - suback.should be_a(MQTT::Protocol::SubAck) - - q = server.vhosts["/"].queues["mqtt.client_id"] - binding = q.bindings.find { |a, b| a.is_a?(LavinMQ::TopicExchange) && b[0] == "a.b" } - binding.should_not be_nil - end - end - end - end end diff --git a/src/lavinmq/mqtt/broker.cr b/src/lavinmq/mqtt/broker.cr index 680e33a790..d04f7380ad 100644 --- a/src/lavinmq/mqtt/broker.cr +++ b/src/lavinmq/mqtt/broker.cr @@ -1,7 +1,6 @@ module LavinMQ module MQTT struct Sessions - @queues : Hash(String, Queue) def initialize( @vhost : VHost) @@ -17,11 +16,10 @@ module LavinMQ end def declare(client_id : String, clean_session : Bool) - if session = self[client_id]? - return session + self[client_id]? || begin + @vhost.declare_queue("amq.mqtt-#{client_id}", !clean_session, clean_session, AMQP::Table.new({"x-queue-type": "mqtt"})) + self[client_id] end - @vhost.declare_queue("amq.mqtt-#{client_id}", !clean_session, clean_session, AMQP::Table.new({"x-queue-type": "mqtt"})) - return self[client_id] end def delete(client_id : String) @@ -30,7 +28,6 @@ module LavinMQ end class Broker - getter vhost, sessions def initialize(@vhost : VHost) @@ -38,7 +35,12 @@ module LavinMQ @clients = Hash(String, Client).new end - #remember to remove the old client entry form the hash if you replace a client. (maybe it already does?) + def session_present?(client_id : String, clean_session) : Bool + session = @sessions[client_id]? + return false if session.nil? || clean_session + true + end + def connect_client(socket, connection_info, user, vhost, packet) if prev_client = @clients[packet.client_id]? Log.trace { "Found previous client connected with client_id: #{packet.client_id}, closing" } @@ -50,18 +52,18 @@ module LavinMQ end def subscribe(client, packet) - name = "amq.mqtt-#{client.client_id}" - durable = false - auto_delete = false - pp "clean_session: #{client.@clean_session}" - @sessions.declare(client.client_id, client.@clean_session) - # Handle bindings, packet.topics + session = @sessions.declare(client.client_id, client.@clean_session) + qos = Array(MQTT::SubAck::ReturnCode).new(packet.topic_filters.size) + packet.topic_filters.each do |tf| + qos << MQTT::SubAck::ReturnCode.from_int(tf.qos) + rk = topicfilter_to_routingkey(tf.topic) + session.subscribe(rk, tf.qos) + end + qos end - def session_present?(client_id : String, clean_session) : Bool - session = @sessions[client_id]? - return false if session.nil? || clean_session - true + def topicfilter_to_routingkey(tf) : String + tf.gsub("/", ".") end def clear_session(client_id) diff --git a/src/lavinmq/mqtt/client.cr b/src/lavinmq/mqtt/client.cr index 5828bc4383..b3d7b7c7a8 100644 --- a/src/lavinmq/mqtt/client.cr +++ b/src/lavinmq/mqtt/client.cr @@ -42,6 +42,7 @@ module LavinMQ end private def read_loop + loop do @log.trace { "waiting for packet" } packet = read_and_handle_packet @@ -95,7 +96,7 @@ module LavinMQ end def recieve_publish(packet : MQTT::Publish) - rk = topicfilter_to_routingkey(packet.topic) + rk = @broker.topicfilter_to_routingkey(packet.topic) props = AMQ::Protocol::Properties.new( message_id: packet.packet_id.to_s ) @@ -112,25 +113,13 @@ module LavinMQ end def recieve_subscribe(packet : MQTT::Subscribe) - @broker.subscribe(self, packet) - qos = Array(MQTT::SubAck::ReturnCode).new - packet.topic_filters.each do |tf| - qos << MQTT::SubAck::ReturnCode.from_int(tf.qos) - rk = topicfilter_to_routingkey(tf.topic) - #handle bindings in broker. - @broker.vhost.bind_queue("amq.mqtt-#{client_id}", "amq.topic", rk) - end - # handle add_consumer in broker. - queue = @broker.vhost.queues["amq.mqtt-#{client_id}"] - consumer = MqttConsumer.new(self, queue) - queue.add_consumer(consumer) + qos = @broker.subscribe(self, packet) + session = @broker.sessions[@client_id] + consumer = MqttConsumer.new(self, session) + session.add_consumer(consumer) send(MQTT::SubAck.new(qos, packet.packet_id)) end - def topicfilter_to_routingkey(tf) : String - tf.gsub("/", ".") - end - def recieve_unsubscribe(packet) end diff --git a/src/lavinmq/mqtt/session.cr b/src/lavinmq/mqtt/session.cr index f8cb01706e..22d8438771 100644 --- a/src/lavinmq/mqtt/session.cr +++ b/src/lavinmq/mqtt/session.cr @@ -1,8 +1,10 @@ module LavinMQ module MQTT class Session < Queue - @clean_session : Bool = false - getter clean_session + @clean_session : Bool = false + @subscriptions : Int32 = 0 + getter clean_session + def initialize(@vhost : VHost, @name : String, @auto_delete = false, @@ -10,17 +12,23 @@ module LavinMQ super(@vhost, @name, false, @auto_delete, arguments) end - def clean_session? - @auto_delete - end + def clean_session?; @auto_delete; end + def durable?; !clean_session?; end - def durable? - !clean_session? + # TODO: "amq.tocpic" is hardcoded, should be the mqtt-exchange when that is finished + def subscribe(rk, qos) + arguments = AMQP::Table.new({"x-mqtt-qos": qos}) + if binding = bindings.find { |b| b.binding_key.routing_key == rk } + return if binding.binding_key.arguments == arguments + @vhost.unbind_queue(@name, "amq.topic", rk, binding.binding_key.arguments || AMQP::Table.new) + end + @vhost.bind_queue(@name, "amq.topic", rk, arguments) end - #TODO: implement subscribers array and session_present? and send instead of false - def connect(client) - client.send(MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted)) + def unsubscribe(rk) + # unbind session from the exchange + # decrease @subscriptions by 1 + # if subscriptions is empty, delete the session(do that from broker?) end end end