Skip to content

Commit

Permalink
pass connection throgh broker and use session for connack
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 18, 2024
1 parent 2497723 commit cb77ee8
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 120 deletions.
2 changes: 1 addition & 1 deletion spec/mqtt/integrations/connect_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ module MqttSpecs
end
end

pending "session present when reconnecting a non-clean session [MQTT-3.1.2-4]" do
it "session present when reconnecting a non-clean session [MQTT-3.1.2-4]" do
with_server do |server|
with_client_io(server) do |io|
connect(io, clean_session: false)
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require "./in_memory_backend"

module LavinMQ
class Config
DEFAULT_LOG_LEVEL = Log::Severity::Info
DEFAULT_LOG_LEVEL = Log::Severity::Trace

property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq")
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : ""
Expand Down
32 changes: 16 additions & 16 deletions src/lavinmq/http/controller/queues.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ module LavinMQ
end
end

get "/api/queues/:vhost/:name/unacked" do |context, params|
with_vhost(context, params) do |vhost|
refuse_unless_management(context, user(context), vhost)
q = queue(context, params, vhost)
unacked_messages = q.consumers.each.flat_map do |c|
c.unacked_messages.each.compact_map do |u|
next unless u.queue == q
if consumer = u.consumer
UnackedMessage.new(c.channel, u.tag, u.delivered_at, consumer.tag)
end
end
end
unacked_messages = unacked_messages.chain(q.basic_get_unacked.each)
page(context, unacked_messages)
end
end
# get "/api/queues/:vhost/:name/unacked" do |context, params|
# with_vhost(context, params) do |vhost|
# refuse_unless_management(context, user(context), vhost)
# q = queue(context, params, vhost)
# # unacked_messages = q.consumers.each.flat_map do |c|
# # c.unacked_messages.each.compact_map do |u|
# # next unless u.queue == q
# # if consumer = u.consumer
# # UnackedMessage.new(c.channel, u.tag, u.delivered_at, consumer.tag)
# # end
# # end
# # end
# # unacked_messages = unacked_messages.chain(q.basic_get_unacked.each)
# # page(context, unacked_messages)
# end
# end

put "/api/queues/:vhost/:name" do |context, params|
with_vhost(context, params) do |vhost|
Expand Down
6 changes: 6 additions & 0 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
module LavinMQ
module MQTT
class Broker

def connected(client) : MQTT::Session
session = Session.new(client.vhost, client.client_id)
session.connect(client)
session
end
end
end
end
189 changes: 95 additions & 94 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,120 +6,33 @@ require "./session"

module LavinMQ
module MQTT
class MqttConsumer < LavinMQ::Client::Channel::Consumer
getter unacked = 0_u32
getter tag : String = "mqtt"
property prefetch_count = 1

def initialize(@client : Client, @queue : Queue)
@has_capacity.try_send? true
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

private def deliver_loop
queue = @queue
i = 0
loop do
queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue LavinMQ::Queue::ClosedError
rescue ex
puts "deliver loop exiting: #{ex.inspect}"
end

def details_tuple
{
queue: {
name: "mqtt.client_id",
vhost: "mqtt",
},
}
end

def no_ack?
true
end

def accepts? : Bool
true
end

def deliver(msg, sp, redelivered = false, recover = false)
packet_id = nil
if message_id = msg.properties.message_id
packet_id = message_id.to_u16 unless message_id.empty?
end
pub_args = {
packet_id: packet_id,
payload: msg.body,
dup: false,
qos: 0u8,
retain: false,
topic: "test",
}
@client.send(::MQTT::Protocol::Publish.new(**pub_args))
# MQTT::Protocol::PubAck.from_io(io) if pub_args[:qos].positive? && expect_response
end

def exclusive?
true
end

def cancel
end

def close
end

def closed?
false
end

def flow(active : Bool)
end

getter has_capacity = ::Channel(Bool).new

def ack(sp)
end

def reject(sp, requeue = false)
end

def priority
0
end
end

class Client < LavinMQ::Client
include Stats
include SortableJSON

getter vhost, channels, log, name, user, client_id
getter vhost, channels, log, name, user, client_id, socket
@channels = Hash(UInt16, Client::Channel).new
session : MQTT::Session
@session : MQTT::Session?
rate_stats({"send_oct", "recv_oct"})
Log = ::Log.for "MQTT.client"

