Skip to content

Commit

Permalink
refactor and drop dependency on Async::IO
Browse files Browse the repository at this point in the history
  • Loading branch information
paddor committed Jan 5, 2024
1 parent 42579d4 commit e07d99f
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 349 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
next
-----
* refactor to make code Fiber Scheduler agnostic
* remove Async::IO::CZTopSocket

1.1.1 (1/4/2024)
-----
* speed up Async::IO#wait_readable and #wait_writable
Expand Down
30 changes: 15 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,22 @@ mechanisms (like CURVE).

## Example with Async

See [this example](https://github.com/paddor/cztop/blob/master/examples/async/):
See [this example](https://github.com/paddor/cztop/blob/master/examples/async.rb):

```ruby
#! /usr/bin/env ruby

require 'cztop/async'
require 'cztop'

Async do |task|
task.async do |t|
socket = CZTop::Socket::REP.new("inproc://req_rep_example")
io = Async::IO.try_convert socket

socket.options.rcvtimeo = 50 # ms

loop do
msg = io.receive
msg = socket.receive
puts "<<< #{msg.to_a.inspect}"
io << msg.to_a.map(&:upcase)
socket << msg.to_a.map(&:upcase)
rescue IO::TimeoutError
break
end
Expand All @@ -40,11 +38,10 @@ Async do |task|

task.async do
socket = CZTop::Socket::REQ.new("inproc://req_rep_example")
io = Async::IO.try_convert socket

10.times do |i|
io << "foobar ##{i}"
msg = io.receive
socket << "foobar ##{i}"
msg = socket.receive
puts ">>> #{msg.to_a.inspect}"
end

Expand All @@ -56,9 +53,8 @@ end

Output:
```
$ cd examples/async
$ bundle
$ /bin/time bundle exec ./async.rb
$ cd examples
$ time ./async.rb
<<< ["foobar #0"]
>>> ["FOOBAR #0"]
<<< ["foobar #1"]
Expand All @@ -81,16 +77,20 @@ $ /bin/time bundle exec ./async.rb
>>> ["FOOBAR #9"]
REQ done.
REP done.
0.46user 0.09system 0:00.60elapsed 90%CPU (0avgtext+0avgdata 47296maxresident)k
0inputs+0outputs (0major+13669minor)pagefaults 0swaps
________________________________________________________
Executed in 401.51 millis fish external
usr time 308.44 millis 605.00 micros 307.83 millis
sys time 40.08 millis 278.00 micros 39.81 millis
```

## Overview

### Features

* Ruby idiomatic API
* compatible with [Async](https://github.com/socketry/async) / [Async::IO](https://github.com/socketry/async-io)
* Fiber Scheduler aware
* errors as exceptions
* CURVE security
* supports CZMQ DRAFT API
Expand Down
1 change: 0 additions & 1 deletion cztop.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,5 @@ Gem::Specification.new do |spec|

if RUBY_VERSION >= '3.1'
spec.add_development_dependency "async", ">= 2.0.1"
spec.add_development_dependency "async-io"
end
end
1 change: 0 additions & 1 deletion examples/async/.gitignore

This file was deleted.

1 change: 0 additions & 1 deletion examples/async/README.md

This file was deleted.

44 changes: 0 additions & 44 deletions examples/async/async.rb

This file was deleted.

124 changes: 0 additions & 124 deletions lib/cztop/async.rb

This file was deleted.

4 changes: 4 additions & 0 deletions lib/cztop/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def empty?
# returns with failure. Please report as bug.
#
def send_to(destination)
destination.wait_writable if Fiber.scheduler

rc = Zmsg.send(ffi_delegate, destination)
return if rc.zero?

Expand All @@ -79,6 +81,8 @@ def send_to(destination)
# @raise [SystemCallError] for any other error code set after +zmsg_recv+
# returns with failure. Please report as bug.
def self.receive_from(source)
source.wait_readable if Fiber.scheduler

delegate = Zmsg.recv(source)
return from_ffi_delegate(delegate) unless delegate.null?

Expand Down
75 changes: 75 additions & 0 deletions lib/cztop/send_receive_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,80 @@ def receive
Message.receive_from(self)
end


# Waits for socket to become readable.
def wait_readable(timeout = read_timeout)
return true if readable?

@fd_io ||= to_io

if timeout
timeout_at = now + timeout

while true
@fd_io.wait_readable(timeout)
break if readable? # NOTE: ZMQ FD can't be trusted 100%
raise ::IO::TimeoutError if now >= timeout_at
end
else
@fd_io.wait_readable until readable?
end
end


# Waits for socket to become writable.
def wait_writable(timeout = write_timeout)
return true if writable?

@fd_io ||= to_io

if timeout
timeout_at = now + timeout

while true
@fd_io.wait_writable(timeout)
break if writable? # NOTE: ZMQ FD can't be trusted 100%
raise ::IO::TimeoutError if now >= timeout_at
end
else
@fd_io.wait_writable until writable?
end
end


# @return [Float, nil] the timeout in seconds used by {IO#wait_readable}
def read_timeout
timeout = options.rcvtimeo

if timeout <= 0
timeout = nil
else
timeout = timeout.to_f / 1000
end

timeout
end


# @return [Float, nil] the timeout in seconds used by {IO#wait_writable}
def write_timeout
timeout = options.sndtimeo

if timeout <= 0
timeout = nil
else
timeout = timeout.to_f / 1000
end

timeout
end


private


def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
6 changes: 6 additions & 0 deletions lib/cztop/zsock_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ def fd
end


# @return [IO] IO for FD
def to_io
IO.for_fd fd, autoclose: false
end


# Used to access the options of a {Socket} or {Actor}.
class OptionsAccessor

Expand Down
Loading

0 comments on commit e07d99f

Please sign in to comment.