From b8afd1c6960c85fcd7136c47f402d3a328010cab Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 26 Nov 2024 20:49:28 +0530 Subject: [PATCH 1/2] feat: create and run stream support --- src/apis/threads.ts | 59 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/src/apis/threads.ts b/src/apis/threads.ts index bedc7c5..c975f09 100644 --- a/src/apis/threads.ts +++ b/src/apis/threads.ts @@ -1,7 +1,7 @@ import { ApiClientInterface } from '../_types/generalTypes'; import { ApiResource } from '../apiResource'; import { RequestOptions } from '../baseClient'; - +import { EventEmitter } from 'events'; import { finalResponse, initOpenAIClient, overrideConfig } from '../utils'; import { createHeaders } from './createHeaders'; @@ -113,6 +113,8 @@ export class Threads extends ApiResource { opts?: RequestOptions ): Promise { const body: ThreadCreateAndRunParams = _body; + const { stream } = body; + if (params) { const config = overrideConfig(this.client.config, params.config); this.client.customHeaders = { @@ -123,11 +125,24 @@ export class Threads extends ApiResource { const OAIclient = initOpenAIClient(this.client); - const result = await OAIclient.beta.threads - .createAndRun(body, opts) - .withResponse(); - - return finalResponse(result); + if(stream === true) { + const eventEmitter = new EventEmitter(); + + (async ()=> { + const streamResponse = await OAIclient.beta.threads.createAndRun(body, opts) + for await (const chunk of streamResponse as AsyncIterable) { + eventEmitter.emit('data', chunk); + } + eventEmitter.emit('end'); + })(); + return eventEmitter; + } else { + const result = await OAIclient.beta.threads + .createAndRun(body, opts) + .withResponse(); + + return finalResponse(result); + } } async createAndRunPoll( @@ -284,8 +299,9 @@ export class Runs extends ApiResource { _body: RunCreateParams, params?: ApiClientInterface, opts?: RequestOptions - ): Promise { + ): Promise{ const body: RunCreateParams = _body; + const { stream } = body; if (params) { const config = overrideConfig(this.client.config, params.config); this.client.customHeaders = { @@ -293,14 +309,27 @@ export class Runs extends ApiResource { ...createHeaders({ ...params, config }), }; } - + const OAIclient = initOpenAIClient(this.client); - - const result = await OAIclient.beta.threads.runs - .create(threadId, body, opts) - .withResponse(); - - return finalResponse(result); + + if(stream === true) { + const eventEmitter = new EventEmitter(); + + (async ()=> { + const streamResponse = await OAIclient.beta.threads.runs.create(threadId, body, opts) + for await (const chunk of streamResponse as AsyncIterable) { + eventEmitter.emit('data', chunk); + } + eventEmitter.emit('end'); + })(); + return eventEmitter; + } else { + const result = await OAIclient.beta.threads.runs + .create(threadId, body, opts) + .withResponse(); + + return finalResponse(result); + } } async list( @@ -685,6 +714,7 @@ export interface RunCreateParams { metadata?: unknown | null; model?: string | null; tools?: Array | null; + stream?: boolean | null; } export interface RunCreateParamsNonStreaming extends RunCreateParams { @@ -698,6 +728,7 @@ export interface ThreadCreateAndRunParams { model?: string | null; thread?: any; tools?: Array | null; + stream?: boolean | null; } export interface ThreadCreateAndRunParamsNonStreaming From ec8fcd5f85556f30a611fe15c822372fd578320b Mon Sep 17 00:00:00 2001 From: csgulati09 Date: Tue, 26 Nov 2024 21:55:43 +0530 Subject: [PATCH 2/2] fix: stream case + linting --- src/apis/threads.ts | 48 ++++++++++++++++++--------------------------- src/baseClient.ts | 2 +- src/utils.ts | 3 ++- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/src/apis/threads.ts b/src/apis/threads.ts index c975f09..768551d 100644 --- a/src/apis/threads.ts +++ b/src/apis/threads.ts @@ -1,7 +1,6 @@ import { ApiClientInterface } from '../_types/generalTypes'; import { ApiResource } from '../apiResource'; import { RequestOptions } from '../baseClient'; -import { EventEmitter } from 'events'; import { finalResponse, initOpenAIClient, overrideConfig } from '../utils'; import { createHeaders } from './createHeaders'; @@ -114,7 +113,7 @@ export class Threads extends ApiResource { ): Promise { const body: ThreadCreateAndRunParams = _body; const { stream } = body; - + if (params) { const config = overrideConfig(this.client.config, params.config); this.client.customHeaders = { @@ -125,22 +124,17 @@ export class Threads extends ApiResource { const OAIclient = initOpenAIClient(this.client); - if(stream === true) { - const eventEmitter = new EventEmitter(); - - (async ()=> { - const streamResponse = await OAIclient.beta.threads.createAndRun(body, opts) - for await (const chunk of streamResponse as AsyncIterable) { - eventEmitter.emit('data', chunk); - } - eventEmitter.emit('end'); - })(); - return eventEmitter; + if (stream === true) { + const streamResponse = await OAIclient.beta.threads.createAndRunStream( + body as any, + opts + ); + return streamResponse; } else { const result = await OAIclient.beta.threads .createAndRun(body, opts) .withResponse(); - + return finalResponse(result); } } @@ -299,7 +293,7 @@ export class Runs extends ApiResource { _body: RunCreateParams, params?: ApiClientInterface, opts?: RequestOptions - ): Promise{ + ): Promise { const body: RunCreateParams = _body; const { stream } = body; if (params) { @@ -309,25 +303,21 @@ export class Runs extends ApiResource { ...createHeaders({ ...params, config }), }; } - + const OAIclient = initOpenAIClient(this.client); - - if(stream === true) { - const eventEmitter = new EventEmitter(); - - (async ()=> { - const streamResponse = await OAIclient.beta.threads.runs.create(threadId, body, opts) - for await (const chunk of streamResponse as AsyncIterable) { - eventEmitter.emit('data', chunk); - } - eventEmitter.emit('end'); - })(); - return eventEmitter; + + if (stream === true) { + const streamResponse = await OAIclient.beta.threads.runs.stream( + threadId, + body as any, + opts + ); + return streamResponse; } else { const result = await OAIclient.beta.threads.runs .create(threadId, body, opts) .withResponse(); - + return finalResponse(result); } } diff --git a/src/baseClient.ts b/src/baseClient.ts index f90f64d..dd67916 100644 --- a/src/baseClient.ts +++ b/src/baseClient.ts @@ -57,7 +57,7 @@ async function defaultParseResponse(props: APIResponseProps): Promise { if (contentType?.includes('application/json')) { const headers = defaultParseHeaders(props); const json = { - ...(await response.json() as any), + ...((await response.json()) as any), getHeaders: () => headers, }; diff --git a/src/utils.ts b/src/utils.ts index 033bc26..20f8c4c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -163,7 +163,8 @@ export function toQueryParams( | VirtualKeysListParams | ApiKeysListParams | CongfigsListParams - | LogsExportListParams|any + | LogsExportListParams + | any ): string { if (!params) { return '';