diff --git a/docs/runtime/nodejs-apis.md b/docs/runtime/nodejs-apis.md index f09a9797103a06..57f3f15780ac1f 100644 --- a/docs/runtime/nodejs-apis.md +++ b/docs/runtime/nodejs-apis.md @@ -173,7 +173,7 @@ Some methods are not optimized yet. ### [`node:zlib`](https://nodejs.org/api/zlib.html) -🟡 Missing `BrotliCompress` `BrotliDecompress` `brotliCompressSync` `brotliDecompress` `brotliDecompressSync` `createBrotliCompress` `createBrotliDecompress`. Unoptimized. +🟡 Unoptimized. ## Globals diff --git a/src/brotli.zig b/src/brotli.zig index 755f8ec794ea48..3ee2388f344ef6 100644 --- a/src/brotli.zig +++ b/src/brotli.zig @@ -1,7 +1,11 @@ const bun = @import("root").bun; const std = @import("std"); -const c = @import("./deps/brotli_decoder.zig"); +const c = struct { + pub usingnamespace @import("./deps/brotli_decoder.zig"); + pub usingnamespace @import("./deps/brotli_encoder.zig"); +}; const BrotliDecoder = c.BrotliDecoder; +const BrotliEncoder = c.BrotliEncoder; const mimalloc = bun.Mimalloc; @@ -15,7 +19,7 @@ const BrotliAllocator = struct { return mimalloc.mi_malloc(len) orelse unreachable; } - pub fn free(_: ?*anyopaque, data: *anyopaque) callconv(.C) void { + pub fn free(_: ?*anyopaque, data: ?*anyopaque) callconv(.C) void { if (comptime bun.is_heap_breakdown_enabled) { const zone = bun.HeapBreakdown.malloc_zone_t.get(BrotliAllocator); zone.malloc_zone_free(data); @@ -26,7 +30,7 @@ const BrotliAllocator = struct { } }; -pub const Options = struct { +pub const DecoderOptions = struct { pub const Params = std.enums.EnumFieldStruct(c.BrotliDecoderParameter, bool, false); params: Params = Params{ @@ -54,7 +58,11 @@ pub const BrotliReaderArrayList = struct { pub usingnamespace bun.New(BrotliReaderArrayList); - pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: Options) !*BrotliReaderArrayList { + pub fn newWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: DecoderOptions) !*BrotliReaderArrayList { + return BrotliReaderArrayList.new(try initWithOptions(input, list, allocator, options)); + } + + pub fn initWithOptions(input: []const u8, list: *std.ArrayListUnmanaged(u8), allocator: std.mem.Allocator, options: DecoderOptions) !BrotliReaderArrayList { if (!BrotliDecoder.initializeBrotli()) { return error.BrotliFailedToLoad; } @@ -67,15 +75,13 @@ pub const BrotliReaderArrayList = struct { bun.assert(list.items.ptr != input.ptr); - return BrotliReaderArrayList.new( - .{ - .input = input, - .list_ptr = list, - .list = list.*, - .list_allocator = allocator, - .brotli = brotli, - }, - ); + return .{ + .input = input, + .list_ptr = list, + .list = list.*, + .list_allocator = allocator, + .brotli = brotli, + }; } pub fn end(this: *BrotliReaderArrayList) void { @@ -83,9 +89,7 @@ pub const BrotliReaderArrayList = struct { } pub fn readAll(this: *BrotliReaderArrayList, is_done: bool) !void { - defer { - this.list_ptr.* = this.list; - } + defer this.list_ptr.* = this.list; if (this.state == .End or this.state == .Error) { return; @@ -128,7 +132,6 @@ pub const BrotliReaderArrayList = struct { if (comptime bun.Environment.allow_assert) { bun.assert(this.brotli.isFinished()); } - this.end(); return; }, @@ -166,3 +169,94 @@ pub const BrotliReaderArrayList = struct { this.destroy(); } }; + +pub const BrotliCompressionStream = struct { + pub const State = enum { + Inflating, + End, + Error, + }; + + brotli: *BrotliEncoder, + state: State = State.Inflating, + total_out: usize = 0, + total_in: usize = 0, + + pub fn init() !BrotliCompressionStream { + const instance = BrotliEncoder.createInstance(&BrotliAllocator.alloc, &BrotliAllocator.free, null) orelse return error.BrotliFailedToCreateInstance; + + return BrotliCompressionStream{ + .brotli = instance, + }; + } + + pub fn writeChunk(this: *BrotliCompressionStream, input: []const u8, last: bool) ![]const u8 { + const result = this.brotli.compressStream(if (last) BrotliEncoder.Operation.finish else .process, input); + + if (!result.success) { + this.state = .Error; + return error.BrotliCompressionError; + } + + return result.output; + } + + pub fn write(this: *BrotliCompressionStream, input: []const u8, last: bool) ![]const u8 { + if (this.state == .End or this.state == .Error) { + return ""; + } + + return this.writeChunk(input, last); + } + + pub fn end(this: *BrotliCompressionStream) ![]const u8 { + defer this.state = .End; + + return try this.write("", true); + } + + pub fn deinit(this: *BrotliCompressionStream) void { + this.brotli.destroyInstance(); + } + + fn NewWriter(comptime InputWriter: type) type { + return struct { + compressor: *BrotliCompressionStream, + input_writer: InputWriter, + + const Self = @This(); + pub const WriteError = error{BrotliCompressionError} || InputWriter.Error; + pub const Writer = std.io.Writer(@This(), WriteError, Self.write); + + pub fn init(compressor: *BrotliCompressionStream, input_writer: InputWriter) Self { + return Self{ + .compressor = compressor, + .input_writer = input_writer, + }; + } + + pub fn write(self: Self, to_compress: []const u8) WriteError!usize { + const decompressed = try self.compressor.write(to_compress, false); + try self.input_writer.writeAll(decompressed); + return to_compress.len; + } + + pub fn end(self: Self) !usize { + const decompressed = try self.compressor.end(); + try self.input_writer.writeAll(decompressed); + } + + pub fn writer(self: Self) Writer { + return Writer{ .context = self }; + } + }; + } + + pub fn writerContext(this: *BrotliCompressionStream, writable: anytype) NewWriter(@TypeOf(writable)) { + return NewWriter(@TypeOf(writable)).init(this, writable); + } + + pub fn writer(this: *BrotliCompressionStream, writable: anytype) NewWriter(@TypeOf(writable)).Writer { + return this.writerContext(writable).writer(); + } +}; diff --git a/src/bun.js/api/brotli.classes.ts b/src/bun.js/api/brotli.classes.ts new file mode 100644 index 00000000000000..2095a2c10433c9 --- /dev/null +++ b/src/bun.js/api/brotli.classes.ts @@ -0,0 +1,63 @@ +import { define } from "../../codegen/class-definitions"; + +export default [ + define({ + name: "BrotliEncoder", + construct: true, + noConstructor: true, + finalize: true, + configurable: false, + hasPendingActivity: true, + klass: {}, + JSType: "0b11101110", + values: ["callback"], + proto: { + encode: { + fn: "encode", + length: 2, + }, + encodeSync: { + fn: "encodeSync", + length: 2, + }, + end: { + fn: "end", + length: 2, + }, + endSync: { + fn: "endSync", + length: 2, + }, + }, + }), + define({ + name: "BrotliDecoder", + construct: true, + noConstructor: true, + finalize: true, + configurable: false, + hasPendingActivity: true, + klass: {}, + JSType: "0b11101110", + values: ["callback"], + + proto: { + decode: { + fn: "decode", + length: 2, + }, + decodeSync: { + fn: "decodeSync", + length: 2, + }, + end: { + fn: "end", + length: 2, + }, + endSync: { + fn: "endSync", + length: 2, + }, + }, + }), +]; diff --git a/src/bun.js/api/brotli.zig b/src/bun.js/api/brotli.zig new file mode 100644 index 00000000000000..cac0e29d16ce7f --- /dev/null +++ b/src/bun.js/api/brotli.zig @@ -0,0 +1,605 @@ +const bun = @import("root").bun; +const JSC = bun.JSC; +const std = @import("std"); +const brotli = bun.brotli; + +const Queue = std.fifo.LinearFifo(JSC.Node.BlobOrStringOrBuffer, .Dynamic); + +pub const BrotliEncoder = struct { + pub usingnamespace bun.New(@This()); + pub usingnamespace JSC.Codegen.JSBrotliEncoder; + + stream: brotli.BrotliCompressionStream, + + freelist: Queue = Queue.init(bun.default_allocator), + freelist_write_lock: bun.Lock = bun.Lock.init(), + + globalThis: *JSC.JSGlobalObject, + + input: Queue = Queue.init(bun.default_allocator), + input_lock: bun.Lock = bun.Lock.init(), + + has_called_end: bool = false, + callback_value: JSC.Strong = .{}, + + output: std.ArrayListUnmanaged(u8) = .{}, + output_lock: bun.Lock = bun.Lock.init(), + + has_pending_activity: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + pending_encode_job_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + ref_count: u32 = 1, + write_failed: bool = false, + poll_ref: bun.Async.KeepAlive = .{}, + + pub fn hasPendingActivity(this: *BrotliEncoder) callconv(.C) bool { + return this.has_pending_activity.load(.Monotonic) > 0; + } + + pub fn constructor(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) ?*BrotliEncoder { + globalThis.throw("BrotliEncoder is not constructable", .{}); + return null; + } + + pub fn create(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3).slice(); + + if (arguments.len < 3) { + globalThis.throwNotEnoughArguments("BrotliEncoder", 3, arguments.len); + return .zero; + } + + const callback = arguments[2]; + + var this: *BrotliEncoder = BrotliEncoder.new(.{ + .globalThis = globalThis, + .stream = brotli.BrotliCompressionStream.init() catch { + globalThis.throw("Failed to create BrotliEncoder", .{}); + return .zero; + }, + }); + + const out = this.toJS(globalThis); + @This().callbackSetCached(out, globalThis, callback); + this.callback_value.set(globalThis, callback); + + return out; + } + + pub fn finalize(this: *BrotliEncoder) callconv(.C) void { + this.deinit(); + } + + pub fn deinit(this: *BrotliEncoder) void { + this.callback_value.deinit(); + this.drainFreelist(); + this.stream.deinit(); + this.input.deinit(); + this.destroy(); + } + + fn drainFreelist(this: *BrotliEncoder) void { + this.freelist_write_lock.lock(); + defer this.freelist_write_lock.unlock(); + const to_free = this.freelist.readableSlice(0); + for (to_free) |*input| { + input.deinit(); + } + this.freelist.discard(to_free.len); + } + + fn collectOutputValue(this: *BrotliEncoder) JSC.JSValue { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len > 16 * 1024) { + defer this.output.items = ""; + defer this.output.deinit(bun.default_allocator); + return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.items = ""; + defer this.output.deinit(bun.default_allocator); + return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + } + + pub fn runFromJSThread(this: *BrotliEncoder) void { + this.poll_ref.unref(this.globalThis.bunVM()); + + defer _ = this.has_pending_activity.fetchSub(1, .Monotonic); + this.drainFreelist(); + + const result = this.callback_value.get().?.call(this.globalThis, &.{ + if (this.write_failed) + // TODO: propagate error from brotli + this.globalThis.createErrorInstance("BrotliError", .{}) + else + JSC.JSValue.null, + this.collectOutputValue(), + }); + + if (result.toError()) |err| { + this.globalThis.bunVM().runErrorHandler(err, null); + } + } + + // We can only run one encode job at a time + // But we don't have an idea of a serial dispatch queue + // So instead, we let you enqueue as many times as you want + // and if one is already running, we just don't do anything + const EncodeJob = struct { + task: JSC.WorkPoolTask = .{ .callback = &runTask }, + encoder: *BrotliEncoder, + is_async: bool, + + pub usingnamespace bun.New(@This()); + + pub fn runTask(this: *JSC.WorkPoolTask) void { + var job: *EncodeJob = @fieldParentPtr(EncodeJob, "task", this); + job.run(); + job.destroy(); + } + + pub fn run(this: *EncodeJob) void { + defer { + _ = this.encoder.has_pending_activity.fetchSub(1, .Monotonic); + } + + var any = false; + + if (this.encoder.pending_encode_job_count.fetchAdd(1, .Monotonic) >= 0) { + const is_last = this.encoder.has_called_end; + while (true) { + this.encoder.input_lock.lock(); + defer this.encoder.input_lock.unlock(); + const readable = this.encoder.input.readableSlice(0); + defer this.encoder.input.discard(readable.len); + const pending = readable; + + const Writer = struct { + encoder: *BrotliEncoder, + + pub const Error = error{OutOfMemory}; + pub fn writeAll(writer: @This(), chunk: []const u8) Error!void { + writer.encoder.output_lock.lock(); + defer writer.encoder.output_lock.unlock(); + + try writer.encoder.output.appendSlice(bun.default_allocator, chunk); + } + }; + + defer { + this.encoder.freelist_write_lock.lock(); + this.encoder.freelist.write(pending) catch unreachable; + this.encoder.freelist_write_lock.unlock(); + } + for (pending) |input| { + var writer = this.encoder.stream.writer(Writer{ .encoder = this.encoder }); + writer.writeAll(input.slice()) catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }; + } + + any = any or pending.len > 0; + + if (this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic) == 0) + break; + } + + if (is_last and any) { + var output = &this.encoder.output; + this.encoder.output_lock.lock(); + defer this.encoder.output_lock.unlock(); + + output.appendSlice(bun.default_allocator, this.encoder.stream.end() catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }) catch { + _ = this.encoder.pending_encode_job_count.fetchSub(1, .Monotonic); + this.encoder.write_failed = true; + return; + }; + } + } + + if (this.is_async and any) { + var vm = this.encoder.globalThis.bunVMConcurrently(); + _ = this.encoder.has_pending_activity.fetchAdd(1, .Monotonic); + this.encoder.poll_ref.refConcurrently(vm); + vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.encoder))); + } + } + }; + + pub fn encode(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.encode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.encode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.encode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task = EncodeJob.new(.{ + .encoder = this, + .is_async = true, + }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + JSC.WorkPool.schedule(&task.task); + + return .undefined; + } + + pub fn encodeSync(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.encode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.encode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.encode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task: EncodeJob = .{ + .encoder = this, + .is_async = false, + }; + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + task.run(); + return if (!is_last) .undefined else this.collectOutputValue(); + } + + pub fn end(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + pub fn endSync(this: *BrotliEncoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } +}; + +pub const BrotliDecoder = struct { + pub usingnamespace bun.New(@This()); + pub usingnamespace JSC.Codegen.JSBrotliDecoder; + + globalThis: *JSC.JSGlobalObject, + stream: brotli.BrotliReaderArrayList, + + has_pending_activity: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + ref_count: u32 = 1, + poll_ref: bun.Async.KeepAlive = .{}, + write_failed: bool = false, + callback_value: JSC.Strong = .{}, + has_called_end: bool = false, + pending_decode_job_count: std.atomic.Value(u32) = std.atomic.Value(u32).init(0), + + input: Queue = Queue.init(bun.default_allocator), + input_lock: bun.Lock = bun.Lock.init(), + + output: std.ArrayListUnmanaged(u8) = .{}, + output_lock: bun.Lock = bun.Lock.init(), + + freelist: Queue = Queue.init(bun.default_allocator), + freelist_write_lock: bun.Lock = bun.Lock.init(), + + pub fn hasPendingActivity(this: *BrotliDecoder) callconv(.C) bool { + return this.has_pending_activity.load(.Monotonic) > 0; + } + + pub fn deinit(this: *BrotliDecoder) void { + this.callback_value.deinit(); + this.drainFreelist(); + this.output.deinit(bun.default_allocator); + this.stream.brotli.destroyInstance(); + this.input.deinit(); + this.destroy(); + } + + pub fn constructor(globalThis: *JSC.JSGlobalObject, _: *JSC.CallFrame) callconv(.C) ?*BrotliDecoder { + globalThis.throw("Crypto is not constructable", .{}); + return null; + } + + pub fn create(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3).slice(); + + if (arguments.len < 3) { + globalThis.throwNotEnoughArguments("BrotliDecoder", 3, arguments.len); + return .zero; + } + + const callback = arguments[2]; + + var this: *BrotliDecoder = BrotliDecoder.new(.{ + .globalThis = globalThis, + .stream = undefined, // &this.output needs to be a stable pointer + }); + this.stream = brotli.BrotliReaderArrayList.initWithOptions("", &this.output, bun.default_allocator, .{}) catch { + globalThis.throw("Failed to create BrotliDecoder", .{}); + return .zero; + }; + + const out = this.toJS(globalThis); + @This().callbackSetCached(out, globalThis, callback); + this.callback_value.set(globalThis, callback); + + return out; + } + + pub fn finalize(this: *BrotliDecoder) callconv(.C) void { + this.deinit(); + } + + fn collectOutputValue(this: *BrotliDecoder) JSC.JSValue { + this.output_lock.lock(); + defer this.output_lock.unlock(); + + if (this.output.items.len > 16 * 1024) { + defer this.output.clearRetainingCapacity(); + return JSC.JSValue.createBuffer(this.globalThis, this.output.items, bun.default_allocator); + } else { + defer this.output.clearRetainingCapacity(); + return JSC.ArrayBuffer.createBuffer(this.globalThis, this.output.items); + } + } + + pub fn runFromJSThread(this: *BrotliDecoder) void { + this.poll_ref.unref(this.globalThis.bunVM()); + + defer _ = this.has_pending_activity.fetchSub(1, .Monotonic); + this.drainFreelist(); + + const result = this.callback_value.get().?.call(this.globalThis, &.{ + if (this.write_failed) + // TODO: propagate error from brotli + this.globalThis.createErrorInstance("BrotliError", .{}) + else + JSC.JSValue.null, + this.collectOutputValue(), + }); + + if (result.toError()) |err| { + this.globalThis.bunVM().runErrorHandler(err, null); + } + } + + fn drainFreelist(this: *BrotliDecoder) void { + this.freelist_write_lock.lock(); + defer this.freelist_write_lock.unlock(); + const to_free = this.freelist.readableSlice(0); + for (to_free) |*input| { + input.deinit(); + } + this.freelist.discard(to_free.len); + } + + pub fn decode(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.decode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.decode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.decode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task = DecodeJob.new(.{ + .decoder = this, + .is_async = true, + }); + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + JSC.WorkPool.schedule(&task.task); + + return .undefined; + } + + pub fn decodeSync(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + const arguments = callframe.arguments(3); + + if (arguments.len < 2) { + globalThis.throwNotEnoughArguments("BrotliEncoder.decode", 2, arguments.len); + return .zero; + } + + if (this.has_called_end) { + globalThis.throw("BrotliEncoder.decode called after BrotliEncoder.end", .{}); + return .zero; + } + + const input = callframe.argument(0); + const optional_encoding = callframe.argument(1); + const is_last = callframe.argument(2).toBoolean(); + + const input_to_queue = JSC.Node.BlobOrStringOrBuffer.fromJSWithEncodingValueMaybeAsync(globalThis, bun.default_allocator, input, optional_encoding, true) orelse { + globalThis.throwInvalidArgumentType("BrotliEncoder.decode", "input", "Blob, String, or Buffer"); + return .zero; + }; + + _ = this.has_pending_activity.fetchAdd(1, .Monotonic); + if (is_last) + this.has_called_end = true; + + var task: DecodeJob = .{ + .decoder = this, + .is_async = false, + }; + + { + this.input_lock.lock(); + defer this.input_lock.unlock(); + + this.input.writeItem(input_to_queue) catch unreachable; + } + task.run(); + return if (!is_last) .undefined else this.collectOutputValue(); + } + + // We can only run one decode job at a time + // But we don't have an idea of a serial dispatch queue + // So instead, we let you enqueue as many times as you want + // and if one is already running, we just don't do anything + const DecodeJob = struct { + task: JSC.WorkPoolTask = .{ .callback = &runTask }, + decoder: *BrotliDecoder, + is_async: bool, + + pub usingnamespace bun.New(@This()); + + pub fn runTask(this: *JSC.WorkPoolTask) void { + var job: *DecodeJob = @fieldParentPtr(DecodeJob, "task", this); + job.run(); + job.destroy(); + } + + pub fn run(this: *DecodeJob) void { + defer { + _ = this.decoder.has_pending_activity.fetchSub(1, .Monotonic); + } + + var any = false; + + if (this.decoder.pending_decode_job_count.fetchAdd(1, .Monotonic) >= 0) { + const is_last = this.decoder.has_called_end; + while (true) { + this.decoder.input_lock.lock(); + defer this.decoder.input_lock.unlock(); + if (!is_last) break; + const readable = this.decoder.input.readableSlice(0); + const pending = readable; + + defer { + this.decoder.freelist_write_lock.lock(); + this.decoder.freelist.write(pending) catch unreachable; + this.decoder.freelist_write_lock.unlock(); + } + + var input_list = std.ArrayListUnmanaged(u8){}; + defer input_list.deinit(bun.default_allocator); + if (pending.len > 1) { + for (pending) |input| { + input_list.appendSlice(bun.default_allocator, input.slice()) catch bun.outOfMemory(); + } + } + + { + this.decoder.output_lock.lock(); + defer this.decoder.output_lock.unlock(); + + const input = if (pending.len <= 1) pending[0].slice() else input_list.items; + this.decoder.stream.input = input; + this.decoder.stream.readAll(false) catch { + _ = this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic); + this.decoder.write_failed = true; + return; + }; + } + + any = any or pending.len > 0; + + if (this.decoder.pending_decode_job_count.fetchSub(1, .Monotonic) == 0) + break; + } + } + + if (this.is_async and any) { + var vm = this.decoder.globalThis.bunVMConcurrently(); + _ = this.decoder.has_pending_activity.fetchAdd(1, .Monotonic); + this.decoder.poll_ref.refConcurrently(vm); + vm.enqueueTaskConcurrent(JSC.ConcurrentTask.create(JSC.Task.init(this.decoder))); + } + } + }; + + pub fn end(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } + + pub fn endSync(this: *BrotliDecoder, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) callconv(.C) JSC.JSValue { + _ = this; + _ = globalThis; + _ = callframe; + + return .zero; + } +}; diff --git a/src/bun.js/bindings/generated_classes_list.zig b/src/bun.js/bindings/generated_classes_list.zig index f154e6c85cb795..2ff83686802a68 100644 --- a/src/bun.js/bindings/generated_classes_list.zig +++ b/src/bun.js/bindings/generated_classes_list.zig @@ -70,4 +70,6 @@ pub const Classes = struct { pub const FileInternalReadableStreamSource = JSC.WebCore.FileReader.Source; pub const BlobInternalReadableStreamSource = JSC.WebCore.ByteBlobLoader.Source; pub const BytesInternalReadableStreamSource = JSC.WebCore.ByteStream.Source; + pub const BrotliEncoder = JSC.API.BrotliEncoder; + pub const BrotliDecoder = JSC.API.BrotliDecoder; }; diff --git a/src/bun.js/event_loop.zig b/src/bun.js/event_loop.zig index 4196fd356dc53d..dc12ab3266beaa 100644 --- a/src/bun.js/event_loop.zig +++ b/src/bun.js/event_loop.zig @@ -352,6 +352,9 @@ const Futimes = JSC.Node.Async.futimes; const Lchmod = JSC.Node.Async.lchmod; const Lchown = JSC.Node.Async.lchown; const Unlink = JSC.Node.Async.unlink; +const BrotliDecoder = JSC.API.BrotliDecoder; +const BrotliEncoder = JSC.API.BrotliEncoder; + const ShellGlobTask = bun.shell.interpret.Interpreter.Expansion.ShellGlobTask; const ShellRmTask = bun.shell.Interpreter.Builtin.Rm.ShellRmTask; const ShellRmDirTask = bun.shell.Interpreter.Builtin.Rm.ShellRmTask.DirTask; @@ -430,6 +433,8 @@ pub const Task = TaggedPointerUnion(.{ Lchmod, Lchown, Unlink, + BrotliEncoder, + BrotliDecoder, ShellGlobTask, ShellRmTask, ShellRmDirTask, @@ -1182,6 +1187,14 @@ pub const EventLoop = struct { var any: *Unlink = task.get(Unlink).?; any.runFromJSThread(); }, + @field(Task.Tag, typeBaseName(@typeName(BrotliEncoder))) => { + var any: *BrotliEncoder = task.get(BrotliEncoder).?; + any.runFromJSThread(); + }, + @field(Task.Tag, typeBaseName(@typeName(BrotliDecoder))) => { + var any: *BrotliDecoder = task.get(BrotliDecoder).?; + any.runFromJSThread(); + }, @field(Task.Tag, typeBaseName(@typeName(ProcessWaiterThreadTask))) => { bun.markPosixOnly(); var any: *ProcessWaiterThreadTask = task.get(ProcessWaiterThreadTask).?; diff --git a/src/bun.js/node/node_zlib_binding.zig b/src/bun.js/node/node_zlib_binding.zig new file mode 100644 index 00000000000000..ee511d5167bc65 --- /dev/null +++ b/src/bun.js/node/node_zlib_binding.zig @@ -0,0 +1,15 @@ +const std = @import("std"); +const bun = @import("root").bun; +const Environment = bun.Environment; +const JSC = bun.JSC; +const string = bun.string; +const Output = bun.Output; +const ZigString = JSC.ZigString; + +pub fn createBrotliEncoder(global: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + return JSC.JSFunction.create(global, "createBrotliEncoder", bun.JSC.API.BrotliEncoder.create, 3, .{}); +} + +pub fn createBrotliDecoder(global: *JSC.JSGlobalObject) callconv(.C) JSC.JSValue { + return JSC.JSFunction.create(global, "createBrotliDecoder", bun.JSC.API.BrotliDecoder.create, 3, .{}); +} diff --git a/src/bun.js/node/types.zig b/src/bun.js/node/types.zig index 2c388d4b00ee35..ee2cd1cb036edf 100644 --- a/src/bun.js/node/types.zig +++ b/src/bun.js/node/types.zig @@ -333,23 +333,23 @@ pub const BlobOrStringOrBuffer = union(enum) { if (blob.store) |store| { store.ref(); } - return .{ .blob = blob.* }; } - return .{ .string_or_buffer = StringOrBuffer.fromJS(global, allocator, value) orelse return null }; } pub fn fromJSWithEncodingValue(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue) ?BlobOrStringOrBuffer { + return fromJSWithEncodingValueMaybeAsync(global, allocator, value, encoding_value, false); + } + + pub fn fromJSWithEncodingValueMaybeAsync(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue, is_async: bool) ?BlobOrStringOrBuffer { if (value.as(JSC.WebCore.Blob)) |blob| { if (blob.store) |store| { store.ref(); } - return .{ .blob = blob.* }; } - - return .{ .string_or_buffer = StringOrBuffer.fromJSWithEncodingValue(global, allocator, value, encoding_value) orelse return null }; + return .{ .string_or_buffer = StringOrBuffer.fromJSWithEncodingValueMaybeAsync(global, allocator, value, encoding_value, is_async) orelse return null }; } }; @@ -533,6 +533,15 @@ pub const StringOrBuffer = union(enum) { return fromJSWithEncoding(global, allocator, value, encoding); } + + pub fn fromJSWithEncodingValueMaybeAsync(global: *JSC.JSGlobalObject, allocator: std.mem.Allocator, value: JSC.JSValue, encoding_value: JSC.JSValue, maybe_async: bool) ?StringOrBuffer { + const encoding: Encoding = brk: { + if (!encoding_value.isCell()) + break :brk .utf8; + break :brk Encoding.fromJS(encoding_value, global) orelse .utf8; + }; + return fromJSWithEncodingMaybeAsync(global, allocator, value, encoding, maybe_async); + } }; pub const ErrorCode = @import("./nodejs_error_code.zig").Code; diff --git a/src/deps/brotli_decoder.zig b/src/deps/brotli_decoder.zig index 58878dfa82ee3a..a3578cb041b00d 100644 --- a/src/deps/brotli_decoder.zig +++ b/src/deps/brotli_decoder.zig @@ -51,14 +51,7 @@ pub const BrotliDecoder = opaque { } pub fn decompressStream(state: *BrotliDecoder, available_in: *usize, next_in: *?[*]const u8, available_out: *usize, next_out: *?[*]u8, total_out: ?*usize) callconv(.C) BrotliDecoderResult { - return BrotliDecoderDecompressStream( - state, - available_in, - next_in, - available_out, - next_out, - total_out, - ); + return BrotliDecoderDecompressStream(state, available_in, next_in, available_out, next_out, total_out); } pub fn hasMoreOutput(state: *const BrotliDecoder) callconv(.C) bool { diff --git a/src/deps/brotli_encoder.zig b/src/deps/brotli_encoder.zig new file mode 100644 index 00000000000000..9bd8614205739e --- /dev/null +++ b/src/deps/brotli_encoder.zig @@ -0,0 +1,141 @@ +const bun = @import("root").bun; +const std = @import("std"); + +pub const brotli_alloc_func = ?*const fn (?*anyopaque, usize) callconv(.C) ?*anyopaque; +pub const brotli_free_func = ?*const fn (?*anyopaque, ?*anyopaque) callconv(.C) void; +pub const struct_BrotliSharedDictionaryStruct = opaque {}; +pub const BrotliSharedDictionary = struct_BrotliSharedDictionaryStruct; +pub const BROTLI_SHARED_DICTIONARY_RAW: c_int = 0; +pub const BROTLI_SHARED_DICTIONARY_SERIALIZED: c_int = 1; +pub const enum_BrotliSharedDictionaryType = c_uint; +pub const BrotliSharedDictionaryType = enum_BrotliSharedDictionaryType; +extern fn BrotliSharedDictionaryCreateInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) ?*BrotliSharedDictionary; +extern fn BrotliSharedDictionaryDestroyInstance(dict: ?*BrotliSharedDictionary) void; +extern fn BrotliSharedDictionaryAttach(dict: ?*BrotliSharedDictionary, @"type": BrotliSharedDictionaryType, data_size: usize, data: [*c]const u8) c_int; +pub const BROTLI_MODE_GENERIC: c_int = 0; +pub const BROTLI_MODE_TEXT: c_int = 1; +pub const BROTLI_MODE_FONT: c_int = 2; +pub const BrotliEncoderMode = enum(c_uint) { + generic = 0, + text = 1, + font = 2, +}; +pub const BROTLI_OPERATION_PROCESS: c_int = 0; +pub const BROTLI_OPERATION_FLUSH: c_int = 1; +pub const BROTLI_OPERATION_FINISH: c_int = 2; +pub const BROTLI_OPERATION_EMIT_METADATA: c_int = 3; + +pub const BROTLI_PARAM_MODE: c_int = 0; +pub const BROTLI_PARAM_QUALITY: c_int = 1; +pub const BROTLI_PARAM_LGWIN: c_int = 2; +pub const BROTLI_PARAM_LGBLOCK: c_int = 3; +pub const BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING: c_int = 4; +pub const BROTLI_PARAM_SIZE_HINT: c_int = 5; +pub const BROTLI_PARAM_LARGE_WINDOW: c_int = 6; +pub const BROTLI_PARAM_NPOSTFIX: c_int = 7; +pub const BROTLI_PARAM_NDIRECT: c_int = 8; +pub const BROTLI_PARAM_STREAM_OFFSET: c_int = 9; +pub const BrotliEncoderParameter = enum(c_uint) { + mode = 0, + quality = 1, + lgwin = 2, + lgblock = 3, + disable_literal_context_modeling = 4, + size_hint = 5, + large_window = 6, + npostfix = 7, + ndirect = 8, + stream_offset = 9, +}; +pub const BrotliEncoder = opaque { + pub const Operation = enum(c_uint) { + process = 0, + flush = 1, + finish = 2, + emit_metadata = 3, + }; + + extern fn BrotliEncoderSetParameter(state: *BrotliEncoder, param: BrotliEncoderParameter, value: u32) c_int; + extern fn BrotliEncoderCreateInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) *BrotliEncoder; + extern fn BrotliEncoderDestroyInstance(state: *BrotliEncoder) void; + pub const struct_BrotliEncoderPreparedDictionaryStruct = opaque {}; + pub const BrotliEncoderPreparedDictionary = struct_BrotliEncoderPreparedDictionaryStruct; + extern fn BrotliEncoderPrepareDictionary(@"type": BrotliSharedDictionaryType, data_size: usize, data: [*c]const u8, quality: c_int, alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) *BrotliEncoderPreparedDictionary; + extern fn BrotliEncoderDestroyPreparedDictionary(dictionary: *BrotliEncoderPreparedDictionary) void; + extern fn BrotliEncoderAttachPreparedDictionary(state: *BrotliEncoder, dictionary: ?*const BrotliEncoderPreparedDictionary) c_int; + extern fn BrotliEncoderMaxCompressedSize(input_size: usize) usize; + extern fn BrotliEncoderCompress(quality: c_int, lgwin: c_int, mode: BrotliEncoderMode, input_size: usize, input_buffer: [*]const u8, encoded_size: *usize, encoded_buffer: [*]u8) c_int; + extern fn BrotliEncoderCompressStream(state: *BrotliEncoder, op: Operation, available_in: *usize, next_in: *?[*]const u8, available_out: *usize, next_out: ?[*]u8, total_out: ?*usize) c_int; + extern fn BrotliEncoderIsFinished(state: *BrotliEncoder) c_int; + extern fn BrotliEncoderHasMoreOutput(state: *BrotliEncoder) c_int; + extern fn BrotliEncoderTakeOutput(state: *BrotliEncoder, size: *usize) ?[*]const u8; + extern fn BrotliEncoderEstimatePeakMemoryUsage(quality: c_int, lgwin: c_int, input_size: usize) usize; + extern fn BrotliEncoderGetPreparedDictionarySize(dictionary: ?*const BrotliEncoderPreparedDictionary) usize; + extern fn BrotliEncoderVersion() u32; + + pub fn createInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) callconv(.C) ?*BrotliEncoder { + return BrotliEncoderCreateInstance(alloc_func, free_func, @"opaque"); + } + + pub fn destroyInstance(state: *BrotliEncoder) callconv(.C) void { + return BrotliEncoderDestroyInstance(state); + } + + pub fn hasMoreOutput(state: *BrotliEncoder) callconv(.C) bool { + return BrotliEncoderHasMoreOutput(state) > 0; + } + + pub fn takeOutput(state: *BrotliEncoder) []const u8 { + var size: usize = 0; + if (BrotliEncoderTakeOutput(state, &size)) |ptr| { + return ptr[0..size]; + } + + return ""; + } + + pub const CompressionResult = struct { + success: bool = false, + has_more: bool = false, + output: []const u8 = "", + }; + + // https://github.com/google/brotli/blob/2ad58d8603294f5ee33d23bb725e0e6a17c1de50/go/cbrotli/writer.go#L23-L40 + pub fn compressStream(state: *BrotliEncoder, op: Operation, data: []const u8) CompressionResult { + var available_in = data.len; + var next_in: ?[*]const u8 = data.ptr; + + var available_out: usize = 0; + + var result = CompressionResult{}; + + result.success = BrotliEncoderCompressStream(state, op, &available_in, &next_in, &available_out, null, null) > 0; + + if (result.success) { + result.output = takeOutput(state); + } + + result.has_more = BrotliEncoderHasMoreOutput(state) > 0; + + return result; + } + + pub fn setParameter(state: *BrotliEncoder, param: BrotliEncoderParameter, value: u32) bool { + return BrotliEncoderSetParameter(state, param, value) > 0; + } +}; + +pub const SHARED_BROTLI_MIN_DICTIONARY_WORD_LENGTH = 4; +pub const SHARED_BROTLI_MAX_DICTIONARY_WORD_LENGTH = 31; +pub const SHARED_BROTLI_NUM_DICTIONARY_CONTEXTS = 64; +pub const SHARED_BROTLI_MAX_COMPOUND_DICTS = 15; +pub const BROTLI_MIN_WINDOW_BITS = 10; +pub const BROTLI_MAX_WINDOW_BITS = 24; +pub const BROTLI_LARGE_MAX_WINDOW_BITS = 30; +pub const BROTLI_MIN_INPUT_BLOCK_BITS = 16; +pub const BROTLI_MAX_INPUT_BLOCK_BITS = 24; +pub const BROTLI_MIN_QUALITY = 0; +pub const BROTLI_MAX_QUALITY = 11; +pub const BROTLI_DEFAULT_QUALITY = 11; +pub const BROTLI_DEFAULT_WINDOW = 22; +pub const BROTLI_DEFAULT_MODE = BROTLI_MODE_GENERIC; diff --git a/src/http.zig b/src/http.zig index bc490d03c535de..9b988e66068dd9 100644 --- a/src/http.zig +++ b/src/http.zig @@ -1280,14 +1280,13 @@ const Decompressor = union(enum) { }, .brotli => { this.* = .{ - .brotli = try Brotli.BrotliReaderArrayList.initWithOptions( + .brotli = try Brotli.BrotliReaderArrayList.newWithOptions( buffer, &body_out_str.list, body_out_str.allocator, .{}, ), }; - return; }, else => @panic("Invalid encoding. This code should not be reachable"), @@ -1628,9 +1627,7 @@ const accept_header = picohttp.Header{ .name = "Accept", .value = "*/*" }; const accept_encoding_no_compression = "identity"; const accept_encoding_compression = "gzip, deflate, br"; -const accept_encoding_compression_no_brotli = "gzip, deflate"; const accept_encoding_header_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression }; -const accept_encoding_header_compression_no_brotli = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_compression_no_brotli }; const accept_encoding_header_no_compression = picohttp.Header{ .name = "Accept-Encoding", .value = accept_encoding_no_compression }; const accept_encoding_header = if (FeatureFlags.disable_compression_in_http_client) diff --git a/src/js/node/stream.ts b/src/js/node/stream.ts index 1337871d21435e..0a423aaad19425 100644 --- a/src/js/node/stream.ts +++ b/src/js/node/stream.ts @@ -2270,6 +2270,15 @@ var require_readable = __commonJS({ function Readable(options) { if (!(this instanceof Readable)) return new Readable(options); const isDuplex = this instanceof require_duplex(); + + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // }; + this._readableState = new ReadableState(options, this, isDuplex); if (options) { const { read, destroy, construct, signal } = options; @@ -3430,6 +3439,7 @@ var require_readable = __commonJS({ }; }, }); +const Readable = require_readable(); // node_modules/readable-stream/lib/internal/streams/writable.js var errorOrDestroy; @@ -3467,6 +3477,15 @@ var require_writable = __commonJS({ function Writable(options = {}) { const isDuplex = this instanceof require_duplex(); if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) return new Writable(options); + + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // }; + this._writableState = new WritableState(options, this, isDuplex); if (options) { if (typeof options.write === "function") this._write = options.write; @@ -4071,6 +4090,7 @@ var require_writable = __commonJS({ }; }, }); +const Writable = require_writable(); // node_modules/readable-stream/lib/internal/streams/duplexify.js var require_duplexify = __commonJS({ @@ -4429,6 +4449,18 @@ var require_duplex = __commonJS({ function Duplex(options) { if (!(this instanceof Duplex)) return new Duplex(options); + + // this._events ??= { + // close: undefined, + // error: undefined, + // prefinish: undefined, + // finish: undefined, + // drain: undefined, + // data: undefined, + // end: undefined, + // readable: undefined, + // }; + Readable.$call(this, options); Writable.$call(this, options); @@ -4506,6 +4538,7 @@ var require_duplex = __commonJS({ }; }, }); +const Duplex = require_duplex(); // node_modules/readable-stream/lib/internal/streams/transform.js var require_transform = __commonJS({ @@ -4515,6 +4548,7 @@ var require_transform = __commonJS({ var { ERR_METHOD_NOT_IMPLEMENTED } = require_errors().codes; function Transform(options) { if (!(this instanceof Transform)) return new Transform(options); + Duplex.$call(this, options); this._readableState.sync = false; @@ -4523,6 +4557,8 @@ var require_transform = __commonJS({ if (options) { if (typeof options.transform === "function") this._transform = options.transform; if (typeof options.flush === "function") this._flush = options.flush; + } else { + this.allowHalfOpen = true; } this.on("prefinish", prefinish.bind(this)); @@ -5505,9 +5541,6 @@ function getNativeReadableStream(Readable, stream, options) { } /** --- Bun native stream wrapper --- */ -var Readable = require_readable(); -var Writable = require_writable(); -var Duplex = require_duplex(); const _pathOrFdOrSink = Symbol("pathOrFdOrSink"); const { fileSinkSymbol: _fileSink } = require("internal/shared"); diff --git a/src/js/node/zlib.ts b/src/js/node/zlib.ts index 84a938709f5d04..65a57448b967a3 100644 --- a/src/js/node/zlib.ts +++ b/src/js/node/zlib.ts @@ -1,4 +1,82 @@ // Hardcoded module "node:zlib" + +const stream = require("node:stream"); + +const ObjectSetPrototypeOf = Object.setPrototypeOf; + +const createBrotliEncoder = $zig("node_zlib_binding.zig", "createBrotliEncoder"); +const createBrotliDecoder = $zig("node_zlib_binding.zig", "createBrotliDecoder"); + +function brotliCompress(buffer, opts, callback) { + if (typeof opts === "function") { + callback = opts; + opts = {}; + } + if (typeof callback !== "function") throw new TypeError("BrotliEncoder callback is not callable"); + const encoder = createBrotliEncoder(opts, {}, callback); + encoder.encode(buffer, undefined, true); +} + +function brotliDecompress(buffer, opts, callback) { + if (typeof opts === "function") { + callback = opts; + opts = {}; + } + if (typeof callback !== "function") throw new TypeError("BrotliDecoder callback is not callable"); + const decoder = createBrotliDecoder(opts, {}, callback); + decoder.decode(buffer, undefined, true); +} + +function brotliCompressSync(buffer, opts) { + const encoder = createBrotliEncoder(opts, {}, null); + return encoder.encodeSync(buffer, undefined, true); +} + +function brotliDecompressSync(buffer, opts) { + const decoder = createBrotliDecoder(opts, {}, null); + return decoder.decodeSync(buffer, undefined, true); +} + +function createBrotliCompress(opts) { + return new BrotliCompress(opts); +} + +const kHandle = Symbol("kHandle"); + +function BrotliCompress(opts) { + if (!(this instanceof BrotliCompress)) return new BrotliCompress(opts); + this[kHandle] = createBrotliEncoder(opts, {}, null); + stream.Transform.$apply(this, arguments); +} +BrotliCompress.prototype = {}; +ObjectSetPrototypeOf(BrotliCompress.prototype, stream.Transform.prototype); + +BrotliCompress.prototype._transform = function _transform(chunk, encoding, callback) { + callback(undefined, this[kHandle].encodeSync(chunk, encoding, false)); +}; +BrotliCompress.prototype._flush = function _flush(callback) { + callback(undefined, this[kHandle].encodeSync("", undefined, true)); +}; + +function createBrotliDecompress(opts) { + return new BrotliDecompress(opts); +} + +function BrotliDecompress(opts) { + if (!(this instanceof BrotliDecompress)) return new BrotliDecompress(opts); + this[kHandle] = createBrotliDecoder(opts, {}, null); + stream.Transform.$apply(this, arguments); +} +BrotliDecompress.prototype = {}; +ObjectSetPrototypeOf(BrotliDecompress.prototype, stream.Transform.prototype); + +BrotliDecompress.prototype._transform = function (chunk, encoding, callback) { + callback(undefined, this[kHandle].decodeSync(chunk, encoding, false)); +}; +BrotliDecompress.prototype._flush = function (callback) { + callback(undefined, this[kHandle].decodeSync("", undefined, true)); +}; + // TODO: **use a native binding from Bun for this!!** // This is a very slow module! // It should really be fixed. It will show up in benchmarking. It also loads @@ -3546,7 +3624,9 @@ var require_constants = __commonJS({ Z_ERRNO: -1, Z_STREAM_ERROR: -2, Z_DATA_ERROR: -3, + Z_MEM_ERROR: -4, Z_BUF_ERROR: -5, + Z_VERSION_ERROR: -6, Z_NO_COMPRESSION: 0, Z_BEST_SPEED: 1, Z_BEST_COMPRESSION: 9, @@ -3558,8 +3638,91 @@ var require_constants = __commonJS({ Z_DEFAULT_STRATEGY: 0, Z_BINARY: 0, Z_TEXT: 1, + Z_ASCII: 1, Z_UNKNOWN: 2, Z_DEFLATED: 8, + DEFLATE: 1, + INFLATE: 2, + GZIP: 3, + GUNZIP: 4, + DEFLATERAW: 5, + INFLATERAW: 6, + UNZIP: 7, + BROTLI_DECODE: 8, + BROTLI_ENCODE: 9, + Z_MIN_WINDOWBITS: 8, + Z_MAX_WINDOWBITS: 15, + Z_DEFAULT_WINDOWBITS: 15, + Z_MIN_CHUNK: 64, + Z_MAX_CHUNK: Infinity, + Z_DEFAULT_CHUNK: 16384, + Z_MIN_MEMLEVEL: 1, + Z_MAX_MEMLEVEL: 9, + Z_DEFAULT_MEMLEVEL: 8, + Z_MIN_LEVEL: -1, + Z_MAX_LEVEL: 9, + Z_DEFAULT_LEVEL: -1, + BROTLI_OPERATION_PROCESS: 0, + BROTLI_OPERATION_FLUSH: 1, + BROTLI_OPERATION_FINISH: 2, + BROTLI_OPERATION_EMIT_METADATA: 3, + BROTLI_PARAM_MODE: 0, + BROTLI_MODE_GENERIC: 0, + BROTLI_MODE_TEXT: 1, + BROTLI_MODE_FONT: 2, + BROTLI_DEFAULT_MODE: 0, + BROTLI_PARAM_QUALITY: 1, + BROTLI_MIN_QUALITY: 0, + BROTLI_MAX_QUALITY: 11, + BROTLI_DEFAULT_QUALITY: 11, + BROTLI_PARAM_LGWIN: 2, + BROTLI_MIN_WINDOW_BITS: 10, + BROTLI_MAX_WINDOW_BITS: 24, + BROTLI_LARGE_MAX_WINDOW_BITS: 30, + BROTLI_DEFAULT_WINDOW: 22, + BROTLI_PARAM_LGBLOCK: 3, + BROTLI_MIN_INPUT_BLOCK_BITS: 16, + BROTLI_MAX_INPUT_BLOCK_BITS: 24, + BROTLI_PARAM_DISABLE_LITERAL_CONTEXT_MODELING: 4, + BROTLI_PARAM_SIZE_HINT: 5, + BROTLI_PARAM_LARGE_WINDOW: 6, + BROTLI_PARAM_NPOSTFIX: 7, + BROTLI_PARAM_NDIRECT: 8, + BROTLI_DECODER_RESULT_ERROR: 0, + BROTLI_DECODER_RESULT_SUCCESS: 1, + BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT: 2, + BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT: 3, + BROTLI_DECODER_PARAM_DISABLE_RING_BUFFER_REALLOCATION: 0, + BROTLI_DECODER_PARAM_LARGE_WINDOW: 1, + BROTLI_DECODER_NO_ERROR: 0, + BROTLI_DECODER_SUCCESS: 1, + BROTLI_DECODER_NEEDS_MORE_INPUT: 2, + BROTLI_DECODER_NEEDS_MORE_OUTPUT: 3, + BROTLI_DECODER_ERROR_FORMAT_EXUBERANT_NIBBLE: -1, + BROTLI_DECODER_ERROR_FORMAT_RESERVED: -2, + BROTLI_DECODER_ERROR_FORMAT_EXUBERANT_META_NIBBLE: -3, + BROTLI_DECODER_ERROR_FORMAT_SIMPLE_HUFFMAN_ALPHABET: -4, + BROTLI_DECODER_ERROR_FORMAT_SIMPLE_HUFFMAN_SAME: -5, + BROTLI_DECODER_ERROR_FORMAT_CL_SPACE: -6, + BROTLI_DECODER_ERROR_FORMAT_HUFFMAN_SPACE: -7, + BROTLI_DECODER_ERROR_FORMAT_CONTEXT_MAP_REPEAT: -8, + BROTLI_DECODER_ERROR_FORMAT_BLOCK_LENGTH_1: -9, + BROTLI_DECODER_ERROR_FORMAT_BLOCK_LENGTH_2: -10, + BROTLI_DECODER_ERROR_FORMAT_TRANSFORM: -11, + BROTLI_DECODER_ERROR_FORMAT_DICTIONARY: -12, + BROTLI_DECODER_ERROR_FORMAT_WINDOW_BITS: -13, + BROTLI_DECODER_ERROR_FORMAT_PADDING_1: -14, + BROTLI_DECODER_ERROR_FORMAT_PADDING_2: -15, + BROTLI_DECODER_ERROR_FORMAT_DISTANCE: -16, + BROTLI_DECODER_ERROR_DICTIONARY_NOT_SET: -19, + BROTLI_DECODER_ERROR_INVALID_ARGUMENTS: -20, + BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MODES: -21, + BROTLI_DECODER_ERROR_ALLOC_TREE_GROUPS: -22, + BROTLI_DECODER_ERROR_ALLOC_CONTEXT_MAP: -25, + BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_1: -26, + BROTLI_DECODER_ERROR_ALLOC_RING_BUFFER_2: -27, + BROTLI_DECODER_ERROR_ALLOC_BLOCK_TYPE_TREES: -30, + BROTLI_DECODER_ERROR_UNREACHABLE: -31, }; }, }); @@ -4068,21 +4231,14 @@ var require_lib = __commonJS({ return zlibBufferSync(new InflateRaw(opts), buffer); }; - // not implemented, stubs - for (const method of [ - "BrotliCompress", - "BrotliDecompress", - "brotliCompress", - "brotliCompressSync", - "brotliDecompress", - "brotliDecompressSync", - "createBrotliCompress", - "createBrotliDecompress", - ]) { - exports[method] = function (buffer, opts, callback) { - throw new Error(`zlib.${method} is not implemented`); - }; - } + exports.brotliCompress = brotliCompress; + exports.brotliDecompress = brotliDecompress; + exports.brotliCompressSync = brotliCompressSync; + exports.brotliDecompressSync = brotliDecompressSync; + exports.createBrotliCompress = createBrotliCompress; + exports.BrotliCompress = BrotliCompress; + exports.createBrotliDecompress = createBrotliDecompress; + exports.BrotliDecompress = BrotliDecompress; function zlibBuffer(engine, buffer, callback) { var buffers = []; diff --git a/src/jsc.zig b/src/jsc.zig index da30bd4960e8e3..57b7fe5db2c3b2 100644 --- a/src/jsc.zig +++ b/src/jsc.zig @@ -49,6 +49,8 @@ pub const API = struct { pub const UDPSocket = @import("./bun.js/api/bun/udp_socket.zig").UDPSocket; pub const Listener = @import("./bun.js/api/bun/socket.zig").Listener; pub const H2FrameParser = @import("./bun.js/api/bun/h2_frame_parser.zig").H2FrameParser; + pub const BrotliEncoder = @import("./bun.js/api/brotli.zig").BrotliEncoder; + pub const BrotliDecoder = @import("./bun.js/api/brotli.zig").BrotliDecoder; }; pub const DNS = @import("./bun.js/api/bun/dns_resolver.zig"); pub const FFI = @import("./bun.js/api/ffi.zig").FFI; diff --git a/test/cli/install/registry/bun-install-registry.test.ts b/test/cli/install/registry/bun-install-registry.test.ts index 356ed18e9a1440..d89b81bc9a5e6c 100644 --- a/test/cli/install/registry/bun-install-registry.test.ts +++ b/test/cli/install/registry/bun-install-registry.test.ts @@ -1,5 +1,14 @@ import { file, spawn } from "bun"; -import { bunExe, bunEnv as env, isLinux, isWindows, toBeValidBin, toHaveBins, writeShebangScript } from "harness"; +import { + bunExe, + bunEnv as env, + isLinux, + isWindows, + toBeValidBin, + toHaveBins, + writeShebangScript, + tmpdirSync, +} from "harness"; import { join, sep } from "path"; import { mkdtempSync, realpathSync } from "fs"; import { rm, writeFile, mkdir, exists, cp } from "fs/promises"; @@ -40,7 +49,7 @@ afterAll(() => { }); beforeEach(async () => { - packageDir = mkdtempSync(join(realpathSync(tmpdir()), "bun-install-registry-" + testCounter++ + "-")); + packageDir = tmpdirSync("bun-install-registry-" + testCounter++ + "-"); env.BUN_INSTALL_CACHE_DIR = join(packageDir, ".bun-cache"); env.BUN_TMPDIR = env.TMPDIR = env.TEMP = join(packageDir, ".bun-tmp"); await writeFile( diff --git a/test/js/node/http/node-http.test.ts b/test/js/node/http/node-http.test.ts index 03cf85dc09bcbd..0ea5cd557740a9 100644 --- a/test/js/node/http/node-http.test.ts +++ b/test/js/node/http/node-http.test.ts @@ -1,5 +1,5 @@ // @ts-nocheck -import { +import http, { createServer, request, get, @@ -27,6 +27,8 @@ import { PassThrough } from "node:stream"; const { describe, expect, it, beforeAll, afterAll, createDoneDotAll } = createTest(import.meta.path); import { bunExe } from "bun:harness"; import { bunEnv, tmpdirSync } from "harness"; +import * as stream from "node:stream"; +import * as zlib from "node:zlib"; function listen(server: Server, protocol: string = "http"): Promise { return new Promise((resolve, reject) => { @@ -1944,3 +1946,70 @@ it("destroy should end download", async () => { server.stop(true); } }); + +it("can send brotli from Server and receive with fetch", async () => { + try { + var server = createServer((req, res) => { + expect(req.url).toBe("/hello"); + res.writeHead(200); + res.setHeader("content-encoding", "br"); + + const inputStream = new stream.Readable(); + inputStream.push("Hello World"); + inputStream.push(null); + + inputStream.pipe(zlib.createBrotliCompress()).pipe(res); + }); + const url = await listen(server); + const res = await fetch(new URL("/hello", url)); + expect(await res.text()).toBe("Hello World"); + } catch (e) { + throw e; + } finally { + server.close(); + } +}); + +it("can send brotli from Server and receive with Client", async () => { + try { + var server = createServer((req, res) => { + expect(req.url).toBe("/hello"); + res.writeHead(200); + res.setHeader("content-encoding", "br"); + + const inputStream = new stream.Readable(); + inputStream.push("Hello World"); + inputStream.push(null); + + const passthrough = new stream.PassThrough(); + passthrough.on("data", data => res.write(data)); + passthrough.on("end", () => res.end()); + + inputStream.pipe(zlib.createBrotliCompress()).pipe(passthrough); + }); + + const url = await listen(server); + const { resolve, reject, promise } = Promise.withResolvers(); + http.get(new URL("/hello", url), res => { + let rawData = ""; + const passthrough = stream.PassThrough(); + passthrough.on("data", chunk => { + rawData += chunk; + }); + passthrough.on("end", () => { + try { + expect(Buffer.from(rawData)).toEqual(Buffer.from("Hello World")); + resolve(); + } catch (e) { + reject(e); + } + }); + res.pipe(zlib.createBrotliDecompress()).pipe(passthrough); + }); + await promise; + } catch (e) { + throw e; + } finally { + server.close(); + } +}); diff --git a/test/js/node/stream/node-stream.test.js b/test/js/node/stream/node-stream.test.js index 408e2285150bc0..bee68e3b81deb6 100644 --- a/test/js/node/stream/node-stream.test.js +++ b/test/js/node/stream/node-stream.test.js @@ -5,7 +5,6 @@ import { join } from "path"; import { bunExe, bunEnv, tmpdirSync, isWindows } from "harness"; import { tmpdir } from "node:os"; import { writeFileSync, mkdirSync } from "node:fs"; -import { spawn } from "node:child_process"; describe("Readable", () => { it("should be able to be created without _construct method defined", done => { @@ -44,6 +43,7 @@ describe("Readable", () => { readable.pipe(writable); }); + it("should be able to be piped via .pipe, issue #3607", done => { const path = `${tmpdir()}/${Date.now()}.testReadStreamEmptyFile.txt`; writeFileSync(path, ""); @@ -70,6 +70,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe, issue #3668", done => { const path = `${tmpdir()}/${Date.now()}.testReadStream.txt`; writeFileSync(path, "12345"); @@ -94,6 +95,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe, both start and end are 0", done => { const path = `${tmpdir()}/${Date.now()}.testReadStream2.txt`; writeFileSync(path, "12345"); @@ -119,6 +121,7 @@ describe("Readable", () => { stream.pipe(writable); }); + it("should be able to be piped via .pipe with a large file", done => { const data = Buffer.allocUnsafe(768 * 1024) .fill("B") @@ -151,6 +154,15 @@ describe("Readable", () => { }); stream.pipe(writable); }); + + it.todo("should have the correct fields in _events", () => { + const s = Readable({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + }); }); describe("createReadStream", () => { @@ -191,6 +203,17 @@ describe("createReadStream", () => { }); }); +describe("Writable", () => { + it.todo("should have the correct fields in _events", () => { + const s = Writable({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + }); +}); + describe("Duplex", () => { it("should allow subclasses to be derived via .call() on class", () => { function Subclass(opts) { @@ -204,6 +227,18 @@ describe("Duplex", () => { const subclass = new Subclass(); expect(subclass instanceof Duplex).toBe(true); }); + + it.todo("should have the correct fields in _events", () => { + const s = Duplex({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); describe("Transform", () => { @@ -219,6 +254,18 @@ describe("Transform", () => { const subclass = new Subclass(); expect(subclass instanceof Transform).toBe(true); }); + + it.todo("should have the correct fields in _events", () => { + const s = Transform({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); describe("PassThrough", () => { @@ -234,6 +281,18 @@ describe("PassThrough", () => { const subclass = new Subclass(); expect(subclass instanceof PassThrough).toBe(true); }); + + it.todo("should have the correct fields in _events", () => { + const s = PassThrough({}); + expect(s._events).toHaveProperty("close"); + expect(s._events).toHaveProperty("error"); + expect(s._events).toHaveProperty("prefinish"); + expect(s._events).toHaveProperty("finish"); + expect(s._events).toHaveProperty("drain"); + expect(s._events).toHaveProperty("data"); + expect(s._events).toHaveProperty("end"); + expect(s._events).toHaveProperty("readable"); + }); }); const processStdInTest = ` diff --git a/test/js/node/zlib/zlib.test.js b/test/js/node/zlib/zlib.test.js index 384be020e23b34..8d30d140917d38 100644 --- a/test/js/node/zlib/zlib.test.js +++ b/test/js/node/zlib/zlib.test.js @@ -1,5 +1,11 @@ import { describe, it, expect } from "bun:test"; import { gzipSync, deflateSync, inflateSync, gunzipSync } from "bun"; +import * as zlib from "node:zlib"; +import * as fs from "node:fs"; +import * as buffer from "node:buffer"; +import * as util from "node:util"; +import { resolve } from "node:path"; +import { tmpdirSync } from "harness"; describe("zlib", () => { it("should be able to deflate and inflate", () => { @@ -27,9 +33,13 @@ describe("zlib", () => { }); }); -import * as zlib from "node:zlib"; -import * as fs from "node:fs"; -import * as buffer from "node:buffer"; +function* window(buffer, size, advance = size) { + let i = 0; + while (i <= buffer.length) { + yield buffer.slice(i, i + size); + i += advance; + } +} describe("zlib.gunzip", () => { it("should be able to unzip a Buffer and return an unzipped Buffer", async () => { @@ -48,19 +58,126 @@ describe("zlib.gunzip", () => { }); }); -describe("zlib.brotli*", () => { - it("returns stub", () => { - for (const method of [ - "BrotliCompress", - "BrotliDecompress", - "brotliCompress", - "brotliCompressSync", - "brotliDecompress", - "brotliDecompressSync", - "createBrotliCompress", - "createBrotliDecompress", - ]) { - expect(() => zlib[method]()).toThrow(new Error(`zlib.${method} is not implemented`)); +describe("zlib.brotli", () => { + const inputString = + "ΩΩLorem ipsum dolor sit amet, consectetur adipiscing eli" + + "t. Morbi faucibus, purus at gravida dictum, libero arcu " + + "convallis lacus, in commodo libero metus eu nisi. Nullam" + + " commodo, neque nec porta placerat, nisi est fermentum a" + + "ugue, vitae gravida tellus sapien sit amet tellus. Aenea" + + "n non diam orci. Proin quis elit turpis. Suspendisse non" + + " diam ipsum. Suspendisse nec ullamcorper odio. Vestibulu" + + "m arcu mi, sodales non suscipit id, ultrices ut massa. S" + + "ed ac sem sit amet arcu malesuada fermentum. Nunc sed. "; + const compressedString = + "G/gBQBwHdky2aHV5KK9Snf05//1pPdmNw/7232fnIm1IB" + + "K1AA8RsN8OB8Nb7Lpgk3UWWUlzQXZyHQeBBbXMTQXC1j7" + + "wg3LJs9LqOGHRH2bj/a2iCTLLx8hBOyTqgoVuD1e+Qqdn" + + "f1rkUNyrWq6LtOhWgxP3QUwdhKGdZm3rJWaDDBV7+pDk1" + + "MIkrmjp4ma2xVi5MsgJScA3tP1I7mXeby6MELozrwoBQD" + + "mVTnEAicZNj4lkGqntJe2qSnGyeMmcFgraK94vCg/4iLu" + + "Tw5RhKhnVY++dZ6niUBmRqIutsjf5TzwF5iAg8a9UkjF5" + + "2eZ0tB2vo6v8SqVfNMkBmmhxr0NT9LkYF69aEjlYzj7IE" + + "KmEUQf1HBogRYhFIt4ymRNEgHAIzOyNEsQM="; + const compressedBuffer = Buffer.from(compressedString, "base64"); + + it("brotliCompress", async () => { + const compressed = await util.promisify(zlib.brotliCompress)(inputString); + expect(compressed.toString()).toEqual(compressedBuffer.toString()); + }); + + it("brotliDecompress", async () => { + const roundtrip = await util.promisify(zlib.brotliDecompress)(compressedBuffer); + expect(roundtrip.toString()).toEqual(inputString); + }); + + it("brotliCompressSync", () => { + const compressed = zlib.brotliCompressSync(inputString); + expect(compressed.toString()).toEqual(compressedBuffer.toString()); + }); + + it("brotliDecompressSync", () => { + const roundtrip = zlib.brotliDecompressSync(compressedBuffer); + expect(roundtrip.toString()).toEqual(inputString); + }); + + it("can compress streaming", () => { + const encoder = zlib.createBrotliCompress(); + for (const chunk of window(inputString, 55)) { + encoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + encoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data).toEqual(compressedBuffer); + }); + }); + + it("can decompress streaming", () => { + const decoder = zlib.createBrotliDecompress(); + for (const chunk of window(compressedBuffer, 10)) { + decoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + decoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data).toEqual(Buffer.from(inputString)); + }); + }); + + it("can roundtrip an empty string", async () => { + const input = ""; + const compressed = await util.promisify(zlib.brotliCompress)(input); + const roundtrip = await util.promisify(zlib.brotliDecompress)(compressed); + expect(roundtrip.toString()).toEqual(input); + }); + + it("can compress streaming big", () => { + const encoder = zlib.createBrotliCompress(); + // prettier-ignore + for (const chunk of window(inputString+inputString+inputString+inputString, 65)) { + encoder._transform(chunk, undefined, (err, data) => { + expect(err).toBeUndefined(); + expect(data).toBeUndefined(); + }); + } + encoder._flush((err, data) => { + expect(err).toBeUndefined(); + expect(data.length).toBeGreaterThan(0); + }); + }); + + it("fully works as a stream.Transform", async () => { + const x_dir = tmpdirSync("bun.test."); + const out_path_c = resolve(x_dir, "this.js.br"); + const out_path_d = resolve(x_dir, "this.js"); + + { + const { resolve, promise } = Promise.withResolvers(); + const readStream = fs.createReadStream(import.meta.filename); + const writeStream = fs.createWriteStream(out_path_c); + const brStream = zlib.createBrotliCompress(); + const the_stream = readStream.pipe(brStream).pipe(writeStream); + the_stream.on("finish", resolve); + await promise; + } + { + const { resolve, promise } = Promise.withResolvers(); + const readStream = fs.createReadStream(out_path_c); + const writeStream = fs.createWriteStream(out_path_d); + const brStream = zlib.createBrotliDecompress(); + const the_stream = readStream.pipe(brStream).pipe(writeStream); + the_stream.on("finish", resolve); + await promise; + } + { + const expected = await Bun.file(import.meta.filename).text(); + const actual = await Bun.file(out_path_d).text(); + expect(actual).toEqual(expected); } }); });