Skip to content

Commit

Permalink
chore(js): Rename streamingCallback to sendChunk (#1809)
Browse files Browse the repository at this point in the history
  • Loading branch information
inlined authored Feb 3, 2025
1 parent 56ce1c6 commit 875b55e
Show file tree
Hide file tree
Showing 21 changed files with 107 additions and 192 deletions.
9 changes: 4 additions & 5 deletions docs/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@ Here's an example of a flow that supports streaming:
* The `streamSchema` option specifies the type of values your flow streams.
This does not necessarily need to be the same type as the `outputSchema`,
which is the type of the flow's complete output.
* `streamingCallback` is a callback function that takes a single parameter, of
* The second parameter to your flow definition is called "sideChannel". It provides
multiple useful features, such as request context and the `sendChunk` callback.
The `sendChunk` callback takes a single parameter, of
the type specified by `streamSchema`. Whenever data becomes available within
your flow, send the data to the output stream by calling this function. Note
that `streamingCallback` is only defined if the caller of your flow
requested streaming output, so you need to check that it's defined before
calling it.
your flow, send the data to the output stream by calling this function.

In the above example, the values streamed by the flow are directly coupled to
the values streamed by the `generate()` call inside the flow. Although this is
Expand Down
14 changes: 6 additions & 8 deletions js/doc-snippets/src/flows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,17 @@ export const menuSuggestionStreamingFlow = ai.defineFlow(
streamSchema: z.string(),
outputSchema: z.object({ theme: z.string(), menuItem: z.string() }),
},
async (restaurantTheme, streamingCallback) => {
async (restaurantTheme, { sendChunk }) => {
const response = await ai.generateStream({
model: gemini15Flash,
prompt: `Invent a menu item for a ${restaurantTheme} themed restaurant.`,
});

if (streamingCallback) {
for await (const chunk of response.stream) {
// Here, you could process the chunk in some way before sending it to
// the output stream via streamingCallback(). In this example, we output
// the text of the chunk, unmodified.
streamingCallback(chunk.text);
}
for await (const chunk of response.stream) {
// Here, you could process the chunk in some way before sending it to
// the output stream via streamingCallback(). In this example, we output
// the text of the chunk, unmodified.
sendChunk(chunk.text);
}

return {
Expand Down
8 changes: 4 additions & 4 deletions js/genkit/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export function streamFlow<O = any, S = any>({
const operationPromise = __flowRunEnvelope({
url,
input,
streamingCallback: (c) => channel.send(c),
sendChunk: (c) => channel.send(c),
headers,
});
operationPromise.then(
Expand All @@ -70,12 +70,12 @@ export function streamFlow<O = any, S = any>({
async function __flowRunEnvelope({
url,
input,
streamingCallback,
sendChunk,
headers,
}: {
url: string;
input: any;
streamingCallback: (chunk: any) => void;
sendChunk: (chunk: any) => void;
headers?: Record<string, string>;
}) {
const response = await fetch(url, {
Expand Down Expand Up @@ -115,7 +115,7 @@ async function __flowRunEnvelope({
.substring('data: '.length)
);
if (chunk.hasOwnProperty('message')) {
streamingCallback(chunk.message);
sendChunk(chunk.message);
} else if (chunk.hasOwnProperty('result')) {
return chunk.result;
} else if (chunk.hasOwnProperty('error')) {
Expand Down
18 changes: 9 additions & 9 deletions js/genkit/tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ export function defineEchoModel(ai: Genkit): ModelAction {
{
name: 'echoModel',
},
async (request, streamingCallback) => {
async (request, sendChunk) => {
(model as any).__test__lastRequest = request;
(model as any).__test__lastStreamingCallback = streamingCallback;
if (streamingCallback) {
(model as any).__test__lastStreamingCallback = sendChunk;
if (sendChunk) {
await runAsync(() => {
streamingCallback({
sendChunk({
content: [
{
text: '3',
Expand All @@ -45,7 +45,7 @@ export function defineEchoModel(ai: Genkit): ModelAction {
});
});
await runAsync(() => {
streamingCallback({
sendChunk({
content: [
{
text: '2',
Expand All @@ -54,7 +54,7 @@ export function defineEchoModel(ai: Genkit): ModelAction {
});
});
await runAsync(() => {
streamingCallback({
sendChunk({
content: [
{
text: '1',
Expand Down Expand Up @@ -148,7 +148,7 @@ export function defineStaticResponseModel(
export type ProgrammableModel = ModelAction & {
handleResponse: (
req: GenerateRequest,
streamingCallback?: StreamingCallback<GenerateResponseChunkData>
sendChunk?: StreamingCallback<GenerateResponseChunkData>
) => Promise<GenerateResponseData>;

lastRequest?: GenerateRequest;
Expand All @@ -162,9 +162,9 @@ export function defineProgrammableModel(ai: Genkit): ProgrammableModel {
tools: true,
},
},
async (request, streamingCallback) => {
async (request, sendChunk) => {
pm.lastRequest = JSON.parse(JSON.stringify(request));
return pm.handleResponse(request, streamingCallback);
return pm.handleResponse(request, sendChunk);
}
) as ProgrammableModel;

Expand Down
19 changes: 8 additions & 11 deletions js/plugins/express/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ This plugin provides utilities for conveninetly exposing Genkit flows and action
import { expressHandler } from '@genkit-ai/express';
import express from 'express';

const simpleFlow = ai.defineFlow(
'simpleFlow',
async (input, streamingCallback) => {
const { text } = await ai.generate({
model: gemini15Flash,
prompt: input,
streamingCallback,
});
return text;
}
);
const simpleFlow = ai.defineFlow('simpleFlow', async (input, { sendChunk }) => {
const { text } = await ai.generate({
model: gemini15Flash,
prompt: input,
onChunk: (c) => sendChunk(c.text),
});
return text;
});

const app = express();
app.use(express.json());
Expand Down
6 changes: 3 additions & 3 deletions js/plugins/googleai/src/gemini.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ export function defineGoogleAIModel(
configSchema: model.configSchema,
use: middleware,
},
async (request, streamingCallback) => {
async (request, sendChunk) => {
const options: RequestOptions = { apiClient: GENKIT_CLIENT_HEADER };
if (apiVersion) {
options.apiVersion = apiVersion;
Expand Down Expand Up @@ -780,14 +780,14 @@ export function defineGoogleAIModel(
);
}

if (streamingCallback) {
if (sendChunk) {
const result = await genModel
.startChat(updatedChatRequest)
.sendMessageStream(msg.parts, options);
for await (const item of result.stream) {
(item as GenerateContentResponse).candidates?.forEach((candidate) => {
const c = fromJSONModeScopedGeminiCandidate(candidate);
streamingCallback({
sendChunk({
index: c.index,
content: c.message.content,
});
Expand Down
6 changes: 3 additions & 3 deletions js/plugins/vertexai/src/modelgarden/anthropic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ export function anthropicModel(
supports: model.info?.supports,
versions: model.info?.versions,
},
async (input, streamingCallback) => {
async (input, sendChunk) => {
const client = clientFactory(input.config?.location || region);
if (!streamingCallback) {
if (!sendChunk) {
const response = await client.messages.create({
...toAnthropicRequest(input.config?.version ?? modelName, input),
stream: false,
Expand All @@ -423,7 +423,7 @@ export function anthropicModel(
);
for await (const event of stream) {
if (event.type === 'content_block_delta') {
streamingCallback({
sendChunk({
index: 0,
content: [
{
Expand Down
6 changes: 3 additions & 3 deletions js/plugins/vertexai/src/modelgarden/mistral.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,13 +341,13 @@ export function mistralModel(
supports: model.info?.supports,
versions: model.info?.versions,
},
async (input, streamingCallback) => {
async (input, sendChunk) => {
const client = getClient(input.config?.location || region);

const versionedModel =
input.config?.version ?? model.info?.versions?.[0] ?? model.name;

if (!streamingCallback) {
if (!sendChunk) {
const mistralRequest = toMistralRequest(versionedModel, input);

const response = await client.chat.complete(mistralRequest, {
Expand All @@ -372,7 +372,7 @@ export function mistralModel(
for await (const event of stream) {
const parts = fromMistralCompletionChunk(event.data);
if (parts.length > 0) {
streamingCallback({
sendChunk({
content: parts,
});
}
Expand Down
6 changes: 3 additions & 3 deletions js/plugins/vertexai/src/modelgarden/openai_compatibility.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,20 +311,20 @@ export function openaiCompatibleModel<C extends typeof OpenAIConfigSchema>(
},
async (
request: GenerateRequest<C>,
streamingCallback?: StreamingCallback<GenerateResponseChunkData>
sendChunk?: StreamingCallback<GenerateResponseChunkData>
): Promise<GenerateResponseData> => {
let response: ChatCompletion;
const client = await clientFactory(request);
const body = toRequestBody(model, request);
if (streamingCallback) {
if (sendChunk) {
const stream = client.beta.chat.completions.stream({
...body,
stream: true,
});
for await (const chunk of stream) {
chunk.choices?.forEach((chunk) => {
const c = fromOpenAiChunkChoice(chunk);
streamingCallback({
sendChunk({
index: c.index,
content: c.message.content,
});
Expand Down
46 changes: 0 additions & 46 deletions js/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 10 additions & 14 deletions js/testapps/dev-ui-gallery/src/main/flows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,11 @@ ai.defineFlow(
outputSchema: z.string(),
streamSchema: z.number(),
},
async (count, streamingCallback) => {
async (count, { sendChunk }) => {
let i = 1;
if (streamingCallback) {
for (; i <= count; i++) {
await new Promise((r) => setTimeout(r, 500));
streamingCallback(i);
}
for (; i <= count; i++) {
await new Promise((r) => setTimeout(r, 500));
sendChunk(i);
}
return `done: ${count}, streamed: ${i - 1} times`;
}
Expand Down Expand Up @@ -168,16 +166,14 @@ ai.defineFlow(
outputSchema: z.string(),
streamSchema: z.number(),
},
async (count, streamingCallback) => {
async (count, { sendChunk }) => {
let i = 1;
if (streamingCallback) {
for (; i <= count; i++) {
if (i == 3) {
throw new Error('I cannot count that high!');
}
await new Promise((r) => setTimeout(r, 500));
streamingCallback(i);
for (; i <= count; i++) {
if (i == 3) {
throw new Error('I cannot count that high!');
}
await new Promise((r) => setTimeout(r, 500));
sendChunk(i);
}
if (count) {
throw new Error('I cannot count that low!');
Expand Down
4 changes: 2 additions & 2 deletions js/testapps/express/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ const ai = genkit({

export const jokeFlow = ai.defineFlow(
{ name: 'jokeFlow', inputSchema: z.string(), outputSchema: z.string() },
async (subject, streamingCallback) => {
async (subject, { sendChunk }) => {
return await ai.run('call-llm', async () => {
const llmResponse = await ai.generate({
prompt: `tell me long joke about ${subject}`,
model: gemini15Flash,
config: {
temperature: 1,
},
streamingCallback,
onChunk: (c) => sendChunk(c.text),
});

return llmResponse.text;
Expand Down
Loading

0 comments on commit 875b55e

Please sign in to comment.