Skip to content

Commit

Permalink
rebase main
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Oct 14, 2024
1 parent f1e82bf commit bb4394c
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 93 deletions.
12 changes: 12 additions & 0 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -434,4 +434,16 @@ describe LavinMQ::MQTTExchange do
end
end
end

it "publish messages to queues with it's own publish method" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
s1 = LavinMQ::MQTT::Session.new(vhost, "session 1")
x = LavinMQ::MQTTExchange.new(vhost, "mqtt.default")
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
msg = LavinMQ::Message.new("mqtt.default", "s1", "hej")
x.publish(msg, false)
s1.message_count.should eq 1
end
end
end
6 changes: 4 additions & 2 deletions spec/mqtt/integrations/connect_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module MqttSpecs
packet_id: 1u16
)
disconnect(io)
sleep 0.1
end
with_client_io(server) do |io|
connack = connect(io, clean_session: true)
Expand All @@ -50,6 +51,7 @@ module MqttSpecs
packet_id: 1u16
)
disconnect(io)
sleep 0.1
end
with_client_io(server) do |io|
connack = connect(io, clean_session: false)
Expand All @@ -69,6 +71,7 @@ module MqttSpecs
packet_id: 1u16
)
disconnect(io)
sleep 0.1
end
with_client_io(server) do |io|
connack = connect(io, clean_session: true)
Expand All @@ -88,6 +91,7 @@ module MqttSpecs
packet_id: 1u16
)
disconnect(io)
sleep 0.1
end
with_client_io(server) do |io|
connack = connect(io, clean_session: false)
Expand All @@ -106,7 +110,6 @@ module MqttSpecs
connack = connect(io)
connack.should be_a(MQTT::Protocol::Connack)
connack = connack.as(MQTT::Protocol::Connack)
pp connack.return_code
connack.return_code.should eq(MQTT::Protocol::Connack::ReturnCode::Accepted)
end
end
Expand Down Expand Up @@ -271,7 +274,6 @@ module MqttSpecs
topics = mk_topic_filters({"a/b", 1})
subscribe(io, topic_filters: topics)
disconnect(io)
pp server.vhosts["/"].queues["amq.mqtt-client_id"].consumers
end
sleep 0.1
server.vhosts["/"].queues["amq.mqtt-client_id"].consumers.should be_empty
Expand Down
12 changes: 8 additions & 4 deletions spec/mqtt/integrations/message_qos_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module MqttSpecs
extend MqttHelpers
extend MqttMatchers
describe "message qos" do
pending "both qos bits can't be set [MQTT-3.3.1-4]" do
it "both qos bits can't be set [MQTT-3.3.1-4]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -19,7 +19,7 @@ module MqttSpecs
end
end

pending "qos is set according to subscription qos [MYRA non-normative]" do
it "qos is set according to subscription qos [LavinMQ non-normative]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -46,13 +46,14 @@ module MqttSpecs
end
end

pending "qos1 messages are stored for offline sessions [MQTT-3.1.2-5]" do
it "qos1 messages are stored for offline sessions [MQTT-3.1.2-5]" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
topic_filters = mk_topic_filters({"a/b", 1u8})
subscribe(io, topic_filters: topic_filters)
disconnect(io)
sleep 0.1
end

with_client_io(server) do |publisher_io|
Expand All @@ -62,6 +63,7 @@ module MqttSpecs
publish(publisher_io, topic: "a/b", qos: 0u8)
end
disconnect(publisher_io)
sleep 0.1
end

with_client_io(server) do |io|
Expand All @@ -78,7 +80,7 @@ module MqttSpecs
end
end

pending "acked qos1 message won't be sent again" do
it "acked qos1 message won't be sent again" do
with_server do |server|
with_client_io(server) do |io|
connect(io)
Expand All @@ -90,6 +92,7 @@ module MqttSpecs
publish(publisher_io, topic: "a/b", payload: "1".to_slice, qos: 0u8)
publish(publisher_io, topic: "a/b", payload: "2".to_slice, qos: 0u8)
disconnect(publisher_io)
sleep 0.1
end

pkt = read_packet(io)
Expand All @@ -98,6 +101,7 @@ module MqttSpecs
puback(io, pub.packet_id)
end
disconnect(io)
sleep 0.1
end

with_client_io(server) do |io|
Expand Down
26 changes: 0 additions & 26 deletions spec/mqtt/integrations/publish_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,5 @@ module MqttSpecs
end
end
end

it "should put the message in a queue" do
with_server do |server|
with_channel(server) do |ch|
x = ch.exchange("amq.topic", "topic")
q = ch.queue("test")
q.bind(x.name, q.name)

with_client_io(server) do |io|
connect(io)

payload = slice = Bytes[1, 254, 200, 197, 123, 4, 87]
ack = publish(io, topic: "test", payload: payload, qos: 1u8)
ack.should_not be_nil

body = q.get(no_ack: true).try do |v|
s = Slice(UInt8).new(payload.size)
v.body_io.read(s)
s
end
body.should eq(payload)
disconnect(io)
end
end
end
end
end
end
1 change: 1 addition & 0 deletions src/lavinmq/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ module LavinMQ
queues : Set(Queue) = Set(Queue).new,
exchanges : Set(Exchange) = Set(Exchange).new) : Int32
@publish_in_count += 1
pp "pub"
count = do_publish(msg, immediate, queues, exchanges)
@unroutable_count += 1 if count.zero?
@publish_out_count += count
Expand Down
97 changes: 39 additions & 58 deletions src/lavinmq/exchange/mqtt.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,21 @@ require "../mqtt/subscription_tree"

