Skip to content

Commit

Permalink
SendReceiveMethods#wait_readable/#wait_writable: honor small timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
paddor committed Jul 12, 2024
1 parent ac3f4b1 commit 0859dc9
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 73 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
next
-----
* CZTop::Socket#wait_readable: honor small (< 0.5s) timeouts
* CZTop::Socket#wait_writable: honor small (< 0.5s) timeouts

1.2.6 (7/12/2024)
-----
* add missing `require 'io/wait'` to get `IO#wait_readable`
Expand Down
24 changes: 19 additions & 5 deletions lib/cztop/send_receive_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,29 @@ def receive


# Because ZMQ sockets are edge-triggered, there's a small chance that we miss an edge (race condition). To avoid
# blocking forever, all waiting on the ZMQ FD is done with this timeout.
# blocking forever, all waiting on the ZMQ FD is done with this timeout or less.
#
# The race condition exists between the calls to {#readable?}/{#writable?} and waiting for the ZMQ FD. If the
# socke becomes readable/writable during that time, waiting for the FD could block forever without a timeout.
#
FD_TIMEOUT = 0.5


# @note Only available on Ruby >= 3.2
#
def wait_for_fd_signal
def wait_for_fd_signal(timeout = nil)
@fd_io ||= to_io
@fd_io.wait_readable FD_TIMEOUT # NOTE: always wait for readability on ZMQ FD

if timeout
if timeout > FD_TIMEOUT
timeout = FD_TIMEOUT
end
else
timeout = FD_TIMEOUT
end

# NOTE: always wait for readability on ZMQ FD
@fd_io.wait_readable timeout
end if IO.method_defined?(:wait_readable)


Expand All @@ -80,7 +94,7 @@ def wait_readable(timeout = read_timeout)
while true
# p wait_readable: self, timeout: timeout

wait_for_fd_signal
wait_for_fd_signal timeout
break if readable? # NOTE: ZMQ FD can't be trusted
raise ::IO::TimeoutError if timeout_at && now >= timeout_at

Expand All @@ -107,7 +121,7 @@ def wait_writable(timeout = write_timeout)
while true
# p wait_writable: self, timeout: timeout

wait_for_fd_signal
wait_for_fd_signal timeout
break if writable? # NOTE: ZMQ FD can't be trusted
raise ::IO::TimeoutError if timeout_at && now >= timeout_at

Expand Down
253 changes: 185 additions & 68 deletions spec/cztop/send_receive_methods_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,89 @@
end
end


describe '#read_timeout' do
let(:req) { CZTop::Socket::REQ.new }

describe 'with no rcvtimeout set' do
before do
assert_equal(-1, req.options.rcvtimeo)
end

it 'returns nil' do
assert_nil req.read_timeout
end
end

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with no rcvtimeout=0' do
before do
req.options.rcvtimeo = 0
end

it 'returns nil' do
assert_nil req.read_timeout
end
end

describe 'with rcvtimeout set' do
before do
req.options.rcvtimeo = 10 # ms
end

it 'returns timeout in seconds' do
assert_equal 0.01, req.read_timeout
end
end
end


describe '#write_timeout' do
let(:req) { CZTop::Socket::REQ.new }

describe 'with no sndtimeout set' do
before do
assert_equal(-1, req.options.sndtimeo)
end

it 'returns nil' do
assert_nil req.write_timeout
end
end

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with sndtimeout=0' do
before do
req.options.sndtimeo = 0
end

it 'returns nil' do
assert_nil req.write_timeout
end
end

describe 'with sndtimeout set' do
before do
req.options.sndtimeo = 10
end

it 'returns timeout in seconds' do
assert_equal 0.01, req.write_timeout
end
end
end

describe 'Async with Fiber Scheduler' do
require 'async'

i = 0
let(:endpoint) { "inproc://async_endpoint_socket_spec_reqrep_#{i += 1}" }
let(:req) { CZTop::Socket::REQ.new(endpoint) }
let(:rep) { CZTop::Socket::REP.new(endpoint) }
let!(:req) { CZTop::Socket::REQ.new(endpoint) }
let!(:rep) { CZTop::Socket::REP.new(endpoint) }


it 'can send and receive' do
Async do |task|
rep
req

sleep 0.1

task.async do |task|
msg = rep.receive
word, = msg.to_a
Expand All @@ -74,109 +141,159 @@

describe '#wait_readable' do
context 'if readable' do
it 'returns true'
around do |ex|
Async do
req << 'foo'
sleep 0.01 until rep.readable?
ex.run
end
end

