From 3434cdc83a2d70e4cae29c2d4a1f3f7051c1773d Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Tue, 30 Apr 2024 22:33:04 -0700 Subject: [PATCH 01/33] node:zlib: implement brotliCompress and brotliDecompress --- src/brotli.zig | 127 +++- src/bun.js/api/brotli.classes.ts | 63 ++ src/bun.js/api/brotli.zig | 569 ++++++++++++++++++ .../bindings/generated_classes_list.zig | 2 + src/bun.js/event_loop.zig | 13 + src/bun.js/node/node_zlib_binding.zig | 15 + src/bun.js/node/types.zig | 16 +- src/deps/brotli_decoder.zig | 9 +- src/deps/brotli_encoder.zig | 141 +++++ src/http.zig | 3 +- src/js/node/zlib.ts | 25 +- src/jsc.zig | 2 + test/js/node/zlib/zlib.test.js | 37 +- 13 files changed, 990 insertions(+), 32 deletions(-) create mode 100644 src/bun.js/api/brotli.classes.ts create mode 100644 src/bun.js/api/brotli.zig create mode 100644 src/bun.js/node/node_zlib_binding.zig create mode 100644 src/deps/brotli_encoder.zig diff --git a/src/brotli.zig b/src/brotli.zig index 755f8ec794ea48..36d89c73810600 100644 --- a/src/brotli.zig +++ b/src/brotli.zig @@ -1,7 +1,11 @@ const bun = @import("root").bun; const std = @import("std"); -const c = @import("./deps/brotli_decoder.zig"); +const c = struct { + pub usingnamespace @import("./deps/brotli_decoder.zig"); + pub usingnamespace @import("./deps/brotli_encoder.zig"); +}; const BrotliDecoder = c.BrotliDecoder; +const BrotliEncoder = c.BrotliEncoder; const mimalloc = bun.Mimalloc; @@ -15,7 +19,7 @@ const BrotliAllocator = struct { return mimalloc.mi_malloc(len) orelse unreachable; } - pub fn free(_: ?*anyopaque, data: *anyopaque) callconv(.C) void { + pub fn free(_: ?*anyopaque, data: ?*anyopaque) callconv(.C) void { if (comptime bun.is_heap_breakdown_enabled) { const zone = bun.HeapBreakdown.malloc_zone_t.get(BrotliAllocator); zone.malloc_zone_free(data); @@ -26,7 +30,7 @@ const BrotliAllocator = struct { } }; -pub const Options = struct { +pub const DecoderOptions = struct { pub const Params = std.enums.EnumFieldStruct(c.BrotliDecoderParameter, bool, false); params: Params = Params{ @@ -54,7 +58,11 @@ pub const BrotliReaderArrayList = struct { pub usingnamespace bun.New(BrotliReaderArrayList); - pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) !*BrotliReaderArrayList { + pub fn newWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: DecoderOptions) !*BrotliReaderArrayList { + return BrotliReaderArrayList.new(try initWithOptions(input, list, allocator, options)); + } + + pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: DecoderOptions) !BrotliReaderArrayList { if (!BrotliDecoder.initializeBrotli()) { return error.BrotliFailedToLoad; } @@ -67,15 +75,13 @@ pub const BrotliReaderArrayList = struct { bun.assert(list.items.ptr != input.ptr); - return BrotliReaderArrayList.new( - .{ - .input = input, - .list_ptr = list, - .list = list.*, - .list_allocator = allocator, - .brotli = brotli, - }, - ); + return .{ + .input = input, + .list_ptr = list, + .list = list.*, + .list_allocator = allocator, + .brotli = brotli, + }; } pub fn end(this: *BrotliReaderArrayList) void { @@ -83,9 +89,7 @@ pub const BrotliReaderArrayList = struct { } pub fn readAll(this: *BrotliReaderArrayList, is_done: bool) !void { - defer { - this.list_ptr.* = this.list; - } + defer this.list_ptr.* = this.list; if (this.state == .End or this.state == .Error) { return; @@ -166,3 +170,94 @@ pub const BrotliReaderArrayList = struct { this.destroy(); } }; + +pub const BrotliCompressionStream = struct { + pub const State = enum { + Inflating, + End, + Error, + }; + + brotli: *BrotliEncoder, + state: State = State.Inflating, + total_out: usize = 0, + total_in: usize = 0, + + pub fn init() !BrotliCompressionStream { + const instance = BrotliEncoder.createInstance(&BrotliAllocator.alloc, &BrotliAllocator.free, null) orelse return error.BrotliFailedToCreateInstance; + + return BrotliCompressionStream{ + .brotli = instance, + }; + } + + pub fn writeChunk(this: *BrotliCompressionStream, input: []const u8, last: bool) ![]const u8 { + const result = this.brotli.compressStream(if (last) BrotliEncoder.Operation.finish else .process, input); + + if (!result.success) { + this.state = .Error; + return error.BrotliCompressionError; + } + + return result.output; + } + + pub fn write(this: *BrotliCompressionStream, input: []const u8, last: bool) ![]const u8 { + if (this.state == .End or this.state == .Error) { + return ""; + } + + return this.writeChunk(input, last); + } + + pub fn end(this: *BrotliCompressionStream) ![]const u8 { + defer this.state = .End; + + return try this.write("", true); + } + + pub fn deinit(this: *BrotliCompressionStream) void { + this.brotli.destroyInstance(); + } + + fn NewWriter(comptime InputWriter: type) type { + return struct { + compressor: *BrotliCompressionStream, + input_writer: InputWriter, + + const Self = @This(); + pub const WriteError = error{BrotliCompressionError} || InputWriter.Error; + pub const Writer = std.io.Writer(@This(), WriteError, Self.write); + + pub fn init(compressor: *BrotliCompressionStream, input_writer: InputWriter) Self { + return Self{ + .compressor = compressor, + .input_writer = input_writer, + }; + } + + pub fn write(self: Self, to_compress: []const u8) WriteError!usize { + const decompressed = try self.compressor.write(to_compress, false); + try self.input_writer.writeAll(decompressed); + return to_compress.len; + } + + pub fn end(self: Self) !usize { + const decompressed = try self.compressor.end(); + try self.input_writer.writeAll(decompressed); + } + + pub fn writer(self: Self) Writer { + return Writer{ .context = self }; + } + }; + } + + pub fn writerContext(this: *BrotliCompressionStream, writable: anytype) NewWriter(@TypeOf(writable)) { + return NewWriter(@TypeOf(writable)).init(this, writable); + } + + pub fn writer(this: *BrotliCompressionStream, writable: anytype) NewWriter(@TypeOf(writable)).Writer { + return this.writerContext(writable).writer(); + } +}; diff --git a/src/bun.js/api/brotli.classes.ts b/src/bun.js/api/brotli.classes.ts new file mode 100644 index 00000000000000..2095a2c10433c9 --- /dev/null +++ b/src/bun.js/api/brotli.classes.ts @@ -0,0 +1,63 @@ +import { define } from "../../codegen/class-definitions"; + +export default [ + define({ + name: "BrotliEncoder", + construct: true, + noConstructor: true, + finalize: true, + configurable: false, + hasPendingActivity: true, + klass: {}, + JSType: "0b11101110", + values: ["callback"], + proto: { + encode: { + fn: "encode", + length: 2, + }, + encodeSync: { + fn: "encodeSync", + length: 2, + }, + end: { + fn: "end", + length: 2, + }, + endSync: { + fn: "endSync", + length: 2, + }, + }, + }), + define({ + name: "BrotliDecoder", + construct: true, + noConstructor: true, + finalize: true, + configurable: false, + hasPendingActivity: true, + klass: {}, + JSType: "0b11101110", + values: ["callback"], + + proto: { + decode: { + fn: "decode", + length: 2, + }, + decodeSync: { + fn: "decodeSync", + length: 2, + }, + end: { + fn: "end", + length: 2, + }, + endSync: { + fn: "endSync", + length: 2, + }, + }, + }), +]; diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig new file mode 100644 index 00000000000000..a14e3a21cefd5c --- /dev/null +++ b/src/bun.js/api/brotli.zig @@ -0,0 +1,569 @@ +const bun = @import("root").bun; +const JSC = bun.JSC; +const std = @import("std"); +const brotli = bun.brotli; + +const Queue = std.fifo.LinearFifo(JSC.Node.BlobOrStringOrBuffer, .Dynamic); + +fn ConcurrentByteProcessor(comptime Processor: type) type { + _ = Processor; // autofix + return struct {}; +} + +pub const BrotliEncoder = struct { + pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub usingnamespace JSC.Codegen.JSBrotliEncoder; + + stream: brotli.BrotliCompressionStream, + + freelist: Queue = Queue.init(bun.default_allocator), + freelist_write_lock: bun.Lock = bun.Lock.init(), + + globalThis: *JSC.JSGlobalObject, + + input: Queue = Queue.init(bun.default_allocator), + input_lock: bun.Lock = bun.Lock.init(), + + has_called_end: bool = false, + callback_value: JSC.Strong = .{}, + + output: std.ArrayListUnmanaged(u8) = .{}, + output_lock: bun.Lock = bun.Lock.init(), + + has_pending_activity: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + pending_encode_job_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + ref_count: u32 = 1, + write_failed: bool = false, + poll_ref: bun.Async.KeepAlive = .{}, + + pub fn hasPendingActivity(this: *BrotliEncoder) callconv(.C) bool { + return this.has_pending_activity.load(.Monotonic) > 0; + } + + pub fn constructor(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) ?*BrotliEncoder { + globalThis.throw("BrotliEncoder is not constructable", .{}); + return null; + } + + pub fn create(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3).slice(); + + if (arguments.len < 3) { + globalThis.throwNotEnoughArguments("BrotliEncoder", 3, arguments.len); + return .zero; + } + + const callback = arguments[2]; + + if (!callback.isCallable(globalThis.vm())) { + globalThis.throwInvalidArguments("BrotliEncoder callback is not callable", .{}); + return .zero; + } + + var this: *BrotliEncoder = BrotliEncoder.new(.{ + .globalThis = globalThis, + .stream = brotli.BrotliCompressionStream.init() catch { + globalThis.throw("Failed to create BrotliEncoder", .{}); + return .zero; + }, + }); + + const out = this.toJS(globalThis); + @This().callbackSetCached(out, globalThis, callback); + this.callback_value.set(globalThis, callback); + + return out; + } + + pub fn finalize(this: *BrotliEncoder) callconv(.C) void { + this.deref(); + } + + pub fn deinit(this: *BrotliEncoder) void { + this.callback_value.deinit(); + this.drainFreelist(); + this.output.deinit(bun.default_allocator); + this.stream.deinit(); + this.input.deinit(); + } + + fn drainFreelist(this: *BrotliEncoder) void { + this.freelist_write_lock.lock(); + defer this.freelist_write_lock.unlock(); + const to_free = this.freelist.readableSlice(0); + for (to_free) |*input| { + input.deinit(); + } + this.freelist.discard(to_free.len); + } + + pub fn runFromJSThread(this: *BrotliEncoder) void { + this.poll_ref.unref(this.globalThis.bunVM()); + + defer this.deref(); + this.drainFreelist(); + + const value = brk: { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len == 0) + return; + + if (this.output.items.len > 16 * 1024) { + defer this.output.items = &.{}; + break :brk JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.clearRetainingCapacity(); + break :brk JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + }; + + const result = this.callback_value.get().?.call(this.globalThis, &.{ + if (this.write_failed) + this.globalThis.createErrorInstance("BrotliError", .{}) + else + JSC.JSValue.null, + value, + }); + + if (result.toError()) |err| { + this.globalThis.bunVM().runErrorHandler(err, null); + } + } + + // We can only run one encode job at a time + // But we don't have an idea of a serial dispatch queue + // So instead, we let you enqueue as many times as you want + // and if one is already running, we just don't do anything + const EncodeJob = struct { + task: JSC.WorkPoolTask = .{ .callback = &runTask }, + encoder: *BrotliEncoder, + + pub usingnamespace bun.New(@This()); + + pub fn run(this: *EncodeJob) void { + defer { + _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); + this.encoder.deref(); + this.destroy(); + } + + var any = false; + + if (this.encoder.pending_encode_job_count.fetchAdd(1, .Monotonic) == 0) { + var is_last = false; + while (true) { + const pending: []bun.JSC.Node.BlobOrStringOrBuffer = brk: { + this.encoder.input_lock.lock(); + defer this.encoder.input_lock.unlock(); + is_last = this.encoder.has_called_end; + const readable = this.encoder.input.readableSlice(0); + const out = bun.default_allocator.dupe(std.meta.Child(@TypeOf(readable)), readable) catch bun.outOfMemory(); + this.encoder.input.discard(readable.len); + break :brk out; + }; + defer bun.default_allocator.free(pending); + const Writer = struct { + encoder: *BrotliEncoder, + + pub const Error = error{OutOfMemory}; + pub fn writeAll(writer: @This(), chunk: []const u8) Error!void { + writer.encoder.output_lock.lock(); + defer writer.encoder.output_lock.unlock(); + + try writer.encoder.output.appendSlice(bun.default_allocator, chunk); + } + }; + + defer { + this.encoder.freelist_write_lock.lock(); + this.encoder.freelist.write(pending) catch unreachable; + this.encoder.freelist_write_lock.unlock(); + } + for (pending) |input| { + var writer = this.encoder.stream.writer(Writer{ .encoder = this.encoder }); + writer.writeAll(input.slice()) catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }; + } + + any = any or pending.len > 0; + + if (this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic) == 0) + break; + } + + if (is_last and any) { + var output = &this.encoder.output; + this.encoder.output_lock.lock(); + defer { + this.encoder.output_lock.unlock(); + } + + output.appendSlice(bun.default_allocator, this.encoder.stream.end() catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }) catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }; + } + } + + if (any) { + var vm = this.encoder.globalThis.bunVMConcurrently(); + this.encoder.ref(); + this.encoder.poll_ref.refConcurrently(vm); + vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.encoder))); + } + } + + pub fn runTask(this: *JSC.WorkPoolTask) void { + var job: *EncodeJob = @fieldParentPtr(EncodeJob, "task", this); + job.run(); + } + }; + + pub fn encode(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.encode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.encode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.encode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task = EncodeJob.new(.{ + .encoder = this, + }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + this.ref(); + JSC.WorkPool.schedule(&task.task); + + return .undefined; + } + + pub fn encodeSync(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + pub fn end(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + pub fn endSync(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } +}; + +pub const BrotliDecoder = struct { + pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub usingnamespace JSC.Codegen.JSBrotliDecoder; + + globalThis: *JSC.JSGlobalObject, + stream: brotli.BrotliReaderArrayList, + + has_pending_activity: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + ref_count: u32 = 1, + poll_ref: bun.Async.KeepAlive = .{}, + write_failed: bool = false, + callback_value: JSC.Strong = .{}, + has_called_end: bool = false, + pending_decode_job_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + + input: Queue = Queue.init(bun.default_allocator), + input_lock: bun.Lock = bun.Lock.init(), + + output: std.ArrayListUnmanaged(u8) = .{}, + output_lock: bun.Lock = bun.Lock.init(), + + freelist: Queue = Queue.init(bun.default_allocator), + freelist_write_lock: bun.Lock = bun.Lock.init(), + + pub fn hasPendingActivity(this: *BrotliDecoder) callconv(.C) bool { + return this.has_pending_activity.load(.Monotonic) > 0; + } + + pub fn deinit(this: *BrotliDecoder) void { + this.callback_value.deinit(); + this.drainFreelist(); + this.output.deinit(bun.default_allocator); + this.stream.brotli.destroyInstance(); + this.input.deinit(); + } + + pub fn constructor(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) ?*BrotliDecoder { + globalThis.throw("Crypto is not constructable", .{}); + return null; + } + + pub fn create(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3).slice(); + + if (arguments.len < 3) { + globalThis.throwNotEnoughArguments("BrotliDecoder", 3, arguments.len); + return .zero; + } + + const callback = arguments[2]; + + if (!callback.isCallable(globalThis.vm())) { + globalThis.throwInvalidArguments("BrotliDecoder callback is not callable", .{}); + return .zero; + } + + var this: *BrotliDecoder = BrotliDecoder.new(.{ + .globalThis = globalThis, + .stream = undefined, // &this.output needs to be a stable pointer + }); + this.stream = brotli.BrotliReaderArrayList.initWithOptions("", bun.new(std.ArrayListUnmanaged(u8), .{}), bun.default_allocator, .{}) catch { + globalThis.throw("Failed to create BrotliDecoder", .{}); + return .zero; + }; + + const out = this.toJS(globalThis); + @This().callbackSetCached(out, globalThis, callback); + this.callback_value.set(globalThis, callback); + + return out; + } + + pub fn finalize(this: *BrotliDecoder) callconv(.C) void { + this.deref(); + } + + pub fn runFromJSThread(this: *BrotliDecoder) void { + this.poll_ref.unref(this.globalThis.bunVM()); + + defer this.deref(); + this.drainFreelist(); + + const value = brk: { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len == 0) + return; + + if (this.output.items.len > 16 * 1024) { + defer this.output.items = &.{}; + break :brk JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.clearRetainingCapacity(); + break :brk JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + }; + + const result = this.callback_value.get().?.call(this.globalThis, &.{ + if (this.write_failed) + this.globalThis.createErrorInstance("BrotliError", .{}) + else + JSC.JSValue.null, + value, + }); + + if (result.toError()) |err| { + this.globalThis.bunVM().runErrorHandler(err, null); + } + } + + fn drainFreelist(this: *BrotliDecoder) void { + this.freelist_write_lock.lock(); + defer this.freelist_write_lock.unlock(); + const to_free = this.freelist.readableSlice(0); + for (to_free) |*input| { + input.deinit(); + } + this.freelist.discard(to_free.len); + } + + pub fn decode(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.decode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.decode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.decode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task = DecodeJob.new(.{ + .decoder = this, + }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + this.ref(); + JSC.WorkPool.schedule(&task.task); + + return .undefined; + } + + pub fn decodeSync(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + // We can only run one decode job at a time + // But we don't have an idea of a serial dispatch queue + // So instead, we let you enqueue as many times as you want + // and if one is already running, we just don't do anything + const DecodeJob = struct { + task: JSC.WorkPoolTask = .{ .callback = &runTask }, + decoder: *BrotliDecoder, + + pub usingnamespace bun.New(@This()); + + pub fn runTask(this: *JSC.WorkPoolTask) void { + var job: *DecodeJob = @fieldParentPtr(DecodeJob, "task", this); + job.run(); + } + + pub fn run(this: *DecodeJob) void { + defer { + _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); + this.decoder.deref(); + this.destroy(); + } + + var any = false; + + if (this.decoder.pending_decode_job_count.fetchAdd(1, .Monotonic) == 0) { + var is_last = false; + while (true) { + const pending: []bun.JSC.Node.BlobOrStringOrBuffer = brk: { + this.decoder.input_lock.lock(); + defer this.decoder.input_lock.unlock(); + is_last = this.decoder.has_called_end; + const readable = this.decoder.input.readableSlice(0); + const out = bun.default_allocator.dupe(std.meta.Child(@TypeOf(readable)), readable) catch bun.outOfMemory(); + this.decoder.input.discard(readable.len); + break :brk out; + }; + defer bun.default_allocator.free(pending); + + defer { + this.decoder.freelist_write_lock.lock(); + this.decoder.freelist.write(pending) catch unreachable; + this.decoder.freelist_write_lock.unlock(); + } + for (pending) |input| { + this.decoder.output_lock.lock(); + defer this.decoder.output_lock.unlock(); + + this.decoder.stream.input = input.slice(); + this.decoder.stream.readAll(false) catch { + _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); + this.decoder.write_failed = true; + return; + }; + } + + any = any or pending.len > 0; + + if (this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic) == 0) + break; + } + + if (is_last and any) { + var output = &this.decoder.output; + this.decoder.output_lock.lock(); + defer this.decoder.output_lock.unlock(); + + output.appendSlice(bun.default_allocator, this.decoder.stream.list.items) catch { + _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); + this.decoder.write_failed = true; + return; + }; + // this.decoder.stream.end(); + } + } + + if (any) { + var vm = this.decoder.globalThis.bunVMConcurrently(); + this.decoder.ref(); + this.decoder.poll_ref.refConcurrently(vm); + vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.decoder))); + } + } + }; + + pub fn end(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + pub fn endSync(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } +}; diff --git a/src/bun.js/bindings/generated_classes_list.zig b/src/bun.js/bindings/generated_classes_list.zig index f154e6c85cb795..2ff83686802a68 100644 --- a/src/bun.js/bindings/generated_classes_list.zig +++ b/src/bun.js/bindings/generated_classes_list.zig @@ -70,4 +70,6 @@ pub const Classes = struct { pub const FileInternalReadableStreamSource = JSC.WebCore.FileReader.Source; pub const BlobInternalReadableStreamSource = JSC.WebCore.ByteBlobLoader.Source; pub const BytesInternalReadableStreamSource = JSC.WebCore.ByteStream.Source; + pub const BrotliEncoder = JSC.API.BrotliEncoder; + pub const BrotliDecoder = JSC.API.BrotliDecoder; }; diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 4196fd356dc53d..dc12ab3266beaa 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -352,6 +352,9 @@ const Futimes = JSC.Node.Async.futimes; const Lchmod = JSC.Node.Async.lchmod; const Lchown = JSC.Node.Async.lchown; const Unlink = JSC.Node.Async.unlink; +const BrotliDecoder = JSC.API.BrotliDecoder; +const BrotliEncoder = JSC.API.BrotliEncoder; + const ShellGlobTask = bun.shell.interpret.Interpreter.Expansion.ShellGlobTask; const ShellRmTask = bun.shell.Interpreter.Builtin.Rm.ShellRmTask; const ShellRmDirTask = bun.shell.Interpreter.Builtin.Rm.ShellRmTask.DirTask; @@ -430,6 +433,8 @@ pub const Task = TaggedPointerUnion(.{ Lchmod, Lchown, Unlink, + BrotliEncoder, + BrotliDecoder, ShellGlobTask, ShellRmTask, ShellRmDirTask, @@ -1182,6 +1187,14 @@ pub const EventLoop = struct { var any: *Unlink = task.get(Unlink).?; any.runFromJSThread(); }, + @field(Task.Tag, typeBaseName(@typeName(BrotliEncoder))) => { + var any: *BrotliEncoder = task.get(BrotliEncoder).?; + any.runFromJSThread(); + }, + @field(Task.Tag, typeBaseName(@typeName(BrotliDecoder))) => { + var any: *BrotliDecoder = task.get(BrotliDecoder).?; + any.runFromJSThread(); + }, @field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => { bun.markPosixOnly(); var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?; diff --git a/src/bun.js/node/node_zlib_binding.zig b/src/bun.js/node/node_zlib_binding.zig new file mode 100644 index 00000000000000..ee511d5167bc65 --- /dev/null +++ b/src/bun.js/node/node_zlib_binding.zig @@ -0,0 +1,15 @@ +const std = @import("std"); +const bun = @import("root").bun; +const Environment = bun.Environment; +const JSC = bun.JSC; +const string = bun.string; +const Output = bun.Output; +const ZigString = JSC.ZigString; + +pub fn createBrotliEncoder(global: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + return JSC.JSFunction.create(global, "createBrotliEncoder", bun.JSC.API.BrotliEncoder.create, 3, .{}); +} + +pub fn createBrotliDecoder(global: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + return JSC.JSFunction.create(global, "createBrotliDecoder", bun.JSC.API.BrotliDecoder.create, 3, .{}); +} diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 2c388d4b00ee35..544f3f75000159 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -341,6 +341,10 @@ pub const BlobOrStringOrBuffer = union(enum) { } pub fn fromJSWithEncodingValue(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue) ?BlobOrStringOrBuffer { + return fromJSWithEncodingValueMaybeAsync(global, allocator, value, encoding_value, false); + } + + pub fn fromJSWithEncodingValueMaybeAsync(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue, is_async: bool) ?BlobOrStringOrBuffer { if (value.as(JSC.WebCore.Blob)) |blob| { if (blob.store) |store| { store.ref(); @@ -349,7 +353,7 @@ pub const BlobOrStringOrBuffer = union(enum) { return .{ .blob = blob.* }; } - return .{ .string_or_buffer = StringOrBuffer.fromJSWithEncodingValue(global, allocator, value, encoding_value) orelse return null }; + return .{ .string_or_buffer = StringOrBuffer.fromJSWithEncodingValueMaybeAsync(global, allocator, value, encoding_value, is_async) orelse return null }; } }; @@ -533,6 +537,16 @@ pub const StringOrBuffer = union(enum) { return fromJSWithEncoding(global, allocator, value, encoding); } + + pub fn fromJSWithEncodingValueMaybeAsync(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue, maybe_async: bool) ?StringOrBuffer { + const encoding: Encoding = brk: { + if (!encoding_value.isCell()) + break :brk .utf8; + break :brk Encoding.fromJS(encoding_value, global) orelse .utf8; + }; + + return fromJSWithEncodingMaybeAsync(global, allocator, value, encoding, maybe_async); + } }; pub const ErrorCode = @import("./nodejs_error_code.zig").Code; diff --git a/src/deps/brotli_decoder.zig b/src/deps/brotli_decoder.zig index 58878dfa82ee3a..a3578cb041b00d 100644 --- a/src/deps/brotli_decoder.zig +++ b/src/deps/brotli_decoder.zig @@ -51,14 +51,7 @@ pub const BrotliDecoder = opaque { } pub fn decompressStream(state: *BrotliDecoder, available_in: *usize, next_in: *?[*]const u8, available_out: *usize, next_out: *?[*]u8, total_out: ?*usize) callconv(.C) BrotliDecoderResult { - return BrotliDecoderDecompressStream( - state, - available_in, - next_in, - available_out, - next_out, - total_out, - ); + return BrotliDecoderDecompressStream(state, available_in, next_in, available_out, next_out, total_out); } pub fn hasMoreOutput(state: *const BrotliDecoder) callconv(.C) bool { diff --git a/src/deps/brotli_encoder.zig b/src/deps/brotli_encoder.zig new file mode 100644 index 00000000000000..9bd8614205739e --- /dev/null +++ b/src/deps/brotli_encoder.zig @@ -0,0 +1,141 @@ +const bun = @import("root").bun; +const std = @import("std"); + +pub const brotli_alloc_func = ?*const fn (?*anyopaque, usize) callconv(.C) ?*anyopaque; +pub const brotli_free_func = ?*const fn (?*anyopaque, ?*anyopaque) callconv(.C) void; +pub const struct_BrotliSharedDictionaryStruct = opaque {}; +pub const BrotliSharedDictionary = struct_BrotliSharedDictionaryStruct; +pub const BROTLI_SHARED_DICTIONARY_RAW: c_int = 0; +pub const BROTLI_SHARED_DICTIONARY_SERIALIZED: c_int = 1; +pub const enum_BrotliSharedDictionaryType = c_uint; +pub const BrotliSharedDictionaryType = enum_BrotliSharedDictionaryType; +extern fn BrotliSharedDictionaryCreateInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) ?*BrotliSharedDictionary; +extern fn BrotliSharedDictionaryDestroyInstance(dict: ?*BrotliSharedDictionary) void; +extern fn BrotliSharedDictionaryAttach(dict: ?*BrotliSharedDictionary, @"type": BrotliSharedDictionaryType, data_size: usize, data: [*c]const u8) c_int; +pub const BROTLI_MODE_GENERIC: c_int = 0; +pub const BROTLI_MODE_TEXT: c_int = 1; +pub const BROTLI_MODE_FONT: c_int = 2; +pub const BrotliEncoderMode = enum(c_uint) { + generic = 0, + text = 1, + font = 2, +}; +pub const BROTLI_OPERATION_PROCESS: c_int = 0; +pub const BROTLI_OPERATION_FLUSH: c_int = 1; +pub const BROTLI_OPERATION_FINISH: c_int = 2; +pub const BROTLI_OPERATION_EMIT_METADATA: c_int = 3; + +pub const BROTLI_PARAM_MODE: c_int = 0; +pub const BROTLI_PARAM_QUALITY: c_int = 1; +pub const BROTLI_PARAM_LGWIN: c_int = 2; +pub const BROTLI_PARAM_LGBLOCK: c_int = 3; +pub const BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING: c_int = 4; +pub const BROTLI_PARAM_SIZE_HINT: c_int = 5; +pub const BROTLI_PARAM_LARGE_WINDOW: c_int = 6; +pub const BROTLI_PARAM_NPOSTFIX: c_int = 7; +pub const BROTLI_PARAM_NDIRECT: c_int = 8; +pub const BROTLI_PARAM_STREAM_OFFSET: c_int = 9; +pub const BrotliEncoderParameter = enum(c_uint) { + mode = 0, + quality = 1, + lgwin = 2, + lgblock = 3, + disable_literal_context_modeling = 4, + size_hint = 5, + large_window = 6, + npostfix = 7, + ndirect = 8, + stream_offset = 9, +}; +pub const BrotliEncoder = opaque { + pub const Operation = enum(c_uint) { + process = 0, + flush = 1, + finish = 2, + emit_metadata = 3, + }; + + extern fn BrotliEncoderSetParameter(state: *BrotliEncoder, param: BrotliEncoderParameter, value: u32) c_int; + extern fn BrotliEncoderCreateInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) *BrotliEncoder; + extern fn BrotliEncoderDestroyInstance(state: *BrotliEncoder) void; + pub const struct_BrotliEncoderPreparedDictionaryStruct = opaque {}; + pub const BrotliEncoderPreparedDictionary = struct_BrotliEncoderPreparedDictionaryStruct; + extern fn BrotliEncoderPrepareDictionary(@"type": BrotliSharedDictionaryType, data_size: usize, data: [*c]const u8, quality: c_int, alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) *BrotliEncoderPreparedDictionary; + extern fn BrotliEncoderDestroyPreparedDictionary(dictionary: *BrotliEncoderPreparedDictionary) void; + extern fn BrotliEncoderAttachPreparedDictionary(state: *BrotliEncoder, dictionary: ?*const BrotliEncoderPreparedDictionary) c_int; + extern fn BrotliEncoderMaxCompressedSize(input_size: usize) usize; + extern fn BrotliEncoderCompress(quality: c_int, lgwin: c_int, mode: BrotliEncoderMode, input_size: usize, input_buffer: [*]const u8, encoded_size: *usize, encoded_buffer: [*]u8) c_int; + extern fn BrotliEncoderCompressStream(state: *BrotliEncoder, op: Operation, available_in: *usize, next_in: *?[*]const u8, available_out: *usize, next_out: ?[*]u8, total_out: ?*usize) c_int; + extern fn BrotliEncoderIsFinished(state: *BrotliEncoder) c_int; + extern fn BrotliEncoderHasMoreOutput(state: *BrotliEncoder) c_int; + extern fn BrotliEncoderTakeOutput(state: *BrotliEncoder, size: *usize) ?[*]const u8; + extern fn BrotliEncoderEstimatePeakMemoryUsage(quality: c_int, lgwin: c_int, input_size: usize) usize; + extern fn BrotliEncoderGetPreparedDictionarySize(dictionary: ?*const BrotliEncoderPreparedDictionary) usize; + extern fn BrotliEncoderVersion() u32; + + pub fn createInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) callconv(.C) ?*BrotliEncoder { + return BrotliEncoderCreateInstance(alloc_func, free_func, @"opaque"); + } + + pub fn destroyInstance(state: *BrotliEncoder) callconv(.C) void { + return BrotliEncoderDestroyInstance(state); + } + + pub fn hasMoreOutput(state: *BrotliEncoder) callconv(.C) bool { + return BrotliEncoderHasMoreOutput(state) > 0; + } + + pub fn takeOutput(state: *BrotliEncoder) []const u8 { + var size: usize = 0; + if (BrotliEncoderTakeOutput(state, &size)) |ptr| { + return ptr[0..size]; + } + + return ""; + } + + pub const CompressionResult = struct { + success: bool = false, + has_more: bool = false, + output: []const u8 = "", + }; + + // https://github.com/google/brotli/blob/2ad58d8603294f5ee33d23bb725e0e6a17c1de50/go/cbrotli/writer.go#L23-L40 + pub fn compressStream(state: *BrotliEncoder, op: Operation, data: []const u8) CompressionResult { + var available_in = data.len; + var next_in: ?[*]const u8 = data.ptr; + + var available_out: usize = 0; + + var result = CompressionResult{}; + + result.success = BrotliEncoderCompressStream(state, op, &available_in, &next_in, &available_out, null, null) > 0; + + if (result.success) { + result.output = takeOutput(state); + } + + result.has_more = BrotliEncoderHasMoreOutput(state) > 0; + + return result; + } + + pub fn setParameter(state: *BrotliEncoder, param: BrotliEncoderParameter, value: u32) bool { + return BrotliEncoderSetParameter(state, param, value) > 0; + } +}; + +pub const SHARED_BROTLI_MIN_DICTIONARY_WORD_LENGTH = 4; +pub const SHARED_BROTLI_MAX_DICTIONARY_WORD_LENGTH = 31; +pub const SHARED_BROTLI_NUM_DICTIONARY_CONTEXTS = 64; +pub const SHARED_BROTLI_MAX_COMPOUND_DICTS = 15; +pub const BROTLI_MIN_WINDOW_BITS = 10; +pub const BROTLI_MAX_WINDOW_BITS = 24; +pub const BROTLI_LARGE_MAX_WINDOW_BITS = 30; +pub const BROTLI_MIN_INPUT_BLOCK_BITS = 16; +pub const BROTLI_MAX_INPUT_BLOCK_BITS = 24; +pub const BROTLI_MIN_QUALITY = 0; +pub const BROTLI_MAX_QUALITY = 11; +pub const BROTLI_DEFAULT_QUALITY = 11; +pub const BROTLI_DEFAULT_WINDOW = 22; +pub const BROTLI_DEFAULT_MODE = BROTLI_MODE_GENERIC; diff --git a/src/http.zig b/src/http.zig index bc490d03c535de..0024932d4e6399 100644 --- a/src/http.zig +++ b/src/http.zig @@ -1280,14 +1280,13 @@ const Decompressor = union(enum) { }, .brotli => { this.* = .{ - .brotli = try Brotli.BrotliReaderArrayList.initWithOptions( + .brotli = try Brotli.BrotliReaderArrayList.newWithOptions( buffer, &body_out_str.list, body_out_str.allocator, .{}, ), }; - return; }, else => @panic("Invalid encoding. This code should not be reachable"), diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 84a938709f5d04..81ff8c6d074a53 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -1,4 +1,25 @@ // Hardcoded module "node:zlib" + +function brotliCompress(buffer, opts, callback) { + if (typeof opts === "function") { + callback = opts; + opts = {}; + } + const createBrotliEncoder = $zig("node_zlib_binding.zig", "createBrotliEncoder"); + const encoder = createBrotliEncoder(opts, {}, callback); + encoder.encode(buffer, undefined, true); +} + +function brotliDecompress(buffer, opts, callback) { + if (typeof opts === "function") { + callback = opts; + opts = {}; + } + const createBrotliDecoder = $zig("node_zlib_binding.zig", "createBrotliDecoder"); + const decoder = createBrotliDecoder(opts, {}, callback); + decoder.decode(buffer, undefined, true); +} + // TODO: **use a native binding from Bun for this!!** // This is a very slow module! // It should really be fixed. It will show up in benchmarking. It also loads @@ -4072,9 +4093,7 @@ var require_lib = __commonJS({ for (const method of [ "BrotliCompress", "BrotliDecompress", - "brotliCompress", "brotliCompressSync", - "brotliDecompress", "brotliDecompressSync", "createBrotliCompress", "createBrotliDecompress", @@ -4083,6 +4102,8 @@ var require_lib = __commonJS({ throw new Error(`zlib.${method} is not implemented`); }; } + exports.brotliCompress = brotliCompress; + exports.brotliDecompress = brotliDecompress; function zlibBuffer(engine, buffer, callback) { var buffers = []; diff --git a/src/jsc.zig b/src/jsc.zig index da30bd4960e8e3..57b7fe5db2c3b2 100644 --- a/src/jsc.zig +++ b/src/jsc.zig @@ -49,6 +49,8 @@ pub const API = struct { pub const UDPSocket = @import("./bun.js/api/bun/udp_socket.zig").UDPSocket; pub const Listener = @import("./bun.js/api/bun/socket.zig").Listener; pub const H2FrameParser = @import("./bun.js/api/bun/h2_frame_parser.zig").H2FrameParser; + pub const BrotliEncoder = @import("./bun.js/api/brotli.zig").BrotliEncoder; + pub const BrotliDecoder = @import("./bun.js/api/brotli.zig").BrotliDecoder; }; pub const DNS = @import("./bun.js/api/bun/dns_resolver.zig"); pub const FFI = @import("./bun.js/api/ffi.zig").FFI; diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 384be020e23b34..26d1f04d506e64 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -30,6 +30,7 @@ describe("zlib", () => { import * as zlib from "node:zlib"; import * as fs from "node:fs"; import * as buffer from "node:buffer"; +import * as util from "node:util"; describe("zlib.gunzip", () => { it("should be able to unzip a Buffer and return an unzipped Buffer", async () => { @@ -48,14 +49,12 @@ describe("zlib.gunzip", () => { }); }); -describe("zlib.brotli*", () => { +describe("zlib.brotli", () => { it("returns stub", () => { for (const method of [ "BrotliCompress", "BrotliDecompress", - "brotliCompress", "brotliCompressSync", - "brotliDecompress", "brotliDecompressSync", "createBrotliCompress", "createBrotliDecompress", @@ -63,4 +62,36 @@ describe("zlib.brotli*", () => { expect(() => zlib[method]()).toThrow(new Error(`zlib.${method} is not implemented`)); } }); + + const inputString = + "ΩΩLorem ipsum dolor sit amet, consectetur adipiscing eli" + + "t. Morbi faucibus, purus at gravida dictum, libero arcu " + + "convallis lacus, in commodo libero metus eu nisi. Nullam" + + " commodo, neque nec porta placerat, nisi est fermentum a" + + "ugue, vitae gravida tellus sapien sit amet tellus. Aenea" + + "n non diam orci. Proin quis elit turpis. Suspendisse non" + + " diam ipsum. Suspendisse nec ullamcorper odio. Vestibulu" + + "m arcu mi, sodales non suscipit id, ultrices ut massa. S" + + "ed ac sem sit amet arcu malesuada fermentum. Nunc sed. "; + const compressedString = + "G/gBQBwHdky2aHV5KK9Snf05//1pPdmNw/7232fnIm1IB" + + "K1AA8RsN8OB8Nb7Lpgk3UWWUlzQXZyHQeBBbXMTQXC1j7" + + "wg3LJs9LqOGHRH2bj/a2iCTLLx8hBOyTqgoVuD1e+Qqdn" + + "f1rkUNyrWq6LtOhWgxP3QUwdhKGdZm3rJWaDDBV7+pDk1" + + "MIkrmjp4ma2xVi5MsgJScA3tP1I7mXeby6MELozrwoBQD" + + "mVTnEAicZNj4lkGqntJe2qSnGyeMmcFgraK94vCg/4iLu" + + "Tw5RhKhnVY++dZ6niUBmRqIutsjf5TzwF5iAg8a9UkjF5" + + "2eZ0tB2vo6v8SqVfNMkBmmhxr0NT9LkYF69aEjlYzj7IE" + + "KmEUQf1HBogRYhFIt4ymRNEgHAIzOyNEsQM="; + const compressedBuffer = Buffer.from(compressedString, "base64"); + + it("brotliCompress", async () => { + const compressed = await util.promisify(zlib.brotliCompress)(inputString); + expect(compressed.toString()).toEqual(compressedBuffer.toString()); + }); + + it("brotliDecompress", async () => { + const roundtrip = await util.promisify(zlib.brotliDecompress)(compressedBuffer); + expect(roundtrip.toString()).toEqual(inputString); + }); }); From b52beb01db0e2f086c624d9824aed82079d700d2 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Tue, 30 Apr 2024 22:37:24 -0700 Subject: [PATCH 02/33] update docs --- docs/runtime/nodejs-apis.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/runtime/nodejs-apis.md b/docs/runtime/nodejs-apis.md index f09a9797103a06..88ad0ad31034ff 100644 --- a/docs/runtime/nodejs-apis.md +++ b/docs/runtime/nodejs-apis.md @@ -173,7 +173,7 @@ Some methods are not optimized yet. ### [`node:zlib`](https://nodejs.org/api/zlib.html) -🟡 Missing `BrotliCompress` `BrotliDecompress` `brotliCompressSync` `brotliDecompress` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. +Missing `BrotliCompress` `BrotliDecompress` `brotliCompress` `brotliCompressSync` `brotliDecompress` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. ## Globals From 8ea8bccb5c13ed45f8b7ff51457a3994f56ffd48 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Tue, 30 Apr 2024 22:37:42 -0700 Subject: [PATCH 03/33] update docs --- docs/runtime/nodejs-apis.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/runtime/nodejs-apis.md b/docs/runtime/nodejs-apis.md index 88ad0ad31034ff..f34903dce7388c 100644 --- a/docs/runtime/nodejs-apis.md +++ b/docs/runtime/nodejs-apis.md @@ -173,7 +173,7 @@ Some methods are not optimized yet. ### [`node:zlib`](https://nodejs.org/api/zlib.html) -Missing `BrotliCompress` `BrotliDecompress` `brotliCompress` `brotliCompressSync` `brotliDecompress` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. +Missing `BrotliCompress` `BrotliDecompress` `brotliCompressSync` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. ## Globals From b7ef27befd08b62c73ba57c8895def2435c748a2 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Wed, 1 May 2024 14:11:40 -0700 Subject: [PATCH 04/33] only load these once --- src/js/node/zlib.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 81ff8c6d074a53..f26ec6d0539f87 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -1,11 +1,13 @@ // Hardcoded module "node:zlib" +const createBrotliEncoder = $zig("node_zlib_binding.zig", "createBrotliEncoder"); +const createBrotliDecoder = $zig("node_zlib_binding.zig", "createBrotliDecoder"); + function brotliCompress(buffer, opts, callback) { if (typeof opts === "function") { callback = opts; opts = {}; } - const createBrotliEncoder = $zig("node_zlib_binding.zig", "createBrotliEncoder"); const encoder = createBrotliEncoder(opts, {}, callback); encoder.encode(buffer, undefined, true); } @@ -15,7 +17,6 @@ function brotliDecompress(buffer, opts, callback) { callback = opts; opts = {}; } - const createBrotliDecoder = $zig("node_zlib_binding.zig", "createBrotliDecoder"); const decoder = createBrotliDecoder(opts, {}, callback); decoder.decode(buffer, undefined, true); } From 2dc1959ca895b171bf3601164a053746b085b77e Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Wed, 1 May 2024 17:00:38 -0700 Subject: [PATCH 05/33] reduce memory usage --- src/bun.js/api/brotli.zig | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index a14e3a21cefd5c..a8345b532a2eb0 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -356,7 +356,7 @@ pub const BrotliDecoder = struct { .globalThis = globalThis, .stream = undefined, // &this.output needs to be a stable pointer }); - this.stream = brotli.BrotliReaderArrayList.initWithOptions("", bun.new(std.ArrayListUnmanaged(u8), .{}), bun.default_allocator, .{}) catch { + this.stream = brotli.BrotliReaderArrayList.initWithOptions("", &this.output, bun.default_allocator, .{}) catch { globalThis.throw("Failed to create BrotliDecoder", .{}); return .zero; }; @@ -527,19 +527,6 @@ pub const BrotliDecoder = struct { if (this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic) == 0) break; } - - if (is_last and any) { - var output = &this.decoder.output; - this.decoder.output_lock.lock(); - defer this.decoder.output_lock.unlock(); - - output.appendSlice(bun.default_allocator, this.decoder.stream.list.items) catch { - _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); - this.decoder.write_failed = true; - return; - }; - // this.decoder.stream.end(); - } } if (any) { From 459edfe6e5b2c51fa02d105faf7dd1c13dff83c9 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Wed, 1 May 2024 17:19:01 -0700 Subject: [PATCH 06/33] move collecting the output buffer into a JSValue into a helper --- src/bun.js/api/brotli.zig | 68 +++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index a8345b532a2eb0..63b2b0608bc6e0 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -97,34 +97,34 @@ pub const BrotliEncoder = struct { this.freelist.discard(to_free.len); } + fn collectOutputValue(this: *BrotliEncoder) ?JSC.JSValue { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len == 0) + return null; + + if (this.output.items.len > 16 * 1024) { + defer this.output.clearRetainingCapacity(); + return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.clearRetainingCapacity(); + return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + } + pub fn runFromJSThread(this: *BrotliEncoder) void { this.poll_ref.unref(this.globalThis.bunVM()); defer this.deref(); this.drainFreelist(); - const value = brk: { - this.output_lock.lock(); - defer this.output_lock.unlock(); - - if (this.output.items.len == 0) - return; - - if (this.output.items.len > 16 * 1024) { - defer this.output.items = &.{}; - break :brk JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); - } else { - defer this.output.clearRetainingCapacity(); - break :brk JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); - } - }; - const result = this.callback_value.get().?.call(this.globalThis, &.{ if (this.write_failed) this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, - value, + this.collectOutputValue() orelse return, }); if (result.toError()) |err| { @@ -372,34 +372,34 @@ pub const BrotliDecoder = struct { this.deref(); } + fn collectOutputValue(this: *BrotliDecoder) ?JSC.JSValue { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len == 0) + return null; + + if (this.output.items.len > 16 * 1024) { + defer this.output.clearRetainingCapacity(); + return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.clearRetainingCapacity(); + return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + } + pub fn runFromJSThread(this: *BrotliDecoder) void { this.poll_ref.unref(this.globalThis.bunVM()); defer this.deref(); this.drainFreelist(); - const value = brk: { - this.output_lock.lock(); - defer this.output_lock.unlock(); - - if (this.output.items.len == 0) - return; - - if (this.output.items.len > 16 * 1024) { - defer this.output.items = &.{}; - break :brk JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); - } else { - defer this.output.clearRetainingCapacity(); - break :brk JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); - } - }; - const result = this.callback_value.get().?.call(this.globalThis, &.{ if (this.write_failed) this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, - value, + this.collectOutputValue() orelse return, }); if (result.toError()) |err| { From bfe9c1bf9332076711ffa9b75cfad64b3df01e2a Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Wed, 1 May 2024 18:45:26 -0700 Subject: [PATCH 07/33] tidy --- src/bun.js/api/brotli.zig | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index 63b2b0608bc6e0..c7f4e8712a9ae9 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -199,9 +199,7 @@ pub const BrotliEncoder = struct { if (is_last and any) { var output = &this.encoder.output; this.encoder.output_lock.lock(); - defer { - this.encoder.output_lock.unlock(); - } + defer this.encoder.output_lock.unlock(); output.appendSlice(bun.default_allocator, this.encoder.stream.end() catch { _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); From 56083a91d390cc150d60c203551ef22ac8e8636e Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Wed, 1 May 2024 18:57:51 -0700 Subject: [PATCH 08/33] add brotliCompressSync and brotliDecompressSync --- docs/runtime/nodejs-apis.md | 2 +- src/bun.js/api/brotli.zig | 99 +++++++++++++++++++++++++++------- src/js/node/zlib.ts | 17 +++++- test/js/node/zlib/zlib.test.js | 13 ++++- 4 files changed, 106 insertions(+), 25 deletions(-) diff --git a/docs/runtime/nodejs-apis.md b/docs/runtime/nodejs-apis.md index f34903dce7388c..7ab04faa7dfcae 100644 --- a/docs/runtime/nodejs-apis.md +++ b/docs/runtime/nodejs-apis.md @@ -173,7 +173,7 @@ Some methods are not optimized yet. ### [`node:zlib`](https://nodejs.org/api/zlib.html) -Missing `BrotliCompress` `BrotliDecompress` `brotliCompressSync` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. +Missing `BrotliCompress` `BrotliDecompress` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. ## Globals diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index c7f4e8712a9ae9..f208ad01cd9e8f 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -55,11 +55,6 @@ pub const BrotliEncoder = struct { const callback = arguments[2]; - if (!callback.isCallable(globalThis.vm())) { - globalThis.throwInvalidArguments("BrotliEncoder callback is not callable", .{}); - return .zero; - } - var this: *BrotliEncoder = BrotliEncoder.new(.{ .globalThis = globalThis, .stream = brotli.BrotliCompressionStream.init() catch { @@ -139,6 +134,7 @@ pub const BrotliEncoder = struct { const EncodeJob = struct { task: JSC.WorkPoolTask = .{ .callback = &runTask }, encoder: *BrotliEncoder, + is_async: bool, pub usingnamespace bun.New(@This()); @@ -213,7 +209,7 @@ pub const BrotliEncoder = struct { } } - if (any) { + if (this.is_async and any) { var vm = this.encoder.globalThis.bunVMConcurrently(); this.encoder.ref(); this.encoder.poll_ref.refConcurrently(vm); @@ -255,6 +251,7 @@ pub const BrotliEncoder = struct { var task = EncodeJob.new(.{ .encoder = this, + .is_async = true, }); { @@ -270,11 +267,42 @@ pub const BrotliEncoder = struct { } pub fn encodeSync(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { - _ = this; - _ = globalThis; - _ = callframe; + const arguments = callframe.arguments(3); - return .zero; + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.encode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.encode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.encode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + bun.assert(is_last); + this.has_called_end = true; + + var task = EncodeJob.new(.{ .encoder = this, .is_async = false }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + this.ref(); + task.run(); + return this.collectOutputValue() orelse .undefined; } pub fn end(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { @@ -345,11 +373,6 @@ pub const BrotliDecoder = struct { const callback = arguments[2]; - if (!callback.isCallable(globalThis.vm())) { - globalThis.throwInvalidArguments("BrotliDecoder callback is not callable", .{}); - return .zero; - } - var this: *BrotliDecoder = BrotliDecoder.new(.{ .globalThis = globalThis, .stream = undefined, // &this.output needs to be a stable pointer @@ -443,6 +466,7 @@ pub const BrotliDecoder = struct { var task = DecodeJob.new(.{ .decoder = this, + .is_async = true, }); { @@ -458,11 +482,45 @@ pub const BrotliDecoder = struct { } pub fn decodeSync(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { - _ = this; - _ = globalThis; - _ = callframe; + const arguments = callframe.arguments(3); - return .zero; + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.decode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.decode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.decode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task = DecodeJob.new(.{ + .decoder = this, + .is_async = false, + }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + this.ref(); + task.run(); + return this.collectOutputValue() orelse .undefined; } // We can only run one decode job at a time @@ -472,6 +530,7 @@ pub const BrotliDecoder = struct { const DecodeJob = struct { task: JSC.WorkPoolTask = .{ .callback = &runTask }, decoder: *BrotliDecoder, + is_async: bool, pub usingnamespace bun.New(@This()); @@ -527,7 +586,7 @@ pub const BrotliDecoder = struct { } } - if (any) { + if (this.is_async and any) { var vm = this.decoder.globalThis.bunVMConcurrently(); this.decoder.ref(); this.decoder.poll_ref.refConcurrently(vm); diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index f26ec6d0539f87..271e4d72a8725d 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -8,6 +8,7 @@ function brotliCompress(buffer, opts, callback) { callback = opts; opts = {}; } + if (typeof callback !== "function") throw new TypeError("BrotliEncoder callback is not callable"); const encoder = createBrotliEncoder(opts, {}, callback); encoder.encode(buffer, undefined, true); } @@ -17,10 +18,21 @@ function brotliDecompress(buffer, opts, callback) { callback = opts; opts = {}; } + if (typeof callback !== "function") throw new TypeError("BrotliDecoder callback is not callable"); const decoder = createBrotliDecoder(opts, {}, callback); decoder.decode(buffer, undefined, true); } +function brotliCompressSync(buffer, opts) { + const encoder = createBrotliEncoder(opts, {}, null); + return encoder.encodeSync(buffer, undefined, true); +} + +function brotliDecompressSync(buffer, opts) { + const decoder = createBrotliDecoder(opts, {}, null); + return decoder.decodeSync(buffer, undefined, true); +} + // TODO: **use a native binding from Bun for this!!** // This is a very slow module! // It should really be fixed. It will show up in benchmarking. It also loads @@ -4092,10 +4104,9 @@ var require_lib = __commonJS({ // not implemented, stubs for (const method of [ + // prettier-ignore "BrotliCompress", "BrotliDecompress", - "brotliCompressSync", - "brotliDecompressSync", "createBrotliCompress", "createBrotliDecompress", ]) { @@ -4105,6 +4116,8 @@ var require_lib = __commonJS({ } exports.brotliCompress = brotliCompress; exports.brotliDecompress = brotliDecompress; + exports.brotliCompressSync = brotliCompressSync; + exports.brotliDecompressSync = brotliDecompressSync; function zlibBuffer(engine, buffer, callback) { var buffers = []; diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 26d1f04d506e64..1b89965b92d01c 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -52,10 +52,9 @@ describe("zlib.gunzip", () => { describe("zlib.brotli", () => { it("returns stub", () => { for (const method of [ + // prettier-ignore "BrotliCompress", "BrotliDecompress", - "brotliCompressSync", - "brotliDecompressSync", "createBrotliCompress", "createBrotliDecompress", ]) { @@ -94,4 +93,14 @@ describe("zlib.brotli", () => { const roundtrip = await util.promisify(zlib.brotliDecompress)(compressedBuffer); expect(roundtrip.toString()).toEqual(inputString); }); + + it("brotliCompressSync", () => { + const compressed = zlib.brotliCompressSync(inputString); + expect(compressed.toString()).toEqual(compressedBuffer.toString()); + }); + + it("brotliDecompressSync", () => { + const roundtrip = zlib.brotliDecompressSync(compressedBuffer); + expect(roundtrip.toString()).toEqual(inputString); + }); }); From 37ee73722e5011dad01f07c6f2b3a1d3a9bce34f Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Thu, 2 May 2024 16:27:11 -0700 Subject: [PATCH 09/33] fix encoder output management --- src/bun.js/api/brotli.zig | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index f208ad01cd9e8f..673a8c907726aa 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -77,7 +77,6 @@ pub const BrotliEncoder = struct { pub fn deinit(this: *BrotliEncoder) void { this.callback_value.deinit(); this.drainFreelist(); - this.output.deinit(bun.default_allocator); this.stream.deinit(); this.input.deinit(); } @@ -100,10 +99,12 @@ pub const BrotliEncoder = struct { return null; if (this.output.items.len > 16 * 1024) { - defer this.output.clearRetainingCapacity(); + defer this.output.items = ""; + defer this.output.deinit(bun.default_allocator); return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); } else { - defer this.output.clearRetainingCapacity(); + defer this.output.items = ""; + defer this.output.deinit(bun.default_allocator); return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); } } From 7537c2acf1dbc480a195ef2cb11938bdef209618 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Thu, 2 May 2024 16:30:02 -0700 Subject: [PATCH 10/33] this assert was invalid --- src/bun.js/api/brotli.zig | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index 673a8c907726aa..ee23e0de64f087 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -290,8 +290,8 @@ pub const BrotliEncoder = struct { }; _ = this.has_pending_activity.fetchAdd(1, .Monotonic); - bun.assert(is_last); - this.has_called_end = true; + if (is_last) + this.has_called_end = true; var task = EncodeJob.new(.{ .encoder = this, .is_async = false }); From 8e5593401a5567008ba15bbc2e3b3dc7840528f3 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Thu, 2 May 2024 16:30:22 -0700 Subject: [PATCH 11/33] destroy in deinit --- src/bun.js/api/brotli.zig | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index ee23e0de64f087..c81ef4e341ca1a 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -79,6 +79,7 @@ pub const BrotliEncoder = struct { this.drainFreelist(); this.stream.deinit(); this.input.deinit(); + this.destroy(); } fn drainFreelist(this: *BrotliEncoder) void { @@ -357,6 +358,7 @@ pub const BrotliDecoder = struct { this.output.deinit(bun.default_allocator); this.stream.brotli.destroyInstance(); this.input.deinit(); + this.destroy(); } pub fn constructor(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) ?*BrotliDecoder { From 2af09ad1a3854a95500cd6fd31bc0caea999cff4 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Thu, 2 May 2024 16:30:42 -0700 Subject: [PATCH 12/33] add a todo --- src/bun.js/api/brotli.zig | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index c81ef4e341ca1a..d9863c717ee9e3 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -118,6 +118,7 @@ pub const BrotliEncoder = struct { const result = this.callback_value.get().?.call(this.globalThis, &.{ if (this.write_failed) + // TODO: propagate error from brotli this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, @@ -420,6 +421,7 @@ pub const BrotliDecoder = struct { const result = this.callback_value.get().?.call(this.globalThis, &.{ if (this.write_failed) + // TODO: propagate error from brotli this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, From 33db4e651f5dc2dabd084747aba2a4e891c7e1dd Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Thu, 2 May 2024 16:31:06 -0700 Subject: [PATCH 13/33] use has_pending_activity instead of ref --- src/bun.js/api/brotli.zig | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index d9863c717ee9e3..c70201c6565ac9 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -11,7 +11,7 @@ fn ConcurrentByteProcessor(comptime Processor: type) type { } pub const BrotliEncoder = struct { - pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub usingnamespace bun.New(@This()); pub usingnamespace JSC.Codegen.JSBrotliEncoder; stream: brotli.BrotliCompressionStream, @@ -71,7 +71,7 @@ pub const BrotliEncoder = struct { } pub fn finalize(this: *BrotliEncoder) callconv(.C) void { - this.deref(); + this.deinit(); } pub fn deinit(this: *BrotliEncoder) void { @@ -113,7 +113,7 @@ pub const BrotliEncoder = struct { pub fn runFromJSThread(this: *BrotliEncoder) void { this.poll_ref.unref(this.globalThis.bunVM()); - defer this.deref(); + defer _ = this.has_pending_activity.fetchSub(1, .Monotonic); this.drainFreelist(); const result = this.callback_value.get().?.call(this.globalThis, &.{ @@ -144,7 +144,7 @@ pub const BrotliEncoder = struct { pub fn run(this: *EncodeJob) void { defer { _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); - this.encoder.deref(); + // _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); this.destroy(); } @@ -214,7 +214,7 @@ pub const BrotliEncoder = struct { if (this.is_async and any) { var vm = this.encoder.globalThis.bunVMConcurrently(); - this.encoder.ref(); + _ = this.encoder.has_pending_activity.fetchAdd(1, .Monotonic); this.encoder.poll_ref.refConcurrently(vm); vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.encoder))); } @@ -263,7 +263,7 @@ pub const BrotliEncoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - this.ref(); + // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); JSC.WorkPool.schedule(&task.task); return .undefined; @@ -303,7 +303,7 @@ pub const BrotliEncoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - this.ref(); + // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); task.run(); return this.collectOutputValue() orelse .undefined; } @@ -326,7 +326,7 @@ pub const BrotliEncoder = struct { }; pub const BrotliDecoder = struct { - pub usingnamespace bun.NewRefCounted(@This(), deinit); + pub usingnamespace bun.New(@This()); pub usingnamespace JSC.Codegen.JSBrotliDecoder; globalThis: *JSC.JSGlobalObject, @@ -394,7 +394,7 @@ pub const BrotliDecoder = struct { } pub fn finalize(this: *BrotliDecoder) callconv(.C) void { - this.deref(); + this.deinit(); } fn collectOutputValue(this: *BrotliDecoder) ?JSC.JSValue { @@ -416,7 +416,7 @@ pub const BrotliDecoder = struct { pub fn runFromJSThread(this: *BrotliDecoder) void { this.poll_ref.unref(this.globalThis.bunVM()); - defer this.deref(); + defer _ = this.has_pending_activity.fetchSub(1, .Monotonic); this.drainFreelist(); const result = this.callback_value.get().?.call(this.globalThis, &.{ @@ -480,7 +480,7 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - this.ref(); + // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); JSC.WorkPool.schedule(&task.task); return .undefined; @@ -523,7 +523,7 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - this.ref(); + // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); task.run(); return this.collectOutputValue() orelse .undefined; } @@ -547,7 +547,7 @@ pub const BrotliDecoder = struct { pub fn run(this: *DecodeJob) void { defer { _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); - this.decoder.deref(); + // _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); this.destroy(); } @@ -593,7 +593,7 @@ pub const BrotliDecoder = struct { if (this.is_async and any) { var vm = this.decoder.globalThis.bunVMConcurrently(); - this.decoder.ref(); + _ = this.decoder.has_pending_activity.fetchAdd(1, .Monotonic); this.decoder.poll_ref.refConcurrently(vm); vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.decoder))); } From 3035031a50afc54c23b6e20a861498c3cebcd5fa Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 19:23:57 -0700 Subject: [PATCH 14/33] node:stream: add _events props and test --- src/js/node/stream.ts | 39 ++++++++++++++-- test/js/node/stream/node-stream.test.js | 61 ++++++++++++++++++++++++- 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/src/js/node/stream.ts b/src/js/node/stream.ts index 1337871d21435e..6444207d053942 100644 --- a/src/js/node/stream.ts +++ b/src/js/node/stream.ts @@ -2270,6 +2270,15 @@ var require_readable = __commonJS({ function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); const isDuplex = this instanceof require_duplex(); + + this._events ??= { + close: undefined, + error: undefined, + prefinish: undefined, + finish: undefined, + drain: undefined, + }; + this._readableState = new ReadableState(options, this, isDuplex); if (options) { const { read, destroy, construct, signal } = options; @@ -3430,6 +3439,7 @@ var require_readable = __commonJS({ }; }, }); +const Readable = require_readable(); // node_modules/readable-stream/lib/internal/streams/writable.js var errorOrDestroy; @@ -3467,6 +3477,15 @@ var require_writable = __commonJS({ function Writable(options = {}) { const isDuplex = this instanceof require_duplex(); if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options); + + this._events ??= { + close: undefined, + error: undefined, + prefinish: undefined, + finish: undefined, + drain: undefined, + }; + this._writableState = new WritableState(options, this, isDuplex); if (options) { if (typeof options.write === "function") this._write = options.write; @@ -4071,6 +4090,7 @@ var require_writable = __commonJS({ }; }, }); +const Writable = require_writable(); // node_modules/readable-stream/lib/internal/streams/duplexify.js var require_duplexify = __commonJS({ @@ -4429,6 +4449,18 @@ var require_duplex = __commonJS({ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); + + this._events ??= { + close: undefined, + error: undefined, + prefinish: undefined, + finish: undefined, + drain: undefined, + data: undefined, + end: undefined, + readable: undefined, + }; + Readable.$call(this, options); Writable.$call(this, options); @@ -4506,6 +4538,7 @@ var require_duplex = __commonJS({ }; }, }); +const Duplex = require_duplex(); // node_modules/readable-stream/lib/internal/streams/transform.js var require_transform = __commonJS({ @@ -4515,6 +4548,7 @@ var require_transform = __commonJS({ var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes; function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); + Duplex.$call(this, options); this._readableState.sync = false; @@ -4523,6 +4557,8 @@ var require_transform = __commonJS({ if (options) { if (typeof options.transform === "function") this._transform = options.transform; if (typeof options.flush === "function") this._flush = options.flush; + } else { + this.allowHalfOpen = true; } this.on("prefinish", prefinish.bind(this)); @@ -5505,9 +5541,6 @@ function getNativeReadableStream(Readable, stream, options) { } /** --- Bun native stream wrapper --- */ -var Readable = require_readable(); -var Writable = require_writable(); -var Duplex = require_duplex(); const _pathOrFdOrSink = Symbol("pathOrFdOrSink"); const { fileSinkSymbol: _fileSink } = require("internal/shared"); diff --git a/test/js/node/stream/node-stream.test.js b/test/js/node/stream/node-stream.test.js index 408e2285150bc0..94422fac2cbb46 100644 --- a/test/js/node/stream/node-stream.test.js +++ b/test/js/node/stream/node-stream.test.js @@ -5,7 +5,6 @@ import { join } from "path"; import { bunExe, bunEnv, tmpdirSync, isWindows } from "harness"; import { tmpdir } from "node:os"; import { writeFileSync, mkdirSync } from "node:fs"; -import { spawn } from "node:child_process"; describe("Readable", () => { it("should be able to be created without _construct method defined", done => { @@ -44,6 +43,7 @@ describe("Readable", () => { readable.pipe(writable); }); + it("should be able to be piped via .pipe, issue #3607", done => { const path = `${tmpdir()}/${Date.now()}.testReadStreamEmptyFile.txt`; writeFileSync(path, ""); @@ -70,6 +70,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe, issue #3668", done => { const path = `${tmpdir()}/${Date.now()}.testReadStream.txt`; writeFileSync(path, "12345"); @@ -94,6 +95,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe, both start and end are 0", done => { const path = `${tmpdir()}/${Date.now()}.testReadStream2.txt`; writeFileSync(path, "12345"); @@ -119,6 +121,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe with a large file", done => { const data = Buffer.allocUnsafe(768 * 1024) .fill("B") @@ -151,6 +154,15 @@ describe("Readable", () => { }); stream.pipe(writable); }); + + it("should have the correct fields in _events", () => { + const s = Readable({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + }); }); describe("createReadStream", () => { @@ -191,6 +203,17 @@ describe("createReadStream", () => { }); }); +describe("Writable", () => { + it("should have the correct fields in _events", () => { + const s = Writable({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + }); +}); + describe("Duplex", () => { it("should allow subclasses to be derived via .call() on class", () => { function Subclass(opts) { @@ -204,6 +227,18 @@ describe("Duplex", () => { const subclass = new Subclass(); expect(subclass instanceof Duplex).toBe(true); }); + + it("should have the correct fields in _events", () => { + const s = Duplex({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); describe("Transform", () => { @@ -219,6 +254,18 @@ describe("Transform", () => { const subclass = new Subclass(); expect(subclass instanceof Transform).toBe(true); }); + + it.todo("should have the correct fields in _events", () => { + const s = Transform({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); describe("PassThrough", () => { @@ -234,6 +281,18 @@ describe("PassThrough", () => { const subclass = new Subclass(); expect(subclass instanceof PassThrough).toBe(true); }); + + it.todo("should have the correct fields in _events", () => { + const s = PassThrough({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); const processStdInTest = ` From 47947362ab70494210d7cfe82dfc3e714f813545 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 19:25:35 -0700 Subject: [PATCH 15/33] dead code remove --- src/bun.js/api/brotli.zig | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index c70201c6565ac9..edfb18f4931221 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -5,11 +5,6 @@ const brotli = bun.brotli; const Queue = std.fifo.LinearFifo(JSC.Node.BlobOrStringOrBuffer, .Dynamic); -fn ConcurrentByteProcessor(comptime Processor: type) type { - _ = Processor; // autofix - return struct {}; -} - pub const BrotliEncoder = struct { pub usingnamespace bun.New(@This()); pub usingnamespace JSC.Codegen.JSBrotliEncoder; @@ -144,7 +139,6 @@ pub const BrotliEncoder = struct { pub fn run(this: *EncodeJob) void { defer { _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); - // _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); this.destroy(); } @@ -263,7 +257,6 @@ pub const BrotliEncoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); JSC.WorkPool.schedule(&task.task); return .undefined; @@ -303,7 +296,6 @@ pub const BrotliEncoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); task.run(); return this.collectOutputValue() orelse .undefined; } @@ -480,7 +472,6 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); JSC.WorkPool.schedule(&task.task); return .undefined; @@ -523,7 +514,6 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } - // _ = this.has_pending_activity.fetchAdd(1, .Monotonic); task.run(); return this.collectOutputValue() orelse .undefined; } @@ -547,7 +537,6 @@ pub const BrotliDecoder = struct { pub fn run(this: *DecodeJob) void { defer { _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); - // _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); this.destroy(); } From 978f1b113cf4e6eb2e901b6c963059a8f19813b0 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 19:26:20 -0700 Subject: [PATCH 16/33] add missing constants --- src/js/node/zlib.ts | 85 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 271e4d72a8725d..da7d8caebf9bfe 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -3580,7 +3580,9 @@ var require_constants = __commonJS({ Z_ERRNO: -1, Z_STREAM_ERROR: -2, Z_DATA_ERROR: -3, + Z_MEM_ERROR: -4, Z_BUF_ERROR: -5, + Z_VERSION_ERROR: -6, Z_NO_COMPRESSION: 0, Z_BEST_SPEED: 1, Z_BEST_COMPRESSION: 9, @@ -3592,8 +3594,91 @@ var require_constants = __commonJS({ Z_DEFAULT_STRATEGY: 0, Z_BINARY: 0, Z_TEXT: 1, + Z_ASCII: 1, Z_UNKNOWN: 2, Z_DEFLATED: 8, + DEFLATE: 1, + INFLATE: 2, + GZIP: 3, + GUNZIP: 4, + DEFLATERAW: 5, + INFLATERAW: 6, + UNZIP: 7, + BROTLI_DECODE: 8, + BROTLI_ENCODE: 9, + Z_MIN_WINDOWBITS: 8, + Z_MAX_WINDOWBITS: 15, + Z_DEFAULT_WINDOWBITS: 15, + Z_MIN_CHUNK: 64, + Z_MAX_CHUNK: Infinity, + Z_DEFAULT_CHUNK: 16384, + Z_MIN_MEMLEVEL: 1, + Z_MAX_MEMLEVEL: 9, + Z_DEFAULT_MEMLEVEL: 8, + Z_MIN_LEVEL: -1, + Z_MAX_LEVEL: 9, + Z_DEFAULT_LEVEL: -1, + BROTLI_OPERATION_PROCESS: 0, + BROTLI_OPERATION_FLUSH: 1, + BROTLI_OPERATION_FINISH: 2, + BROTLI_OPERATION_EMIT_METADATA: 3, + BROTLI_PARAM_MODE: 0, + BROTLI_MODE_GENERIC: 0, + BROTLI_MODE_TEXT: 1, + BROTLI_MODE_FONT: 2, + BROTLI_DEFAULT_MODE: 0, + BROTLI_PARAM_QUALITY: 1, + BROTLI_MIN_QUALITY: 0, + BROTLI_MAX_QUALITY: 11, + BROTLI_DEFAULT_QUALITY: 11, + BROTLI_PARAM_LGWIN: 2, + BROTLI_MIN_WINDOW_BITS: 10, + BROTLI_MAX_WINDOW_BITS: 24, + BROTLI_LARGE_MAX_WINDOW_BITS: 30, + BROTLI_DEFAULT_WINDOW: 22, + BROTLI_PARAM_LGBLOCK: 3, + BROTLI_MIN_INPUT_BLOCK_BITS: 16, + BROTLI_MAX_INPUT_BLOCK_BITS: 24, + BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING: 4, + BROTLI_PARAM_SIZE_HINT: 5, + BROTLI_PARAM_LARGE_WINDOW: 6, + BROTLI_PARAM_NPOSTFIX: 7, + BROTLI_PARAM_NDIRECT: 8, + BROTLI_DECODER_RESULT_ERROR: 0, + BROTLI_DECODER_RESULT_SUCCESS: 1, + BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: 2, + BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT: 3, + BROTLI_DECODER_PARAM_DISABLE_RING_BUFFER_REALLOCATION: 0, + BROTLI_DECODER_PARAM_LARGE_WINDOW: 1, + BROTLI_DECODER_NO_ERROR: 0, + BROTLI_DECODER_SUCCESS: 1, + BROTLI_DECODER_NEEDS_MORE_INPUT: 2, + BROTLI_DECODER_NEEDS_MORE_OUTPUT: 3, + BROTLI_DECODER_ERROR_FORMAT_EXUBERANT_NIBBLE: -1, + BROTLI_DECODER_ERROR_FORMAT_RESERVED: -2, + BROTLI_DECODER_ERROR_FORMAT_EXUBERANT_META_NIBBLE: -3, + BROTLI_DECODER_ERROR_FORMAT_SIMPLE_HUFFMAN_ALPHABET: -4, + BROTLI_DECODER_ERROR_FORMAT_SIMPLE_HUFFMAN_SAME: -5, + BROTLI_DECODER_ERROR_FORMAT_CL_SPACE: -6, + BROTLI_DECODER_ERROR_FORMAT_HUFFMAN_SPACE: -7, + BROTLI_DECODER_ERROR_FORMAT_CONTEXT_MAP_REPEAT: -8, + BROTLI_DECODER_ERROR_FORMAT_BLOCK_LENGTH_1: -9, + BROTLI_DECODER_ERROR_FORMAT_BLOCK_LENGTH_2: -10, + BROTLI_DECODER_ERROR_FORMAT_TRANSFORM: -11, + BROTLI_DECODER_ERROR_FORMAT_DICTIONARY: -12, + BROTLI_DECODER_ERROR_FORMAT_WINDOW_BITS: -13, + BROTLI_DECODER_ERROR_FORMAT_PADDING_1: -14, + BROTLI_DECODER_ERROR_FORMAT_PADDING_2: -15, + BROTLI_DECODER_ERROR_FORMAT_DISTANCE: -16, + BROTLI_DECODER_ERROR_DICTIONARY_NOT_SET: -19, + BROTLI_DECODER_ERROR_INVALID_ARGUMENTS: -20, + BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MODES: -21, + BROTLI_DECODER_ERROR_ALLOC_TREE_GROUPS: -22, + BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MAP: -25, + BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_1: -26, + BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: -27, + BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: -30, + BROTLI_DECODER_ERROR_UNREACHABLE: -31, }; }, }); From ed830a57164165e4600e8c7da6366e4cd3a0165f Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 19:32:22 -0700 Subject: [PATCH 17/33] implement createBrotliCompress and createBrotliDecompress --- docs/runtime/nodejs-apis.md | 2 +- src/brotli.zig | 6 ++-- src/bun.js/api/brotli.zig | 34 ++++++++++++-------- src/js/node/zlib.ts | 58 +++++++++++++++++++++++++++------- test/js/node/zlib/zlib.test.js | 48 +++++++++++++++++++++------- 5 files changed, 106 insertions(+), 42 deletions(-) diff --git a/docs/runtime/nodejs-apis.md b/docs/runtime/nodejs-apis.md index 7ab04faa7dfcae..57f3f15780ac1f 100644 --- a/docs/runtime/nodejs-apis.md +++ b/docs/runtime/nodejs-apis.md @@ -173,7 +173,7 @@ Some methods are not optimized yet. ### [`node:zlib`](https://nodejs.org/api/zlib.html) -Missing `BrotliCompress` `BrotliDecompress` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. +🟡 Unoptimized. ## Globals diff --git a/src/brotli.zig b/src/brotli.zig index 36d89c73810600..0742f482a29a0a 100644 --- a/src/brotli.zig +++ b/src/brotli.zig @@ -122,17 +122,17 @@ pub const BrotliReaderArrayList = struct { ); const bytes_written = unused_capacity.len -| out_remaining; - const bytes_read = next_in.len -| in_remaining; + // const bytes_read = next_in.len -| in_remaining; this.list.items.len += bytes_written; - this.total_in += bytes_read; + // TODO: calling this in a state where 'this.total_in' is not the full data raises error.ShortRead in many cases; look into this + // this.total_in += bytes_read; switch (result) { .success => { if (comptime bun.Environment.allow_assert) { bun.assert(this.brotli.isFinished()); } - this.end(); return; }, diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index edfb18f4931221..cc40f192340853 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -515,7 +515,7 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } task.run(); - return this.collectOutputValue() orelse .undefined; + return if (!is_last) .undefined else this.collectOutputValue() orelse .undefined; } // We can only run one decode job at a time @@ -542,30 +542,36 @@ pub const BrotliDecoder = struct { var any = false; - if (this.decoder.pending_decode_job_count.fetchAdd(1, .Monotonic) == 0) { + if (true) { var is_last = false; while (true) { - const pending: []bun.JSC.Node.BlobOrStringOrBuffer = brk: { - this.decoder.input_lock.lock(); - defer this.decoder.input_lock.unlock(); - is_last = this.decoder.has_called_end; - const readable = this.decoder.input.readableSlice(0); - const out = bun.default_allocator.dupe(std.meta.Child(@TypeOf(readable)), readable) catch bun.outOfMemory(); - this.decoder.input.discard(readable.len); - break :brk out; - }; - defer bun.default_allocator.free(pending); + this.decoder.input_lock.lock(); + defer this.decoder.input_lock.unlock(); + is_last = this.decoder.has_called_end; + if (!is_last) break; + const readable = this.decoder.input.readableSlice(0); + const pending = readable; defer { this.decoder.freelist_write_lock.lock(); this.decoder.freelist.write(pending) catch unreachable; this.decoder.freelist_write_lock.unlock(); } - for (pending) |input| { + + var input_list = std.ArrayListUnmanaged(u8){}; + defer input_list.deinit(bun.default_allocator); + if (pending.len > 1) { + for (pending) |input| { + input_list.appendSlice(bun.default_allocator, input.slice()) catch bun.outOfMemory(); + } + } + + { this.decoder.output_lock.lock(); defer this.decoder.output_lock.unlock(); - this.decoder.stream.input = input.slice(); + const input = if (pending.len == 1) pending[0].slice() else input_list.items; + this.decoder.stream.input = input; this.decoder.stream.readAll(false) catch { _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); this.decoder.write_failed = true; diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index da7d8caebf9bfe..bf7c931c752a38 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -1,5 +1,9 @@ // Hardcoded module "node:zlib" +const stream = require("node:stream"); + +const ObjectSetPrototypeOf = Object.setPrototypeOf; + const createBrotliEncoder = $zig("node_zlib_binding.zig", "createBrotliEncoder"); const createBrotliDecoder = $zig("node_zlib_binding.zig", "createBrotliDecoder"); @@ -33,6 +37,46 @@ function brotliDecompressSync(buffer, opts) { return decoder.decodeSync(buffer, undefined, true); } +function createBrotliCompress(opts) { + return new BrotliCompress(opts); +} + +const kHandle = Symbol("kHandle"); + +function BrotliCompress(opts) { + if (!new.target) return new BrotliCompress(opts); + this[kHandle] = createBrotliEncoder(opts, {}, null); + stream.Transform.$call(this); +} +BrotliCompress.prototype = {}; +ObjectSetPrototypeOf(BrotliCompress.prototype, stream.Transform.prototype); + +BrotliCompress.prototype._transform = function _transform(chunk, encoding, callback) { + callback(undefined, this[kHandle].encodeSync(chunk, encoding, false)); +}; +BrotliCompress.prototype._flush = function _flush(callback) { + callback(undefined, this[kHandle].encodeSync("", undefined, true)); +}; + +function createBrotliDecompress(opts) { + return new BrotliDecompress(opts); +} + +function BrotliDecompress(opts) { + if (!new.target) return new BrotliDecompress(opts); + this[kHandle] = createBrotliDecoder(opts, {}, null); + stream.Transform.$call(this); +} +BrotliDecompress.prototype = {}; +ObjectSetPrototypeOf(BrotliDecompress.prototype, stream.Transform.prototype); + +BrotliDecompress.prototype._transform = function (chunk, encoding, callback) { + callback(undefined, this[kHandle].decodeSync(chunk, encoding, false)); +}; +BrotliDecompress.prototype._flush = function (callback) { + callback(undefined, this[kHandle].decodeSync("", undefined, true)); +}; + // TODO: **use a native binding from Bun for this!!** // This is a very slow module! // It should really be fixed. It will show up in benchmarking. It also loads @@ -4187,22 +4231,12 @@ var require_lib = __commonJS({ return zlibBufferSync(new InflateRaw(opts), buffer); }; - // not implemented, stubs - for (const method of [ - // prettier-ignore - "BrotliCompress", - "BrotliDecompress", - "createBrotliCompress", - "createBrotliDecompress", - ]) { - exports[method] = function (buffer, opts, callback) { - throw new Error(`zlib.${method} is not implemented`); - }; - } exports.brotliCompress = brotliCompress; exports.brotliDecompress = brotliDecompress; exports.brotliCompressSync = brotliCompressSync; exports.brotliDecompressSync = brotliDecompressSync; + exports.createBrotliCompress = createBrotliCompress; + exports.createBrotliDecompress = createBrotliDecompress; function zlibBuffer(engine, buffer, callback) { var buffers = []; diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 1b89965b92d01c..0c917489f4afaa 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -32,6 +32,14 @@ import * as fs from "node:fs"; import * as buffer from "node:buffer"; import * as util from "node:util"; +function* window(buffer, size, advance = size) { + let i = 0; + while (i <= buffer.length) { + yield buffer.slice(i, i + size); + i += advance; + } +} + describe("zlib.gunzip", () => { it("should be able to unzip a Buffer and return an unzipped Buffer", async () => { const content = fs.readFileSync(import.meta.dir + "/fixture.html.gz"); @@ -50,18 +58,6 @@ describe("zlib.gunzip", () => { }); describe("zlib.brotli", () => { - it("returns stub", () => { - for (const method of [ - // prettier-ignore - "BrotliCompress", - "BrotliDecompress", - "createBrotliCompress", - "createBrotliDecompress", - ]) { - expect(() => zlib[method]()).toThrow(new Error(`zlib.${method} is not implemented`)); - } - }); - const inputString = "ΩΩLorem ipsum dolor sit amet, consectetur adipiscing eli" + "t. Morbi faucibus, purus at gravida dictum, libero arcu " + @@ -103,4 +99,32 @@ describe("zlib.brotli", () => { const roundtrip = zlib.brotliDecompressSync(compressedBuffer); expect(roundtrip.toString()).toEqual(inputString); }); + + it("can compress streaming", () => { + const encoder = zlib.createBrotliCompress(); + for (const chunk of window(inputString, 55)) { + encoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + encoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data).toEqual(compressedBuffer); + }); + }); + + it("can decompress streaming", () => { + const decoder = zlib.createBrotliDecompress(); + for (const chunk of window(compressedBuffer, 10)) { + decoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + decoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data).toEqual(Buffer.from(inputString)); + }); + }); }); From 931ce906b1c1ce3d6b26bca74832685843394874 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 19:49:48 -0700 Subject: [PATCH 18/33] add another test --- src/bun.js/api/brotli.zig | 25 +++++++++++-------------- test/js/node/zlib/zlib.test.js | 7 +++++++ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index cc40f192340853..4697ad70fcd5c0 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -87,13 +87,10 @@ pub const BrotliEncoder = struct { this.freelist.discard(to_free.len); } - fn collectOutputValue(this: *BrotliEncoder) ?JSC.JSValue { + fn collectOutputValue(this: *BrotliEncoder) JSC.JSValue { this.output_lock.lock(); defer this.output_lock.unlock(); - if (this.output.items.len == 0) - return null; - if (this.output.items.len > 16 * 1024) { defer this.output.items = ""; defer this.output.deinit(bun.default_allocator); @@ -117,7 +114,7 @@ pub const BrotliEncoder = struct { this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, - this.collectOutputValue() orelse return, + this.collectOutputValue(), }); if (result.toError()) |err| { @@ -288,7 +285,10 @@ pub const BrotliEncoder = struct { if (is_last) this.has_called_end = true; - var task = EncodeJob.new(.{ .encoder = this, .is_async = false }); + var task = EncodeJob.new(.{ + .encoder = this, + .is_async = false, + }); { this.input_lock.lock(); @@ -297,7 +297,7 @@ pub const BrotliEncoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } task.run(); - return this.collectOutputValue() orelse .undefined; + return if (!is_last) .undefined else this.collectOutputValue(); } pub fn end(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { @@ -389,13 +389,10 @@ pub const BrotliDecoder = struct { this.deinit(); } - fn collectOutputValue(this: *BrotliDecoder) ?JSC.JSValue { + fn collectOutputValue(this: *BrotliDecoder) JSC.JSValue { this.output_lock.lock(); defer this.output_lock.unlock(); - if (this.output.items.len == 0) - return null; - if (this.output.items.len > 16 * 1024) { defer this.output.clearRetainingCapacity(); return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); @@ -417,7 +414,7 @@ pub const BrotliDecoder = struct { this.globalThis.createErrorInstance("BrotliError", .{}) else JSC.JSValue.null, - this.collectOutputValue() orelse return, + this.collectOutputValue(), }); if (result.toError()) |err| { @@ -515,7 +512,7 @@ pub const BrotliDecoder = struct { this.input.writeItem(input_to_queue) catch unreachable; } task.run(); - return if (!is_last) .undefined else this.collectOutputValue() orelse .undefined; + return if (!is_last) .undefined else this.collectOutputValue(); } // We can only run one decode job at a time @@ -570,7 +567,7 @@ pub const BrotliDecoder = struct { this.decoder.output_lock.lock(); defer this.decoder.output_lock.unlock(); - const input = if (pending.len == 1) pending[0].slice() else input_list.items; + const input = if (pending.len <= 1) pending[0].slice() else input_list.items; this.decoder.stream.input = input; this.decoder.stream.readAll(false) catch { _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 0c917489f4afaa..73f3399c10aa0e 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -127,4 +127,11 @@ describe("zlib.brotli", () => { expect(data).toEqual(Buffer.from(inputString)); }); }); + + it("can roundtrip an empty string", async () => { + const input = ""; + const compressed = await util.promisify(zlib.brotliCompress)(input); + const roundtrip = await util.promisify(zlib.brotliDecompress)(compressed); + expect(roundtrip.toString()).toEqual(input); + }); }); From 1520aface3c6f2f4a4eb6cf74e7b4362f547dfff Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 22:15:34 -0700 Subject: [PATCH 19/33] compress stream fix --- src/bun.js/api/brotli.zig | 5 ++--- src/bun.js/node/types.zig | 5 ----- test/js/node/zlib/zlib.test.js | 15 +++++++++++++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index 4697ad70fcd5c0..f02c66790098af 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -141,13 +141,12 @@ pub const BrotliEncoder = struct { var any = false; - if (this.encoder.pending_encode_job_count.fetchAdd(1, .Monotonic) == 0) { - var is_last = false; + if (this.encoder.pending_encode_job_count.fetchAdd(1, .Monotonic) >= 0) { + const is_last = this.encoder.has_called_end; while (true) { const pending: []bun.JSC.Node.BlobOrStringOrBuffer = brk: { this.encoder.input_lock.lock(); defer this.encoder.input_lock.unlock(); - is_last = this.encoder.has_called_end; const readable = this.encoder.input.readableSlice(0); const out = bun.default_allocator.dupe(std.meta.Child(@TypeOf(readable)), readable) catch bun.outOfMemory(); this.encoder.input.discard(readable.len); diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 544f3f75000159..ee2cd1cb036edf 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -333,10 +333,8 @@ pub const BlobOrStringOrBuffer = union(enum) { if (blob.store) |store| { store.ref(); } - return .{ .blob = blob.* }; } - return .{ .string_or_buffer = StringOrBuffer.fromJS(global, allocator, value) orelse return null }; } @@ -349,10 +347,8 @@ pub const BlobOrStringOrBuffer = union(enum) { if (blob.store) |store| { store.ref(); } - return .{ .blob = blob.* }; } - return .{ .string_or_buffer = StringOrBuffer.fromJSWithEncodingValueMaybeAsync(global, allocator, value, encoding_value, is_async) orelse return null }; } }; @@ -544,7 +540,6 @@ pub const StringOrBuffer = union(enum) { break :brk .utf8; break :brk Encoding.fromJS(encoding_value, global) orelse .utf8; }; - return fromJSWithEncodingMaybeAsync(global, allocator, value, encoding, maybe_async); } }; diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 73f3399c10aa0e..6cad6e6afb10a2 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -134,4 +134,19 @@ describe("zlib.brotli", () => { const roundtrip = await util.promisify(zlib.brotliDecompress)(compressed); expect(roundtrip.toString()).toEqual(input); }); + + it("can compress streaming big", () => { + const encoder = zlib.createBrotliCompress(); + // prettier-ignore + for (const chunk of window(inputString+inputString+inputString+inputString, 65)) { + encoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + encoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data.length).toBeGreaterThan(0); + }); + }); }); From 605cc8018b54b6f49cbb41b932622b57e85217e9 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 22:16:49 -0700 Subject: [PATCH 20/33] use the harness here --- .../install/registry/bun-install-registry.test.ts | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/test/cli/install/registry/bun-install-registry.test.ts b/test/cli/install/registry/bun-install-registry.test.ts index 356ed18e9a1440..d89b81bc9a5e6c 100644 --- a/test/cli/install/registry/bun-install-registry.test.ts +++ b/test/cli/install/registry/bun-install-registry.test.ts @@ -1,5 +1,14 @@ import { file, spawn } from "bun"; -import { bunExe, bunEnv as env, isLinux, isWindows, toBeValidBin, toHaveBins, writeShebangScript } from "harness"; +import { + bunExe, + bunEnv as env, + isLinux, + isWindows, + toBeValidBin, + toHaveBins, + writeShebangScript, + tmpdirSync, +} from "harness"; import { join, sep } from "path"; import { mkdtempSync, realpathSync } from "fs"; import { rm, writeFile, mkdir, exists, cp } from "fs/promises"; @@ -40,7 +49,7 @@ afterAll(() => { }); beforeEach(async () => { - packageDir = mkdtempSync(join(realpathSync(tmpdir()), "bun-install-registry-" + testCounter++ + "-")); + packageDir = tmpdirSync("bun-install-registry-" + testCounter++ + "-"); env.BUN_INSTALL_CACHE_DIR = join(packageDir, ".bun-cache"); env.BUN_TMPDIR = env.TMPDIR = env.TEMP = join(packageDir, ".bun-tmp"); await writeFile( From 7670c9c737840abe690b669ffe02c1133c9632da Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Fri, 3 May 2024 22:25:26 -0700 Subject: [PATCH 21/33] add an end-to-end test --- src/bun.js/api/brotli.zig | 3 +-- test/js/node/zlib/zlib.test.js | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index f02c66790098af..153fdc4c3c830c 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -539,11 +539,10 @@ pub const BrotliDecoder = struct { var any = false; if (true) { - var is_last = false; + const is_last = this.decoder.has_called_end; while (true) { this.decoder.input_lock.lock(); defer this.decoder.input_lock.unlock(); - is_last = this.decoder.has_called_end; if (!is_last) break; const readable = this.decoder.input.readableSlice(0); const pending = readable; diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 6cad6e6afb10a2..484d8a36230936 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -31,6 +31,8 @@ import * as zlib from "node:zlib"; import * as fs from "node:fs"; import * as buffer from "node:buffer"; import * as util from "node:util"; +import { resolve } from "node:path"; +import { tmpdirSync } from "harness"; function* window(buffer, size, advance = size) { let i = 0; @@ -149,4 +151,34 @@ describe("zlib.brotli", () => { expect(data.length).toBeGreaterThan(0); }); }); + + it("fully works as a stream.Transform", async () => { + const x_dir = tmpdirSync("bun.test."); + const out_path_c = resolve(x_dir, "this.js.br"); + const out_path_d = resolve(x_dir, "this.js"); + + { + const { resolve, promise } = Promise.withResolvers(); + const readStream = fs.createReadStream(import.meta.filename); + const writeStream = fs.createWriteStream(out_path_c); + const brStream = zlib.createBrotliCompress(); + const the_stream = readStream.pipe(brStream).pipe(writeStream); + the_stream.on("finish", resolve); + await promise; + } + { + const { resolve, promise } = Promise.withResolvers(); + const readStream = fs.createReadStream(out_path_c); + const writeStream = fs.createWriteStream(out_path_d); + const brStream = zlib.createBrotliDecompress(); + const the_stream = readStream.pipe(brStream).pipe(writeStream); + the_stream.on("finish", resolve); + await promise; + } + { + const expected = await Bun.file(import.meta.filename).text(); + const actual = await Bun.file(out_path_d).text(); + expect(actual).toEqual(expected); + } + }); }); From 37bec3a9ac5a0276c20bb6bb69274a5cf3c23981 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Sat, 4 May 2024 00:36:58 -0700 Subject: [PATCH 22/33] oops didnt actually need that ig --- src/brotli.zig | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/brotli.zig b/src/brotli.zig index 0742f482a29a0a..3ee2388f344ef6 100644 --- a/src/brotli.zig +++ b/src/brotli.zig @@ -122,11 +122,10 @@ pub const BrotliReaderArrayList = struct { ); const bytes_written = unused_capacity.len -| out_remaining; - // const bytes_read = next_in.len -| in_remaining; + const bytes_read = next_in.len -| in_remaining; this.list.items.len += bytes_written; - // TODO: calling this in a state where 'this.total_in' is not the full data raises error.ShortRead in many cases; look into this - // this.total_in += bytes_read; + this.total_in += bytes_read; switch (result) { .success => { From 5bf78c25f1e9aef70c24439dd5184692d2a02052 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 14:07:05 -0700 Subject: [PATCH 23/33] avoid this dupe --- src/bun.js/api/brotli.zig | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index 153fdc4c3c830c..507f6cbcbc6224 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -144,15 +144,11 @@ pub const BrotliEncoder = struct { if (this.encoder.pending_encode_job_count.fetchAdd(1, .Monotonic) >= 0) { const is_last = this.encoder.has_called_end; while (true) { - const pending: []bun.JSC.Node.BlobOrStringOrBuffer = brk: { - this.encoder.input_lock.lock(); - defer this.encoder.input_lock.unlock(); - const readable = this.encoder.input.readableSlice(0); - const out = bun.default_allocator.dupe(std.meta.Child(@TypeOf(readable)), readable) catch bun.outOfMemory(); - this.encoder.input.discard(readable.len); - break :brk out; - }; - defer bun.default_allocator.free(pending); + this.encoder.input_lock.lock(); + defer this.encoder.input_lock.unlock(); + const readable = this.encoder.input.readableSlice(0); + defer this.encoder.input.discard(readable.len); + const pending = readable; const Writer = struct { encoder: *BrotliEncoder, From 9cd48eff9497a7e9a8a96502a7458c7c27b8e750 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 14:07:12 -0700 Subject: [PATCH 24/33] if true bad --- src/bun.js/api/brotli.zig | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index 507f6cbcbc6224..fed9bc27635ff4 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -149,6 +149,7 @@ pub const BrotliEncoder = struct { const readable = this.encoder.input.readableSlice(0); defer this.encoder.input.discard(readable.len); const pending = readable; + const Writer = struct { encoder: *BrotliEncoder, @@ -534,7 +535,7 @@ pub const BrotliDecoder = struct { var any = false; - if (true) { + if (this.decoder.pending_decode_job_count.fetchAdd(1, .Monotonic) >= 0) { const is_last = this.decoder.has_called_end; while (true) { this.decoder.input_lock.lock(); From 68fde6236444be356ea945dd2f1f217c9afb2009 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 14:07:45 -0700 Subject: [PATCH 25/33] postpone this until EventEmitter is refactored --- src/js/node/stream.ts | 48 ++++++++++++------------- test/js/node/stream/node-stream.test.js | 6 ++-- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/js/node/stream.ts b/src/js/node/stream.ts index 6444207d053942..0a423aaad19425 100644 --- a/src/js/node/stream.ts +++ b/src/js/node/stream.ts @@ -2271,13 +2271,13 @@ var require_readable = __commonJS({ if (!(this instanceof Readable)) return new Readable(options); const isDuplex = this instanceof require_duplex(); - this._events ??= { - close: undefined, - error: undefined, - prefinish: undefined, - finish: undefined, - drain: undefined, - }; + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // }; this._readableState = new ReadableState(options, this, isDuplex); if (options) { @@ -3478,13 +3478,13 @@ var require_writable = __commonJS({ const isDuplex = this instanceof require_duplex(); if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options); - this._events ??= { - close: undefined, - error: undefined, - prefinish: undefined, - finish: undefined, - drain: undefined, - }; + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // }; this._writableState = new WritableState(options, this, isDuplex); if (options) { @@ -4450,16 +4450,16 @@ var require_duplex = __commonJS({ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); - this._events ??= { - close: undefined, - error: undefined, - prefinish: undefined, - finish: undefined, - drain: undefined, - data: undefined, - end: undefined, - readable: undefined, - }; + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // data: undefined, + // end: undefined, + // readable: undefined, + // }; Readable.$call(this, options); Writable.$call(this, options); diff --git a/test/js/node/stream/node-stream.test.js b/test/js/node/stream/node-stream.test.js index 94422fac2cbb46..bee68e3b81deb6 100644 --- a/test/js/node/stream/node-stream.test.js +++ b/test/js/node/stream/node-stream.test.js @@ -155,7 +155,7 @@ describe("Readable", () => { stream.pipe(writable); }); - it("should have the correct fields in _events", () => { + it.todo("should have the correct fields in _events", () => { const s = Readable({}); expect(s._events).toHaveProperty("close"); expect(s._events).toHaveProperty("error"); @@ -204,7 +204,7 @@ describe("createReadStream", () => { }); describe("Writable", () => { - it("should have the correct fields in _events", () => { + it.todo("should have the correct fields in _events", () => { const s = Writable({}); expect(s._events).toHaveProperty("close"); expect(s._events).toHaveProperty("error"); @@ -228,7 +228,7 @@ describe("Duplex", () => { expect(subclass instanceof Duplex).toBe(true); }); - it("should have the correct fields in _events", () => { + it.todo("should have the correct fields in _events", () => { const s = Duplex({}); expect(s._events).toHaveProperty("close"); expect(s._events).toHaveProperty("error"); From 7a3f3a83d90b2c8aee32458788a9f7eef1b33a38 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 18:32:16 -0700 Subject: [PATCH 26/33] we always have brotli now --- src/http.zig | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/http.zig b/src/http.zig index 0024932d4e6399..9b988e66068dd9 100644 --- a/src/http.zig +++ b/src/http.zig @@ -1627,9 +1627,7 @@ const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" }; const accept_encoding_no_compression = "identity"; const accept_encoding_compression = "gzip, deflate, br"; -const accept_encoding_compression_no_brotli = "gzip, deflate"; const accept_encoding_header_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression }; -const accept_encoding_header_compression_no_brotli = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression_no_brotli }; const accept_encoding_header_no_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_no_compression }; const accept_encoding_header = if (FeatureFlags.disable_compression_in_http_client) From e9f6775aad1d81d980870c68343bd26730732720 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 18:32:49 -0700 Subject: [PATCH 27/33] node:zlib: expose BrotliCompress and BrotliDecompress constructors --- src/js/node/zlib.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index bf7c931c752a38..45f53846bc805d 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -4236,7 +4236,9 @@ var require_lib = __commonJS({ exports.brotliCompressSync = brotliCompressSync; exports.brotliDecompressSync = brotliDecompressSync; exports.createBrotliCompress = createBrotliCompress; + exports.BrotliCompress = BrotliCompress; exports.createBrotliDecompress = createBrotliDecompress; + exports.BrotliDecompress = BrotliDecompress; function zlibBuffer(engine, buffer, callback) { var buffers = []; From bcdc345d52625768ccc07042359b76bc4e696857 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 18:33:10 -0700 Subject: [PATCH 28/33] move these imports to the top of the file --- test/js/node/zlib/zlib.test.js | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 484d8a36230936..8d30d140917d38 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -1,5 +1,11 @@ import { describe, it, expect } from "bun:test"; import { gzipSync, deflateSync, inflateSync, gunzipSync } from "bun"; +import * as zlib from "node:zlib"; +import * as fs from "node:fs"; +import * as buffer from "node:buffer"; +import * as util from "node:util"; +import { resolve } from "node:path"; +import { tmpdirSync } from "harness"; describe("zlib", () => { it("should be able to deflate and inflate", () => { @@ -27,13 +33,6 @@ describe("zlib", () => { }); }); -import * as zlib from "node:zlib"; -import * as fs from "node:fs"; -import * as buffer from "node:buffer"; -import * as util from "node:util"; -import { resolve } from "node:path"; -import { tmpdirSync } from "harness"; - function* window(buffer, size, advance = size) { let i = 0; while (i <= buffer.length) { From e41c69ffce450640c0076910f60af6ca32f30d4c Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 18:33:32 -0700 Subject: [PATCH 29/33] add more brotli e2e tests with node:http --- test/js/node/http/node-http.test.ts | 71 ++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index 09902eb77a72f3..bf40cf0ec348da 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1,5 +1,5 @@ // @ts-nocheck -import { +import http, { createServer, request, get, @@ -27,6 +27,8 @@ import { PassThrough } from "node:stream"; const { describe, expect, it, beforeAll, afterAll, createDoneDotAll } = createTest(import.meta.path); import { bunExe } from "bun:harness"; import { bunEnv, tmpdirSync } from "harness"; +import * as stream from "node:stream"; +import * as zlib from "node:zlib"; function listen(server: Server, protocol: string = "http"): Promise { return new Promise((resolve, reject) => { @@ -1904,3 +1906,70 @@ it.skipIf(!process.env.TEST_INFO_STRIPE)("should be able to connect to stripe", err = await new Response(stderr).text(); expect(err).toContain(`error: No such charge: '${charge_id}'\n`); }); + +it("can send brotli from Server and receive with fetch", async () => { + try { + var server = createServer((req, res) => { + expect(req.url).toBe("/hello"); + res.writeHead(200); + res.setHeader("content-encoding", "br"); + + const inputStream = new stream.Readable(); + inputStream.push("Hello World"); + inputStream.push(null); + + inputStream.pipe(zlib.createBrotliCompress()).pipe(res); + }); + const url = await listen(server); + const res = await fetch(new URL("/hello", url)); + expect(await res.text()).toBe("Hello World"); + } catch (e) { + throw e; + } finally { + server.close(); + } +}); + +it("can send brotli from Server and receive with Client", async () => { + try { + var server = createServer((req, res) => { + expect(req.url).toBe("/hello"); + res.writeHead(200); + res.setHeader("content-encoding", "br"); + + const inputStream = new stream.Readable(); + inputStream.push("Hello World"); + inputStream.push(null); + + const passthrough = new stream.PassThrough(); + passthrough.on("data", data => res.write(data)); + passthrough.on("end", () => res.end()); + + inputStream.pipe(zlib.createBrotliCompress()).pipe(passthrough); + }); + + const url = await listen(server); + const { resolve, reject, promise } = Promise.withResolvers(); + http.get(new URL("/hello", url), res => { + let rawData = ""; + const passthrough = stream.PassThrough(); + passthrough.on("data", chunk => { + rawData += chunk; + }); + passthrough.on("end", () => { + try { + expect(Buffer.from(rawData)).toEqual(Buffer.from("Hello World")); + resolve(); + } catch (e) { + reject(e); + } + }); + res.pipe(zlib.createBrotliDecompress()).pipe(passthrough); + }); + await promise; + } catch (e) { + throw e; + } finally { + server.close(); + } +}); From ac4c4b62d7a8193663927b7070ce462fa98000b5 Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 18:45:57 -0700 Subject: [PATCH 30/33] avoid heap allocation in sync case --- src/bun.js/api/brotli.zig | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig index fed9bc27635ff4..cac0e29d16ce7f 100644 --- a/src/bun.js/api/brotli.zig +++ b/src/bun.js/api/brotli.zig @@ -133,10 +133,15 @@ pub const BrotliEncoder = struct { pub usingnamespace bun.New(@This()); + pub fn runTask(this: *JSC.WorkPoolTask) void { + var job: *EncodeJob = @fieldParentPtr(EncodeJob, "task", this); + job.run(); + job.destroy(); + } + pub fn run(this: *EncodeJob) void { defer { _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); - this.destroy(); } var any = false; @@ -206,11 +211,6 @@ pub const BrotliEncoder = struct { vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.encoder))); } } - - pub fn runTask(this: *JSC.WorkPoolTask) void { - var job: *EncodeJob = @fieldParentPtr(EncodeJob, "task", this); - job.run(); - } }; pub fn encode(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { @@ -281,10 +281,10 @@ pub const BrotliEncoder = struct { if (is_last) this.has_called_end = true; - var task = EncodeJob.new(.{ + var task: EncodeJob = .{ .encoder = this, .is_async = false, - }); + }; { this.input_lock.lock(); @@ -496,10 +496,10 @@ pub const BrotliDecoder = struct { if (is_last) this.has_called_end = true; - var task = DecodeJob.new(.{ + var task: DecodeJob = .{ .decoder = this, .is_async = false, - }); + }; { this.input_lock.lock(); @@ -525,12 +525,12 @@ pub const BrotliDecoder = struct { pub fn runTask(this: *JSC.WorkPoolTask) void { var job: *DecodeJob = @fieldParentPtr(DecodeJob, "task", this); job.run(); + job.destroy(); } pub fn run(this: *DecodeJob) void { defer { _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); - this.destroy(); } var any = false; From 8e920dbf7ac5e3eadcc8c117043389b4855fceae Mon Sep 17 00:00:00 2001 From: Meghan Denny Date: Mon, 6 May 2024 19:58:30 -0700 Subject: [PATCH 31/33] fix constructor check --- src/js/node/zlib.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 45f53846bc805d..1bcfeadcaa3c74 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -44,7 +44,7 @@ function createBrotliCompress(opts) { const kHandle = Symbol("kHandle"); function BrotliCompress(opts) { - if (!new.target) return new BrotliCompress(opts); + if (!(this instanceof BrotliCompress)) return new BrotliCompress(opts); this[kHandle] = createBrotliEncoder(opts, {}, null); stream.Transform.$call(this); } @@ -63,7 +63,7 @@ function createBrotliDecompress(opts) { } function BrotliDecompress(opts) { - if (!new.target) return new BrotliDecompress(opts); + if (!(this instanceof BrotliDecompress)) return new BrotliDecompress(opts); this[kHandle] = createBrotliDecoder(opts, {}, null); stream.Transform.$call(this); } From a264d59b8ddf795955604779ca5e593a44750203 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Mon, 6 May 2024 20:48:51 -0700 Subject: [PATCH 32/33] Update src/js/node/zlib.ts --- src/js/node/zlib.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 1bcfeadcaa3c74..c471455492ebb4 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -46,7 +46,7 @@ const kHandle = Symbol("kHandle"); function BrotliCompress(opts) { if (!(this instanceof BrotliCompress)) return new BrotliCompress(opts); this[kHandle] = createBrotliEncoder(opts, {}, null); - stream.Transform.$call(this); + stream.Transform.$apply(this, arguments); } BrotliCompress.prototype = {}; ObjectSetPrototypeOf(BrotliCompress.prototype, stream.Transform.prototype); From 0a5cbe14ba00e7806ccefdadd36a10b9def42541 Mon Sep 17 00:00:00 2001 From: Jarred Sumner Date: Mon, 6 May 2024 20:49:38 -0700 Subject: [PATCH 33/33] Update src/js/node/zlib.ts --- src/js/node/zlib.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index c471455492ebb4..65a57448b967a3 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -65,7 +65,7 @@ function createBrotliDecompress(opts) { function BrotliDecompress(opts) { if (!(this instanceof BrotliDecompress)) return new BrotliDecompress(opts); this[kHandle] = createBrotliDecoder(opts, {}, null); - stream.Transform.$call(this); + stream.Transform.$apply(this, arguments); } BrotliDecompress.prototype = {}; ObjectSetPrototypeOf(BrotliDecompress.prototype, stream.Transform.prototype);