Skip to content

Commit

Permalink
bunch of fixes after review
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Jan 28, 2025
1 parent 1fd6b3a commit 76917ad
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ module LavinMQ
property? tcp_nodelay = false # bool
property segment_size : Int32 = 8 * 1024**2 # bytes
property max_inflight_messages : UInt16 = 65_535 # mqtt messages
property default_mqtt_vhost = "/"
property? raise_gc_warn : Bool = false
property? data_dir_lock : Bool = true
property tcp_keepalive : Tuple(Int32, Int32, Int32)? = {60, 10, 3} # idle, interval, probes/count
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ module LavinMQ
@log.warn(exception: ex) { "Packet decode error" }
publish_will
rescue ex : ::IO::TimeoutError
@log.warn { "Keepalive timeout: #{ex.message}" }
@log.warn { "Keepalive timeout (keepalive:#{@keepalive}): #{ex.message}" }
publish_will
rescue ex : ::IO::Error
@log.error { "Client unexpectedly closed connection: #{ex.message}" } unless @closed
Expand Down Expand Up @@ -184,10 +184,10 @@ module LavinMQ
class Consumer < LavinMQ::Client::Channel::Consumer
getter unacked = 0_u32
getter tag : String = "mqtt"
getter has_capacity = ::Channel(Bool).new
property prefetch_count = 1

def initialize(@client : Client, @session : MQTT::Session)
@has_capacity.try_send? true
end

def details_tuple
Expand Down Expand Up @@ -243,7 +243,7 @@ module LavinMQ
def flow(active : Bool)
end

getter has_capacity = ::Channel(Bool).new


def ack(sp)
end
Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module LavinMQ
Log = LavinMQ::Log.for "mqtt.connection_factory"

def initialize(@users : UserStore,
@brokers : Brokers)
@brokers : Brokers, @config : Config)
end

def start(socket : ::IO, connection_info : ConnectionInfo)
Expand Down Expand Up @@ -54,18 +54,18 @@ module LavinMQ
def authenticate(io, packet)
return unless (username = packet.username) && (password = packet.password)

vhost = "/"

if split_pos = username.index(':')
vhost = username[0, split_pos]
@config.default_mqtt_vhost = username[0, split_pos]
username = username[split_pos + 1..]
end

user = @users[username]?
return unless user
return unless user.password && user.password.try(&.verify(String.new(password)))
has_vhost_permissions = user.try &.permissions.has_key?(vhost)
has_vhost_permissions = user.try &.permissions.has_key?(@config.default_mqtt_vhost)
return unless has_vhost_permissions
broker = @brokers[vhost]?
broker = @brokers[@config.default_mqtt_vhost]?
return unless broker

{user, broker}
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/mqtt/sessions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ require "../vhost"

module LavinMQ
module MQTT
struct Sessions
class Sessions
@queues : Hash(String, Queue)

def initialize(@vhost : VHost)
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ module LavinMQ
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
@connection_factories = {
Protocol::AMQP => AMQP::ConnectionFactory.new(@users, @vhosts),
Protocol::MQTT => MQTT::ConnectionFactory.new(@users, mqtt_brokers),
Protocol::MQTT => MQTT::ConnectionFactory.new(@users, mqtt_brokers, @config),
}
apply_parameter
spawn stats_loop, name: "Server#stats_loop"
Expand Down

0 comments on commit 76917ad

Please sign in to comment.