Skip to content

Commit

Permalink
beginning of mqtt-poc, ping
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Aug 27, 2024
1 parent 7db7cda commit ee30279
Show file tree
Hide file tree
Showing 13 changed files with 268 additions and 32 deletions.
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ shards:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

mqtt-protocol:
git: https://github.com/84codes/mqtt-protocol.cr.git
version: 0.2.0+git.commit.3f82ee85d029e6d0505cbe261b108e156df4e598

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr

development_dependencies:
ameba:
Expand Down
51 changes: 51 additions & 0 deletions spec/mqtt_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require "spec"
require "socket"
require "./spec_helper"
require "mqtt-protocol"
require "../src/lavinmq/mqtt/connection_factory"


def setup_connection(s, pass)
left, right = UNIXSocket.pair
io = MQTT::Protocol::IO.new(left)
s.users.create("usr", "pass", [LavinMQ::Tag::Administrator])
MQTT::Protocol::Connect.new("abc", false, 60u16, "usr", pass.to_slice, nil).to_io(io)
connection_factory = LavinMQ::MQTT::ConnectionFactory.new(right,
LavinMQ::ConnectionInfo.local,
s.users,
s.vhosts["/"])
{ connection_factory.start, io }
end

describe LavinMQ do
src = "127.0.0.1"
dst = "127.0.0.1"

it "MQTT connection should pass authentication" do
with_amqp_server do |s|
client, io = setup_connection(s, "pass")
client.should be_a(LavinMQ::MQTT::Client)
# client.close
MQTT::Protocol::Disconnect.new.to_io(io)
end
end

it "unauthorized MQTT connection should not pass authentication" do
with_amqp_server do |s|
client, io = setup_connection(s, "pa&ss")
client.should_not be_a(LavinMQ::MQTT::Client)
# client.close
MQTT::Protocol::Disconnect.new.to_io(io)
end
end

it "should handle a Ping" do
with_amqp_server do |s|
client, io = setup_connection(s, "pass")
client.should be_a(LavinMQ::MQTT::Client)
MQTT::Protocol::PingReq.new.to_io(io)
MQTT::Protocol::Packet.from_io(io).should be_a(MQTT::Protocol::Connack)
MQTT::Protocol::Packet.from_io(io).should be_a(MQTT::Protocol::PingResp)
end
end
end
14 changes: 12 additions & 2 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.n
ctx = OpenSSL::SSL::Context::Server.new
ctx.certificate_chain = "spec/resources/server_certificate.pem"
ctx.private_key = "spec/resources/server_key.pem"
spawn(name: "amqp tls listen") { s.listen_tls(tcp_server, ctx) }
spawn(name: "amqp tls listen") { s.listen_tls(tcp_server, ctx, "amqp") }
else
spawn(name: "amqp tcp listen") { s.listen(tcp_server) }
spawn(name: "amqp tcp listen") { s.listen(tcp_server, "amqp") }
end
Fiber.yield
yield s
Expand All @@ -89,6 +89,16 @@ def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.n
end
end

#do i need to do this?
# def with_mqtt_server(tls = false, & : LavinMQ::Server -> Nil)
# tcp_server = TCPServer.new("localhost", 0)
# s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator)
# begin
# if tls
# end

# end

def with_http_server(&)
with_amqp_server do |s|
h = LavinMQ::HTTP::Server.new(s)
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module LavinMQ
property amqp_bind = "127.0.0.1"
property amqp_port = 5672
property amqps_port = -1
property mqtt_port = 1883
property mqtt_bind = "127.0.0.1"
property unix_path = ""
property unix_proxy_protocol = 1_u8 # PROXY protocol version on unix domain socket connections
property tcp_proxy_protocol = 0_u8 # PROXY protocol version on amqp tcp connections
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/http/handler/websocket.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module LavinMQ
Socket::IPAddress.new("127.0.0.1", 0) # Fake when UNIXAddress
connection_info = ConnectionInfo.new(remote_address, local_address)
io = WebSocketIO.new(ws)
spawn amqp_server.handle_connection(io, connection_info), name: "HandleWSconnection #{remote_address}"
spawn amqp_server.handle_connection(io, connection_info, "amqp"), name: "HandleWSconnection #{remote_address}"
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/http/http_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ module LavinMQ
StaticController.new,
ApiErrorHandler.new,
AuthHandler.new(@amqp_server),
PrometheusController.new(@amqp_server),
# PrometheusController.new(@amqp_server),
ApiDefaultsHandler.new,
MainController.new(@amqp_server),
DefinitionsController.new(@amqp_server),
Expand Down
11 changes: 8 additions & 3 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ module LavinMQ

private def listen
if @config.amqp_port > 0
spawn @amqp_server.listen(@config.amqp_bind, @config.amqp_port),
spawn @amqp_server.listen(@config.amqp_bind, @config.amqp_port, :amqp),
name: "AMQP listening on #{@config.amqp_port}"
end

if @config.amqps_port > 0
if ctx = @tls_context
spawn @amqp_server.listen_tls(@config.amqp_bind, @config.amqps_port, ctx),
spawn @amqp_server.listen_tls(@config.amqp_bind, @config.amqps_port, ctx, :amqp),
name: "AMQPS listening on #{@config.amqps_port}"
end
end
Expand All @@ -131,7 +131,7 @@ module LavinMQ
end

