From 4affaa3953ee67b0a67a47da02d74fdc397a4021 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@oriontransfer.co.nz> Date: Wed, 24 Jan 2024 22:51:51 +1300 Subject: [PATCH] Separate interfaces for bound and connected sockets. --- lib/io/endpoint/bound_endpoint.rb | 86 +++++++++++ lib/io/endpoint/composite_endpoint.rb | 1 + lib/io/endpoint/connected_endpoint.rb | 69 +++++++++ lib/io/endpoint/generic.rb | 40 +++--- lib/io/endpoint/shared_endpoint.rb | 135 +----------------- lib/io/endpoint/wrapper.rb | 8 +- .../{shared_endpoint.rb => bound_endpoint.rb} | 41 ++++-- 7 files changed, 217 insertions(+), 163 deletions(-) create mode 100644 lib/io/endpoint/bound_endpoint.rb create mode 100644 lib/io/endpoint/connected_endpoint.rb rename test/io/endpoint/{shared_endpoint.rb => bound_endpoint.rb} (58%) diff --git a/lib/io/endpoint/bound_endpoint.rb b/lib/io/endpoint/bound_endpoint.rb new file mode 100644 index 0000000..2a91539 --- /dev/null +++ b/lib/io/endpoint/bound_endpoint.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2023, by Samuel Williams. + +require_relative 'generic' +require_relative 'composite_endpoint' +require_relative 'address_endpoint' + +module IO::Endpoint + class BoundEndpoint < Generic + def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false) + sockets = endpoint.bind + + sockets.each do |server| + # This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation. + begin + server.listen(backlog) + rescue Errno::EOPNOTSUPP + # Ignore. + end + + server.close_on_exec = close_on_exec + end + + return self.new(endpoint, sockets, **endpoint.options) + end + + def initialize(endpoint, sockets, **options) + super(**options) + + @endpoint = endpoint + @sockets = sockets + end + + attr :endpoint + attr :sockets + + # A endpoint for the local end of the bound socket. + # @returns [CompositeEndpoint] A composite endpoint for the local end of the bound socket. + def local_address_endpoint(**options) + endpoints = @sockets.map do |socket| + AddressEndpoint.new(socket.to_io.local_address, **options) + end + + return CompositeEndpoint.new(endpoints) + end + + # A endpoint for the remote end of the bound socket. + # @returns [CompositeEndpoint] A composite endpoint for the remote end of the bound socket. + def remote_address_endpoint(**options) + endpoints = @sockets.map do |wrapper| + AddressEndpoint.new(socket.to_io.remote_address, **options) + end + + return CompositeEndpoint.new(endpoints) + end + + def close + @sockets.each(&:close) + @sockets.clear + end + + def to_s + "\#<#{self.class} #{@sockets.size} bound sockets for #{@endpoint}>" + end + + def bind(wrapper = Wrapper.default, &block) + @sockets.each.map do |server| + if block_given? + wrapper.async do + yield server + end + else + server.dup + end + end + end + end + + class Generic + def bound(**options) + BoundEndpoint.bound(self, **options) + end + end +end diff --git a/lib/io/endpoint/composite_endpoint.rb b/lib/io/endpoint/composite_endpoint.rb index 2c72252..55aba06 100644 --- a/lib/io/endpoint/composite_endpoint.rb +++ b/lib/io/endpoint/composite_endpoint.rb @@ -6,6 +6,7 @@ require_relative 'generic' module IO::Endpoint + # A composite endpoint is a collection of endpoints that are used in order. class CompositeEndpoint < Generic def initialize(endpoints, **options) super(**options) diff --git a/lib/io/endpoint/connected_endpoint.rb b/lib/io/endpoint/connected_endpoint.rb new file mode 100644 index 0000000..11f48ff --- /dev/null +++ b/lib/io/endpoint/connected_endpoint.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2023, by Samuel Williams. + +require_relative 'generic' +require_relative 'composite_endpoint' +require_relative 'socket_endpoint' + +require 'openssl' + +module IO::Endpoint + class ConnectedEndpoint < Generic + def self.connected(endpoint, close_on_exec: false) + socket = endpoint.connect + + socket.close_on_exec = close_on_exec + + return self.new(endpoint, socket, **endpoint.options) + end + + def initialize(endpoint, socket, **options) + super(**options) + + @endpoint = endpoint + @socket = socket + end + + attr :endpoint + attr :socket + + # A endpoint for the local end of the bound socket. + # @returns [AddressEndpoint] A endpoint for the local end of the connected socket. + def local_address_endpoint(**options) + AddressEndpoint.new(socket.to_io.local_address, **options) + end + + # A endpoint for the remote end of the bound socket. + # @returns [AddressEndpoint] A endpoint for the remote end of the connected socket. + def remote_address_endpoint(**options) + AddressEndpoint.new(socket.to_io.remote_address, **options) + end + + def connect(wrapper = Wrapper.default, &block) + if block_given? + yield @socket + else + return @socket.dup + end + end + + def close + if @socket + @socket.close + @socket = nil + end + end + + def to_s + "\#<#{self.class} #{@socket} connected for #{@endpoint}>" + end + end + + class Generic + def connected(**options) + ConnectedEndpoint.connected(self, **options) + end + end +end diff --git a/lib/io/endpoint/generic.rb b/lib/io/endpoint/generic.rb index 953e2de..7d626fd 100644 --- a/lib/io/endpoint/generic.rb +++ b/lib/io/endpoint/generic.rb @@ -54,32 +54,40 @@ def timeout @options[:timeout] end - # @return [Numeric] The default timeout for accepted sockets. - def accept_timeout - @options.fetch(:accepted_timeout) + # Bind a socket to the given address. If a block is given, the socket will be automatically closed when the block exits. + # @parameter wrapper [Wrapper] The wrapper to use for binding. + # @yields {|socket| ...} An optional block which will be passed the socket. + # @parameter socket [Socket] The socket which has been bound. + # @returns [Array(Socket)] the bound socket + def bind(wrapper = Wrapper.default, &block) + raise NotImplementedError end - # @return [Address] the address to bind to before connecting. - def local_address - @options[:local_address] + # Connects a socket to the given address. If a block is given, the socket will be automatically closed when the block exits. + # @parameter wrapper [Wrapper] The wrapper to use for connecting. + # @return [Socket] the connected socket + def connect(wrapper = Wrapper.default, &block) + raise NotImplementedError end - # Endpoints sometimes have multiple paths. - # @yield [Endpoint] Enumerate all discrete paths as endpoints. + # Bind and accept connections on the given address. + # @parameter wrapper [Wrapper] The wrapper to use for accepting connections. + # @yields [Socket] The accepted socket. + def accept(wrapper = Wrapper.default, &block) + bind(wrapper) do |server| + wrapper.accept(server, **@options, &block) + end + end + + # Enumerate all discrete paths as endpoints. + # @yields {|endpoint| ...} A block which will be passed each endpoint. + # @parameter endpoint [Endpoint] The endpoint. def each return to_enum unless block_given? yield self end - # Accept connections from the specified endpoint. - # @param backlog [Integer] the number of connections to listen for. - def accept(wrapper = Wrapper.default, *arguments, **options, &block) - bind(wrapper, *arguments, **options) do |server| - wrapper.accept(server, **options, &block) - end - end - # Create an Endpoint instance by URI scheme. The host and port of the URI will be passed to the Endpoint factory method, along with any options. # # You should not use untrusted input as it may execute arbitrary code. diff --git a/lib/io/endpoint/shared_endpoint.rb b/lib/io/endpoint/shared_endpoint.rb index c8cdd6d..a412689 100644 --- a/lib/io/endpoint/shared_endpoint.rb +++ b/lib/io/endpoint/shared_endpoint.rb @@ -1,136 +1,7 @@ # frozen_string_literal: true # Released under the MIT License. -# Copyright, 2023, by Samuel Williams. +# Copyright, 2024, by Samuel Williams. -require_relative 'generic' -require_relative 'composite_endpoint' -require_relative 'socket_endpoint' - -require 'openssl' - -module IO::Endpoint - # Pre-connect and pre-bind sockets so that it can be used between processes. - class SharedEndpoint < Generic - # Create a new `SharedEndpoint` by binding to the given endpoint. - def self.bound(endpoint, backlog: Socket::SOMAXCONN, close_on_exec: false, **options) - sockets = endpoint.bind(**options) - - sockets.each do |server| - # This is somewhat optional. We want to have a generic interface as much as possible so that users of this interface can just call it without knowing a lot of internal details. Therefore, we ignore errors here if it's because the underlying socket does not support the operation. - begin - server.listen(backlog) - rescue Errno::EOPNOTSUPP - # Ignore. - end - - server.close_on_exec = close_on_exec - end - - return self.new(endpoint, sockets) - end - - # Create a new `SharedEndpoint` by connecting to the given endpoint. - def self.connected(endpoint, close_on_exec: false) - socket = endpoint.connect - - socket.close_on_exec = close_on_exec - - return self.new(endpoint, [socket]) - end - - def initialize(endpoint, sockets, **options) - super(**options) - - raise TypeError, "sockets must be an Array" unless sockets.is_a?(Array) - - @endpoint = endpoint - @sockets = sockets - end - - attr :endpoint - attr :sockets - - def local_address_endpoint(**options) - endpoints = @sockets.map do |wrapper| - AddressEndpoint.new(wrapper.to_io.local_address, **options) - end - - return CompositeEndpoint.new(endpoints) - end - - def remote_address_endpoint(**options) - endpoints = @sockets.map do |wrapper| - AddressEndpoint.new(wrapper.to_io.remote_address, **options) - end - - return CompositeEndpoint.new(endpoints) - end - - # Close all the internal sockets. - def close - @sockets.each(&:close) - @sockets.clear - end - - def each(&block) - return to_enum unless block_given? - - @sockets.each do |socket| - yield SocketEndpoint.new(socket.dup) - end - end - - def bind(wrapper = Wrapper.default, &block) - @sockets.each.map do |server| - server = server.dup - - if block_given? - wrapper.async do - begin - yield server - ensure - server.close - end - end - else - server - end - end - end - - def connect(wrapper = Wrapper.default, &block) - @sockets.each do |socket| - socket = socket.dup - - return socket unless block_given? - - begin - return yield(socket) - ensure - socket.close - end - end - end - - def accept(wrapper = Wrapper.default, &block) - bind(wrapper) do |server| - wrapper.accept(server, &block) - end - end - - def to_s - "\#<#{self.class} #{@sockets.size} descriptors for #{@endpoint}>" - end - end - - class Generic - def bound(**options) - SharedEndpoint.bound(self, **options) - end - - def connected(**options) - SharedEndpoint.connected(self, **options) - end - end -end +require_relative 'bound_endpoint' +require_relative 'connected_endpoint' diff --git a/lib/io/endpoint/wrapper.rb b/lib/io/endpoint/wrapper.rb index 9b5d72e..8d96a6e 100644 --- a/lib/io/endpoint/wrapper.rb +++ b/lib/io/endpoint/wrapper.rb @@ -43,7 +43,7 @@ def async # @option reuse_address [Boolean] Allow this port to be bound in multiple processes. # @option linger [Boolean] Wait for data to be sent before closing the socket. # @option buffered [Boolean] Enable or disable Nagle's algorithm for TCP sockets. - def build(*arguments, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, buffered: false) + def build(*arguments, reuse_address: true, reuse_port: nil, linger: nil, buffered: false, **options) socket = ::Socket.new(*arguments) if reuse_address @@ -75,8 +75,12 @@ def build(*arguments, timeout: nil, reuse_address: true, reuse_port: nil, linger # socket = Async::IO::Socket.connect(Async::IO::Address.tcp("8.8.8.8", 53)) # @param remote_address [Address] The remote address to connect to. # @option local_address [Address] The local address to bind to before connecting. - def connect(remote_address, local_address: nil, **options) + def connect(remote_address, local_address: nil, timeout: nil, **options) socket = build(remote_address.afamily, remote_address.socktype, remote_address.protocol, **options) do |socket| + if timeout + set_timeout(socket, timeout) + end + if local_address if defined?(IP_BIND_ADDRESS_NO_PORT) # Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique. diff --git a/test/io/endpoint/shared_endpoint.rb b/test/io/endpoint/bound_endpoint.rb similarity index 58% rename from test/io/endpoint/shared_endpoint.rb rename to test/io/endpoint/bound_endpoint.rb index 5eefd7a..b25dd1e 100644 --- a/test/io/endpoint/shared_endpoint.rb +++ b/test/io/endpoint/bound_endpoint.rb @@ -3,11 +3,11 @@ # Released under the MIT License. # Copyright, 2023, by Samuel Williams. -require 'io/endpoint/shared_endpoint' +require 'io/endpoint/bound_endpoint' require 'io/endpoint/unix_endpoint' require 'with_temporary_directory' -describe IO::Endpoint::SharedEndpoint do +describe IO::Endpoint::BoundEndpoint do include WithTemporaryDirectory let(:path) {File.join(temporary_directory, "test.ipc")} @@ -32,7 +32,7 @@ peer.close end - endpoint = subject.connected(internal_endpoint) + endpoint = internal_endpoint.connected endpoint.connect do |socket| expect(socket).to be_a(Socket) @@ -43,23 +43,37 @@ socket.close end ensure + thread&.kill sockets&.each(&:close) - thread&.join end with "timeouts" do - let(:timeout) {nil} - let(:accepted_timeout) {1.0} + let(:timeout) {1.0} - let(:internal_endpoint) {IO::Endpoint::UNIXEndpoint.new(path, timeout: timeout, accepted_timeout: accepted_timeout)} + let(:internal_endpoint) {IO::Endpoint::UNIXEndpoint.new(path, timeout: timeout)} it "can accept with distinct timeouts" do - internal_endpoint.accept + expect(internal_endpoint.timeout).to be == timeout - endpoint = subject.connected(internal_endpoint) + bound_endpoint = internal_endpoint.bound + expect(bound_endpoint.timeout).to be == timeout - endpoint.connect do |socket| - expect(socket).to be_a(Socket) + expect(bound_endpoint.sockets).not.to be(:empty?) + bound_endpoint.sockets.each do |socket| + expect(socket.timeout).to be_nil + end + + thread = Thread.new do + bound_endpoint.accept do |peer, address| + expect(peer.timeout).to be == timeout + peer.close + end + end + + connected_endpoint = internal_endpoint.connected + + connected_endpoint.connect do |socket| + expect(socket.timeout).to be == timeout # Wait for the connection to be closed. socket.wait_readable @@ -67,8 +81,9 @@ socket.close end ensure - sockets&.each(&:close) - thread&.join + thread&.kill + bound_endpoint&.close + connected_endpoint&.close end end end