Skip to content

Commit

Permalink
add Async::IO wrapper (see new example)
Browse files Browse the repository at this point in the history
  • Loading branch information
paddor committed Dec 30, 2023
1 parent f8d3c7d commit 47f3eac
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ More information in the [API documentation](http://www.rubydoc.info/github/paddo

### Features

* compatible with [Async](https://github.com/socketry/async) / [Async::IO](https://github.com/socketry/async-io)
* Ruby-like API
* method names
* sending a message via a socket is done with `Socket#<<`
Expand Down
1 change: 1 addition & 0 deletions examples/async/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/Gemfile.lock
5 changes: 5 additions & 0 deletions examples/async/Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
source 'https://rubygems.org'

gem 'async'
gem 'cztop', path: '../../'
gem 'async-io'
1 change: 1 addition & 0 deletions examples/async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Run: `bundle exec ./async.rb`
40 changes: 40 additions & 0 deletions examples/async/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#! /usr/bin/env ruby

require 'cztop/async'

Async do |task|
task.async do |t|
socket = CZTop::Socket::REP.new("ipc:///tmp/req_rep_example")

# Simply echo every message, with every frame String#upcase'd.
socket.options.rcvtimeo = 3
io = Async::IO.try_convert socket

msg = io.receive
puts "<<< #{msg.to_a.inspect}"
io << msg.to_a.map(&:upcase)

puts "REP done."
end

task.async do
socket = CZTop::Socket::REQ.new("ipc:///tmp/req_rep_example")
puts ">>> Socket connected."

io = Async::IO.try_convert socket
# sleep 5
io << "foobar"

socket.options.rcvtimeo = 3
msg = io.receive
puts ">>> #{msg.to_a.inspect}"
puts "REQ done."
end

task.async do
6.times do
sleep 0.5
puts "tick"
end
end
end
84 changes: 84 additions & 0 deletions lib/cztop/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# frozen_string_literal: true

require 'cztop'
require 'async/io'

module Async
module IO
class CZTopSocket < Generic
CZTop::Socket::Types.constants.each do |name|
wraps ::CZTop::Socket.const_get(name)
end


def receive
wait_readable
@io.receive
end


def <<(...)
wait_writable
@io.<<(...)
end


def wait_readable(timeout = read_timeout)
puts "Async::IO::CZTopSocket#wait_readable: waiting with timeout=#{timeout}"
@io_fd ||= ::IO.for_fd @io.fd, autoclose: false

if timeout
timeout_at = now + timeout

while true
@io_fd.wait_readable(timeout)
break if @io.readable?
raise TimeoutError if now >= timeout_at
end
else
@io_fd.wait_readable until @io.readable?
end
end


def wait_writable(timeout = write_timeout)
puts "Async::IO::CZTopSocket#wait_writable: waiting with timeout=#{timeout}"
@io_fd ||= ::IO.for_fd @io.fd, autoclose: false

if timeout
timeout_at = now + timeout

while true
@io_fd.wait_writable(timeout)
break if @io.writable?
raise TimeoutError if now >= timeout_at
end
else
@io_fd.wait_writable until @io.writable?
end
end


def read_timeout
timeout = @io.options.rcvtimeo
timeout = nil if timeout <= 0
timeout
end


def write_timeout
timeout = @io.options.sndtimeo
timeout = nil if timeout <= 0
timeout
end


private


def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
end

0 comments on commit 47f3eac

Please sign in to comment.