Skip to content

Commit

Permalink
Separate interfaces for bound and connected sockets.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 24, 2024
1 parent fa3ade6 commit 4affaa3
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 163 deletions.
86 changes: 86 additions & 0 deletions lib/io/endpoint/bound_endpoint.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require_relative 'generic'
require_relative 'composite_endpoint'
require_relative 'address_endpoint'

module IO::Endpoint
class BoundEndpoint < Generic
def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false)
sockets = endpoint.bind

sockets.each do |server|
# This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation.
begin
server.listen(backlog)
rescue Errno::EOPNOTSUPP
# Ignore.
end

server.close_on_exec = close_on_exec
end

return self.new(endpoint, sockets, **endpoint.options)
end

def initialize(endpoint, sockets, **options)
super(**options)

@endpoint = endpoint
@sockets = sockets
end

attr :endpoint
attr :sockets

# A endpoint for the local end of the bound socket.
# @returns [CompositeEndpoint] A composite endpoint for the local end of the bound socket.
def local_address_endpoint(**options)
endpoints = @sockets.map do |socket|
AddressEndpoint.new(socket.to_io.local_address, **options)
end

return CompositeEndpoint.new(endpoints)
end

# A endpoint for the remote end of the bound socket.
# @returns [CompositeEndpoint] A composite endpoint for the remote end of the bound socket.
def remote_address_endpoint(**options)
endpoints = @sockets.map do |wrapper|
AddressEndpoint.new(socket.to_io.remote_address, **options)
end

return CompositeEndpoint.new(endpoints)
end

def close
@sockets.each(&:close)
@sockets.clear
end

def to_s
"\#<#{self.class} #{@sockets.size} bound sockets for #{@endpoint}>"
end

def bind(wrapper = Wrapper.default, &block)
@sockets.each.map do |server|
if block_given?
wrapper.async do
yield server
end
else
server.dup
end
end
end
end

class Generic
def bound(**options)
BoundEndpoint.bound(self, **options)
end
end
end
1 change: 1 addition & 0 deletions lib/io/endpoint/composite_endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative 'generic'

module IO::Endpoint
# A composite endpoint is a collection of endpoints that are used in order.
class CompositeEndpoint < Generic
def initialize(endpoints, **options)
super(**options)
Expand Down
69 changes: 69 additions & 0 deletions lib/io/endpoint/connected_endpoint.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.

require_relative 'generic'
require_relative 'composite_endpoint'
require_relative 'socket_endpoint'

require 'openssl'

module IO::Endpoint
class ConnectedEndpoint < Generic
def self.connected(endpoint, close_on_exec: false)
socket = endpoint.connect

socket.close_on_exec = close_on_exec

return self.new(endpoint, socket, **endpoint.options)
end

def initialize(endpoint, socket, **options)
super(**options)

@endpoint = endpoint
@socket = socket
end

attr :endpoint
attr :socket

# A endpoint for the local end of the bound socket.
# @returns [AddressEndpoint] A endpoint for the local end of the connected socket.
def local_address_endpoint(**options)
AddressEndpoint.new(socket.to_io.local_address, **options)
end

# A endpoint for the remote end of the bound socket.
# @returns [AddressEndpoint] A endpoint for the remote end of the connected socket.
def remote_address_endpoint(**options)
AddressEndpoint.new(socket.to_io.remote_address, **options)
end

def connect(wrapper = Wrapper.default, &block)
if block_given?
yield @socket
else
return @socket.dup
end
end

def close
if @socket
@socket.close
@socket = nil
end
end

def to_s
"\#<#{self.class} #{@socket} connected for #{@endpoint}>"
end
end

class Generic
def connected(**options)
ConnectedEndpoint.connected(self, **options)
end
end
end
40 changes: 24 additions & 16 deletions lib/io/endpoint/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,32 +54,40 @@ def timeout
@options[:timeout]
end

# @return [Numeric] The default timeout for accepted sockets.
def accept_timeout
@options.fetch(:accepted_timeout)
# Bind a socket to the given address. If a block is given, the socket will be automatically closed when the block exits.
# @parameter wrapper [Wrapper] The wrapper to use for binding.
# @yields {|socket| ...} An optional block which will be passed the socket.
# @parameter socket [Socket] The socket which has been bound.
# @returns [Array(Socket)] the bound socket
def bind(wrapper = Wrapper.default, &block)
raise NotImplementedError
end

# @return [Address] the address to bind to before connecting.
def local_address
@options[:local_address]
# Connects a socket to the given address. If a block is given, the socket will be automatically closed when the block exits.
# @parameter wrapper [Wrapper] The wrapper to use for connecting.
# @return [Socket] the connected socket
def connect(wrapper = Wrapper.default, &block)
raise NotImplementedError
end

# Endpoints sometimes have multiple paths.
# @yield [Endpoint] Enumerate all discrete paths as endpoints.
# Bind and accept connections on the given address.
# @parameter wrapper [Wrapper] The wrapper to use for accepting connections.
# @yields [Socket] The accepted socket.
def accept(wrapper = Wrapper.default, &block)
bind(wrapper) do |server|
wrapper.accept(server, **@options, &block)
end
end

