Skip to content

Commit

Permalink
fixed single line buffering in lines
Browse files Browse the repository at this point in the history
  • Loading branch information
j50n committed Dec 1, 2023
1 parent a594874 commit 44b54fa
Show file tree
Hide file tree
Showing 31 changed files with 243 additions and 156 deletions.
2 changes: 1 addition & 1 deletion dev/deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * as path from "../tests/deps/path.ts";
export * as colors from "https://deno.land/std@0.207.0/fmt/colors.ts";
export * as colors from "https://deno.land/std@0.208.0/fmt/colors.ts";
8 changes: 8 additions & 0 deletions dev/lines/consume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { blue } from "https://deno.land/[email protected]/fmt/colors.ts";
import { run } from "../../mod.ts";

const produce = import.meta.resolve("./produce.ts");

for await (const line of run("deno", "run", produce).lines) {
console.log(`${blue(new Date().toISOString())} ${line}`);
}
19 changes: 19 additions & 0 deletions dev/lines/produce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { sleep } from "../../mod.ts";

let count = 0;

for (;;) {
console.log(`${new Date().toISOString()} ${count++}\nx\ny\nz`);
await sleep(1000 + Math.random() * 10000);
}

// for (let j = 0; j < 10000000; j++) {
// const acc: string[] = [];
// for (let i = 0; i < 1000; i++) {
// acc.push(
// `${j}:${i} dsjkslfsjldjflskjflsdjfljslkdfjsdlkfjskjklsdjflksjdklsjldjfskdfsfdlsjfksl`,
// );
// }
// console.log(acc.join(""));
// await sleep(10000);
// }
4 changes: 2 additions & 2 deletions legacy/deps-test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from "https://deno.land/std@0.207.0/testing/asserts.ts";
export * from "https://deno.land/std@0.208.0/testing/asserts.ts";
export * from "https://deno.land/x/[email protected]/mod.ts";
export * from "https://deno.land/std@0.207.0/fmt/colors.ts";
export * from "https://deno.land/std@0.208.0/fmt/colors.ts";
4 changes: 2 additions & 2 deletions legacy/deps.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export * from "https://deno.land/std@0.207.0/async/mod.ts";
export * from "https://deno.land/std@0.207.0/io/mod.ts";
export * from "https://deno.land/std@0.208.0/async/mod.ts";
export * from "https://deno.land/std@0.208.0/io/mod.ts";
2 changes: 1 addition & 1 deletion legacy/examples/pushiterable/example-of-pushiterable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Answer, Question } from "./common-json-defs.ts";
import * as proc from "../../mod.ts";
import { WritableIterable } from "../../../mod1.ts";
import { asynciter } from "https://deno.land/x/[email protected]/mod.ts";
import { blue, red } from "https://deno.land/std@0.207.0/fmt/colors.ts";
import { blue, red } from "https://deno.land/std@0.208.0/fmt/colors.ts";

