Skip to content

Commit

Permalink
implement node:cluster (#11492)
Browse files Browse the repository at this point in the history
Co-authored-by: Jarred Sumner <[email protected]>
Co-authored-by: nektro <[email protected]>
Co-authored-by: cirospaciari <[email protected]>
  • Loading branch information
4 people authored Aug 18, 2024
1 parent a53db00 commit fd75ca7
Show file tree
Hide file tree
Showing 93 changed files with 5,255 additions and 294 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ if(NOT NO_CODEGEN)
"${BUN_SRC}/js/thirdparty/*.ts"
"${BUN_SRC}/js/internal/*.js"
"${BUN_SRC}/js/internal/*.ts"
"${BUN_SRC}/js/internal/cluster/*.ts"
"${BUN_SRC}/js/internal/util/*.js"
"${BUN_SRC}/js/internal/fs/*.ts"
"${BUN_SRC}/js/node/*.js"
Expand Down
4 changes: 2 additions & 2 deletions docs/runtime/nodejs-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This page is updated regularly to reflect compatibility status of the latest ver

### [`node:cluster`](https://nodejs.org/api/cluster.html)

🔴 Not implemented.
🟡 Handles and file descriptors cannot be passed between workers, which means load-balancing HTTP requests across processes is only supported on Linux at this time (via `SO_REUSEPORT`). Otherwise, implemented but not battle-tested.

### [`node:console`](https://nodejs.org/api/console.html)

Expand Down Expand Up @@ -341,7 +341,7 @@ The table below lists all globals implemented by Node.js and Bun's current compa

### [`process`](https://nodejs.org/api/process.html)

🟡 Missing `domain` `initgroups` `setegid` `seteuid` `setgid` `setgroups` `setuid` `allowedNodeEnvironmentFlags` `getActiveResourcesInfo` `setActiveResourcesInfo` `moduleLoadList` `setSourceMapsEnabled` `channel`. `process.binding` is partially implemented.
🟡 Missing `domain` `initgroups` `setegid` `seteuid` `setgid` `setgroups` `setuid` `allowedNodeEnvironmentFlags` `getActiveResourcesInfo` `setActiveResourcesInfo` `moduleLoadList` `setSourceMapsEnabled`. `process.binding` is partially implemented.

### [`queueMicrotask()`](https://developer.mozilla.org/en-US/docs/Web/API/queueMicrotask)

Expand Down
6 changes: 4 additions & 2 deletions docs/runtime/web-apis.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ The following Web APIs are partially or completely supported.
[`self.postMessage`](https://developer.mozilla.org/en-US/docs/Web/API/DedicatedWorkerGlobalScope/postMessage)
[`structuredClone`](https://developer.mozilla.org/en-US/docs/Web/API/structuredClone)
[`MessagePort`](https://developer.mozilla.org/en-US/docs/Web/API/MessagePort)
[`MessageChannel`](https://developer.mozilla.org/en-US/docs/Web/API/MessageChannel), [`BroadcastChannel`](https://developer.mozilla.org/en-US/docs/Web/API/BroadcastChannel).
[`MessageChannel`](https://developer.mozilla.org/en-US/docs/Web/API/MessageChannel)
[`BroadcastChannel`](https://developer.mozilla.org/en-US/docs/Web/API/BroadcastChannel).

---

Expand Down Expand Up @@ -70,7 +71,8 @@ The following Web APIs are partially or completely supported.
---

- Intervals
- [`setInterval`](https://developer.mozilla.org/en-US/docs/Web/API/setInterval)[`clearInterval`](https://developer.mozilla.org/en-US/docs/Web/API/clearInterval)
- [`setInterval`](https://developer.mozilla.org/en-US/docs/Web/API/setInterval)
[`clearInterval`](https://developer.mozilla.org/en-US/docs/Web/API/clearInterval)

---

Expand Down
13 changes: 10 additions & 3 deletions scripts/runner.node.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
} from "node:fs";
import { spawn, spawnSync } from "node:child_process";
import { tmpdir, hostname, userInfo, homedir } from "node:os";
import { join, basename, dirname, relative } from "node:path";
import { join, basename, dirname, relative, sep } from "node:path";
import { normalize as normalizeWindows } from "node:path/win32";
import { isIP } from "node:net";
import { parseArgs } from "node:util";
Expand Down Expand Up @@ -529,10 +529,11 @@ async function spawnBun(execPath, { args, cwd, timeout, env, stdout, stderr }) {
async function spawnBunTest(execPath, testPath) {
const timeout = getTestTimeout(testPath);
const perTestTimeout = Math.ceil(timeout / 2);
const isReallyTest = isTestStrict(testPath);
const { ok, error, stdout } = await spawnBun(execPath, {
args: ["test", `--timeout=${perTestTimeout}`, testPath],
args: isReallyTest ? ["test", `--timeout=${perTestTimeout}`, testPath] : [testPath],
cwd: cwd,
timeout,
timeout: isReallyTest ? timeout : 30_000,
env: {
GITHUB_ACTIONS: "true", // always true so annotations are parsed
},
Expand Down Expand Up @@ -811,6 +812,12 @@ function isJavaScript(path) {
* @returns {boolean}
*/
function isTest(path) {
if (path.replaceAll(sep, "/").includes("/test-cluster-") && path.endsWith(".js")) return true;
if (path.replaceAll(sep, "/").startsWith("js/node/cluster/test-") && path.endsWith(".ts")) return true;
return isTestStrict(path);
}

function isTestStrict(path) {
return isJavaScript(path) && /\.test|spec\./.test(basename(path));
}

Expand Down
19 changes: 11 additions & 8 deletions src/bun.js/api/BunObject.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ export default [
getter: "getStdout",
cache: true,
},
stderr: {
getter: "getStderr",
cache: true,
},
writable: {
getter: "getStdin",
cache: "stdin",
Expand All @@ -70,11 +74,6 @@ export default [
getter: "getStdout",
cache: "stdout",
},
stderr: {
getter: "getStderr",
cache: true,
},

ref: {
fn: "doRef",
length: 0,
Expand All @@ -91,20 +90,24 @@ export default [
fn: "doSend",
length: 1,
},

kill: {
fn: "kill",
length: 1,
},
disconnect: {
fn: "disconnect",
length: 0,
},
connected: {
getter: "getConnected",
},
"@@asyncDispose": {
fn: "asyncDispose",
length: 1,
},

killed: {
getter: "getKilled",
},

exitCode: {
getter: "getExitCode",
},
Expand Down
3 changes: 2 additions & 1 deletion src/bun.js/api/bun/socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ pub const Listener = struct {
}

pub fn finalize(this: *Listener) callconv(.C) void {
log("Finalize", .{});
log("finalize", .{});
if (this.listener) |listener| {
this.listener = null;
listener.close(this.ssl);
Expand All @@ -878,6 +878,7 @@ pub const Listener = struct {
}

pub fn deinit(this: *Listener) void {
log("deinit", .{});
this.strong_self.deinit();
this.strong_data.deinit();
this.poll_ref.unref(this.handlers.vm);
Expand Down
65 changes: 52 additions & 13 deletions src/bun.js/api/bun/subprocess.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const windows = bun.windows;
const uv = windows.libuv;
const LifecycleScriptSubprocess = bun.install.LifecycleScriptSubprocess;
const Body = JSC.WebCore.Body;
const IPClog = Output.scoped(.IPC, false);

const PosixSpawn = bun.posix.spawn;
const Rusage = bun.posix.spawn.Rusage;
Expand Down Expand Up @@ -128,10 +129,7 @@ pub const ResourceUsage = struct {
};

pub fn appendEnvpFromJS(globalThis: *JSC.JSGlobalObject, object: JSC.JSValue, envp: *std.ArrayList(?[*:0]const u8), PATH: *[]const u8) !void {
var object_iter = JSC.JSPropertyIterator(.{
.skip_empty_name = false,
.include_value = true,
}).init(globalThis, object);
var object_iter = JSC.JSPropertyIterator(.{ .skip_empty_name = false, .include_value = true }).init(globalThis, object);
defer object_iter.deinit();
try envp.ensureTotalCapacityPrecise(object_iter.len +
// +1 incase there's IPC
Expand Down Expand Up @@ -185,6 +183,7 @@ pub const Subprocess = struct {

exit_promise: JSC.Strong = .{},
on_exit_callback: JSC.Strong = .{},
on_disconnect_callback: JSC.Strong = .{},

globalThis: *JSC.JSGlobalObject,
observable_getters: std.enums.EnumSet(enum {
Expand Down Expand Up @@ -715,6 +714,7 @@ pub const Subprocess = struct {
}

pub fn doSend(this: *Subprocess, global: *JSC.JSGlobalObject, callFrame: *JSC.CallFrame) JSValue {
IPClog("Subprocess#doSend", .{});
const ipc_data = &(this.ipc_data orelse {
if (this.hasExited()) {
global.throw("Subprocess.send() cannot be used after the process has exited.", .{});
Expand All @@ -737,10 +737,18 @@ pub const Subprocess = struct {
return .undefined;
}

pub fn disconnect(this: *Subprocess) void {
const ipc_data = this.ipc_data orelse return;
ipc_data.socket.close(.normal);
pub fn disconnect(this: *Subprocess, globalThis: *JSGlobalObject, callframe: *JSC.CallFrame) JSValue {
_ = globalThis;
_ = callframe;
const ipc_data = this.ipc_maybe() orelse return .undefined;
ipc_data.close();
this.ipc_data = null;
return .undefined;
}

pub fn getConnected(this: *Subprocess, globalThis: *JSGlobalObject) JSValue {
_ = globalThis;
return JSValue.jsBoolean(this.ipc_maybe() != null);
}

pub fn pid(this: *const Subprocess) i32 {
Expand Down Expand Up @@ -1546,6 +1554,7 @@ pub const Subprocess = struct {

this.exit_promise.deinit();
this.on_exit_callback.deinit();
this.on_disconnect_callback.deinit();
}

pub fn finalize(this: *Subprocess) callconv(.C) void {
Expand Down Expand Up @@ -1635,10 +1644,7 @@ pub const Subprocess = struct {
var allocator = arena.allocator();

var override_env = false;
var env_array = std.ArrayListUnmanaged(?[*:0]const u8){
.items = &.{},
.capacity = 0,
};
var env_array = std.ArrayListUnmanaged(?[*:0]const u8){};
var jsc_vm = globalThis.bunVM();

var cwd = jsc_vm.bundler.fs.top_level_dir;
Expand All @@ -1655,6 +1661,7 @@ pub const Subprocess = struct {
}
var lazy = false;
var on_exit_callback = JSValue.zero;
var on_disconnect_callback = JSValue.zero;
var PATH = jsc_vm.bundler.env.get("PATH") orelse "";
var argv = std.ArrayList(?[*:0]const u8).init(allocator);
var cmd_value = JSValue.zero;
Expand Down Expand Up @@ -1772,7 +1779,6 @@ pub const Subprocess = struct {
}

if (args != .zero and args.isObject()) {

// This must run before the stdio parsing happens
if (!is_sync) {
if (args.getTruthy(globalThis, "ipc")) |val| {
Expand Down Expand Up @@ -1801,6 +1807,18 @@ pub const Subprocess = struct {
}
}

if (args.getTruthy(globalThis, "onDisconnect")) |onDisconnect_| {
if (!onDisconnect_.isCell() or !onDisconnect_.isCallable(globalThis.vm())) {
globalThis.throwInvalidArguments("onDisconnect must be a function or undefined", .{});
return .zero;
}

on_disconnect_callback = if (comptime is_sync)
onDisconnect_
else
onDisconnect_.withAsyncContextIfNeeded(globalThis);
}

if (args.getTruthy(globalThis, "cwd")) |cwd_| {
const cwd_str = cwd_.getZigString(globalThis);
if (cwd_str.len > 0) {
Expand Down Expand Up @@ -2120,6 +2138,7 @@ pub const Subprocess = struct {
),
.stdio_pipes = spawned.extra_pipes.moveToUnmanaged(),
.on_exit_callback = if (on_exit_callback != .zero) JSC.Strong.create(on_exit_callback, globalThis) else .{},
.on_disconnect_callback = if (on_disconnect_callback != .zero) JSC.Strong.create(on_disconnect_callback, globalThis) else .{},
.ipc_data = if (!is_sync)
if (maybe_ipc_mode) |ipc_mode|
if (Environment.isWindows) .{
Expand Down Expand Up @@ -2266,10 +2285,13 @@ pub const Subprocess = struct {
return sync_value;
}

const node_cluster_binding = @import("./../../node/node_cluster_binding.zig");

pub fn handleIPCMessage(
this: *Subprocess,
message: IPC.DecodedIPCMessage,
) void {
IPClog("Subprocess#handleIPCMessage", .{});
switch (message) {
// In future versions we can read this in order to detect version mismatches,
// or disable future optimizations if the subprocess is old.
Expand All @@ -2287,17 +2309,34 @@ pub const Subprocess = struct {
);
}
},
.internal => |data| {
IPC.log("Received IPC internal message from child", .{});
node_cluster_binding.handleInternalMessagePrimary(this.globalThis, this, data);
},
}
}

pub fn handleIPCClose(this: *Subprocess) void {
this.ipc_data = null;
IPClog("Subprocess#handleIPCClose", .{});
this.updateHasPendingActivity();
const ok = this.ipc_data != null;
if (ok) this.ipc().internal_msg_queue.deinit();
this.ipc_data = null;

const this_jsvalue = this.this_jsvalue;
this_jsvalue.ensureStillAlive();
if (this.on_disconnect_callback.trySwap()) |callback| {
this.globalThis.bunVM().eventLoop().runCallback(callback, this.globalThis, this_jsvalue, &.{JSValue.jsBoolean(ok)});
}
}

pub fn ipc(this: *Subprocess) *IPC.IPCData {
return &this.ipc_data.?;
}

pub fn ipc_maybe(this: *Subprocess) ?*IPC.IPCData {
return &(this.ipc_data orelse return null);
}

pub const IPCHandler = IPC.NewIPCHandler(Subprocess);
};
Loading

0 comments on commit fd75ca7

Please sign in to comment.