From 0859dc9bdb05cdfec5263238c44d7c40fd0edb4a Mon Sep 17 00:00:00 2001 From: Patrik Wenger Date: Fri, 12 Jul 2024 21:39:47 +0200 Subject: [PATCH] SendReceiveMethods#wait_readable/#wait_writable: honor small timeouts --- CHANGES.md | 5 + lib/cztop/send_receive_methods.rb | 24 ++- spec/cztop/send_receive_methods_spec.rb | 253 +++++++++++++++++------- 3 files changed, 209 insertions(+), 73 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8e6db76..6fe3157 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,8 @@ +next +----- +* CZTop::Socket#wait_readable: honor small (< 0.5s) timeouts +* CZTop::Socket#wait_writable: honor small (< 0.5s) timeouts + 1.2.6 (7/12/2024) ----- * add missing `require 'io/wait'` to get `IO#wait_readable` diff --git a/lib/cztop/send_receive_methods.rb b/lib/cztop/send_receive_methods.rb index 4a2fa96..73fee10 100644 --- a/lib/cztop/send_receive_methods.rb +++ b/lib/cztop/send_receive_methods.rb @@ -48,15 +48,29 @@ def receive # Because ZMQ sockets are edge-triggered, there's a small chance that we miss an edge (race condition). To avoid - # blocking forever, all waiting on the ZMQ FD is done with this timeout. + # blocking forever, all waiting on the ZMQ FD is done with this timeout or less. + # + # The race condition exists between the calls to {#readable?}/{#writable?} and waiting for the ZMQ FD. If the + # socke becomes readable/writable during that time, waiting for the FD could block forever without a timeout. + # FD_TIMEOUT = 0.5 # @note Only available on Ruby >= 3.2 # - def wait_for_fd_signal + def wait_for_fd_signal(timeout = nil) @fd_io ||= to_io - @fd_io.wait_readable FD_TIMEOUT # NOTE: always wait for readability on ZMQ FD + + if timeout + if timeout > FD_TIMEOUT + timeout = FD_TIMEOUT + end + else + timeout = FD_TIMEOUT + end + + # NOTE: always wait for readability on ZMQ FD + @fd_io.wait_readable timeout end if IO.method_defined?(:wait_readable) @@ -80,7 +94,7 @@ def wait_readable(timeout = read_timeout) while true # p wait_readable: self, timeout: timeout - wait_for_fd_signal + wait_for_fd_signal timeout break if readable? # NOTE: ZMQ FD can't be trusted raise ::IO::TimeoutError if timeout_at && now >= timeout_at @@ -107,7 +121,7 @@ def wait_writable(timeout = write_timeout) while true # p wait_writable: self, timeout: timeout - wait_for_fd_signal + wait_for_fd_signal timeout break if writable? # NOTE: ZMQ FD can't be trusted raise ::IO::TimeoutError if timeout_at && now >= timeout_at diff --git a/spec/cztop/send_receive_methods_spec.rb b/spec/cztop/send_receive_methods_spec.rb index d9f0806..2b234c1 100644 --- a/spec/cztop/send_receive_methods_spec.rb +++ b/spec/cztop/send_receive_methods_spec.rb @@ -40,22 +40,89 @@ end end + + describe '#read_timeout' do + let(:req) { CZTop::Socket::REQ.new } + + describe 'with no rcvtimeout set' do + before do + assert_equal(-1, req.options.rcvtimeo) + end + + it 'returns nil' do + assert_nil req.read_timeout + end + end + + # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async + describe 'with no rcvtimeout=0' do + before do + req.options.rcvtimeo = 0 + end + + it 'returns nil' do + assert_nil req.read_timeout + end + end + + describe 'with rcvtimeout set' do + before do + req.options.rcvtimeo = 10 # ms + end + + it 'returns timeout in seconds' do + assert_equal 0.01, req.read_timeout + end + end + end + + + describe '#write_timeout' do + let(:req) { CZTop::Socket::REQ.new } + + describe 'with no sndtimeout set' do + before do + assert_equal(-1, req.options.sndtimeo) + end + + it 'returns nil' do + assert_nil req.write_timeout + end + end + + # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async + describe 'with sndtimeout=0' do + before do + req.options.sndtimeo = 0 + end + + it 'returns nil' do + assert_nil req.write_timeout + end + end + + describe 'with sndtimeout set' do + before do + req.options.sndtimeo = 10 + end + + it 'returns timeout in seconds' do + assert_equal 0.01, req.write_timeout + end + end + end + describe 'Async with Fiber Scheduler' do require 'async' i = 0 let(:endpoint) { "inproc://async_endpoint_socket_spec_reqrep_#{i += 1}" } - let(:req) { CZTop::Socket::REQ.new(endpoint) } - let(:rep) { CZTop::Socket::REP.new(endpoint) } + let!(:req) { CZTop::Socket::REQ.new(endpoint) } + let!(:rep) { CZTop::Socket::REP.new(endpoint) } it 'can send and receive' do Async do |task| - rep - req - - sleep 0.1 - task.async do |task| msg = rep.receive word, = msg.to_a @@ -74,20 +141,47 @@ describe '#wait_readable' do context 'if readable' do - it 'returns true' + around do |ex| + Async do + req << 'foo' + sleep 0.01 until rep.readable? + ex.run + end + end + + it 'returns true' do + expect(rep).not_to receive(:wait_for_fd_signal) + assert_equal true, rep.wait_readable + end end context 'if not readable' do - it 'waits' + it 'waits' do + Async do |task| + expect(rep).to receive(:wait_for_fd_signal).and_call_original - it 'does not trust FD' + task.async do + sleep 0.05 + req << 'bar' + end - context 'when not timed out' do - it 'returns true' + assert rep.wait_readable + end end context 'when timed out' do - it 'raises IO::TimeoutError' + it 'raises IO::TimeoutError' do + Async do |task| + t0 = Time.now + + assert_raises IO::TimeoutError do + rep.wait_readable 0.05 + end + + t1 = Time.now + assert_in_delta 0.05, t1 - t0, 0.02 + end + end end end end @@ -95,88 +189,111 @@ describe '#wait_writable' do context 'if writable' do - it 'returns true' - end - - context 'if not writable' do - it 'waits' - - it 'does not trust FD' - - context 'when not timed out' do - it 'returns true' + around do |ex| + Async do + sleep 0.01 until req.writable? + ex.run + end end - context 'when timed out' do - it 'raises IO::TimeoutError' + it 'returns true' do + expect(rep).not_to receive(:wait_for_fd_signal) + assert_equal true, req.wait_writable end end - end - - describe '#read_timeout' do - describe 'with no rcvtimeout set' do + context 'if not writable' do before do - assert_equal(-1, req.options.rcvtimeo) + refute_operator rep, :writable? end - it 'returns nil' do - assert_nil req.read_timeout - end - end + it 'waits' do + expect(rep).to receive(:wait_for_fd_signal) { fail } - # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async - describe 'with no rcvtimeout=0' do - before do - req.options.rcvtimeo = 0 + assert_raises StandardError do + rep.wait_writable + end end - it 'returns nil' do - assert_nil req.read_timeout + context 'when not timed out' do + it 'returns true' do + Async do |task| + task.async do + sleep 0.05 + req << 'bar' + end + + task.async do + rep.receive + end + + t0 = Time.now + assert rep.wait_writable + t1 = Time.now + + assert_in_delta 0.05, t1 - t0, 0.02 + end + end end - end - describe 'with rcvtimeout set' do - before do - req.options.rcvtimeo = 10 # ms - end + context 'when timed out' do + it 'raises IO::TimeoutError' do + Async do |task| + t0 = Time.now + + assert_raises IO::TimeoutError do + rep.wait_writable 0.05 + end - it 'returns timeout in seconds' do - assert_equal 0.01, req.read_timeout + t1 = Time.now + assert_in_delta 0.05, t1 - t0, 0.02 + end + end end end end - describe '#write_timeout' do - describe 'with no sndtimeout set' do - before do - assert_equal(-1, req.options.sndtimeo) - end + describe '#wait_for_fd_signal' do + let(:req) { CZTop::Socket::REQ.new } + let(:io) { instance_spy ::IO } - it 'returns nil' do - assert_nil req.write_timeout - end + before do + allow(req).to receive(:to_io) { io } end - # NOTE: 0 would mean non-block (EAGAIN), but that's obsolete with Async - describe 'with sndtimeout=0' do - before do - req.options.sndtimeo = 0 - end + it 'waits for readability on ZMQ FD' do + expect(io).to receive(:wait_readable) + req.wait_for_fd_signal + end - it 'returns nil' do - assert_nil req.write_timeout + it 'memoizes IO object' do + expect(req).to receive(:to_io).once + req.wait_for_fd_signal + req.wait_for_fd_signal + req.wait_for_fd_signal + end + + context 'with small timeout' do + it 'uses that timeout' do + expect(io).to receive(:wait_readable).with(0.05) + req.wait_for_fd_signal 0.05 end end - describe 'with sndtimeout set' do - before do - req.options.sndtimeo = 10 + context 'with large timeout' do + it 'uses reasonably small timeout' do + expect(io).to receive(:wait_readable) do |timeout| + assert timeout < 1.0 + end + req.wait_for_fd_signal 10 end + end - it 'returns timeout in seconds' do - assert_equal 0.01, req.write_timeout + context 'with no timeout' do + it 'still uses a timeout' do + expect(io).to receive(:wait_readable).with(Numeric) + req.wait_for_fd_signal end end end @@ -212,5 +329,5 @@ end end end - end + end if IO.method_defined?(:wait_readable) end