/**
* This demonstrates sending objects to and receiving objects from a child process
Expand Down
2 changes: 1 addition & 1 deletion legacy/runners/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isWindows } from "https://deno.land/std@0.207.0/path/_os.ts";
import { isWindows } from "https://deno.land/std@0.208.0/path/_os.ts";

export const LINESEP: string = (() => {
if (isWindows) {
Expand Down
2 changes: 1 addition & 1 deletion legacy/runners/handlers/bytes.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals } from "https://deno.land/std@0.207.0/testing/asserts.ts";
import { assertEquals } from "https://deno.land/std@0.208.0/testing/asserts.ts";
import * as proc from "../../mod.ts";

Deno.test({
Expand Down
2 changes: 1 addition & 1 deletion legacy/runners/utility.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BufReader, BufWriter } from "../deps.ts";
import * as path from "https://deno.land/std@0.207.0/path/mod.ts";
import * as path from "https://deno.land/std@0.208.0/path/mod.ts";

export const DEFAULT_BUFFER_SIZE = 4096;

Expand Down
2 changes: 1 addition & 1 deletion legacy/tests/line-split.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals } from "https://deno.land/std@0.207.0/testing/asserts.ts";
import { assertEquals } from "https://deno.land/std@0.208.0/testing/asserts.ts";
import * as proc from "../mod.ts";

Deno.test({
Expand Down
2 changes: 1 addition & 1 deletion legacy/tests/piped.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { assertEquals } from "https://deno.land/std@0.207.0/testing/asserts.ts";
import { assertEquals } from "https://deno.land/std@0.208.0/testing/asserts.ts";
import * as proc from "../mod.ts";

Deno.test({
Expand Down
2 changes: 1 addition & 1 deletion site/scripts/deps/path.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/path/posix.ts";
export * from "https://deno.land/std@0.208.0/path/posix.ts";
2 changes: 1 addition & 1 deletion site/scripts/process/output.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { run } from "https://deno.land/x/[email protected].3/mod.ts";
import { run } from "https://deno.land/x/[email protected].5/mod.ts";

await run("echo", "Hello, world.").forEach((it) => console.dir(it));

Expand Down
2 changes: 1 addition & 1 deletion src/deps/colors.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/fmt/colors.ts";
export * from "https://deno.land/std@0.208.0/fmt/colors.ts";
2 changes: 1 addition & 1 deletion src/deps/path.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/path/mod.ts";
export * from "https://deno.land/std@0.208.0/path/mod.ts";
2 changes: 1 addition & 1 deletion src/deps/retry.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/async/retry.ts";
export * from "https://deno.land/std@0.208.0/async/retry.ts";
2 changes: 1 addition & 1 deletion src/deps/streams.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/streams/mod.ts";
export * from "https://deno.land/std@0.208.0/streams/mod.ts";
2 changes: 1 addition & 1 deletion src/deps/tee.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/async/tee.ts";
export * from "https://deno.land/std@0.208.0/async/tee.ts";
2 changes: 1 addition & 1 deletion src/enumerable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ export class Enumerable<T> implements AsyncIterable<T> {
* to improve performance with larger data.
*/
get lines(): Lines<T> {
return enumerate(toLines(this as Enumerable<Uint8Array>)) as Lines<T>;
return enumerate(toLines(this.iter as Enumerable<Uint8Array>)) as Lines<T>;
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ export class Process<S> implements Deno.Closer {
yield* process.stdout;
} finally {
status = await process.status;
if (ser != null) {
await ser;
}
await ser;
}

const cause = passError();
Expand Down
192 changes: 124 additions & 68 deletions src/transformers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,98 @@ export async function* toChunkedLines(
}
}

// /**
// * Convert an `AsyncIterable<Uint8Array>` into an `AsyncIterable<Uint8Array[]>`
// * (an array of lines chunked together based on buffer size)
// * split on `lf` and also suppressing trailing `cr`. `lf` and trailing `cr`
// * is removed from the returned lines. As this is line-oriented data, if the
// * last line is empty (the last byte was a line feed, splitting into one extra line),
// * it is suppressed.
// *
// * @param buffs The iterable bytes.
// */
// export async function* toByteLines2(
// buffs: AsyncIterable<Uint8Array>,
// ): AsyncIterable<Uint8Array[]> {
// /*
// * Using subarray since that is just a view. No copy operation. Faster.
// *
// * Iterating and testing byte-wise rather than using `find()`, which requires a
// * call to a function for each byte. Should be pretty close to byte-at-a-time
// * C-style scanning. Not as fast as a SIMD operation, but that isn't an option
// * here.
// */
// let currentLine: Uint8Array[] = [];
// let lastline: undefined | Uint8Array;

// function bufferLine(): Uint8Array | undefined {
// function createLine(): Uint8Array {
// const line = concat(currentLine);

// if (line.length > 0 && line[line.length - 1] === 13) {
// /* Strip the carriage return. */
// return line.subarray(0, line.length - 1);
// } else {
// return line;
// }
// }

// const temp = lastline;
// lastline = createLine();
// return temp;
// }

// try {
// for await (const buff of buffs) {
// const length = buff.length;

// const chunk: Uint8Array[] = [];

// let start = 0;
// for (let pos = 0; pos < length; pos++) {
// if (buff[pos] === 10) {
// if (pos) {
// currentLine.push(buff.subarray(start, pos));
// }

// const b = bufferLine();
// if (b) {
// chunk.push(b);
// }

