diff --git a/README.md b/README.md index fb5134a..fb8de3d 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ More information in the [API documentation](http://www.rubydoc.info/github/paddo ### Features +* compatible with [Async](https://github.com/socketry/async) / [Async::IO](https://github.com/socketry/async-io) * Ruby-like API * method names * sending a message via a socket is done with `Socket#<<` diff --git a/examples/async/.gitignore b/examples/async/.gitignore new file mode 100644 index 0000000..66f8ed3 --- /dev/null +++ b/examples/async/.gitignore @@ -0,0 +1 @@ +/Gemfile.lock diff --git a/examples/async/Gemfile b/examples/async/Gemfile new file mode 100644 index 0000000..77c6e0c --- /dev/null +++ b/examples/async/Gemfile @@ -0,0 +1,5 @@ +source 'https://rubygems.org' + +gem 'async' +gem 'cztop', path: '../../' +gem 'async-io' diff --git a/examples/async/README.md b/examples/async/README.md new file mode 100644 index 0000000..d824508 --- /dev/null +++ b/examples/async/README.md @@ -0,0 +1 @@ +Run: `bundle exec ./async.rb` diff --git a/examples/async/async.rb b/examples/async/async.rb new file mode 100755 index 0000000..24e43d5 --- /dev/null +++ b/examples/async/async.rb @@ -0,0 +1,40 @@ +#! /usr/bin/env ruby + +require 'cztop/async' + +Async do |task| + task.async do |t| + socket = CZTop::Socket::REP.new("ipc:///tmp/req_rep_example") + + # Simply echo every message, with every frame String#upcase'd. + socket.options.rcvtimeo = 3 + io = Async::IO.try_convert socket + + msg = io.receive + puts "<<< #{msg.to_a.inspect}" + io << msg.to_a.map(&:upcase) + + puts "REP done." + end + + task.async do + socket = CZTop::Socket::REQ.new("ipc:///tmp/req_rep_example") + puts ">>> Socket connected." + + io = Async::IO.try_convert socket + # sleep 5 + io << "foobar" + + socket.options.rcvtimeo = 3 + msg = io.receive + puts ">>> #{msg.to_a.inspect}" + puts "REQ done." + end + + task.async do + 6.times do + sleep 0.5 + puts "tick" + end + end +end diff --git a/lib/cztop/async.rb b/lib/cztop/async.rb new file mode 100644 index 0000000..1b26f77 --- /dev/null +++ b/lib/cztop/async.rb @@ -0,0 +1,84 @@ +# frozen_string_literal: true + +require 'cztop' +require 'async/io' + +module Async + module IO + class CZTopSocket < Generic + CZTop::Socket::Types.constants.each do |name| + wraps ::CZTop::Socket.const_get(name) + end + + + def receive + wait_readable + @io.receive + end + + + def <<(...) + wait_writable + @io.<<(...) + end + + + def wait_readable(timeout = read_timeout) + puts "Async::IO::CZTopSocket#wait_readable: waiting with timeout=#{timeout}" + @io_fd ||= ::IO.for_fd @io.fd, autoclose: false + + if timeout + timeout_at = now + timeout + + while true + @io_fd.wait_readable(timeout) + break if @io.readable? + raise TimeoutError if now >= timeout_at + end + else + @io_fd.wait_readable until @io.readable? + end + end + + + def wait_writable(timeout = write_timeout) + puts "Async::IO::CZTopSocket#wait_writable: waiting with timeout=#{timeout}" + @io_fd ||= ::IO.for_fd @io.fd, autoclose: false + + if timeout + timeout_at = now + timeout + + while true + @io_fd.wait_writable(timeout) + break if @io.writable? + raise TimeoutError if now >= timeout_at + end + else + @io_fd.wait_writable until @io.writable? + end + end + + + def read_timeout + timeout = @io.options.rcvtimeo + timeout = nil if timeout <= 0 + timeout + end + + + def write_timeout + timeout = @io.options.sndtimeo + timeout = nil if timeout <= 0 + timeout + end + + + private + + + def now + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + end +end