diff --git a/cztop.gemspec b/cztop.gemspec index 98f0839..06287e3 100644 --- a/cztop.gemspec +++ b/cztop.gemspec @@ -32,4 +32,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency "pry" spec.add_development_dependency "yard" spec.add_development_dependency "rubocop", "~> 1.36.0" + spec.add_development_dependency "async", ">= 2.0.1" + spec.add_development_dependency "async-io" end diff --git a/lib/cztop/async.rb b/lib/cztop/async.rb index c3c133a..1138a00 100644 --- a/lib/cztop/async.rb +++ b/lib/cztop/async.rb @@ -5,6 +5,18 @@ module Async module IO + + # Wrapper for CZTop sockets. + # + # @example + # Async do |task| + # socket = CZTop::Socket::REP.new("ipc:///tmp/req_rep_example") + # socket.options.rcvtimeo = 3 + # io = Async::IO.try_convert socket + # msg = io.receive + # io << msg.to_a.map(&:upcase) + # end + class CZTopSocket < Generic wraps ::CZTop::Socket::REQ wraps ::CZTop::Socket::REP @@ -19,20 +31,22 @@ class CZTopSocket < Generic wraps ::CZTop::Socket::XSUB + # @see {CZTop::SendReceiveMethods#receive} def receive wait_readable @io.receive end + # @see {CZTop::SendReceiveMethods#<<} def <<(...) wait_writable @io.<<(...) end + # Waits for socket to become readable. def wait_readable(timeout = read_timeout) - puts "Async::IO::CZTopSocket#wait_readable: waiting with timeout=#{timeout}" @io_fd ||= ::IO.for_fd @io.fd, autoclose: false if timeout @@ -41,7 +55,7 @@ def wait_readable(timeout = read_timeout) while true @io_fd.wait_readable(timeout) break if @io.readable? - raise TimeoutError if now >= timeout_at + raise ::IO::TimeoutError if now >= timeout_at end else @io_fd.wait_readable until @io.readable? @@ -49,8 +63,8 @@ def wait_readable(timeout = read_timeout) end + # Waits for socket to become writable. def wait_writable(timeout = write_timeout) - puts "Async::IO::CZTopSocket#wait_writable: waiting with timeout=#{timeout}" @io_fd ||= ::IO.for_fd @io.fd, autoclose: false if timeout @@ -59,7 +73,7 @@ def wait_writable(timeout = write_timeout) while true @io_fd.wait_writable(timeout) break if @io.writable? - raise TimeoutError if now >= timeout_at + raise ::IO::TimeoutError if now >= timeout_at end else @io_fd.wait_writable until @io.writable? @@ -67,16 +81,30 @@ def wait_writable(timeout = write_timeout) end + # @return [Float, nil] the timeout in seconds used by {IO#wait_readable} def read_timeout timeout = @io.options.rcvtimeo - timeout = nil if timeout <= 0 + + if timeout <= 0 + timeout = nil + else + timeout = timeout.to_f / 1000 + end + timeout end + # @return [Float, nil] the timeout in seconds used by {IO#wait_writable} def write_timeout timeout = @io.options.sndtimeo - timeout = nil if timeout <= 0 + + if timeout <= 0 + timeout = nil + else + timeout = timeout.to_f / 1000 + end + timeout end diff --git a/lib/cztop/zsock_options.rb b/lib/cztop/zsock_options.rb index 16f005c..c5fcf6b 100644 --- a/lib/cztop/zsock_options.rb +++ b/lib/cztop/zsock_options.rb @@ -264,29 +264,33 @@ def PLAIN_password=(password) # @!group Send and Receive Timeouts - # @return [Integer] the timeout when receiving a message + # @return [Integer] the timeout in milliseconds when receiving a message # @see Message.receive_from + # @note -1 means infinite, 0 means nonblocking def rcvtimeo Zsock.rcvtimeo(@zocket) end - # @param timeout [Integer] new timeout + # @param timeout [Integer] new timeout in milliseconds # @see Message.receive_from + # @note -1 means infinite, 0 means nonblocking def rcvtimeo=(timeout) Zsock.set_rcvtimeo(@zocket, timeout) end - # @return [Integer] the timeout when sending a message + # @return [Integer] the timeout in milliseconds when sending a message # @see Message#send_to + # @note -1 means infinite, 0 means nonblocking def sndtimeo Zsock.sndtimeo(@zocket) end - # @param timeout [Integer] new timeout + # @param timeout [Integer] new timeout in milliseconds # @see Message#send_to + # @note -1 means infinite, 0 means nonblocking def sndtimeo=(timeout) Zsock.set_sndtimeo(@zocket, timeout) end diff --git a/spec/cztop/async_spec.rb b/spec/cztop/async_spec.rb new file mode 100644 index 0000000..fb315a5 --- /dev/null +++ b/spec/cztop/async_spec.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true + +require_relative 'spec_helper' +require_relative '../../lib/cztop/async' + +describe Async::IO::CZTopSocket do + i = 0 + let(:endpoint) { "inproc://async_endpoint_socket_spec_reqrep_#{i += 1}" } + let(:req_socket) { CZTop::Socket::REQ.new(endpoint) } + let(:rep_socket) { CZTop::Socket::REP.new(endpoint) } + let(:req_io) { Async::IO.try_convert req_socket } + let(:rep_io) { Async::IO.try_convert rep_socket } + + + it 'can be converted to Async::IO' do + assert_kind_of Async::IO::CZTopSocket, req_io + assert_kind_of Async::IO::CZTopSocket, rep_io + end + + it 'can send and receive' do + Async do |task| + rep_socket + req_socket + + sleep 0.1 + req_io = Async::IO.try_convert req_socket + rep_io = Async::IO.try_convert rep_socket + + task.async do |task| + msg = rep_io.receive + word, = msg.to_a + rep_io << word.upcase + end + + task.async do |task| + req_io << 'hello' + response, = req_io.receive.to_a + # p response: response + assert_equal 'HELLO', response + end + end + end + + + describe '#read_timeout' do + describe 'with no rcvtimeout set' do + before do + assert_equal -1, req_socket.options.rcvtimeo + end + + it 'returns nil' do + assert_nil req_io.read_timeout + end + end + + # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async + describe 'with no rcvtimeout=0' do + before do + req_socket.options.rcvtimeo = 0 + end + + it 'returns nil' do + assert_nil req_io.read_timeout + end + end + + describe 'with rcvtimeout set' do + before do + req_socket.options.rcvtimeo = 10 # ms + end + + it 'returns timeout in seconds' do + assert_equal 0.01, req_io.read_timeout + end + end + end + + + describe '#write_timeout' do + describe 'with no sndtimeout set' do + before do + assert_equal -1, req_socket.options.sndtimeo + end + + it 'returns nil' do + assert_nil req_io.write_timeout + end + end + + # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async + describe 'with sndtimeout=0' do + before do + req_socket.options.sndtimeo = 0 + end + + it 'returns nil' do + assert_nil req_io.write_timeout + end + end + + describe 'with sndtimeout set' do + before do + req_socket.options.sndtimeo = 10 + end + + it 'returns timeout in seconds' do + assert_equal 0.01, req_io.write_timeout + end + end + end + + + context 'with rcvtimeo' do + before do + req_socket.options.rcvtimeo = 30 + assert_equal 30, req_socket.options.rcvtimeo + end + + it 'will raise TimeoutError' do + Async do + assert_raises ::IO::TimeoutError do + req_io.receive + end + end + end + end + + + context 'with sndtimeo' do + before do + rep_socket.options.sndtimeo = 30 + assert_equal 30, rep_socket.options.sndtimeo + end + + it 'will raise TimeoutError' do + Async do + assert_raises ::IO::TimeoutError do + rep_io << ['foo'] + end + end + end + end + + + describe 'thread-safe sockets', if: has_czmq_drafts? do + let(:endpoint) { "inproc://async_endpoint_socket_spec_serverclient_#{i += 1}" } + let(:server_socket) { CZTop::Socket::SERVER.new(endpoint) } + let(:client_socket) { CZTop::Socket::CLIENT.new(endpoint) } + + it 'does not convert thread-safe sockets' do + assert_raises ArgumentError do + Async::IO.try_convert server_socket + end + + assert_raises ArgumentError do + Async::IO.try_convert client_socket + end + end + end +end diff --git a/spec/cztop/zsock_options_spec.rb b/spec/cztop/zsock_options_spec.rb index 2ae5eee..9971b5a 100644 --- a/spec/cztop/zsock_options_spec.rb +++ b/spec/cztop/zsock_options_spec.rb @@ -245,8 +245,12 @@ describe '#sndtimeo' do it 'sets and gets send timeout' do assert_equal(-1, options.sndtimeo) + options.sndtimeo = 7 assert_equal 7, options.sndtimeo + + options.sndtimeo = 0 + assert_equal 0, options.sndtimeo end end