Skip to content

Commit

Permalink
Use WebWorker (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 authored Aug 13, 2024
1 parent 4c2daa7 commit 9d57a01
Show file tree
Hide file tree
Showing 45 changed files with 1,565 additions and 1,006 deletions.
690 changes: 456 additions & 234 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
},
"dependencies": {
"@jupyterlab/services": "^7.2.4",
"@jupyterlite/contents": "^0.4.0"
"@jupyterlite/contents": "^0.4.0",
"comlink": "^4.4.1"
},
"devDependencies": {
"@types/json-schema": "^7.0.15",
Expand Down
198 changes: 198 additions & 0 deletions src/buffered_stdin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/**
* Classes to deal with buffered stdin. Both main and webworkers have access to the same
* SharedArrayBuffer and use that to pass stdin characters from the UI (main worker) to the shell
* (webworker). This is necessary when the shell is running a WASM command that is synchronous and
* blocking, as the usual async message passing from main to webworker does not work as the received
* messages would only be processed when the command has finished.
*/

// Indexes into SharedArrayBuffer.
const MAIN = 0;
const WORKER = 1;
const LENGTH = 2;
const START_CHAR = 3;

abstract class BufferedStdin {
constructor(sharedArrayBuffer?: SharedArrayBuffer) {
if (sharedArrayBuffer === undefined) {
const length = (this._maxChars + 3) * Int32Array.BYTES_PER_ELEMENT;
this._sharedArrayBuffer = new SharedArrayBuffer(length);
} else {
this._sharedArrayBuffer = sharedArrayBuffer;
}

this._intArray = new Int32Array(this._sharedArrayBuffer);
if (sharedArrayBuffer === undefined) {
this._intArray[MAIN] = 0;
this._intArray[WORKER] = 0;
}
}

async disable(): Promise<void> {
this._enabled = false;
this._clear();
}

async enable(): Promise<void> {
this._enabled = true;
}

get enabled(): boolean {
return this._enabled;
}

protected _clear() {
this._intArray[MAIN] = 0;
this._intArray[WORKER] = 0;
this._readCount = 0;
}

/**
* Load the character from the shared array buffer and return it.
*/
protected _loadFromSharedArrayBuffer(): number[] {
const len = Atomics.load(this._intArray, LENGTH);
const ret: number[] = [];
for (let i = 0; i < len; i++) {
ret.push(Atomics.load(this._intArray, START_CHAR + i));
}
return ret;
}

protected _enabled: boolean = false;
protected _maxChars: number = 8; // Max number of actual characters in a token.
protected _sharedArrayBuffer: SharedArrayBuffer;
protected _intArray: Int32Array;
protected _readCount: number = 0;
}

export namespace MainBufferedStdin {
export interface ISendStdinNow {
(output: string): Promise<void>;
}
}

/**
* Main worker buffers characters locally, and stores just one character at a time in the
* SharedArrayBuffer so that the web worker can read it.
*/
export class MainBufferedStdin extends BufferedStdin {
constructor() {
super();
}

override async disable(): Promise<void> {
// Send all remaining buffered characters as soon as possible via the supplied sendFunction.
this._disabling = true;
if (this._storedCount !== this._readCount) {
const codes = this._loadFromSharedArrayBuffer();
let text = '';
for (const code of codes) {
text += String.fromCharCode(code);
}
await this._sendStdinNow!(text);
}
while (this._buffer.length > 0) {
await this._sendStdinNow!(this._buffer.shift()!);
}
this._disabling = false;

super.disable();
}

get sharedArrayBuffer(): SharedArrayBuffer {
return this._sharedArrayBuffer;
}

/**
* Push a character to the buffer.
* It may or may not be stored in the SharedArrayBuffer immediately.
*/
async push(char: string) {
// May be multiple characters if ANSI control sequence.
this._buffer.push(char);
this._bufferCount++;

if (char.length > this._maxChars) {
// Too big, log this and do not pass it on?
console.log(`String '${char}' is too long to buffer`);
}

if (!this._disabling && this._readCount === this._storedCount) {
this._storeInSharedArrayBuffer();
}
}

registerSendStdinNow(sendStdinNow: MainBufferedStdin.ISendStdinNow) {
this._sendStdinNow = sendStdinNow;
}

/**
* After a successful read by the worker, main checks if another character can be stored in the
* SharedArrayBuffer.
*/
private _afterRead() {
this._readCount = Atomics.load(this._intArray, 1);
if (this._readCount !== this._storedCount) {
throw new Error('Should not happen');
}

if (this._bufferCount > this._storedCount) {
this._storeInSharedArrayBuffer();
}
}

protected override _clear() {
super._clear();
this._buffer = [];
this._bufferCount = 0;
this._storedCount = 0;
}

private _storeInSharedArrayBuffer() {
const char: string = this._buffer.shift()!;
this._storedCount++;

// Store character in SharedArrayBuffer.
const len = char.length;
Atomics.store(this._intArray, LENGTH, len);
for (let i = 0; i < len; i++) {
Atomics.store(this._intArray, START_CHAR + i, char.charCodeAt(i));
}

// Notify web worker that a new character is available.
Atomics.store(this._intArray, MAIN, this._storedCount);
Atomics.notify(this._intArray, MAIN, 1);

// Async wait for web worker to read this character.
const { async, value } = Atomics.waitAsync(this._intArray, WORKER, this._readCount);
if (async) {
value.then(() => this._afterRead());
}
}

private _buffer: string[] = [];
private _bufferCount: number = 0;
private _disabling: boolean = false;
private _storedCount: number = 0;
private _sendStdinNow?: MainBufferedStdin.ISendStdinNow;
}

export class WorkerBufferedStdin extends BufferedStdin {
constructor(sharedArrayBuffer: SharedArrayBuffer) {
super(sharedArrayBuffer);
}

get(): number[] {
// Wait for main worker to store a new character.
Atomics.wait(this._intArray, MAIN, this._readCount);
const ret = this._loadFromSharedArrayBuffer();
this._readCount++;

// Notify main worker that character has been read and a new one can be stored.
Atomics.store(this._intArray, WORKER, this._readCount);
Atomics.notify(this._intArray, WORKER, 1);

return ret;
}
}
4 changes: 2 additions & 2 deletions src/builtin/alias_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export class AliasCommand extends BuiltinCommand {
const index = name.indexOf('=');
if (index === -1) {
// Print alias.
await stdout.write(`${name}='${aliases.get(name)}'\n`);
stdout.write(`${name}='${aliases.get(name)}'\n`);
} else {
// Set alias.
aliases.set(name.slice(0, index), name.slice(index + 1));
Expand All @@ -31,7 +31,7 @@ export class AliasCommand extends BuiltinCommand {
} else {
// Write all aliases.
for (const [key, value] of aliases.entries()) {
await stdout.write(`${key}='${value}'\n`);
stdout.write(`${key}='${value}'\n`);
}
}
return ExitCode.SUCCESS;
Expand Down
2 changes: 1 addition & 1 deletion src/builtin/clear_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class ClearCommand extends BuiltinCommand {
protected async _run(context: Context): Promise<number> {
const { stdout } = context;
if (stdout.supportsAnsiEscapes()) {
await stdout.write(ansi.eraseScreen + ansi.eraseSavedLines + ansi.cursorHome);
stdout.write(ansi.eraseScreen + ansi.eraseSavedLines + ansi.cursorHome);
}
return ExitCode.SUCCESS;
}
Expand Down
4 changes: 2 additions & 2 deletions src/builtin/history_command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ export class HistoryCommand extends BuiltinCommand {
const options = Options.fromArgs(args, HistoryOptions);

if (options.help.isSet) {
await options.writeHelp(stdout);
options.writeHelp(stdout);
} else if (options.clear.isSet) {
history.clear();
} else {
await history.write(stdout);
history.write(stdout);
}
return ExitCode.SUCCESS;
}
Expand Down
4 changes: 2 additions & 2 deletions src/builtin/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ export abstract class Options {
return options;
}

async writeHelp(output: IOutput): Promise<void> {
writeHelp(output: IOutput): void {
for (const line of this._help()) {
await output.write(`${line}\n`);
output.write(`${line}\n`);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface IOutputCallback {
}

/**
* Enable/disable buffered stdin in the terminal.
* Enable/disable buffered stdin.
*/
export interface IEnableBufferedStdinCallback {
(enable: boolean): void;
Expand Down
57 changes: 57 additions & 0 deletions src/defs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { IEnableBufferedStdinCallback, IOutputCallback, IStdinCallback } from './callback';

import { ProxyMarked, Remote } from 'comlink';

interface IOptionsCommon {
color?: boolean;
mountpoint?: string;
driveFsBaseUrl?: string;
// Initial directories and files to create, for testing purposes.
initialDirectories?: string[];
initialFiles?: IShell.IFiles;
}

export interface IShell {
input(char: string): Promise<void>;
setSize(rows: number, columns: number): Promise<void>;
start(): Promise<void>;
}

export namespace IShell {
export interface IOptions extends IOptionsCommon {
outputCallback: IOutputCallback;
}

export type IFiles = { [key: string]: string };
}

export interface IShellWorker extends IShell {
// Handle any lazy initialization activities.
// Callback proxies need to be separate arguments, they cannot be in IOptions.
initialize(
options: IShellWorker.IOptions,
outputCallback: IShellWorker.IProxyOutputCallback,
enableBufferedStdinCallback: IShellWorker.IProxyEnableBufferedStdinCallback
): void;
}

export namespace IShellWorker {
export interface IProxyOutputCallback extends IOutputCallback, ProxyMarked {}
export interface IProxyEnableBufferedStdinCallback
extends IEnableBufferedStdinCallback,
ProxyMarked {}

export interface IOptions extends IOptionsCommon {
sharedArrayBuffer: SharedArrayBuffer;
}
}

export type IRemoteShell = Remote<IShellWorker>;

export namespace IShellImpl {
export interface IOptions extends IOptionsCommon {
outputCallback: IOutputCallback;
enableBufferedStdinCallback: IEnableBufferedStdinCallback;
stdinCallback: IStdinCallback;
}
}
10 changes: 7 additions & 3 deletions src/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import { ansi } from './ansi';
* commands.
*/
export class Environment extends Map<string, string> {
constructor() {
constructor(color: boolean) {
super();
this.set('PS1', ansi.styleBoldGreen + 'js-shell:' + ansi.styleReset + ' ');
this.set('TERM', 'xterm-256color');
if (color) {
this.set('PS1', ansi.styleBoldGreen + 'js-shell:' + ansi.styleReset + ' ');
this.set('TERM', 'xterm-256color');
} else {
this.set('PS1', 'js-shell: ');
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class History {
async write(output: IOutput): Promise<void> {
for (let i = 0; i < this._history.length; i++) {
const index = String(i).padStart(5, ' ');
await output.write(`${index} ${this._history[i]}\n`);
output.write(`${index} ${this._history[i]}\n`);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ export { Aliases } from './aliases';
export { IOutputCallback, IEnableBufferedStdinCallback, IStdinCallback } from './callback';
export { CommandRegistry } from './command_registry';
export { Context } from './context';
export { IShell } from './defs';
export * from './exit_code';
export { IFileSystem } from './file_system';
export * from './io';
export { parse } from './parse';
export * from './shell';
export { Shell } from './shell';
export { tokenize, Token } from './tokenize';
2 changes: 1 addition & 1 deletion src/io/buffered_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export abstract class BufferedOutput implements IOutput {
return false;
}

async write(text: string): Promise<void> {
write(text: string): void {
this.data.push(text);
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/console_output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class ConsoleOutput implements IOutput {
return false;
}

async write(text: string): Promise<void> {
write(text: string): void {
console.log(text);
}
}
2 changes: 1 addition & 1 deletion src/io/output.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface IOutput {
flush(): Promise<void>;
supportsAnsiEscapes(): boolean;
write(text: string): Promise<void>;
write(text: string): void;
}
Loading

0 comments on commit 9d57a01

Please sign in to comment.