Skip to content

Commit

Permalink
Fall back to IO::Stream::Buffered wrapper for compatibility.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Apr 22, 2024
1 parent 6e82516 commit 54c5a2f
Show file tree
Hide file tree
Showing 14 changed files with 44 additions and 51 deletions.
3 changes: 2 additions & 1 deletion async-http.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Gem::Specification.new do |spec|

spec.add_dependency "async", ">= 1.25"
spec.add_dependency "async-pool", ">= 0.6.1"
spec.add_dependency "io-endpoint", "~> 0.7.2"
spec.add_dependency "io-endpoint", "~> 0.8.1"
spec.add_dependency "io-stream", "~> 0.1.0"
spec.add_dependency "protocol-http", "~> 0.26.0"
spec.add_dependency "protocol-http1", "~> 0.19.0"
spec.add_dependency "protocol-http2", "~> 0.17.0"
Expand Down
2 changes: 1 addition & 1 deletion gems.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# gem "async", path: "../async"
# gem "io-endpoint", path: "../io-endpoint"
gem "openssl", git: "https://github.com/ruby/openssl.git"
# gem "openssl", git: "https://github.com/ruby/openssl.git"
# gem "traces", path: "../traces"
# gem "sus-fixtures-async-http", path: "../sus-fixtures-async-http"

Expand Down
7 changes: 5 additions & 2 deletions lib/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ def initialize(input, output = Writable.new, task: Task.current)
@input = input
@output = output

@head, @tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)
head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM)

@head = ::IO::Stream::Buffered.new(head)
@tail = tail

@reader = nil
@writer = nil
Expand Down Expand Up @@ -62,7 +65,7 @@ def writer(task)

task.annotate "#{self.class} writer."

while chunk = @head.readpartial(1024)
while chunk = @head.read_partial
@output.write(chunk)
end
ensure
Expand Down
12 changes: 6 additions & 6 deletions lib/async/http/protocol/http1.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
require_relative 'http1/client'
require_relative 'http1/server'

require 'io/stream/buffered'

module Async
module HTTP
module Protocol
Expand All @@ -21,17 +23,15 @@ def self.trailer?
end

def self.client(peer)
# We will do our own buffering:
peer.sync = false
stream = ::IO::Stream::Buffered.wrap(peer)

HTTP1::Client.new(peer, VERSION)
return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
# We will do our own buffering:
peer.sync = false
stream = ::IO::Stream::Buffered.wrap(peer)

HTTP1::Server.new(peer, VERSION)
return HTTP1::Server.new(stream, VERSION)
end

def self.names
Expand Down
8 changes: 3 additions & 5 deletions lib/async/http/protocol/http1/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,15 @@ def http2?
end

def read_line?
@stream.gets(CRLF, chomp: true)
rescue Errno::ECONNRESET
nil
@stream.read_until(CRLF)
end

def read_line
read_line? or raise EOFError, "Could not read line!"
end

def peer
@stream
@stream.io
end

attr :count
Expand All @@ -64,7 +62,7 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@ready && @stream && !@stream.closed? && @stream.readable?
@ready && @stream&.readable?
end

def reusable?
Expand Down
7 changes: 2 additions & 5 deletions lib/async/http/protocol/http1/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ def fail_request(status)
write_response(@version, status, {})
write_body(@version, nil)
rescue Errno::ECONNRESET, Errno::EPIPE
Console.warn(self, "fail_request failure", $!)
# Handle when connection is already closed
# Nothing we can do...
end

def next_request
Console.debug(self, "Reading request...", persistent: @persistent)

# The default is true.
return unless @persistent

Expand Down Expand Up @@ -59,7 +56,7 @@ def each(task: Task.current)
return
end

begin
task.defer_stop do
# If a response was generated, send it:
if response
trailer = response.headers.trailer!
Expand Down
12 changes: 5 additions & 7 deletions lib/async/http/protocol/http10.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ def self.trailer?
end

def self.client(peer)
# We will do our own buffering:
peer.sync = false
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Client.new(peer, VERSION)
return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
# We will do our own buffering:
peer.sync = false

