Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jan 22, 2024
1 parent b929ac0 commit 279b013
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 25 deletions.
38 changes: 23 additions & 15 deletions fixtures/async/http/a_protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -544,33 +544,41 @@ def around
Console.logger.level = current
end

def timeout = nil

it "doesn't cancel all requests" do
tasks = []
task = Async::Task.current
tasks = []
stopped = []

10.times do
tasks << task.async {
begin
loop do
client.get('http://127.0.0.1:8080/a').finish
end
task.async do |child|
tasks << child

loop do
response = client.get('/a')
response.finish
ensure
stopped << 'a'
response&.close
end
}
ensure
stopped << 'a'
end
end

10.times do
tasks << task.async {
begin
loop do
client.get('http://127.0.0.1:8080/b').finish
end
task.async do |child|
tasks << child

loop do
response = client.get('/b')
response.finish
ensure
stopped << 'b'
response&.close
end
}
ensure
stopped << 'b'
end
end

tasks.each do |child|
Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/body/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def writer(task)

task.annotate "#{self.class} writer."

while chunk = @head.read_partial
while chunk = @head.readpartial(1024)
@output.write(chunk)
end
ensure
Expand Down
5 changes: 3 additions & 2 deletions lib/async/http/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def call(request)

# This signals that the ensure block below should not try to release the connection, because it's bound into the response which will be returned:
connection = nil

return response
rescue Protocol::RequestFailed
# This is a specific case where the entire request wasn't sent before a failure occurred. So, we can even resend non-idempotent requests.
Expand All @@ -119,7 +118,9 @@ def call(request)
raise
end
ensure
@pool.release(connection) if connection
if connection
@pool.release(connection)
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/protocol/http1/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def call(request, task: Task.current)
@ready = true

return response
rescue
rescue => error
# This will ensure that #reusable? returns false.
@stream.close

Expand Down
2 changes: 1 addition & 1 deletion lib/async/http/protocol/http1/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def read_line
end

def peer
@stream.io
@stream
end

attr :count
Expand Down
4 changes: 2 additions & 2 deletions lib/async/http/protocol/http2/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def read_in_background(parent: Task.current)
Console.debug(self, @framer) {"Reading frame..."}
self.read_frame
end
rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE, Async::Wrapper::Cancelled => error
rescue SocketError, EOFError, Errno::ECONNRESET, Errno::EPIPE, Async::Wrapper::Cancelled => error
# Ignore.
Console.warn(self, error)
rescue ::Protocol::HTTP2::GoawayError => error
Expand All @@ -119,7 +119,7 @@ def read_in_background(parent: Task.current)
attr :promises

def peer
@stream.io
@stream
end

attr :count
Expand Down
6 changes: 3 additions & 3 deletions test/async/http/proxy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
expect(request.path).to be == "localhost:1"

Async::HTTP::Body::Hijack.response(request, 200, {}) do |stream|
while chunk = stream.read_partial(1024)
while chunk = stream.readpartial(1024)
stream.write(chunk)
stream.flush
end
Expand Down Expand Up @@ -135,7 +135,7 @@
reader = Async do |task|
task.annotate "Upstream reader."

while chunk = upstream.read_partial
while chunk = upstream.readpartial(1024)
stream.write(chunk)
stream.flush
end
Expand All @@ -147,7 +147,7 @@
writer = Async do |task|
task.annotate "Upstream writer."

while chunk = stream.read_partial
while chunk = stream.readpartial(1024)
upstream.write(chunk)
upstream.flush
end
Expand Down

0 comments on commit 279b013

Please sign in to comment.