def initialize(@socket : ::IO,
@connection_info : ConnectionInfo,
@vhost : VHost,
@user : User,
@broker : MQTT::Broker,
@client_id : String,
@clean_session = false,
@will : MQTT::Will? = nil)
@will : MQTT::Will? = nil
)
@io = MQTT::IO.new(@socket)
@lock = Mutex.new
@remote_address = @connection_info.src
@local_address = @connection_info.dst
@name = "#{@remote_address} -> #{@local_address}"
@metadata = ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s})
@log = Logger.new(Log, @metadata)
@vhost.add_connection(self)
session = start_session(self)
@session = @broker.connected(self)
@log.info { "Connection established for user=#{@user.name}" }
spawn read_loop
end
Expand Down Expand Up @@ -178,7 +91,7 @@ module LavinMQ
send MQTT::PingResp.new
end

def recieve_publish(packet)
def recieve_publish(packet : MQTT::Publish)
rk = topicfilter_to_routingkey(packet.topic)
props = AMQ::Protocol::Properties.new(
message_id: packet.packet_id.to_s
Expand Down Expand Up @@ -261,5 +174,93 @@ module LavinMQ
def force_close
end
end

class MqttConsumer < LavinMQ::Client::Channel::Consumer
getter unacked = 0_u32
getter tag : String = "mqtt"
property prefetch_count = 1

def initialize(@client : Client, @queue : Queue)
@has_capacity.try_send? true
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

private def deliver_loop
queue = @queue
i = 0
loop do
queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue LavinMQ::Queue::ClosedError
rescue ex
puts "deliver loop exiting: #{ex.inspect}"
end

def details_tuple
{
queue: {
name: "mqtt.client_id",
vhost: "mqtt",
},
}
end

def no_ack?
true
end

def accepts? : Bool
true
end

def deliver(msg, sp, redelivered = false, recover = false)
packet_id = nil
if message_id = msg.properties.message_id
packet_id = message_id.to_u16 unless message_id.empty?
end
pub_args = {
packet_id: packet_id,
payload: msg.body,
dup: false,
qos: 0u8,
retain: false,
topic: "test",
}
@client.send(::MQTT::Protocol::Publish.new(**pub_args))
# MQTT::Protocol::PubAck.from_io(io) if pub_args[:qos].positive? && expect_response
end

def exclusive?
true
end

def cancel
end

def close
end

def closed?
false
end

def flow(active : Bool)
end

getter has_capacity = ::Channel(Bool).new

def ack(sp)
end

def reject(sp, requeue = false)
end

def priority
0
end
end
end
end
7 changes: 3 additions & 4 deletions src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require "log"
require "./client"
require "../vhost"
require "../user"
require "./broker"

module LavinMQ
module MQTT
Expand All @@ -13,13 +14,12 @@ module LavinMQ
end

def start(socket : ::IO, connection_info : ConnectionInfo)

io = MQTT::IO.new(socket)
if packet = MQTT::Packet.from_io(socket).as?(MQTT::Connect)
Log.trace { "recv #{packet.inspect}" }
if user = authenticate(io, packet)
MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted).to_io(io)
io.flush
return LavinMQ::MQTT::Client.new(socket, connection_info, @vhost, user, packet.client_id, packet.clean_session?, packet.will)
return LavinMQ::MQTT::Client.new(socket, connection_info, @vhost, user, MQTT::Broker.new, packet.client_id, packet.clean_session?, packet.will)
end
end
rescue ex : MQTT::Error::Connect
Expand All @@ -28,7 +28,6 @@ module LavinMQ
MQTT::Connack.new(false, MQTT::Connack::ReturnCode.new(ex.return_code)).to_io(io)
end
socket.close

rescue ex
Log.warn { "Recieved the wrong packet" }
socket.close
Expand Down
13 changes: 10 additions & 3 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
module LavinMQ
module MQTT
class Session < Queue
def initialize(@vhost : VHost, @name : String, @exclusive = true, @auto_delete = false, arguments : ::AMQ::Protocol::Table = AMQP::Table.new)
def initialize(@vhost : VHost,
@name : String,
@exclusive = true,
@auto_delete = false,
arguments : ::AMQ::Protocol::Table = AMQP::Table.new)
super
end

#if sub comes in with clean_session, set auto_delete on session
#rm_consumer override for clean_session

#TODO: implement subscribers array and session_present? and send instead of false
def connect(client)
client.send(MQTT::Connack.new(false, MQTT::Connack::ReturnCode::Accepted))
end
end
end
end
2 changes: 1 addition & 1 deletion src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ module LavinMQ
# closing all channels will move all unacked back into ready queue
# so we are purging all messages from the queue, not only ready
consumers = @consumers_lock.synchronize { @consumers.dup }
consumers.each(&.channel.close)
# consumers.each(&.channel.close)
count = purge
notify_consumers_empty(true)
count.to_u32
Expand Down

0 comments on commit cb77ee8

Please sign in to comment.