# Enumerate all discrete paths as endpoints.
# @yields {|endpoint| ...} A block which will be passed each endpoint.
# @parameter endpoint [Endpoint] The endpoint.
def each
return to_enum unless block_given?

yield self
end

# Accept connections from the specified endpoint.
# @param backlog [Integer] the number of connections to listen for.
def accept(wrapper = Wrapper.default, *arguments, **options, &block)
bind(wrapper, *arguments, **options) do |server|
wrapper.accept(server, **options, &block)
end
end

# Create an Endpoint instance by URI scheme. The host and port of the URI will be passed to the Endpoint factory method, along with any options.
#
# You should not use untrusted input as it may execute arbitrary code.
Expand Down
135 changes: 3 additions & 132 deletions lib/io/endpoint/shared_endpoint.rb
Original file line number Diff line number Diff line change
@@ -1,136 +1,7 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2023, by Samuel Williams.
# Copyright, 2024, by Samuel Williams.

require_relative 'generic'
require_relative 'composite_endpoint'
require_relative 'socket_endpoint'

require 'openssl'

module IO::Endpoint
# Pre-connect and pre-bind sockets so that it can be used between processes.
class SharedEndpoint < Generic
# Create a new `SharedEndpoint` by binding to the given endpoint.
def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **options)
sockets = endpoint.bind(**options)

sockets.each do |server|
# This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation.
begin
server.listen(backlog)
rescue Errno::EOPNOTSUPP
# Ignore.
end

server.close_on_exec = close_on_exec
end

return self.new(endpoint, sockets)
end

# Create a new `SharedEndpoint` by connecting to the given endpoint.
def self.connected(endpoint, close_on_exec: false)
socket = endpoint.connect

socket.close_on_exec = close_on_exec

return self.new(endpoint, [socket])
end

def initialize(endpoint, sockets, **options)
super(**options)

raise TypeError, "sockets must be an Array" unless sockets.is_a?(Array)

@endpoint = endpoint
@sockets = sockets
end

attr :endpoint
attr :sockets

def local_address_endpoint(**options)
endpoints = @sockets.map do |wrapper|
AddressEndpoint.new(wrapper.to_io.local_address, **options)
end

return CompositeEndpoint.new(endpoints)
end

def remote_address_endpoint(**options)
endpoints = @sockets.map do |wrapper|
AddressEndpoint.new(wrapper.to_io.remote_address, **options)
end

return CompositeEndpoint.new(endpoints)
end

# Close all the internal sockets.
def close
@sockets.each(&:close)
@sockets.clear
end

def each(&block)
return to_enum unless block_given?

@sockets.each do |socket|
yield SocketEndpoint.new(socket.dup)
end
end

def bind(wrapper = Wrapper.default, &block)
@sockets.each.map do |server|
server = server.dup

if block_given?
wrapper.async do
begin
yield server
ensure
server.close
end
end
else
server
end
end
end

def connect(wrapper = Wrapper.default, &block)
@sockets.each do |socket|
socket = socket.dup

return socket unless block_given?

begin
return yield(socket)
ensure
socket.close
end
end
end

def accept(wrapper = Wrapper.default, &block)
bind(wrapper) do |server|
wrapper.accept(server, &block)
end
end

def to_s
"\#<#{self.class} #{@sockets.size} descriptors for #{@endpoint}>"
end
end

class Generic
def bound(**options)
SharedEndpoint.bound(self, **options)
end

def connected(**options)
SharedEndpoint.connected(self, **options)
end
end
end
require_relative 'bound_endpoint'
require_relative 'connected_endpoint'
8 changes: 6 additions & 2 deletions lib/io/endpoint/wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def async
# @option reuse_address [Boolean] Allow this port to be bound in multiple processes.
# @option linger [Boolean] Wait for data to be sent before closing the socket.
# @option buffered [Boolean] Enable or disable Nagle's algorithm for TCP sockets.
def build(*arguments, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, buffered: false)
def build(*arguments, reuse_address: true, reuse_port: nil, linger: nil, buffered: false, **options)
socket = ::Socket.new(*arguments)

if reuse_address
Expand Down Expand Up @@ -75,8 +75,12 @@ def build(*arguments, timeout: nil, reuse_address: true, reuse_port: nil, linger
# socket = Async::IO::Socket.connect(Async::IO::Address.tcp("8.8.8.8", 53))
# @param remote_address [Address] The remote address to connect to.
# @option local_address [Address] The local address to bind to before connecting.
def connect(remote_address, local_address: nil, **options)
def connect(remote_address, local_address: nil, timeout: nil, **options)
socket = build(remote_address.afamily, remote_address.socktype, remote_address.protocol, **options) do |socket|
if timeout
set_timeout(socket, timeout)
end

if local_address
if defined?(IP_BIND_ADDRESS_NO_PORT)
# Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique.
Expand Down
Loading

0 comments on commit 4affaa3

Please sign in to comment.