Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket flush fiber #856

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
32 changes: 27 additions & 5 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,25 @@ module LavinMQ
@vhost.add_connection(self)
@log.info { "Connection established for user=#{@user.name}" }
spawn read_loop, name: "Client#read_loop #{@remote_address}"
spawn flush_loop, name: "Client#flush_loop #{@remote_address}"
end

@flush_buffer = ::Channel(Bool).new(128)

private def flush_loop
flush_buffer = @flush_buffer
while flush_buffer.receive? && @running
while flush_buffer.try_receive?
end
@write_lock.synchronize do
@socket.flush
end
end
rescue e : IO::Error
end

private def flush
@flush_buffer.send(true)
end

# Returns client provided connection name if set, else server generated name
Expand Down Expand Up @@ -190,8 +209,8 @@ module LavinMQ
@write_lock.synchronize do
s = @socket
s.write_bytes frame, IO::ByteFormat::NetworkEndian
s.flush
end
flush
@last_sent_frame = RoughTime.monotonic
@send_oct_count += 8_u64 + frame.bytesize
if frame.is_a?(AMQP::Frame::Connection::CloseOk)
Expand Down Expand Up @@ -224,9 +243,9 @@ module LavinMQ

def deliver(frame, msg)
return false if closed?
socket = @socket
websocket = socket.is_a? WebSocketIO
@write_lock.synchronize do
socket = @socket
websocket = socket.is_a? WebSocketIO
{% unless flag?(:release) %}
@log.trace { "Send #{frame.inspect}" }
{% end %}
Expand Down Expand Up @@ -257,9 +276,9 @@ module LavinMQ
@send_oct_count += 8_u64 + body.bytesize
pos += length
end
socket.flush unless websocket # Websockets need to send one frame per WS frame
@last_sent_frame = RoughTime.monotonic
end
flush unless websocket # Websockets need to send one frame per WS frame
true
rescue ex : IO::Error | OpenSSL::SSL::Error
@log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed?
Expand Down Expand Up @@ -412,7 +431,10 @@ module LavinMQ

private def close_socket
@running = false
@socket.close
@write_lock.synchronize do
@flush_buffer.close
@socket.close
end
rescue ex
@log.debug { "#{ex.inspect} when closing socket" }
end
Expand Down
Loading