From c634dc725c5154523ce0da1735930008b4224354 Mon Sep 17 00:00:00 2001 From: Jarred Sumner <709451+Jarred-Sumner@users.noreply.github.com> Date: Fri, 29 Sep 2023 19:33:34 -0700 Subject: [PATCH] Fix hang in `bun install` --- src/bun.zig | 32 ++++++++++++ src/http_client_async.zig | 51 ++++++++++++------ src/install/install.zig | 34 ++++++++++-- src/libarchive/libarchive.zig | 4 +- src/sys.zig | 13 +++++ test/js/web/fetch/fetch.test.ts | 92 +++++++++++++++++++++++++++++++++ 6 files changed, 204 insertions(+), 22 deletions(-) diff --git a/src/bun.zig b/src/bun.zig index 3f09277bbd7cce..a93b3c5ae7b0e8 100644 --- a/src/bun.zig +++ b/src/bun.zig @@ -1906,3 +1906,35 @@ pub inline fn serializableInto(comptime T: type, init: anytype) T { return result.*; } + +/// Like std.fs.Dir.makePath except instead of infinite looping on dangling +/// symlink, it deletes the symlink and tries again. +pub fn makePath(dir: std.fs.Dir, sub_path: []const u8) !void { + var it = try std.fs.path.componentIterator(sub_path); + var component = it.last() orelse return; + while (true) { + dir.makeDir(component.path) catch |err| switch (err) { + error.PathAlreadyExists => { + var path_buf2: [MAX_PATH_BYTES * 2]u8 = undefined; + copy(u8, &path_buf2, component.path); + + path_buf2[component.path.len] = 0; + var path_to_use = path_buf2[0..component.path.len :0]; + const result = sys.lstat(path_to_use); + try result.throw(); + const is_dir = std.os.S.ISDIR(result.result.mode); + // dangling symlink + if (!is_dir) { + dir.deleteTree(component.path) catch {}; + continue; + } + }, + error.FileNotFound => |e| { + component = it.previous() orelse return e; + continue; + }, + else => |e| return e, + }; + component = it.next() orelse return; + } +} diff --git a/src/http_client_async.zig b/src/http_client_async.zig index 15e29f34521ca6..65ffbee6200f7f 100644 --- a/src/http_client_async.zig +++ b/src/http_client_async.zig @@ -68,6 +68,10 @@ pub const Signals = struct { aborted: ?*std.atomic.Atomic(bool) = null, cert_errors: ?*std.atomic.Atomic(bool) = null, + pub fn isEmpty(this: *const Signals) bool { + return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null; + } + pub const Store = struct { header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false), @@ -1339,7 +1343,7 @@ pub const InternalState = struct { return this.transfer_encoding == Encoding.chunked; } - pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void { + pub fn reset(this: *InternalState, buffering: bool, allocator: std.mem.Allocator) void { this.compressed_body.deinit(); this.response_message_buffer.deinit(); @@ -1350,9 +1354,12 @@ pub const InternalState = struct { reader.deinit(); } - // if we are holding a cloned_metadata we need to deinit it - // this should never happen because we should always return the metadata to the user - std.debug.assert(this.cloned_metadata == null); + if (!buffering) { + // if we are holding a cloned_metadata we need to deinit it + // this should never happen because we should always return the metadata to the user + std.debug.assert(this.cloned_metadata == null); + } + // just in case we check and free to avoid leaks if (this.cloned_metadata != null) { this.cloned_metadata.?.deinit(allocator); @@ -2179,7 +2186,7 @@ pub fn doRedirect(this: *HTTPClient) void { this.fail(error.TooManyRedirects); return; } - this.state.reset(this.allocator); + this.state.reset(this.signals.isEmpty(), this.allocator); // also reset proxy to redirect this.proxy_tunneling = false; if (this.proxy_tunnel != null) { @@ -2673,7 +2680,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u } var deferred_redirect: ?*URLBufferPool.Node = null; - const can_continue = this.handleResponseMetadata( + const should_continue = this.handleResponseMetadata( &response, // If there are multiple consecutive redirects // and the redirect differs in hostname @@ -2720,8 +2727,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u this.state.pending_response = response; } - if (!can_continue) { - log("onData: can_continue is false", .{}); + if (should_continue == .finished) { // this means that the request ended // clone metadata and return the progress at this point this.cloneMetadata(); @@ -2901,7 +2907,7 @@ fn fail(this: *HTTPClient, err: anyerror) void { _ = socket_async_http_abort_tracker.swapRemove(this.async_http_id); } - this.state.reset(this.allocator); + this.state.reset(this.signals.isEmpty(), this.allocator); this.proxy_tunneling = false; this.state.request_stage = .fail; @@ -2987,7 +2993,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon socket.close(0, null); } - this.state.reset(this.allocator); + this.state.reset(this.signals.isEmpty(), this.allocator); this.state.response_stage = .done; this.state.request_stage = .done; this.state.stage = .done; @@ -3355,13 +3361,19 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket( unreachable; } +const ShouldContinue = enum { + continue_streaming, + finished, +}; + pub fn handleResponseMetadata( this: *HTTPClient, response: *picohttp.Response, deferred_redirect: *?*URLBufferPool.Node, -) !bool { +) !ShouldContinue { var location: string = ""; var pretend_304 = false; + var is_server_sent_events = false; for (response.headers, 0..) |header, header_i| { switch (hashHeaderName(header.name)) { hashHeaderConst("Content-Length") => { @@ -3373,6 +3385,11 @@ pub fn handleResponseMetadata( this.state.content_length = 0; } }, + hashHeaderConst("Content-Type") => { + if (strings.contains(header.value, "text/event-stream")) { + is_server_sent_events = true; + } + }, hashHeaderConst("Content-Encoding") => { if (!this.disable_decompression) { if (strings.eqlComptime(header.value, "gzip")) { @@ -3429,8 +3446,8 @@ pub fn handleResponseMetadata( if (this.proxy_tunneling and this.proxy_tunnel == null) { if (response.status_code == 200) { - //signal to continue the proxing - return true; + // signal to continue the proxing + return ShouldContinue.finished; } //proxy denied connection so return proxy result (407, 403 etc) @@ -3623,6 +3640,10 @@ pub fn handleResponseMetadata( } else { log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding}); } - // if no body is expected we should stop processing - return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked); + + if (this.method.hasBody() and ((content_length != null and content_length.? > 0) or !this.state.allow_keepalive or this.state.transfer_encoding == .chunked or is_server_sent_events)) { + return ShouldContinue.continue_streaming; + } else { + return ShouldContinue.finished; + } } diff --git a/src/install/install.zig b/src/install/install.zig index 131adb00e03c0c..6689b05acac4c8 100644 --- a/src/install/install.zig +++ b/src/install/install.zig @@ -1764,7 +1764,7 @@ pub const PackageManager = struct { pub fn sleep(this: *PackageManager) void { if (this.wait_count.swap(0, .Monotonic) > 0) return; - bun.Mimalloc.mi_collect(false); + Output.flush(); _ = this.waiter.wait() catch 0; } @@ -1829,7 +1829,7 @@ pub const PackageManager = struct { switch (this.options.log_level) { inline else => |log_level| { if (log_level.showProgress()) this.startProgressBarIfNone(); - while (this.pending_tasks > 0) : (this.sleep()) { + while (this.pending_tasks > 0) { this.runTasks( void, {}, @@ -1843,6 +1843,13 @@ pub const PackageManager = struct { ) catch |err| { return .{ .failure = err }; }; + + if (PackageManager.verbose_install and this.pending_tasks > 0) { + Output.prettyErrorln("[PackageManager] waiting for {d} tasks\n", .{this.pending_tasks}); + } + + if (this.pending_tasks > 0) + this.sleep(); } }, } @@ -7297,7 +7304,10 @@ pub const PackageManager = struct { // We use this file descriptor to know where to put it. installer.node_modules_folder = cwd.openIterableDir(node_modules.relative_path, .{}) catch brk: { // Avoid extra mkdir() syscall - try cwd.makePath(bun.span(node_modules.relative_path)); + // + // note: this will recursively delete any dangling symlinks + // in the next.js repo, it encounters a dangling symlink in node_modules/@next/codemod/node_modules/cheerio + try bun.makePath(cwd, bun.span(node_modules.relative_path)); break :brk try cwd.openIterableDir(node_modules.relative_path, .{}); }; @@ -7351,7 +7361,7 @@ pub const PackageManager = struct { if (!installer.options.do.install_packages) return error.InstallFailed; } - while (this.pending_tasks > 0 and installer.options.do.install_packages) : (this.sleep()) { + while (this.pending_tasks > 0 and installer.options.do.install_packages) { try this.runTasks( *PackageInstaller, &installer, @@ -7363,6 +7373,13 @@ pub const PackageManager = struct { }, log_level, ); + + if (PackageManager.verbose_install and this.pending_tasks > 0) { + Output.prettyErrorln("[PackageManager] waiting for {d} tasks\n", .{this.pending_tasks}); + } + + if (this.pending_tasks > 0) + this.sleep(); } if (!installer.options.do.install_packages) return error.InstallFailed; @@ -7733,7 +7750,7 @@ pub const PackageManager = struct { Output.flush(); } - while (manager.pending_tasks > 0) : (manager.sleep()) { + while (manager.pending_tasks > 0) { try manager.runTasks( *PackageManager, manager, @@ -7746,6 +7763,13 @@ pub const PackageManager = struct { }, log_level, ); + + if (PackageManager.verbose_install and manager.pending_tasks > 0) { + Output.prettyErrorln("[PackageManager] waiting for {d} tasks\n", .{manager.pending_tasks}); + } + + if (manager.pending_tasks > 0) + manager.sleep(); } if (comptime log_level.showProgress()) { diff --git a/src/libarchive/libarchive.zig b/src/libarchive/libarchive.zig index c6276b1cc94ffc..ff514d72ddedc6 100644 --- a/src/libarchive/libarchive.zig +++ b/src/libarchive/libarchive.zig @@ -543,13 +543,13 @@ pub const Archive = struct { if (comptime Environment.isWindows) { std.os.mkdirat(dir_fd, pathname, @as(u32, @intCast(mode))) catch |err| { if (err == error.PathAlreadyExists or err == error.NotDir) break; - try dir.makePath(std.fs.path.dirname(slice) orelse return err); + try bun.makePath(dir, std.fs.path.dirname(slice) orelse return err); try std.os.mkdirat(dir_fd, pathname, 0o777); }; } else { std.os.mkdiratZ(dir_fd, pathname, @as(u32, @intCast(mode))) catch |err| { if (err == error.PathAlreadyExists or err == error.NotDir) break; - try dir.makePath(std.fs.path.dirname(slice) orelse return err); + try bun.makePath(dir, std.fs.path.dirname(slice) orelse return err); try std.os.mkdiratZ(dir_fd, pathname, 0o777); }; } diff --git a/src/sys.zig b/src/sys.zig index 14bb9ddf9d8348..b7542b44f3c5a2 100644 --- a/src/sys.zig +++ b/src/sys.zig @@ -819,6 +819,19 @@ pub fn readlink(in: [:0]const u8, buf: []u8) Maybe(usize) { unreachable; } +pub fn readlinkat(fd: bun.FileDescriptor, in: [:0]const u8, buf: []u8) Maybe(usize) { + while (true) { + const rc = sys.readlinkat(fd, in, buf.ptr, buf.len); + + if (Maybe(usize).errnoSys(rc, .readlink)) |err| { + if (err.getErrno() == .INTR) continue; + return err; + } + return Maybe(usize){ .result = @as(usize, @intCast(rc)) }; + } + unreachable; +} + pub fn ftruncate(fd: fd_t, size: isize) Maybe(void) { if (comptime Environment.isWindows) { if (kernel32.SetFileValidData(bun.fdcast(fd), size) == 0) { diff --git a/test/js/web/fetch/fetch.test.ts b/test/js/web/fetch/fetch.test.ts index 4f7b20d3c99cdd..8c5e27f884deeb 100644 --- a/test/js/web/fetch/fetch.test.ts +++ b/test/js/web/fetch/fetch.test.ts @@ -1702,3 +1702,95 @@ it("should throw RedirectURLTooLong when location is too long", async () => { expect(err.code).toStrictEqual("RedirectURLTooLong"); server.stop(true); }); + +it("304 not modified with missing content-length does not cause a request timeout", async () => { + const server = await Bun.listen({ + socket: { + open(socket) { + socket.write("HTTP/1.1 304 Not Modified\r\n\r\n"); + socket.flush(); + setTimeout(() => { + socket.end(); + }, 9999).unref(); + }, + data() {}, + close() {}, + }, + port: 0, + hostname: "localhost", + }); + + const response = await fetch(`http://${server.hostname}:${server.port}/`); + expect(response.status).toBe(304); + expect(await response.arrayBuffer()).toHaveLength(0); + server.stop(true); +}); + +it("304 not modified with missing content-length and connection close does not cause a request timeout", async () => { + const server = await Bun.listen({ + socket: { + open(socket) { + socket.write("HTTP/1.1 304 Not Modified\r\nConnection: close\r\n\r\n"); + socket.flush(); + setTimeout(() => { + socket.end(); + }, 9999).unref(); + }, + data() {}, + close() {}, + }, + port: 0, + hostname: "localhost", + }); + + const response = await fetch(`http://${server.hostname}:${server.port}/`); + expect(response.status).toBe(304); + expect(await response.arrayBuffer()).toHaveLength(0); + server.stop(true); +}); + +it("304 not modified with content-length 0 and connection close does not cause a request timeout", async () => { + const server = await Bun.listen({ + socket: { + open(socket) { + socket.write("HTTP/1.1 304 Not Modified\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"); + socket.flush(); + setTimeout(() => { + socket.end(); + }, 9999).unref(); + }, + data() {}, + close() {}, + }, + port: 0, + hostname: "localhost", + }); + + const response = await fetch(`http://${server.hostname}:${server.port}/`); + expect(response.status).toBe(304); + expect(await response.arrayBuffer()).toHaveLength(0); + server.stop(true); +}); + +it("304 not modified with 0 content-length does not cause a request timeout", async () => { + const server = await Bun.listen({ + socket: { + open(socket) { + socket.write("HTTP/1.1 304 Not Modified\r\nContent-Length: 0\r\n\r\n"); + socket.flush(); + setTimeout(() => { + socket.end(); + }, 9999).unref(); + }, + data() {}, + close() {}, + }, + port: 0, + hostname: "localhost", + }); + + const response = await fetch(`http://${server.hostname}:${server.port}/`); + expect(response.status).toBe(304); + expect(await response.arrayBuffer()).toHaveLength(0); + server.stop(true); +});