From 5a5f3d6b30a112ad2277ce4952540c5976c6a84c Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Tue, 23 Jul 2024 01:13:43 -0700 Subject: [PATCH] fix(http) timeout (#12728) Co-authored-by: cirospaciari Co-authored-by: Jarred Sumner --- misctools/fetch.zig | 1 - misctools/http_bench.zig | 7 -- src/bun.js/webcore/response.zig | 3 - src/cli/create_command.zig | 5 - src/cli/upgrade_command.zig | 4 - src/compile_target.zig | 3 - src/http.zig | 115 +++++++++++---------- src/install/install.zig | 4 +- test/js/web/fetch/fetch-preconnect.test.ts | 86 +++++++++------ 9 files changed, 117 insertions(+), 111 deletions(-) diff --git a/misctools/fetch.zig b/misctools/fetch.zig index 7c91b6d9ec3e0b..5d0b9f1e528a5d 100644 --- a/misctools/fetch.zig +++ b/misctools/fetch.zig @@ -195,7 +195,6 @@ pub fn main() anyerror!void { args.headers_buf, response_body_string, args.body, - 0, HTTP.FetchRedirect.follow, ), }; diff --git a/misctools/http_bench.zig b/misctools/http_bench.zig index b736eab9d33a81..3cd147d2eaa81c 100644 --- a/misctools/http_bench.zig +++ b/misctools/http_bench.zig @@ -31,7 +31,6 @@ const params = [_]clap.Param(clap.Help){ clap.parseParam("-b, --body HTTP request body as a string") catch unreachable, clap.parseParam("-f, --file File path to load as body") catch unreachable, clap.parseParam("-n, --count How many runs? Default 10") catch unreachable, - clap.parseParam("-t, --timeout Max duration per request") catch unreachable, clap.parseParam("-r, --retry Max retry count") catch unreachable, clap.parseParam("--no-gzip Disable gzip") catch unreachable, clap.parseParam("--no-deflate Disable deflate") catch unreachable, @@ -75,7 +74,6 @@ pub const Arguments = struct { body: string = "", turbo: bool = false, count: usize = 10, - timeout: usize = 0, repeat: usize = 0, concurrency: u16 = 32, @@ -165,10 +163,6 @@ pub const Arguments = struct { // .keep_alive = !args.flag("--no-keep-alive"), .concurrency = std.fmt.parseInt(u16, args.option("--max-concurrency") orelse "32", 10) catch 32, .turbo = args.flag("--turbo"), - .timeout = std.fmt.parseInt(usize, args.option("--timeout") orelse "0", 10) catch |err| { - Output.prettyErrorln("{s} parsing timeout", .{@errorName(err)}); - Global.exit(1); - }, .count = std.fmt.parseInt(usize, args.option("--count") orelse "10", 10) catch |err| { Output.prettyErrorln("{s} parsing count", .{@errorName(err)}); Global.exit(1); @@ -225,7 +219,6 @@ pub fn main() anyerror!void { args.headers_buf, response_body, "", - args.timeout, ), }; ctx.http.client.verbose = args.verbose; diff --git a/src/bun.js/webcore/response.zig b/src/bun.js/webcore/response.zig index 4abe1d3662c3de..a97124aa0bccd4 100644 --- a/src/bun.js/webcore/response.zig +++ b/src/bun.js/webcore/response.zig @@ -1623,7 +1623,6 @@ pub const Fetch = struct { fetch_options.headers.buf.items, &fetch_tasklet.response_buffer, fetch_tasklet.request_body.slice(), - fetch_options.timeout, http.HTTPClientResult.Callback.New( *FetchTasklet, FetchTasklet.callback, @@ -1681,7 +1680,6 @@ pub const Fetch = struct { method: Method, headers: Headers, body: HTTPRequestBody, - timeout: usize, disable_timeout: bool, disable_keepalive: bool, disable_decompression: bool, @@ -2799,7 +2797,6 @@ pub const Fetch = struct { .allocator = allocator, }, .body = http_body, - .timeout = std.time.ns_per_hour, .disable_keepalive = disable_keepalive, .disable_timeout = disable_timeout, .disable_decompression = disable_decompression, diff --git a/src/cli/create_command.zig b/src/cli/create_command.zig index aaf2f44cf4cba2..870360f506c580 100644 --- a/src/cli/create_command.zig +++ b/src/cli/create_command.zig @@ -1811,7 +1811,6 @@ pub const Example = struct { const examples_url: string = "https://registry.npmjs.org/bun-examples-all/latest"; var url: URL = undefined; - pub const timeout: u32 = 6000; var app_name_buf: [512]u8 = undefined; pub fn print(examples: []const Example, default_app_name: ?string) void { @@ -1977,7 +1976,6 @@ pub const Example = struct { headers_buf, mutable, "", - 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow, @@ -2055,7 +2053,6 @@ pub const Example = struct { "", mutable, "", - 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow, @@ -2145,7 +2142,6 @@ pub const Example = struct { "", mutable, "", - 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow, @@ -2188,7 +2184,6 @@ pub const Example = struct { "", mutable, "", - 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow, diff --git a/src/cli/upgrade_command.zig b/src/cli/upgrade_command.zig index 2f731d6701314f..0c5ba5c28e0bb5 100644 --- a/src/cli/upgrade_command.zig +++ b/src/cli/upgrade_command.zig @@ -163,7 +163,6 @@ pub const UpgradeCheckerThread = struct { }; pub const UpgradeCommand = struct { - pub const timeout: u32 = 30000; const default_github_headers: string = "Acceptapplication/vnd.github.v3+json"; var github_repository_url_buf: bun.PathBuffer = undefined; var current_executable_buf: bun.PathBuffer = undefined; @@ -245,7 +244,6 @@ pub const UpgradeCommand = struct { headers_buf, &metadata_body, "", - 60 * std.time.ns_per_min, http_proxy, null, HTTP.FetchRedirect.follow, @@ -528,12 +526,10 @@ pub const UpgradeCommand = struct { "", zip_file_buffer, "", - timeout, http_proxy, null, HTTP.FetchRedirect.follow, ); - async_http.client.timeout = timeout; async_http.client.progress_node = progress; async_http.client.reject_unauthorized = env_loader.getTLSRejectUnauthorized(); diff --git a/src/compile_target.zig b/src/compile_target.zig index 0a8ec3ff79ba4a..c840c018dd187c 100644 --- a/src/compile_target.zig +++ b/src/compile_target.zig @@ -153,7 +153,6 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc { var progress = refresher.start("Downloading", 0); defer progress.end(); - const timeout = 30000; const http_proxy: ?bun.URL = env.getHttpProxy(url); async_http.* = HTTP.AsyncHTTP.initSync( @@ -164,12 +163,10 @@ pub fn downloadToPath(this: *const CompileTarget, env: *bun.DotEnv.Loader, alloc "", compressed_archive_bytes, "", - timeout, http_proxy, null, HTTP.FetchRedirect.follow, ); - async_http.client.timeout = timeout; async_http.client.progress_node = progress; async_http.client.reject_unauthorized = env.getTLSRejectUnauthorized(); diff --git a/src/http.zig b/src/http.zig index d1ff0f9c84295a..99635e0451afc6 100644 --- a/src/http.zig +++ b/src/http.zig @@ -350,6 +350,11 @@ fn NewHTTPContext(comptime ssl: bool) type { socket.close(.failure); } + fn closeSocket(socket: HTTPSocket) void { + markSocketAsDead(socket); + socket.close(.normal); + } + fn getTagged(ptr: *anyopaque) ActiveSocket { return ActiveSocket.from(bun.cast(**anyopaque, ptr).*); } @@ -472,8 +477,7 @@ fn NewHTTPContext(comptime ssl: bool) type { } } - markSocketAsDead(socket); - socket.close(.normal); + closeSocket(socket); } pub const Handler = struct { @@ -487,7 +491,7 @@ fn NewHTTPContext(comptime ssl: bool) type { } if (active.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); + addMemoryBackToPool(pooled); return; } @@ -523,33 +527,43 @@ fn NewHTTPContext(comptime ssl: bool) type { if (!client.checkServerIdentity(comptime ssl, socket, handshake_error)) { client.did_have_handshaking_error = true; + if (!socket.isClosed()) terminateSocket(socket); return; } return client.firstCall(comptime ssl, socket); } else { // if authorized it self is false, this means that the connection was rejected - markSocketAsDead(socket); + terminateSocket(socket); if (client.state.stage != .done and client.state.stage != .fail) client.fail(error.ConnectionRefused); return; } } - // we can reach here if we are aborted - if (!socket.isClosed()) { + if (socket.isClosed()) { + markSocketAsDead(socket); if (active.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); - return; + addMemoryBackToPool(pooled); } - terminateSocket(socket); - } else { - if (active.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); + return; + } + + if (authorized) { + if (active.is(PooledSocket)) { + // Allow pooled sockets to be reused if the handshake was successful. + socket.setTimeout(0); + socket.setTimeoutMinutes(5); return; } } + + if (active.get(PooledSocket)) |pooled| { + addMemoryBackToPool(pooled); + } + + terminateSocket(socket); } pub fn onClose( ptr: *anyopaque, @@ -565,11 +579,16 @@ fn NewHTTPContext(comptime ssl: bool) type { } if (tagged.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); + addMemoryBackToPool(pooled); } return; } + + fn addMemoryBackToPool(pooled: *PooledSocket) void { + assert(context().pending_sockets.put(pooled)); + } + pub fn onData( ptr: *anyopaque, socket: HTTPSocket, @@ -620,14 +639,12 @@ fn NewHTTPContext(comptime ssl: bool) type { socket: HTTPSocket, ) void { const tagged = getTagged(ptr); - if (tagged.get(HTTPClient)) |client| { - return client.onTimeout( - comptime ssl, - socket, - ); + return client.onTimeout(comptime ssl, socket); } else if (tagged.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); + // If a socket has been sitting around for 5 minutes + // Let's close it and remove it from the pool. + addMemoryBackToPool(pooled); } terminateSocket(socket); @@ -640,23 +657,23 @@ fn NewHTTPContext(comptime ssl: bool) type { const tagged = getTagged(ptr); markSocketAsDead(socket); if (tagged.get(HTTPClient)) |client| { - return client.onConnectError( - comptime ssl, - socket, - ); + client.onConnectError(); } else if (tagged.get(PooledSocket)) |pooled| { - assert(context().pending_sockets.put(pooled)); + addMemoryBackToPool(pooled); } - - if (comptime Environment.isDebug) - // caller should already have closed it. - bun.debugAssert(socket.isClosed()); + // us_connecting_socket_close is always called internally by uSockets } pub fn onEnd( _: *anyopaque, socket: HTTPSocket, ) void { - // TCP fin gets closed immediately. + // TCP fin must be closed, but we must keep the original tagged + // pointer so that their onClose callback is called. + // + // Three possible states: + // 1. HTTP Keep-Alive socket: it must be removed from the pool + // 2. HTTP Client socket: it might need to be retried + // 3. Dead socket: it is already marked as dead socket.close(.failure); } }; @@ -1095,6 +1112,13 @@ pub fn firstCall( comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ) void { + if (comptime FeatureFlags.is_fetch_preconnect_supported) { + if (client.is_preconnect_only) { + client.onPreconnect(is_ssl, socket); + return; + } + } + if (client.state.request_stage == .pending) { client.onWritable(true, comptime is_ssl, socket); } @@ -1144,17 +1168,17 @@ pub fn onTimeout( comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket, ) void { - _ = socket; + if (client.disable_timeout) return; log("Timeout {s}\n", .{client.url.href}); + defer NewHTTPContext(is_ssl).terminateSocket(socket); + if (client.state.stage != .done and client.state.stage != .fail) { client.fail(error.Timeout); } } pub fn onConnectError( client: *HTTPClient, - comptime is_ssl: bool, - _: NewHTTPContext(is_ssl).HTTPSocket, ) void { log("onConnectError {s}\n", .{client.url.href}); if (client.state.stage != .done and client.state.stage != .fail) @@ -1567,7 +1591,6 @@ remaining_redirect_count: i8 = default_redirect_count, allow_retry: bool = false, redirect_type: FetchRedirect = FetchRedirect.follow, redirect: []u8 = &.{}, -timeout: usize = 0, progress_node: ?*Progress.Node = null, disable_timeout: bool = false, disable_keepalive: bool = false, @@ -1745,8 +1768,6 @@ pub const AsyncHTTP = struct { task: ThreadPool.Task = ThreadPool.Task{ .callback = &startAsyncHTTP }, result_callback: HTTPClientResult.Callback = undefined, - /// Timeout in nanoseconds - timeout: usize = 0, redirected: bool = false, response_encoding: Encoding = Encoding.identity, @@ -1873,7 +1894,7 @@ pub const AsyncHTTP = struct { .is_url_owned = is_url_owned, }); - this.async_http = AsyncHTTP.init(bun.default_allocator, .GET, url, .{}, "", &this.response_buffer, "", 0, HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{}); + this.async_http = AsyncHTTP.init(bun.default_allocator, .GET, url, .{}, "", &this.response_buffer, "", HTTPClientResult.Callback.New(*Preconnect, Preconnect.onResult).init(this), .manual, .{}); this.async_http.client.is_preconnect_only = true; http_thread.schedule(Batch.from(&this.async_http.task)); @@ -1887,7 +1908,6 @@ pub const AsyncHTTP = struct { headers_buf: string, response_buffer: *MutableString, request_body: []const u8, - timeout: usize, callback: HTTPClientResult.Callback, redirect_type: FetchRedirect, options: Options, @@ -1904,7 +1924,6 @@ pub const AsyncHTTP = struct { .http_proxy = options.http_proxy, .signals = options.signals orelse .{}, .async_http_id = if (options.signals != null and options.signals.?.aborted != null) async_http_id.fetchAdd(1, .monotonic) else 0, - .timeout = timeout, }; this.client = .{ @@ -1916,7 +1935,6 @@ pub const AsyncHTTP = struct { .hostname = options.hostname, .signals = options.signals orelse this.signals, .async_http_id = this.async_http_id, - .timeout = timeout, .http_proxy = this.http_proxy, .redirect_type = redirect_type, }; @@ -2002,20 +2020,17 @@ pub const AsyncHTTP = struct { return this; } - pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, timeout: usize, http_proxy: ?URL, hostname: ?[]u8, redirect_type: FetchRedirect) AsyncHTTP { - return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, timeout, undefined, redirect_type, .{ + pub fn initSync(allocator: std.mem.Allocator, method: Method, url: URL, headers: Headers.Entries, headers_buf: string, response_buffer: *MutableString, request_body: []const u8, http_proxy: ?URL, hostname: ?[]u8, redirect_type: FetchRedirect) AsyncHTTP { + return @This().init(allocator, method, url, headers, headers_buf, response_buffer, request_body, undefined, redirect_type, .{ .http_proxy = http_proxy, .hostname = hostname, }); } fn reset(this: *AsyncHTTP) !void { - const timeout = this.timeout; const aborted = this.client.aborted; this.client = try HTTPClient.init(this.allocator, this.method, this.client.url, this.client.header_entries, this.client.header_buf, aborted); - this.client.timeout = timeout; this.client.http_proxy = this.http_proxy; - this.timeout = timeout; if (this.http_proxy) |proxy| { //TODO: need to understand how is possible to reuse Proxy with TSL, so disable keepalive if url is HTTPS @@ -2298,7 +2313,6 @@ pub fn doRedirect( this.state.response_message_buffer.deinit(); // we need to clean the client reference before closing the socket because we are going to reuse the same ref in a another request - socket.ext(**anyopaque).* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); if (this.isKeepAlivePossible()) { assert(this.connected_url.hostname.len > 0); ctx.releaseSocket( @@ -2308,8 +2322,7 @@ pub fn doRedirect( this.connected_url.getPortAuto(), ); } else { - NewHTTPContext(is_ssl).markSocketAsDead(socket); - socket.close(.normal); + NewHTTPContext(is_ssl).closeSocket(socket); } this.connected_url = URL{}; @@ -2383,6 +2396,7 @@ fn start_(this: *HTTPClient, comptime is_ssl: bool) void { }; if (socket.isClosed() and (this.state.response_stage != .done and this.state.response_stage != .fail)) { + NewHTTPContext(is_ssl).markSocketAsDead(socket); this.fail(error.ConnectionClosed); assert(this.state.fail != null); return; @@ -2429,7 +2443,6 @@ pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCon log("onPreconnect({})", .{this.url}); _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); const ctx = if (comptime is_ssl) &http_thread.https_context else &http_thread.http_context; - ctx.releaseSocket( socket, this.did_have_handshaking_error and !this.reject_unauthorized, @@ -2740,8 +2753,7 @@ pub fn closeAndFail(this: *HTTPClient, err: anyerror, comptime is_ssl: bool, soc if (this.state.stage != .fail and this.state.stage != .done) { log("closeAndFail: {s}", .{@errorName(err)}); if (!socket.isClosed()) { - socket.ext(**anyopaque).* = bun.cast(**anyopaque, NewHTTPContext(is_ssl).ActiveSocket.init(&dead_socket).ptr()); - socket.close(.failure); + NewHTTPContext(is_ssl).terminateSocket(socket); } this.fail(err); } @@ -3158,8 +3170,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon this.connected_url.getPortAuto(), ); } else if (!socket.isClosed()) { - NewHTTPContext(is_ssl).markSocketAsDead(socket); - socket.close(.normal); + NewHTTPContext(is_ssl).closeSocket(socket); } this.state.reset(this.allocator); diff --git a/src/install/install.zig b/src/install/install.zig index 506d3c4995327f..89d94fc801b2a2 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -426,7 +426,7 @@ const NetworkTask = struct { this.allocator = allocator; const url = URL.parse(this.url_buf); - this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len], &this.response_buffer, "", 0, this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{ + this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_builder.content.ptr.?[0..header_builder.content.len], &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{ .http_proxy = this.package_manager.httpProxy(url), }); this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(); @@ -510,7 +510,7 @@ const NetworkTask = struct { const url = URL.parse(this.url_buf); - this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_buf, &this.response_buffer, "", 0, this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{ + this.http = AsyncHTTP.init(allocator, .GET, url, header_builder.entries, header_buf, &this.response_buffer, "", this.getCompletionCallback(), HTTP.FetchRedirect.follow, .{ .http_proxy = this.package_manager.httpProxy(url), }); this.http.client.reject_unauthorized = this.package_manager.tlsRejectUnauthorized(); diff --git a/test/js/web/fetch/fetch-preconnect.test.ts b/test/js/web/fetch/fetch-preconnect.test.ts index 88a842e5cc4ca1..1259b0cd45b123 100644 --- a/test/js/web/fetch/fetch-preconnect.test.ts +++ b/test/js/web/fetch/fetch-preconnect.test.ts @@ -29,46 +29,65 @@ describe.todoIf(isWindows)("fetch.preconnect", () => { expect(response.status).toBe(200); }); - describe("closing the connection doesn't break the request", () => { - for (let at of ["before", "after"]) { - it(at, async () => { - let { promise, resolve } = Promise.withResolvers(); - using listener = Bun.listen({ - port: 0, - hostname: "localhost", - socket: { - open(socket) { - resolve(socket); - }, - data() {}, - close() {}, - }, - }); - fetch.preconnect(`http://localhost:${listener.port}`); - let socket = await promise; - ({ promise, resolve } = Promise.withResolvers()); - if (at === "before") { - await Bun.sleep(16); - socket.end(); - } - const fetchPromise = fetch(`http://localhost:${listener.port}`); - if (at === "after") { - await Bun.sleep(16); - socket.end(); - } - socket = await promise; - socket.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"); - socket.end(); + describe("doesn't break the request when", () => { + for (let endOrTerminate of ["end", "terminate", "shutdown"]) { + describe(endOrTerminate, () => { + for (let at of ["before", "middle", "after"]) { + it(at, async () => { + let { promise, resolve } = Promise.withResolvers(); + using listener = Bun.listen({ + port: 0, + hostname: "localhost", + socket: { + open(socket) { + resolve(socket); + }, + data() {}, + close() {}, + }, + }); + fetch.preconnect(`http://localhost:${listener.port}`); + let socket = await promise; + ({ promise, resolve } = Promise.withResolvers()); + if (at === "before") { + await Bun.sleep(16); + socket[endOrTerminate](); + if (endOrTerminate === "shutdown") { + await Bun.sleep(0); + socket.end(); + } + } + const fetchPromise = fetch(`http://localhost:${listener.port}`); + if (at === "middle") { + socket[endOrTerminate](); + if (endOrTerminate === "shutdown") { + socket.end(); + } + await Bun.sleep(16); + } - const response = await fetchPromise; - expect(response.status).toBe(200); + if (at === "after") { + await Bun.sleep(16); + socket[endOrTerminate](); + if (endOrTerminate === "shutdown") { + socket.end(); + } + } + socket = await promise; + socket.write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"); + socket.end(); + + const response = await fetchPromise; + expect(response.status).toBe(200); + }); + } }); } }); it("--fetch-preconnect works", async () => { const { promise, resolve } = Promise.withResolvers(); - const listener = Bun.listen({ + using listener = Bun.listen({ port: 0, hostname: "localhost", socket: { @@ -86,7 +105,6 @@ describe.todoIf(isWindows)("fetch.preconnect", () => { expect([`--fetch-preconnect=http://localhost:${listener.port}`, "--eval", "Bun.sleep(64)"]).toRun(); await promise; - listener.stop(true); }); it("fetch.preconnect validates the URL", async () => {