From 875b55e53b9d51b8b228a51347f7318826d84fac Mon Sep 17 00:00:00 2001 From: Thomas Bouldin Date: Mon, 3 Feb 2025 15:58:20 -0800 Subject: [PATCH] chore(js): Rename streamingCallback to sendChunk (#1809) --- docs/flows.md | 9 ++-- js/doc-snippets/src/flows/index.ts | 14 +++-- js/genkit/src/client/client.ts | 8 +-- js/genkit/tests/helpers.ts | 18 +++---- js/plugins/express/README.md | 19 +++---- js/plugins/googleai/src/gemini.ts | 6 +-- .../vertexai/src/modelgarden/anthropic.ts | 6 +-- .../vertexai/src/modelgarden/mistral.ts | 6 +-- .../src/modelgarden/openai_compatibility.ts | 6 +-- js/pnpm-lock.yaml | 46 ---------------- js/testapps/dev-ui-gallery/src/main/flows.ts | 24 ++++----- js/testapps/express/src/index.ts | 4 +- js/testapps/flow-sample1/src/index.ts | 24 ++++----- js/testapps/flow-simple-ai/src/index.ts | 52 +++++++------------ js/testapps/multimodal/src/pdf.ts | 4 +- js/testapps/multimodal/src/video.ts | 4 +- js/testapps/prompt-file/src/index.ts | 21 +++----- js/testapps/rag/src/pdf-rag.ts | 4 +- .../js-angular/server/src/jsonStreaming.ts | 10 ++-- samples/js-chatbot/server/src/index.ts | 4 +- tests/test_js_app/src/index.ts | 10 ++-- 21 files changed, 107 insertions(+), 192 deletions(-) diff --git a/docs/flows.md b/docs/flows.md index 6bc982eea..2a88b22c2 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -109,12 +109,11 @@ Here's an example of a flow that supports streaming: * The `streamSchema` option specifies the type of values your flow streams. This does not necessarily need to be the same type as the `outputSchema`, which is the type of the flow's complete output. -* `streamingCallback` is a callback function that takes a single parameter, of +* The second parameter to your flow definition is called "sideChannel". It provides + multiple useful features, such as request context and the `sendChunk` callback. + The `sendChunk` callback takes a single parameter, of the type specified by `streamSchema`. Whenever data becomes available within - your flow, send the data to the output stream by calling this function. Note - that `streamingCallback` is only defined if the caller of your flow - requested streaming output, so you need to check that it's defined before - calling it. + your flow, send the data to the output stream by calling this function. In the above example, the values streamed by the flow are directly coupled to the values streamed by the `generate()` call inside the flow. Although this is diff --git a/js/doc-snippets/src/flows/index.ts b/js/doc-snippets/src/flows/index.ts index 938a82528..219f6ce37 100644 --- a/js/doc-snippets/src/flows/index.ts +++ b/js/doc-snippets/src/flows/index.ts @@ -92,19 +92,17 @@ export const menuSuggestionStreamingFlow = ai.defineFlow( streamSchema: z.string(), outputSchema: z.object({ theme: z.string(), menuItem: z.string() }), }, - async (restaurantTheme, streamingCallback) => { + async (restaurantTheme, { sendChunk }) => { const response = await ai.generateStream({ model: gemini15Flash, prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`, }); - if (streamingCallback) { - for await (const chunk of response.stream) { - // Here, you could process the chunk in some way before sending it to - // the output stream via streamingCallback(). In this example, we output - // the text of the chunk, unmodified. - streamingCallback(chunk.text); - } + for await (const chunk of response.stream) { + // Here, you could process the chunk in some way before sending it to + // the output stream via streamingCallback(). In this example, we output + // the text of the chunk, unmodified. + sendChunk(chunk.text); } return { diff --git a/js/genkit/src/client/client.ts b/js/genkit/src/client/client.ts index 05e5d3256..dd42cabb8 100644 --- a/js/genkit/src/client/client.ts +++ b/js/genkit/src/client/client.ts @@ -53,7 +53,7 @@ export function streamFlow({ const operationPromise = __flowRunEnvelope({ url, input, - streamingCallback: (c) => channel.send(c), + sendChunk: (c) => channel.send(c), headers, }); operationPromise.then( @@ -70,12 +70,12 @@ export function streamFlow({ async function __flowRunEnvelope({ url, input, - streamingCallback, + sendChunk, headers, }: { url: string; input: any; - streamingCallback: (chunk: any) => void; + sendChunk: (chunk: any) => void; headers?: Record; }) { const response = await fetch(url, { @@ -115,7 +115,7 @@ async function __flowRunEnvelope({ .substring('data: '.length) ); if (chunk.hasOwnProperty('message')) { - streamingCallback(chunk.message); + sendChunk(chunk.message); } else if (chunk.hasOwnProperty('result')) { return chunk.result; } else if (chunk.hasOwnProperty('error')) { diff --git a/js/genkit/tests/helpers.ts b/js/genkit/tests/helpers.ts index e5dcddcd1..32df0ae39 100644 --- a/js/genkit/tests/helpers.ts +++ b/js/genkit/tests/helpers.ts @@ -31,12 +31,12 @@ export function defineEchoModel(ai: Genkit): ModelAction { { name: 'echoModel', }, - async (request, streamingCallback) => { + async (request, sendChunk) => { (model as any).__test__lastRequest = request; - (model as any).__test__lastStreamingCallback = streamingCallback; - if (streamingCallback) { + (model as any).__test__lastStreamingCallback = sendChunk; + if (sendChunk) { await runAsync(() => { - streamingCallback({ + sendChunk({ content: [ { text: '3', @@ -45,7 +45,7 @@ export function defineEchoModel(ai: Genkit): ModelAction { }); }); await runAsync(() => { - streamingCallback({ + sendChunk({ content: [ { text: '2', @@ -54,7 +54,7 @@ export function defineEchoModel(ai: Genkit): ModelAction { }); }); await runAsync(() => { - streamingCallback({ + sendChunk({ content: [ { text: '1', @@ -148,7 +148,7 @@ export function defineStaticResponseModel( export type ProgrammableModel = ModelAction & { handleResponse: ( req: GenerateRequest, - streamingCallback?: StreamingCallback + sendChunk?: StreamingCallback ) => Promise; lastRequest?: GenerateRequest; @@ -162,9 +162,9 @@ export function defineProgrammableModel(ai: Genkit): ProgrammableModel { tools: true, }, }, - async (request, streamingCallback) => { + async (request, sendChunk) => { pm.lastRequest = JSON.parse(JSON.stringify(request)); - return pm.handleResponse(request, streamingCallback); + return pm.handleResponse(request, sendChunk); } ) as ProgrammableModel; diff --git a/js/plugins/express/README.md b/js/plugins/express/README.md index b7b610f6a..95c1a2ada 100644 --- a/js/plugins/express/README.md +++ b/js/plugins/express/README.md @@ -6,17 +6,14 @@ This plugin provides utilities for conveninetly exposing Genkit flows and action import { expressHandler } from '@genkit-ai/express'; import express from 'express'; -const simpleFlow = ai.defineFlow( - 'simpleFlow', - async (input, streamingCallback) => { - const { text } = await ai.generate({ - model: gemini15Flash, - prompt: input, - streamingCallback, - }); - return text; - } -); +const simpleFlow = ai.defineFlow('simpleFlow', async (input, { sendChunk }) => { + const { text } = await ai.generate({ + model: gemini15Flash, + prompt: input, + onChunk: (c) => sendChunk(c.text), + }); + return text; +}); const app = express(); app.use(express.json()); diff --git a/js/plugins/googleai/src/gemini.ts b/js/plugins/googleai/src/gemini.ts index cace611fe..92a2e0632 100644 --- a/js/plugins/googleai/src/gemini.ts +++ b/js/plugins/googleai/src/gemini.ts @@ -646,7 +646,7 @@ export function defineGoogleAIModel( configSchema: model.configSchema, use: middleware, }, - async (request, streamingCallback) => { + async (request, sendChunk) => { const options: RequestOptions = { apiClient: GENKIT_CLIENT_HEADER }; if (apiVersion) { options.apiVersion = apiVersion; @@ -780,14 +780,14 @@ export function defineGoogleAIModel( ); } - if (streamingCallback) { + if (sendChunk) { const result = await genModel .startChat(updatedChatRequest) .sendMessageStream(msg.parts, options); for await (const item of result.stream) { (item as GenerateContentResponse).candidates?.forEach((candidate) => { const c = fromJSONModeScopedGeminiCandidate(candidate); - streamingCallback({ + sendChunk({ index: c.index, content: c.message.content, }); diff --git a/js/plugins/vertexai/src/modelgarden/anthropic.ts b/js/plugins/vertexai/src/modelgarden/anthropic.ts index 850f393a5..9fac5c5fd 100644 --- a/js/plugins/vertexai/src/modelgarden/anthropic.ts +++ b/js/plugins/vertexai/src/modelgarden/anthropic.ts @@ -409,9 +409,9 @@ export function anthropicModel( supports: model.info?.supports, versions: model.info?.versions, }, - async (input, streamingCallback) => { + async (input, sendChunk) => { const client = clientFactory(input.config?.location || region); - if (!streamingCallback) { + if (!sendChunk) { const response = await client.messages.create({ ...toAnthropicRequest(input.config?.version ?? modelName, input), stream: false, @@ -423,7 +423,7 @@ export function anthropicModel( ); for await (const event of stream) { if (event.type === 'content_block_delta') { - streamingCallback({ + sendChunk({ index: 0, content: [ { diff --git a/js/plugins/vertexai/src/modelgarden/mistral.ts b/js/plugins/vertexai/src/modelgarden/mistral.ts index b2aac4dde..88ec9d24f 100644 --- a/js/plugins/vertexai/src/modelgarden/mistral.ts +++ b/js/plugins/vertexai/src/modelgarden/mistral.ts @@ -341,13 +341,13 @@ export function mistralModel( supports: model.info?.supports, versions: model.info?.versions, }, - async (input, streamingCallback) => { + async (input, sendChunk) => { const client = getClient(input.config?.location || region); const versionedModel = input.config?.version ?? model.info?.versions?.[0] ?? model.name; - if (!streamingCallback) { + if (!sendChunk) { const mistralRequest = toMistralRequest(versionedModel, input); const response = await client.chat.complete(mistralRequest, { @@ -372,7 +372,7 @@ export function mistralModel( for await (const event of stream) { const parts = fromMistralCompletionChunk(event.data); if (parts.length > 0) { - streamingCallback({ + sendChunk({ content: parts, }); } diff --git a/js/plugins/vertexai/src/modelgarden/openai_compatibility.ts b/js/plugins/vertexai/src/modelgarden/openai_compatibility.ts index 2de914f57..36b023a37 100644 --- a/js/plugins/vertexai/src/modelgarden/openai_compatibility.ts +++ b/js/plugins/vertexai/src/modelgarden/openai_compatibility.ts @@ -311,12 +311,12 @@ export function openaiCompatibleModel( }, async ( request: GenerateRequest, - streamingCallback?: StreamingCallback + sendChunk?: StreamingCallback ): Promise => { let response: ChatCompletion; const client = await clientFactory(request); const body = toRequestBody(model, request); - if (streamingCallback) { + if (sendChunk) { const stream = client.beta.chat.completions.stream({ ...body, stream: true, @@ -324,7 +324,7 @@ export function openaiCompatibleModel( for await (const chunk of stream) { chunk.choices?.forEach((chunk) => { const c = fromOpenAiChunkChoice(chunk); - streamingCallback({ + sendChunk({ index: c.index, content: c.message.content, }); diff --git a/js/pnpm-lock.yaml b/js/pnpm-lock.yaml index 412999447..a2d56992b 100644 --- a/js/pnpm-lock.yaml +++ b/js/pnpm-lock.yaml @@ -1496,52 +1496,6 @@ importers: specifier: ^5.6.2 version: 5.6.3 - testapps/personal-testing: - dependencies: - '@genkit-ai/firebase': - specifier: workspace:* - version: link:../../plugins/firebase - '@genkit-ai/google-cloud': - specifier: workspace:* - version: link:../../plugins/google-cloud - '@genkit-ai/googleai': - specifier: workspace:* - version: link:../../plugins/googleai - '@genkit-ai/vertexai': - specifier: workspace:* - version: link:../../plugins/vertexai - '@google/generative-ai': - specifier: ^0.15.0 - version: 0.15.0 - '@opentelemetry/sdk-trace-base': - specifier: ^1.25.0 - version: 1.26.0(@opentelemetry/api@1.9.0) - body-parser: - specifier: ^1.20.3 - version: 1.20.3 - express: - specifier: ^4.21.0 - version: 4.21.1 - firebase-admin: - specifier: '>=12.2' - version: 12.3.1(encoding@0.1.13) - genkit: - specifier: workspace:* - version: link:../../genkit - partial-json: - specifier: ^0.1.7 - version: 0.1.7 - devDependencies: - rimraf: - specifier: ^6.0.1 - version: 6.0.1 - tsx: - specifier: ^4.19.2 - version: 4.19.2 - typescript: - specifier: ^5.3.3 - version: 5.6.3 - testapps/prompt-file: dependencies: '@genkit-ai/googleai': diff --git a/js/testapps/dev-ui-gallery/src/main/flows.ts b/js/testapps/dev-ui-gallery/src/main/flows.ts index fa9bc346c..01ca8239a 100644 --- a/js/testapps/dev-ui-gallery/src/main/flows.ts +++ b/js/testapps/dev-ui-gallery/src/main/flows.ts @@ -83,13 +83,11 @@ ai.defineFlow( outputSchema: z.string(), streamSchema: z.number(), }, - async (count, streamingCallback) => { + async (count, { sendChunk }) => { let i = 1; - if (streamingCallback) { - for (; i <= count; i++) { - await new Promise((r) => setTimeout(r, 500)); - streamingCallback(i); - } + for (; i <= count; i++) { + await new Promise((r) => setTimeout(r, 500)); + sendChunk(i); } return `done: ${count}, streamed: ${i - 1} times`; } @@ -168,16 +166,14 @@ ai.defineFlow( outputSchema: z.string(), streamSchema: z.number(), }, - async (count, streamingCallback) => { + async (count, { sendChunk }) => { let i = 1; - if (streamingCallback) { - for (; i <= count; i++) { - if (i == 3) { - throw new Error('I cannot count that high!'); - } - await new Promise((r) => setTimeout(r, 500)); - streamingCallback(i); + for (; i <= count; i++) { + if (i == 3) { + throw new Error('I cannot count that high!'); } + await new Promise((r) => setTimeout(r, 500)); + sendChunk(i); } if (count) { throw new Error('I cannot count that low!'); diff --git a/js/testapps/express/src/index.ts b/js/testapps/express/src/index.ts index 8e6cc0019..4c4ae6ed1 100644 --- a/js/testapps/express/src/index.ts +++ b/js/testapps/express/src/index.ts @@ -49,7 +49,7 @@ const ai = genkit({ export const jokeFlow = ai.defineFlow( { name: 'jokeFlow', inputSchema: z.string(), outputSchema: z.string() }, - async (subject, streamingCallback) => { + async (subject, { sendChunk }) => { return await ai.run('call-llm', async () => { const llmResponse = await ai.generate({ prompt: `tell me long joke about ${subject}`, @@ -57,7 +57,7 @@ export const jokeFlow = ai.defineFlow( config: { temperature: 1, }, - streamingCallback, + onChunk: (c) => sendChunk(c.text), }); return llmResponse.text; diff --git a/js/testapps/flow-sample1/src/index.ts b/js/testapps/flow-sample1/src/index.ts index d75927e32..d2ac1a21e 100644 --- a/js/testapps/flow-sample1/src/index.ts +++ b/js/testapps/flow-sample1/src/index.ts @@ -70,13 +70,11 @@ export const streamy = ai.defineFlow( outputSchema: z.string(), streamSchema: z.object({ count: z.number() }), }, - async (count, streamingCallback) => { + async (count, { sendChunk }) => { let i = 0; - if (streamingCallback) { - for (; i < count; i++) { - await new Promise((r) => setTimeout(r, 1000)); - streamingCallback({ count: i }); - } + for (; i < count; i++) { + await new Promise((r) => setTimeout(r, 1000)); + sendChunk({ count: i }); } return `done: ${count}, streamed: ${i} times`; } @@ -90,16 +88,14 @@ export const streamyThrowy = ai.defineFlow( outputSchema: z.string(), streamSchema: z.object({ count: z.number() }), }, - async (count, streamingCallback) => { + async (count, { sendChunk }) => { let i = 0; - if (streamingCallback) { - for (; i < count; i++) { - if (i == 3) { - throw new Error('whoops'); - } - await new Promise((r) => setTimeout(r, 1000)); - streamingCallback({ count: i }); + for (; i < count; i++) { + if (i == 3) { + throw new Error('whoops'); } + await new Promise((r) => setTimeout(r, 1000)); + sendChunk({ count: i }); } return `done: ${count}, streamed: ${i} times`; } diff --git a/js/testapps/flow-simple-ai/src/index.ts b/js/testapps/flow-simple-ai/src/index.ts index a73afda44..79da26e11 100644 --- a/js/testapps/flow-simple-ai/src/index.ts +++ b/js/testapps/flow-simple-ai/src/index.ts @@ -129,16 +129,14 @@ export const streamFlow = ai.defineFlow( outputSchema: z.string(), streamSchema: z.string(), }, - async (prompt, streamingCallback) => { - const { response, stream } = await ai.generateStream({ + async (prompt, { sendChunk }) => { + const { response, stream } = ai.generateStream({ model: gemini15Flash, prompt, }); - if (streamingCallback) { - for await (const chunk of stream) { - streamingCallback(chunk.content[0].text!); - } + for await (const chunk of stream) { + sendChunk(chunk.content[0].text!); } return (await response).text; @@ -167,12 +165,8 @@ export const streamJsonFlow = ai.defineFlow( outputSchema: z.string(), streamSchema: GameCharactersSchema, }, - async (count, streamingCallback) => { - if (!streamingCallback) { - throw new Error('this flow only works in streaming mode'); - } - - const { response, stream } = await ai.generateStream({ + async (count, { sendChunk }) => { + const { response, stream } = ai.generateStream({ model: gemini15Flash, output: { schema: GameCharactersSchema, @@ -184,7 +178,7 @@ export const streamJsonFlow = ai.defineFlow( for await (const chunk of stream) { buffer += chunk.content[0].text!; if (buffer.length > 10) { - streamingCallback(parse(maybeStripMarkdown(buffer), Allow.ALL)); + sendChunk(parse(maybeStripMarkdown(buffer), Allow.ALL)); } } @@ -270,12 +264,12 @@ export const vertexStreamer = ai.defineFlow( inputSchema: z.string(), outputSchema: z.string(), }, - async (input, streamingCallback) => { + async (input, { sendChunk }) => { return await ai.run('call-llm', async () => { const llmResponse = await ai.generate({ model: gemini15Flash, prompt: `Tell me a very long joke about ${input}.`, - streamingCallback, + onChunk: (c) => sendChunk(c.text), }); return llmResponse.text; @@ -407,12 +401,8 @@ export const toolCaller = ai.defineFlow( outputSchema: z.string(), streamSchema: z.any(), }, - async (_, streamingCallback) => { - if (!streamingCallback) { - throw new Error('this flow only works in streaming mode'); - } - - const { response, stream } = await ai.generateStream({ + async (_, { sendChunk }) => { + const { response, stream } = ai.generateStream({ model: gemini15Flash, config: { temperature: 1, @@ -422,7 +412,7 @@ export const toolCaller = ai.defineFlow( }); for await (const chunk of stream) { - streamingCallback(chunk); + sendChunk(chunk); } return (await response).text; @@ -449,12 +439,8 @@ export const forcedToolCaller = ai.defineFlow( outputSchema: z.string(), streamSchema: z.any(), }, - async (input, streamingCallback) => { - if (!streamingCallback) { - throw new Error('this flow only works in streaming mode'); - } - - const { response, stream } = await ai.generateStream({ + async (input, { sendChunk }) => { + const { response, stream } = ai.generateStream({ model: gemini15Flash, config: { temperature: 1, @@ -465,7 +451,7 @@ export const forcedToolCaller = ai.defineFlow( }); for await (const chunk of stream) { - streamingCallback(chunk); + sendChunk(chunk); } return (await response).text; @@ -573,9 +559,9 @@ export const arrayStreamTester = ai.defineFlow( outputSchema: z.any(), streamSchema: z.any(), }, - async (input, streamingCallback) => { + async (input, { sendChunk }) => { try { - const { stream, response } = await ai.generateStream({ + const { stream, response } = ai.generateStream({ model: gemini15Flash, config: { safetySettings: [ @@ -612,7 +598,7 @@ export const arrayStreamTester = ai.defineFlow( }); for await (const { output, text } of stream) { - streamingCallback?.({ text, output }); + sendChunk({ text, output }); } const result = await response; @@ -645,7 +631,7 @@ ai.defineModel( { name: 'hiModel', }, - async (request, streamingCallback) => { + async () => { return { finishReason: 'stop', message: { role: 'model', content: [{ text: 'hi' }] }, diff --git a/js/testapps/multimodal/src/pdf.ts b/js/testapps/multimodal/src/pdf.ts index 2b4e878b9..c53ea4913 100644 --- a/js/testapps/multimodal/src/pdf.ts +++ b/js/testapps/multimodal/src/pdf.ts @@ -43,7 +43,7 @@ export const multimodalPdfQAFlow = ai.defineFlow( inputSchema: z.string(), outputSchema: z.string(), }, - async (query: any, streamingCallback: any) => { + async (query: string, { sendChunk }) => { const docs = (await ai.retrieve({ retriever: pdfMultimodalRetriever, query, @@ -74,7 +74,7 @@ export const multimodalPdfQAFlow = ai.defineFlow( }), }, { - streamingCallback, + onChunk: (c) => sendChunk(c.text), } ).then((r) => r.text); } diff --git a/js/testapps/multimodal/src/video.ts b/js/testapps/multimodal/src/video.ts index fe50d42cb..9902e9c8e 100644 --- a/js/testapps/multimodal/src/video.ts +++ b/js/testapps/multimodal/src/video.ts @@ -138,7 +138,7 @@ export const VideoQAFlow = ai.defineFlow( inputSchema: z.string(), outputSchema: z.string(), }, - async (query: any, streamingCallback: any) => { + async (query, { sendChunk }) => { const docs = (await ai.retrieve({ retriever: videoRetriever, query, @@ -166,7 +166,7 @@ export const VideoQAFlow = ai.defineFlow( })[0], }, { - streamingCallback, + onChunk: (c) => sendChunk(c.text), } ).then((r) => r.text); } diff --git a/js/testapps/prompt-file/src/index.ts b/js/testapps/prompt-file/src/index.ts index f1596d8cc..6a1060d07 100644 --- a/js/testapps/prompt-file/src/index.ts +++ b/js/testapps/prompt-file/src/index.ts @@ -87,20 +87,15 @@ ai.defineFlow( outputSchema: z.string(), streamSchema: z.string(), }, - async ({ subject, personality }, streamingCallback) => { + async ({ subject, personality }, { sendChunk }) => { const storyPrompt = ai.prompt('story'); - if (streamingCallback) { - const { response, stream } = await storyPrompt.stream({ - subject, - personality, - }); - for await (const chunk of stream) { - streamingCallback(chunk.content[0]?.text!); - } - return (await response).text; - } else { - const response = await storyPrompt({ subject }); - return response.text; + const { response, stream } = storyPrompt.stream({ + subject, + personality, + }); + for await (const chunk of stream) { + sendChunk(chunk.content[0]?.text!); } + return (await response).text; } ); diff --git a/js/testapps/rag/src/pdf-rag.ts b/js/testapps/rag/src/pdf-rag.ts index e3b33ca5f..63a9fcf0f 100644 --- a/js/testapps/rag/src/pdf-rag.ts +++ b/js/testapps/rag/src/pdf-rag.ts @@ -38,7 +38,7 @@ export const pdfQA = ai.defineFlow( inputSchema: z.string(), outputSchema: z.string(), }, - async (query, streamingCallback) => { + async (query, { sendChunk }) => { const docs = await ai.retrieve({ retriever: pdfChatRetriever, query, @@ -51,7 +51,7 @@ export const pdfQA = ai.defineFlow( context: docs.map((d) => d.text), }, { - streamingCallback, + onChunk: (c) => sendChunk(c.text), } ).then((r) => r.text); } diff --git a/samples/js-angular/server/src/jsonStreaming.ts b/samples/js-angular/server/src/jsonStreaming.ts index df0611100..ef07b7c62 100644 --- a/samples/js-angular/server/src/jsonStreaming.ts +++ b/samples/js-angular/server/src/jsonStreaming.ts @@ -34,18 +34,14 @@ const GameCharactersSchema = z.object({ .describe('Characters'), }); -export const streamCharacters = ai.defineStreamingFlow( +export const streamCharacters = ai.defineFlow( { name: 'streamCharacters', inputSchema: z.number(), outputSchema: z.string(), streamSchema: GameCharactersSchema, }, - async (count, streamingCallback) => { - if (!streamingCallback) { - throw new Error('this flow only works in streaming mode'); - } - + async (count, { sendChunk }) => { const { response, stream } = await ai.generateStream({ model: gemini15Flash, output: { @@ -62,7 +58,7 @@ export const streamCharacters = ai.defineStreamingFlow( for await (const chunk of stream) { buffer += chunk.content[0].text!; if (buffer.length > 10) { - streamingCallback(parse(maybeStripMarkdown(buffer), Allow.ALL)); + sendChunk(parse(maybeStripMarkdown(buffer), Allow.ALL)); } } diff --git a/samples/js-chatbot/server/src/index.ts b/samples/js-chatbot/server/src/index.ts index 283b368a2..ddd4c4454 100644 --- a/samples/js-chatbot/server/src/index.ts +++ b/samples/js-chatbot/server/src/index.ts @@ -63,7 +63,7 @@ export const chatbotFlow = ai.defineFlow( outputSchema: z.string(), streamSchema: GenerateResponseChunkSchema, }, - async (request, streamingCallback) => { + async (request, { sendChunk }) => { // Retrieve conversation history. const history = await ai.run( 'retrieve-history', @@ -78,7 +78,7 @@ export const chatbotFlow = ai.defineFlow( prompt: request.prompt, messages: history, model: llms[request.llmIndex], - streamingCallback, + onChunk: (c) => sendChunk(c.text), }); // Save history. diff --git a/tests/test_js_app/src/index.ts b/tests/test_js_app/src/index.ts index 72b288645..2919e4fdc 100644 --- a/tests/test_js_app/src/index.ts +++ b/tests/test_js_app/src/index.ts @@ -69,13 +69,11 @@ export const streamy = ai.defineFlow( outputSchema: z.string(), streamSchema: z.object({ count: z.number() }), }, - async (count, streamingCallback) => { + async (count, { sendChunk }) => { let i = 0; - if (streamingCallback) { - for (; i < count; i++) { - await new Promise((r) => setTimeout(r, 1000)); - streamingCallback({ count: i }); - } + for (; i < count; i++) { + await new Promise((r) => setTimeout(r, 1000)); + sendChunk({ count: i }); } return `done: ${count}, streamed: ${i} times`; }