// currentLine = [];
// start = pos + 1;
// }
// }

// if (chunk.length > 0) {
// yield chunk;
// }

// if (start < length) {
// currentLine.push(buff.subarray(start));
// }
// }
// } finally {
// const chunk: Uint8Array[] = [];

// if (currentLine.length > 0) {
// const b = bufferLine();
// if (b) {
// chunk.push(b);
// }
// }

// if (lastline?.length) {
// chunk.push(lastline);
// }

// if (chunk.length > 0) {
// yield chunk;
// }
// }
// }

/**
* Convert an `AsyncIterable<Uint8Array>` into an `AsyncIterable<Uint8Array[]>`
* (an array of lines chunked together based on buffer size)
Expand All @@ -59,83 +151,47 @@ export async function* toChunkedLines(
export async function* toByteLines(
buffs: AsyncIterable<Uint8Array>,
): AsyncIterable<Uint8Array[]> {
/*
* Using subarray since that is just a view. No copy operation. Faster.
*
* Iterating and testing byte-wise rather than using `find()`, which requires a
* call to a function for each byte. Should be pretty close to byte-at-a-time
* C-style scanning. Not as fast as a SIMD operation, but that isn't an option
* here.
*/
let currentLine: Uint8Array[] = [];
let lastline: undefined | Uint8Array;

function bufferLine(): Uint8Array | undefined {
function createLine(): Uint8Array {
const line = concat(currentLine);

if (line.length > 0 && line[line.length - 1] === 13) {
/* Strip the carriage return. */
return line.subarray(0, line.length - 1);
} else {
return line;
}
}

const temp = lastline;
lastline = createLine();
return temp;
}

try {
for await (const buff of buffs) {
const length = buff.length;

const chunk: Uint8Array[] = [];

let start = 0;
for (let pos = 0; pos < length; pos++) {
if (buff[pos] === 10) {
if (pos) {
currentLine.push(buff.subarray(start, pos));
}

const b = bufferLine();
if (b) {
chunk.push(b);
}

currentLine = [];
start = pos + 1;
}
}
const completeLines: Uint8Array[] = [];
const currentLine: Uint8Array[] = [];

if (chunk.length > 0) {
yield chunk;
}
function makeCurrentLineComplete() {
const line = concat(currentLine);
currentLine.length = 0;

if (start < length) {
currentLine.push(buff.subarray(start));
}
const lineLen = line.length;
if (lineLen > 0 && line[lineLen - 1] === 13) {
completeLines.push(line.subarray(0, lineLen - 1));
} else {
completeLines.push(line);
}
} finally {
const chunk: Uint8Array[] = [];
}

if (currentLine.length > 0) {
const b = bufferLine();
if (b) {
chunk.push(b);
for await (const buff of buffs) {
const buffLen = buff.length;
let lastPos = 0;
for (let pos = 0; pos < buffLen; pos++) {
if (buff[pos] === 10) {
currentLine.push(buff.subarray(lastPos, pos));
makeCurrentLineComplete();
lastPos = pos + 1;
}
}

if (lastline?.length) {
chunk.push(lastline);
if (lastPos < buffLen) {
currentLine.push(buff.subarray(lastPos));
}

if (chunk.length > 0) {
yield chunk;
if (completeLines.length > 0) {
yield completeLines;
completeLines.length = 0;
}
}

if (currentLine.length > 0) {
makeCurrentLineComplete();
}

if (completeLines.length > 0) {
yield completeLines;
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion tests/deps/asserts.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/assert/mod.ts";
export * from "https://deno.land/std@0.208.0/assert/mod.ts";
2 changes: 1 addition & 1 deletion tests/deps/colors.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/fmt/colors.ts";
export * from "https://deno.land/std@0.208.0/fmt/colors.ts";
2 changes: 1 addition & 1 deletion tests/deps/path.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/path/mod.ts";
export * from "https://deno.land/std@0.208.0/path/mod.ts";
2 changes: 1 addition & 1 deletion tests/deps/streams.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export * from "https://deno.land/std@0.207.0/streams/mod.ts";
export * from "https://deno.land/std@0.208.0/streams/mod.ts";
Loading

0 comments on commit 44b54fa

Please sign in to comment.