Skip to content

Commit

Permalink
Improve streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
billytrend-cohere committed Apr 17, 2024
1 parent ebbb15c commit c513255
Show file tree
Hide file tree
Showing 3 changed files with 526 additions and 65 deletions.
4 changes: 2 additions & 2 deletions src/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ export class CohereClient {
});
if (_response.ok) {
return new core.Stream({
stream: _response.body,
response: _response,
terminator: "\n",
parse: async (data) => {
return await serializers.StreamedChatResponse.parseOrThrow(data, {
Expand Down Expand Up @@ -248,7 +248,7 @@ export class CohereClient {
});
if (_response.ok) {
return new core.Stream({
stream: _response.body,
response: _response,
terminator: "\n",
parse: async (data) => {
return await serializers.GenerateStreamedResponse.parseOrThrow(data, {
Expand Down
77 changes: 14 additions & 63 deletions src/core/streaming-fetcher/Stream.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,33 @@
import { Readable } from "node:stream";
import { ReadableStream } from "stream/web";
import { Readable } from "stream";
import { SuccessfulResponse } from "../fetcher/APIResponse";
import { StreamUtils } from "./streaming-utils";

export class Stream<T> implements AsyncIterable<T> {
private stream: Readable | ReadableStream;
private response: SuccessfulResponse<Readable>;
private parse: (val: unknown) => Promise<T>;
private terminator: string;

constructor({
stream,
response,
parse,
terminator,
}: {
stream: Readable | ReadableStream;
response: SuccessfulResponse<Readable>;
parse: (val: unknown) => Promise<T>;
terminator: string;
}) {
this.stream = stream;
this.response = response;
this.parse = parse;
this.terminator = terminator;
}

private async *iterMessages(): AsyncGenerator<T, void> {
const decoder = new TextDecoder("utf8");
const stream = readableStreamAsyncIterable<any>(this.stream);
let previous = "";
for await (const chunk of stream) {
let bufferChunk = "";
// Buffer is present in Node.js environment
if (typeof Buffer !== "undefined") {
bufferChunk += Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
}
// TextDecoder is present in Browser environment
else if (typeof TextDecoder !== "undefined") {
bufferChunk += decoder.decode(chunk);
}
previous += bufferChunk;
let terminatorIndex: number;
while ((terminatorIndex = previous.indexOf(this.terminator)) >= 0) {
const line = previous.slice(0, terminatorIndex).trimEnd();
const message = await this.parse(JSON.parse(line));
yield message;
previous = previous.slice(terminatorIndex + 1);
}
const sse = Boolean(this.response?.headers?.get("content-type") === "text/event-stream");
const stream: StreamUtils<{}> = sse ? StreamUtils.fromReadableStream(this.response.body , null as any) : StreamUtils.fromSSEResponse(this.response, null as any);

for await (const obj of stream) {
const message = await this.parse(obj);
yield message;
}
}

Expand All @@ -50,39 +36,4 @@ export class Stream<T> implements AsyncIterable<T> {
yield message;
}
}
}

/**
* Browser polyfill for ReadableStream
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) {
return stream;
}

const reader = stream.getReader();
return {
async next() {
try {
const result = await reader.read();
if (result?.done) {
reader.releaseLock();
} // release lock when stream becomes closed
return result;
} catch (e) {
reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
},
async return() {
const cancelPromise = reader.cancel();
reader.releaseLock();
await cancelPromise;
return { done: true, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
}
}
Loading

0 comments on commit c513255

Please sign in to comment.