Skip to content

Commit

Permalink
Async::IO::CZTopSocket: spec and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
paddor committed Jan 4, 2024
1 parent bb5d00c commit 2841212
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 10 deletions.
2 changes: 2 additions & 0 deletions cztop.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "pry"
spec.add_development_dependency "yard"
spec.add_development_dependency "rubocop", "~> 1.36.0"
spec.add_development_dependency "async", ">= 2.0.1"
spec.add_development_dependency "async-io"
end
40 changes: 34 additions & 6 deletions lib/cztop/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@

module Async
module IO

# Wrapper for CZTop sockets.
#
# @example
# Async do |task|
# socket = CZTop::Socket::REP.new("ipc:///tmp/req_rep_example")
# socket.options.rcvtimeo = 3
# io = Async::IO.try_convert socket
# msg = io.receive
# io << msg.to_a.map(&:upcase)
# end

class CZTopSocket < Generic
wraps ::CZTop::Socket::REQ
wraps ::CZTop::Socket::REP
Expand All @@ -19,20 +31,22 @@ class CZTopSocket < Generic
wraps ::CZTop::Socket::XSUB


# @see {CZTop::SendReceiveMethods#receive}
def receive
wait_readable
@io.receive
end


# @see {CZTop::SendReceiveMethods#<<}
def <<(...)
wait_writable
@io.<<(...)
end


# Waits for socket to become readable.
def wait_readable(timeout = read_timeout)
puts "Async::IO::CZTopSocket#wait_readable: waiting with timeout=#{timeout}"
@io_fd ||= ::IO.for_fd @io.fd, autoclose: false

if timeout
Expand All @@ -41,16 +55,16 @@ def wait_readable(timeout = read_timeout)
while true
@io_fd.wait_readable(timeout)
break if @io.readable?
raise TimeoutError if now >= timeout_at
raise ::IO::TimeoutError if now >= timeout_at
end
else
@io_fd.wait_readable until @io.readable?
end
end


# Waits for socket to become writable.
def wait_writable(timeout = write_timeout)
puts "Async::IO::CZTopSocket#wait_writable: waiting with timeout=#{timeout}"
@io_fd ||= ::IO.for_fd @io.fd, autoclose: false

if timeout
Expand All @@ -59,24 +73,38 @@ def wait_writable(timeout = write_timeout)
while true
@io_fd.wait_writable(timeout)
break if @io.writable?
raise TimeoutError if now >= timeout_at
raise ::IO::TimeoutError if now >= timeout_at
end
else
@io_fd.wait_writable until @io.writable?
end
end


# @return [Float, nil] the timeout in seconds used by {IO#wait_readable}
def read_timeout
timeout = @io.options.rcvtimeo
timeout = nil if timeout <= 0

if timeout <= 0
timeout = nil
else
timeout = timeout.to_f / 1000
end

timeout
end


# @return [Float, nil] the timeout in seconds used by {IO#wait_writable}
def write_timeout
timeout = @io.options.sndtimeo
timeout = nil if timeout <= 0

if timeout <= 0
timeout = nil
else
timeout = timeout.to_f / 1000
end

timeout
end

Expand Down
12 changes: 8 additions & 4 deletions lib/cztop/zsock_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,33 @@ def PLAIN_password=(password)

# @!group Send and Receive Timeouts

# @return [Integer] the timeout when receiving a message
# @return [Integer] the timeout in milliseconds when receiving a message
# @see Message.receive_from
# @note -1 means infinite, 0 means nonblocking
def rcvtimeo
Zsock.rcvtimeo(@zocket)
end


# @param timeout [Integer] new timeout
# @param timeout [Integer] new timeout in milliseconds
# @see Message.receive_from
# @note -1 means infinite, 0 means nonblocking
def rcvtimeo=(timeout)
Zsock.set_rcvtimeo(@zocket, timeout)
end


# @return [Integer] the timeout when sending a message
# @return [Integer] the timeout in milliseconds when sending a message
# @see Message#send_to
# @note -1 means infinite, 0 means nonblocking
def sndtimeo
Zsock.sndtimeo(@zocket)
end