unless @config.unix_path.empty?
spawn @amqp_server.listen_unix(@config.unix_path), name: "AMQP listening at #{@config.unix_path}"
spawn @amqp_server.listen_unix(@config.unix_path, :amqp), name: "AMQP listening at #{@config.unix_path}"
end

if @config.http_port > 0
Expand All @@ -150,6 +150,11 @@ module LavinMQ
spawn(name: "HTTP listener") do
@http_server.not_nil!.listen
end

if @config.mqtt_port > 0
spawn @amqp_server.listen(@config.mqtt_bind, @config.mqtt_port, :mqtt),
name: "MQTT listening on #{@config.mqtt_port}"
end
end

private def dump_debug_info
Expand Down
96 changes: 96 additions & 0 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
require "openssl"
require "socket"
require "../client"
require "../error"

module LavinMQ
module MQTT
class Client < LavinMQ::Client
include Stats
include SortableJSON

getter vhost, channels, log, name, user
Log = ::Log.for "MQTT.client"
rate_stats({"send_oct", "recv_oct"})

def initialize(@socket : ::IO,
@connection_info : ConnectionInfo,
@vhost : VHost,
@user : User)
@io = MQTT::IO.new(@socket)
@lock = Mutex.new
@remote_address = @connection_info.src
@local_address = @connection_info.dst
@metadata = ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s})
@log = Logger.new(Log, @metadata)
@channels = Hash(UInt16, Client::Channel).new
@vhost.add_connection(self)
spawn read_loop
connection_name = "#{@remote_address} -> #{@local_address}"
@name = "#{@remote_address} -> #{@local_address}"
end

private def read_loop
loop do
Log.trace { "waiting for packet" }
packet = read_and_handle_packet
# The disconnect packet has been handled and the socket has been closed.
# If we dont breakt the loop here we'll get a IO/Error on next read.
break if packet.is_a?(MQTT::Disconnect)
end
rescue ex : MQTT::Error::Connect
Log.warn { "Connect error #{ex.inspect}" }
ensure
@socket.close
@vhost.rm_connection(self)
end

def read_and_handle_packet
packet : MQTT::Packet = MQTT::Packet.from_io(@io)
Log.info { "recv #{packet.inspect}" }

case packet
when MQTT::Publish then pp "publish"
when MQTT::PubAck then pp "puback"
when MQTT::Subscribe then pp "subscribe"
when MQTT::Unsubscribe then pp "unsubscribe"
when MQTT::PingReq then receive_pingreq(packet)
when MQTT::Disconnect then return packet
else raise "invalid packet type for client to send"
end
packet
end

private def send(packet)
@lock.synchronize do
packet.to_io(@io)
@socket.flush
end
# @broker.increment_bytes_sent(packet.bytesize)
# @broker.increment_messages_sent
# @broker.increment_publish_sent if packet.is_a?(MQTT::Protocol::Publish)
end

def receive_pingreq(packet : MQTT::PingReq)
send(MQTT::PingResp.new)
end

def details_tuple
{
vhost: @vhost.name,
user: @user.name,
protocol: "MQTT",
}.merge(stats_details)
end

def update_rates
end

def close(reason)
end

def force_close
end
end
end
end
47 changes: 47 additions & 0 deletions src/lavinmq/mqtt/connection_factory.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "socket"
require "./protocol"
require "log"
require "./client"
require "../vhost"
require "../user"

module LavinMQ
module MQTT
class ConnectionFactory
def initialize(@socket : ::IO,
@connection_info : ConnectionInfo,
@users : UserStore,
@vhost : VHost)
end

def start
io = ::MQTT::Protocol::IO.new(@socket)
if packet = MQTT::Packet.from_io(@socket).as?(MQTT::Connect)
Log.trace { "recv #{packet.inspect}" }
if user = authenticate(io, packet, @users)
::MQTT::Protocol::Connack.new(false, ::MQTT::Protocol::Connack::ReturnCode::Accepted).to_io(io)
io.flush
return LavinMQ::MQTT::Client.new(@socket, @connection_info, @vhost, user)
end
end
rescue ex
Log.warn { "Recieved the wrong packet" }
@socket.close
end

def authenticate(io, packet, users)
return nil unless (username = packet.username) && (password = packet.password)
user = users[username]?
return user if user && user.password && user.password.not_nil!.verify(String.new(password))
#probably not good to differentiate between user not found and wrong password
if user.nil?
Log.warn { "User \"#{username}\" not found" }
else
Log.warn { "Authentication failure for user \"#{username}\"" }
end
::MQTT::Protocol::Connack.new(false, ::MQTT::Protocol::Connack::ReturnCode::NotAuthorized).to_io(io)
nil
end
end
end
end
7 changes: 7 additions & 0 deletions src/lavinmq/mqtt/protocol.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
require "mqtt-protocol"

module LavinMQ
module MQTT
include ::MQTT::Protocol
end
end
Loading

0 comments on commit ee30279

Please sign in to comment.