module LavinMQ
class MQTTExchange < Exchange
@bindings = Hash(BindingKey, Set(Destination)).new do |h, k|
struct MqttBindingKey
def initialize(routing_key : String, arguments : AMQP::Table? = nil)
@binding_key = BindingKey.new(routing_key, arguments)
end

def inner
@binding_key
end

def hash
@binding_key.routing_key.hash
end
end

@bindings = Hash(MqttBindingKey, Set(Destination)).new do |h, k|
h[k] = Set(Destination).new
end
@tree = MQTT::SubscriptionTree.new
Expand All @@ -12,93 +26,60 @@ module LavinMQ
"mqtt"
end

def bindings_details : Iterator(BindingDetails)
@bindings.each.flat_map do |binding_key, ds|
ds.each.map do |d|
BindingDetails.new(name, vhost.name, binding_key, d)
end
end
end

# TODO: we can probably clean this up a bit
def publish(msg : Message, immediate : Bool,
queues : Set(Queue) = Set(Queue).new,
exchanges : Set(Exchange) = Set(Exchange).new) : Int32
@publish_in_count += 1
headers = msg.properties.headers
find_queues(msg.routing_key, headers, queues, exchanges)
if queues.empty?
@unroutable_count += 1
return 0
end
return 0 if immediate && !queues.any? &.immediate_delivery?

private def _publish(msg : Message, immediate : Bool,
queues : Set(Queue) = Set(Queue).new,
exchanges : Set(Exchange) = Set(Exchange).new) : Int32
count = 0
queues.each do |queue|
qos = 0_u8
bindings_details.each do |binding_detail|
next unless binding_detail.destination == queue
next unless arg = binding_detail.binding_key.arguments
next unless qos_value = arg["x-mqtt-qos"]?
qos = qos_value.try &.as(UInt8)
end
@tree.each_entry(msg.routing_key) do |queue, qos|
msg.properties.delivery_mode = qos

if queue.publish(msg)
@publish_out_count += 1
count += 1
msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind
end
end
count
end

def bindings_details : Iterator(BindingDetails)
@bindings.each.flat_map do |binding_key, ds|
ds.each.map do |d|
BindingDetails.new(name, vhost.name, binding_key.inner, d)
end
end
end

# Only here to make superclass happy
protected def bindings(routing_key, headers) : Iterator(Destination)
Iterator(Destination).empty
end

def bind(destination : Destination, routing_key : String, headers = nil) : Bool
raise LavinMQ::Exchange::AccessRefused.new(self) unless destination.is_a?(MQTT::Session)

binding_key = BindingKey.new(routing_key, headers)
return false unless @bindings[binding_key].add? destination

qos = headers.try { |h| h.fetch("x-mqtt-qos", "0").as(UInt8) }
qos = headers.try { |h| h["x-mqtt-qos"]?.try(&.as(UInt8)) } || 0u8
binding_key = MqttBindingKey.new(routing_key, headers)
@bindings[binding_key].add destination
@tree.subscribe(routing_key, destination, qos)

data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = BindingDetails.new(name, vhost.name, binding_key.inner, destination)
notify_observers(ExchangeEvent::Bind, data)
true
end

def unbind(destination : Destination, routing_key, headers = nil) : Bool
raise LavinMQ::Exchange::AccessRefused.new(self) unless destination.is_a?(MQTT::Session)
binding_key = BindingKey.new(routing_key, headers)
binding_key = MqttBindingKey.new(routing_key, headers)
rk_bindings = @bindings[binding_key]
return false unless rk_bindings.delete destination
rk_bindings.delete destination
@bindings.delete binding_key if rk_bindings.empty?

@tree.unsubscribe(routing_key, destination)

data = BindingDetails.new(name, vhost.name, binding_key, destination)
data = BindingDetails.new(name, vhost.name, binding_key.inner, destination)
notify_observers(ExchangeEvent::Unbind, data)

delete if @auto_delete && @bindings.each_value.all?(&.empty?)
true
end

protected def bindings : Iterator(Destination)
@bindings.values.each.flat_map(&.each)
end

protected def bindings(routing_key, headers) : Iterator(Destination)
binding_key = BindingKey.new(routing_key, headers)
matches(binding_key).each
end

private def matches(binding_key : BindingKey) : Iterator(Destination)
@tree.each_entry(binding_key.routing_key) do |session, qos|
end

@bindings.each.select do |binding, destinations|
binding.routing_key == binding_key.routing_key
end.flat_map { |_, v| v.each }
end
end
end
7 changes: 4 additions & 3 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ module LavinMQ
end

private def get(no_ack : Bool, & : Envelope -> Nil) : Bool
#let packet_id be message counter, look at myra for counter
raise ClosedError.new if @closed
loop do # retry if msg expired or deliver limit hit
env = @msg_store_lock.synchronize { @msg_store.shift? } || break

sp = env.segment_position
no_ack = env.message.properties.delivery_mode == 0
if false
if no_ack
pp "no ack"
begin
yield env # deliver the message
rescue ex # requeue failed delivery
Expand All @@ -113,7 +113,8 @@ module LavinMQ
end

def ack(sp : SegmentPosition) : Nil
# TDO: maybe risky to not have locka round this
# TODO: maybe risky to not have lock around this
pp "Acking?"
@unacked.delete sp
super sp
end
Expand Down

0 comments on commit bb4394c

Please sign in to comment.