# @param timeout [Integer] new timeout
# @param timeout [Integer] new timeout in milliseconds
# @see Message#send_to
# @note -1 means infinite, 0 means nonblocking
def sndtimeo=(timeout)
Zsock.set_sndtimeo(@zocket, timeout)
end
Expand Down
160 changes: 160 additions & 0 deletions spec/cztop/async_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# frozen_string_literal: true

require_relative 'spec_helper'
require_relative '../../lib/cztop/async'

describe Async::IO::CZTopSocket do
i = 0
let(:endpoint) { "inproc://async_endpoint_socket_spec_reqrep_#{i += 1}" }
let(:req_socket) { CZTop::Socket::REQ.new(endpoint) }
let(:rep_socket) { CZTop::Socket::REP.new(endpoint) }
let(:req_io) { Async::IO.try_convert req_socket }
let(:rep_io) { Async::IO.try_convert rep_socket }


it 'can be converted to Async::IO' do
assert_kind_of Async::IO::CZTopSocket, req_io
assert_kind_of Async::IO::CZTopSocket, rep_io
end

it 'can send and receive' do
Async do |task|
rep_socket
req_socket

sleep 0.1
req_io = Async::IO.try_convert req_socket
rep_io = Async::IO.try_convert rep_socket

task.async do |task|
msg = rep_io.receive
word, = msg.to_a
rep_io << word.upcase
end

task.async do |task|
req_io << 'hello'
response, = req_io.receive.to_a
# p response: response
assert_equal 'HELLO', response
end
end
end


describe '#read_timeout' do
describe 'with no rcvtimeout set' do
before do
assert_equal -1, req_socket.options.rcvtimeo
end

it 'returns nil' do
assert_nil req_io.read_timeout
end
end

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with no rcvtimeout=0' do
before do
req_socket.options.rcvtimeo = 0
end

it 'returns nil' do
assert_nil req_io.read_timeout
end
end

describe 'with rcvtimeout set' do
before do
req_socket.options.rcvtimeo = 10 # ms
end

it 'returns timeout in seconds' do
assert_equal 0.01, req_io.read_timeout
end
end
end


describe '#write_timeout' do
describe 'with no sndtimeout set' do
before do
assert_equal -1, req_socket.options.sndtimeo
end

it 'returns nil' do
assert_nil req_io.write_timeout
end
end

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with sndtimeout=0' do
before do
req_socket.options.sndtimeo = 0
end

it 'returns nil' do
assert_nil req_io.write_timeout
end
end

describe 'with sndtimeout set' do
before do
req_socket.options.sndtimeo = 10
end

it 'returns timeout in seconds' do
assert_equal 0.01, req_io.write_timeout
end
end
end


context 'with rcvtimeo' do
before do
req_socket.options.rcvtimeo = 30
assert_equal 30, req_socket.options.rcvtimeo
end

it 'will raise TimeoutError' do
Async do
assert_raises ::IO::TimeoutError do
req_io.receive
end
end
end
end


context 'with sndtimeo' do
before do
rep_socket.options.sndtimeo = 30
assert_equal 30, rep_socket.options.sndtimeo
end

it 'will raise TimeoutError' do
Async do
assert_raises ::IO::TimeoutError do
rep_io << ['foo']
end
end
end
end


describe 'thread-safe sockets', if: has_czmq_drafts? do
let(:endpoint) { "inproc://async_endpoint_socket_spec_serverclient_#{i += 1}" }
let(:server_socket) { CZTop::Socket::SERVER.new(endpoint) }
let(:client_socket) { CZTop::Socket::CLIENT.new(endpoint) }

it 'does not convert thread-safe sockets' do
assert_raises ArgumentError do
Async::IO.try_convert server_socket
end

assert_raises ArgumentError do
Async::IO.try_convert client_socket
end
end
end
end
4 changes: 4 additions & 0 deletions spec/cztop/zsock_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,12 @@
describe '#sndtimeo' do
it 'sets and gets send timeout' do
assert_equal(-1, options.sndtimeo)

options.sndtimeo = 7
assert_equal 7, options.sndtimeo

options.sndtimeo = 0
assert_equal 0, options.sndtimeo
end
end

Expand Down

0 comments on commit 2841212

Please sign in to comment.