return HTTP1::Server.new(peer, VERSION)
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Server.new(stream, VERSION)
end

def self.names
Expand Down
10 changes: 4 additions & 6 deletions lib/async/http/protocol/http11.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ def self.trailer?
end

def self.client(peer)
# We will do our own buffering:
peer.sync = false
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Client.new(peer, VERSION)
return HTTP1::Client.new(stream, VERSION)
end

def self.server(peer)
# We will do our own buffering:
peer.sync = false
stream = ::IO::Stream::Buffered.wrap(peer)

return HTTP1::Server.new(peer, VERSION)
return HTTP1::Server.new(stream, VERSION)
end

def self.names
Expand Down
14 changes: 6 additions & 8 deletions lib/async/http/protocol/http2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
require_relative 'http2/client'
require_relative 'http2/server'

require 'io/stream/buffered'

module Async
module HTTP
module Protocol
Expand Down Expand Up @@ -35,10 +37,8 @@ def self.trailer?
}

def self.client(peer, settings = CLIENT_SETTINGS)
# We will do our own buffering:
peer.sync = false

client = Client.new(peer)
stream = IO::Stream::Buffered.wrap(peer)
client = Client.new(stream)

client.send_connection_preface(settings)
client.start_connection
Expand All @@ -47,10 +47,8 @@ def self.client(peer, settings = CLIENT_SETTINGS)
end

def self.server(peer, settings = SERVER_SETTINGS)
# We will do our own buffering:
peer.sync = false

server = Server.new(peer)
stream = ::IO::Stream::Buffered.wrap(peer)
server = Server.new(stream)

server.read_connection_preface(settings)
server.start_connection
Expand Down
4 changes: 2 additions & 2 deletions lib/async/http/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def read_in_background(parent: Task.current)
attr :promises

def peer
@stream
@stream.io
end

attr :count
Expand All @@ -124,7 +124,7 @@ def concurrency

# Can we use this connection to make requests?
def viable?
@stream && !@stream.closed? && @stream.readable?
@stream&.readable?
end

def reusable?
Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/protocol/http2/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def each(task: Task.current)

@count += 1

begin
task.defer_stop do
response = yield(request)
rescue
# We need to close the stream if the user code blows up while generating a response:
Expand Down
1 change: 0 additions & 1 deletion test/async/http/body.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
output.close
end

Console.debug(self, "Responding with body: #{output}")
Protocol::HTTP::Response[200, [], output]
end
end
Expand Down
5 changes: 3 additions & 2 deletions test/async/http/protocol/http2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

expect(connection.as_json).to be =~ /#<Async::HTTP::Protocol::HTTP2::Client 1 requests, \d+ active streams>/
ensure
response&.close
response&.finish
end

it "generates a JSON string" do
Expand All @@ -28,7 +28,8 @@

expect(JSON.dump(connection)).to be == connection.to_json
ensure
response&.close
# Using close here causes failures.
response&.finish
end
end

Expand Down
8 changes: 4 additions & 4 deletions test/async/http/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
expect(request.path).to be == "localhost:1"

Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
while chunk = stream.readpartial(1024)
while chunk = stream.read_partial
stream.write(chunk)
stream.flush
end
Expand Down Expand Up @@ -129,13 +129,13 @@
Console.logger.debug(self) {"Making connection to #{endpoint}..."}

Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
upstream = endpoint.connect
upstream = ::IO::Stream::Buffered.wrap(endpoint.connect)
Console.logger.debug(self) {"Connected to #{upstream}..."}

reader = Async do |task|
task.annotate "Upstream reader."

while chunk = upstream.readpartial(1024)
while chunk = upstream.read_partial
stream.write(chunk)
stream.flush
end
Expand All @@ -147,7 +147,7 @@
writer = Async do |task|
task.annotate "Upstream writer."

while chunk = stream.readpartial(1024)
while chunk = stream.read_partial
upstream.write(chunk)
upstream.flush
end
Expand Down

0 comments on commit 54c5a2f

Please sign in to comment.