it 'returns true' do
expect(rep).not_to receive(:wait_for_fd_signal)
assert_equal true, rep.wait_readable
end
end

context 'if not readable' do
it 'waits'
it 'waits' do
Async do |task|
expect(rep).to receive(:wait_for_fd_signal).and_call_original

it 'does not trust FD'
task.async do
sleep 0.05
req << 'bar'
end

context 'when not timed out' do
it 'returns true'
assert rep.wait_readable
end
end

context 'when timed out' do
it 'raises IO::TimeoutError'
it 'raises IO::TimeoutError' do
Async do |task|
t0 = Time.now

assert_raises IO::TimeoutError do
rep.wait_readable 0.05
end

t1 = Time.now
assert_in_delta 0.05, t1 - t0, 0.02
end
end
end
end
end


describe '#wait_writable' do
context 'if writable' do
it 'returns true'
end

context 'if not writable' do
it 'waits'

it 'does not trust FD'

context 'when not timed out' do
it 'returns true'
around do |ex|
Async do
sleep 0.01 until req.writable?
ex.run
end
end

context 'when timed out' do
it 'raises IO::TimeoutError'
it 'returns true' do
expect(rep).not_to receive(:wait_for_fd_signal)
assert_equal true, req.wait_writable
end
end
end


describe '#read_timeout' do
describe 'with no rcvtimeout set' do
context 'if not writable' do
before do
assert_equal(-1, req.options.rcvtimeo)
refute_operator rep, :writable?
end

it 'returns nil' do
assert_nil req.read_timeout
end
end
it 'waits' do
expect(rep).to receive(:wait_for_fd_signal) { fail }

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with no rcvtimeout=0' do
before do
req.options.rcvtimeo = 0
assert_raises StandardError do
rep.wait_writable
end
end

it 'returns nil' do
assert_nil req.read_timeout
context 'when not timed out' do
it 'returns true' do
Async do |task|
task.async do
sleep 0.05
req << 'bar'
end

task.async do
rep.receive
end

t0 = Time.now
assert rep.wait_writable
t1 = Time.now

assert_in_delta 0.05, t1 - t0, 0.02
end
end
end
end

describe 'with rcvtimeout set' do
before do
req.options.rcvtimeo = 10 # ms
end
context 'when timed out' do
it 'raises IO::TimeoutError' do
Async do |task|
t0 = Time.now

assert_raises IO::TimeoutError do
rep.wait_writable 0.05
end

it 'returns timeout in seconds' do
assert_equal 0.01, req.read_timeout
t1 = Time.now
assert_in_delta 0.05, t1 - t0, 0.02
end
end
end
end
end


describe '#write_timeout' do
describe 'with no sndtimeout set' do
before do
assert_equal(-1, req.options.sndtimeo)
end
describe '#wait_for_fd_signal' do
let(:req) { CZTop::Socket::REQ.new }
let(:io) { instance_spy ::IO }

it 'returns nil' do
assert_nil req.write_timeout
end
before do
allow(req).to receive(:to_io) { io }
end

# NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async
describe 'with sndtimeout=0' do
before do
req.options.sndtimeo = 0
end
it 'waits for readability on ZMQ FD' do
expect(io).to receive(:wait_readable)
req.wait_for_fd_signal
end

it 'returns nil' do
assert_nil req.write_timeout
it 'memoizes IO object' do
expect(req).to receive(:to_io).once
req.wait_for_fd_signal
req.wait_for_fd_signal
req.wait_for_fd_signal
end

context 'with small timeout' do
it 'uses that timeout' do
expect(io).to receive(:wait_readable).with(0.05)
req.wait_for_fd_signal 0.05
end
end

describe 'with sndtimeout set' do
before do
req.options.sndtimeo = 10
context 'with large timeout' do
it 'uses reasonably small timeout' do
expect(io).to receive(:wait_readable) do |timeout|
assert timeout < 1.0
end
req.wait_for_fd_signal 10
end
end

it 'returns timeout in seconds' do
assert_equal 0.01, req.write_timeout
context 'with no timeout' do
it 'still uses a timeout' do
expect(io).to receive(:wait_readable).with(Numeric)
req.wait_for_fd_signal
end
end
end
Expand Down Expand Up @@ -212,5 +329,5 @@
end
end
end
end
end if IO.method_defined?(:wait_readable)
end

0 comments on commit 0859dc9

Please sign in to comment.