Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hang in bun install #6192

Merged
merged 1 commit into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/bun.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1906,3 +1906,35 @@ pub inline fn serializableInto(comptime T: type, init: anytype) T {

return result.*;
}

/// Like std.fs.Dir.makePath except instead of infinite looping on dangling
/// symlink, it deletes the symlink and tries again.
pub fn makePath(dir: std.fs.Dir, sub_path: []const u8) !void {
var it = try std.fs.path.componentIterator(sub_path);
var component = it.last() orelse return;
while (true) {
dir.makeDir(component.path) catch |err| switch (err) {
error.PathAlreadyExists => {
var path_buf2: [MAX_PATH_BYTES * 2]u8 = undefined;
copy(u8, &path_buf2, component.path);

path_buf2[component.path.len] = 0;
var path_to_use = path_buf2[0..component.path.len :0];
const result = sys.lstat(path_to_use);
try result.throw();
const is_dir = std.os.S.ISDIR(result.result.mode);
// dangling symlink
if (!is_dir) {
dir.deleteTree(component.path) catch {};
continue;
}
},
error.FileNotFound => |e| {
component = it.previous() orelse return e;
continue;
},
else => |e| return e,
};
component = it.next() orelse return;
}
}
51 changes: 36 additions & 15 deletions src/http_client_async.zig
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ pub const Signals = struct {
aborted: ?*std.atomic.Atomic(bool) = null,
cert_errors: ?*std.atomic.Atomic(bool) = null,

pub fn isEmpty(this: *const Signals) bool {
return this.aborted == null and this.body_streaming == null and this.header_progress == null and this.cert_errors == null;
}

pub const Store = struct {
header_progress: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
body_streaming: std.atomic.Atomic(bool) = std.atomic.Atomic(bool).init(false),
Expand Down Expand Up @@ -1339,7 +1343,7 @@ pub const InternalState = struct {
return this.transfer_encoding == Encoding.chunked;
}

pub fn reset(this: *InternalState, allocator: std.mem.Allocator) void {
pub fn reset(this: *InternalState, buffering: bool, allocator: std.mem.Allocator) void {
this.compressed_body.deinit();
this.response_message_buffer.deinit();

Expand All @@ -1350,9 +1354,12 @@ pub const InternalState = struct {
reader.deinit();
}

// if we are holding a cloned_metadata we need to deinit it
// this should never happen because we should always return the metadata to the user
std.debug.assert(this.cloned_metadata == null);
if (!buffering) {
// if we are holding a cloned_metadata we need to deinit it
// this should never happen because we should always return the metadata to the user
std.debug.assert(this.cloned_metadata == null);
}

// just in case we check and free to avoid leaks
if (this.cloned_metadata != null) {
this.cloned_metadata.?.deinit(allocator);
Expand Down Expand Up @@ -2179,7 +2186,7 @@ pub fn doRedirect(this: *HTTPClient) void {
this.fail(error.TooManyRedirects);
return;
}
this.state.reset(this.allocator);
this.state.reset(this.signals.isEmpty(), this.allocator);
// also reset proxy to redirect
this.proxy_tunneling = false;
if (this.proxy_tunnel != null) {
Expand Down Expand Up @@ -2673,7 +2680,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
}

var deferred_redirect: ?*URLBufferPool.Node = null;
const can_continue = this.handleResponseMetadata(
const should_continue = this.handleResponseMetadata(
&response,
// If there are multiple consecutive redirects
// and the redirect differs in hostname
Expand Down Expand Up @@ -2720,8 +2727,7 @@ pub fn onData(this: *HTTPClient, comptime is_ssl: bool, incoming_data: []const u
this.state.pending_response = response;
}

if (!can_continue) {
log("onData: can_continue is false", .{});
if (should_continue == .finished) {
// this means that the request ended
// clone metadata and return the progress at this point
this.cloneMetadata();
Expand Down Expand Up @@ -2901,7 +2907,7 @@ fn fail(this: *HTTPClient, err: anyerror) void {
_ = socket_async_http_abort_tracker.swapRemove(this.async_http_id);
}

this.state.reset(this.allocator);
this.state.reset(this.signals.isEmpty(), this.allocator);
this.proxy_tunneling = false;

this.state.request_stage = .fail;
Expand Down Expand Up @@ -2987,7 +2993,7 @@ pub fn progressUpdate(this: *HTTPClient, comptime is_ssl: bool, ctx: *NewHTTPCon
socket.close(0, null);
}

this.state.reset(this.allocator);
this.state.reset(this.signals.isEmpty(), this.allocator);
this.state.response_stage = .done;
this.state.request_stage = .done;
this.state.stage = .done;
Expand Down Expand Up @@ -3355,13 +3361,19 @@ fn handleResponseBodyChunkedEncodingFromSinglePacket(
unreachable;
}

const ShouldContinue = enum {
continue_streaming,
finished,
};

pub fn handleResponseMetadata(
this: *HTTPClient,
response: *picohttp.Response,
deferred_redirect: *?*URLBufferPool.Node,
) !bool {
) !ShouldContinue {
var location: string = "";
var pretend_304 = false;
var is_server_sent_events = false;
for (response.headers, 0..) |header, header_i| {
switch (hashHeaderName(header.name)) {
hashHeaderConst("Content-Length") => {
Expand All @@ -3373,6 +3385,11 @@ pub fn handleResponseMetadata(
this.state.content_length = 0;
}
},
hashHeaderConst("Content-Type") => {
if (strings.contains(header.value, "text/event-stream")) {
is_server_sent_events = true;
}
},
hashHeaderConst("Content-Encoding") => {
if (!this.disable_decompression) {
if (strings.eqlComptime(header.value, "gzip")) {
Expand Down Expand Up @@ -3429,8 +3446,8 @@ pub fn handleResponseMetadata(

if (this.proxy_tunneling and this.proxy_tunnel == null) {
if (response.status_code == 200) {
//signal to continue the proxing
return true;
// signal to continue the proxing
return ShouldContinue.finished;
}

//proxy denied connection so return proxy result (407, 403 etc)
Expand Down Expand Up @@ -3623,6 +3640,10 @@ pub fn handleResponseMetadata(
} else {
log("handleResponseMetadata: content_length is null and transfer_encoding {}", .{this.state.transfer_encoding});
}
// if no body is expected we should stop processing
return this.method.hasBody() and (content_length == null or content_length.? > 0 or this.state.transfer_encoding == .chunked);

if (this.method.hasBody() and ((content_length != null and content_length.? > 0) or !this.state.allow_keepalive or this.state.transfer_encoding == .chunked or is_server_sent_events)) {
return ShouldContinue.continue_streaming;
} else {
return ShouldContinue.finished;
}
}
34 changes: 29 additions & 5 deletions src/install/install.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,7 @@ pub const PackageManager = struct {

pub fn sleep(this: *PackageManager) void {
if (this.wait_count.swap(0, .Monotonic) > 0) return;
bun.Mimalloc.mi_collect(false);
Output.flush();
_ = this.waiter.wait() catch 0;
}

Expand Down Expand Up @@ -1829,7 +1829,7 @@ pub const PackageManager = struct {
switch (this.options.log_level) {
inline else => |log_level| {
if (log_level.showProgress()) this.startProgressBarIfNone();
while (this.pending_tasks > 0) : (this.sleep()) {
while (this.pending_tasks > 0) {
this.runTasks(
void,
{},
Expand All @@ -1843,6 +1843,13 @@ pub const PackageManager = struct {
) catch |err| {
return .{ .failure = err };
};

if (PackageManager.verbose_install and this.pending_tasks > 0) {
Output.prettyErrorln("<d>[PackageManager]<r> waiting for {d} tasks\n", .{this.pending_tasks});
}

if (this.pending_tasks > 0)
this.sleep();
}
},
}
Expand Down Expand Up @@ -7297,7 +7304,10 @@ pub const PackageManager = struct {
// We use this file descriptor to know where to put it.
installer.node_modules_folder = cwd.openIterableDir(node_modules.relative_path, .{}) catch brk: {
// Avoid extra mkdir() syscall
try cwd.makePath(bun.span(node_modules.relative_path));
//
// note: this will recursively delete any dangling symlinks
// in the next.js repo, it encounters a dangling symlink in node_modules/@next/codemod/node_modules/cheerio
try bun.makePath(cwd, bun.span(node_modules.relative_path));
break :brk try cwd.openIterableDir(node_modules.relative_path, .{});
};

Expand Down Expand Up @@ -7351,7 +7361,7 @@ pub const PackageManager = struct {
if (!installer.options.do.install_packages) return error.InstallFailed;
}

while (this.pending_tasks > 0 and installer.options.do.install_packages) : (this.sleep()) {
while (this.pending_tasks > 0 and installer.options.do.install_packages) {
try this.runTasks(
*PackageInstaller,
&installer,
Expand All @@ -7363,6 +7373,13 @@ pub const PackageManager = struct {
},
log_level,
);

if (PackageManager.verbose_install and this.pending_tasks > 0) {
Output.prettyErrorln("<d>[PackageManager]<r> waiting for {d} tasks\n", .{this.pending_tasks});
}

if (this.pending_tasks > 0)
this.sleep();
}

if (!installer.options.do.install_packages) return error.InstallFailed;
Expand Down Expand Up @@ -7733,7 +7750,7 @@ pub const PackageManager = struct {
Output.flush();
}

while (manager.pending_tasks > 0) : (manager.sleep()) {
while (manager.pending_tasks > 0) {
try manager.runTasks(
*PackageManager,
manager,
Expand All @@ -7746,6 +7763,13 @@ pub const PackageManager = struct {
},
log_level,
);

if (PackageManager.verbose_install and manager.pending_tasks > 0) {
Output.prettyErrorln("<d>[PackageManager]<r> waiting for {d} tasks\n", .{manager.pending_tasks});
}

if (manager.pending_tasks > 0)
manager.sleep();
}

if (comptime log_level.showProgress()) {
Expand Down
4 changes: 2 additions & 2 deletions src/libarchive/libarchive.zig
Original file line number Diff line number Diff line change
Expand Up @@ -543,13 +543,13 @@ pub const Archive = struct {
if (comptime Environment.isWindows) {
std.os.mkdirat(dir_fd, pathname, @as(u32, @intCast(mode))) catch |err| {
if (err == error.PathAlreadyExists or err == error.NotDir) break;
try dir.makePath(std.fs.path.dirname(slice) orelse return err);
try bun.makePath(dir, std.fs.path.dirname(slice) orelse return err);
try std.os.mkdirat(dir_fd, pathname, 0o777);
};
} else {
std.os.mkdiratZ(dir_fd, pathname, @as(u32, @intCast(mode))) catch |err| {
if (err == error.PathAlreadyExists or err == error.NotDir) break;
try dir.makePath(std.fs.path.dirname(slice) orelse return err);
try bun.makePath(dir, std.fs.path.dirname(slice) orelse return err);
try std.os.mkdiratZ(dir_fd, pathname, 0o777);
};
}
Expand Down
13 changes: 13 additions & 0 deletions src/sys.zig
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,19 @@ pub fn readlink(in: [:0]const u8, buf: []u8) Maybe(usize) {
unreachable;
}

pub fn readlinkat(fd: bun.FileDescriptor, in: [:0]const u8, buf: []u8) Maybe(usize) {
while (true) {
const rc = sys.readlinkat(fd, in, buf.ptr, buf.len);

if (Maybe(usize).errnoSys(rc, .readlink)) |err| {
if (err.getErrno() == .INTR) continue;
return err;
}
return Maybe(usize){ .result = @as(usize, @intCast(rc)) };
}
unreachable;
}

pub fn ftruncate(fd: fd_t, size: isize) Maybe(void) {
if (comptime Environment.isWindows) {
if (kernel32.SetFileValidData(bun.fdcast(fd), size) == 0) {
Expand Down
92 changes: 92 additions & 0 deletions test/js/web/fetch/fetch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1702,3 +1702,95 @@ it("should throw RedirectURLTooLong when location is too long", async () => {
expect(err.code).toStrictEqual("RedirectURLTooLong");
server.stop(true);
});

it("304 not modified with missing content-length does not cause a request timeout", async () => {
const server = await Bun.listen({
socket: {
open(socket) {
socket.write("HTTP/1.1 304 Not Modified\r\n\r\n");
socket.flush();
setTimeout(() => {
socket.end();
}, 9999).unref();
},
data() {},
close() {},
},
port: 0,
hostname: "localhost",
});

const response = await fetch(`http://${server.hostname}:${server.port}/`);
expect(response.status).toBe(304);
expect(await response.arrayBuffer()).toHaveLength(0);
server.stop(true);
});

it("304 not modified with missing content-length and connection close does not cause a request timeout", async () => {
const server = await Bun.listen({
socket: {
open(socket) {
socket.write("HTTP/1.1 304 Not Modified\r\nConnection: close\r\n\r\n");
socket.flush();
setTimeout(() => {
socket.end();
}, 9999).unref();
},
data() {},
close() {},
},
port: 0,
hostname: "localhost",
});

const response = await fetch(`http://${server.hostname}:${server.port}/`);
expect(response.status).toBe(304);
expect(await response.arrayBuffer()).toHaveLength(0);
server.stop(true);
});

it("304 not modified with content-length 0 and connection close does not cause a request timeout", async () => {
const server = await Bun.listen({
socket: {
open(socket) {
socket.write("HTTP/1.1 304 Not Modified\r\nConnection: close\r\nContent-Length: 0\r\n\r\n");
socket.flush();
setTimeout(() => {
socket.end();
}, 9999).unref();
},
data() {},
close() {},
},
port: 0,
hostname: "localhost",
});

const response = await fetch(`http://${server.hostname}:${server.port}/`);
expect(response.status).toBe(304);
expect(await response.arrayBuffer()).toHaveLength(0);
server.stop(true);
});

it("304 not modified with 0 content-length does not cause a request timeout", async () => {
const server = await Bun.listen({
socket: {
open(socket) {
socket.write("HTTP/1.1 304 Not Modified\r\nContent-Length: 0\r\n\r\n");
socket.flush();
setTimeout(() => {
socket.end();
}, 9999).unref();
},
data() {},
close() {},
},
port: 0,
hostname: "localhost",
});

const response = await fetch(`http://${server.hostname}:${server.port}/`);
expect(response.status).toBe(304);
expect(await response.arrayBuffer()).toHaveLength(0);
server.stop(true);
});
Loading