From 6b355ab9ad24a75769ded30a38eeb85ae1b6d4c0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 13 Dec 2024 11:42:50 +0000 Subject: [PATCH] Upgrades and fixes to Realtime and Realtime streams (#1549) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix streaming splits in realtime streams v2 * Add changeset * Skip all flaky tests 😡 * Improve the way we stream from tasks to the server * Improve the v1 realtime streams (Redis) * Turn on the relay realtime stream service * Improved the relay realtime cleanup * Fixed consuming realtime runs w/streams after the run is already finished * Remove some logs * Update changeset * Fixed runStream tests --- .changeset/rude-walls-help.md | 9 + .../v3/ApiRunListPresenter.server.ts | 13 +- .../app/presenters/v3/SpanPresenter.server.ts | 2 +- .../realtime.v1.streams.$runId.$streamId.ts | 5 +- .../databaseRealtimeStreams.server.ts | 29 +- .../realtime/redisRealtimeStreams.server.ts | 60 ++-- .../realtime/relayRealtimeStreams.server.ts | 257 ++++++++++++++++++ .../app/services/realtime/utils.server.ts | 33 +++ apps/webapp/server.ts | 13 + .../authorizationRateLimitMiddleware.test.ts | 2 +- docker/docker-compose.yml | 2 +- internal-packages/testcontainers/src/utils.ts | 4 +- packages/core/package.json | 2 +- packages/core/src/v3/apiClient/runStream.ts | 58 ++-- packages/core/src/v3/apiClient/stream.ts | 127 ++++++--- packages/core/src/v3/runMetadata/manager.ts | 15 +- .../core/src/v3/runMetadata/metadataStream.ts | 76 +++--- packages/core/src/v3/utils/ioSerialization.ts | 23 +- packages/core/test/runStream.test.ts | 6 + packages/react-hooks/src/hooks/useRealtime.ts | 12 +- pnpm-lock.yaml | 180 +++++++++++- references/nextjs-realtime/package.json | 7 +- .../src/app/realtime/[id]/page.tsx | 18 ++ .../src/components/RunRealtimeComparison.tsx | 91 +++++++ .../components/TriggerButtonWithStreaming.tsx | 1 - .../src/components/ui/tabs.tsx | 55 ++++ references/nextjs-realtime/src/trigger/ai.ts | 1 - 27 files changed, 920 insertions(+), 181 deletions(-) create mode 100644 .changeset/rude-walls-help.md create mode 100644 apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts create mode 100644 apps/webapp/app/services/realtime/utils.server.ts create mode 100644 references/nextjs-realtime/src/app/realtime/[id]/page.tsx create mode 100644 references/nextjs-realtime/src/components/RunRealtimeComparison.tsx create mode 100644 references/nextjs-realtime/src/components/ui/tabs.tsx diff --git a/.changeset/rude-walls-help.md b/.changeset/rude-walls-help.md new file mode 100644 index 0000000000..6355f86587 --- /dev/null +++ b/.changeset/rude-walls-help.md @@ -0,0 +1,9 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/react-hooks": patch +--- + +- Fixes an issue in streams where "chunks" could get split across multiple reads +- Fixed stopping the run subscription after a run is finished, when using useRealtimeRun or useRealtimeRunWithStreams +- Added an `onComplete` callback to `useRealtimeRun` and `useRealtimeRunWithStreams` +- Optimized the run subscription to reduce unnecessary updates diff --git a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts index 766ca0367f..78f95e324d 100644 --- a/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts @@ -224,10 +224,15 @@ export class ApiRunListPresenter extends BasePresenter { const data: ListRunResponseItem[] = await Promise.all( results.runs.map(async (run) => { - const metadata = await parsePacket({ - data: run.metadata ?? undefined, - dataType: run.metadataType, - }); + const metadata = await parsePacket( + { + data: run.metadata ?? undefined, + dataType: run.metadataType, + }, + { + filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], + } + ); return { id: run.friendlyId, diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index 539a0c8110..d8516ca6ac 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -216,7 +216,7 @@ export class SpanPresenter extends BasePresenter { const metadata = run.metadata ? await prettyPrintPacket(run.metadata, run.metadataType, { - filteredKeys: ["$$streams", "$$streamsVersion"], + filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"], }) : undefined; diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index c370869d3f..99e9cdb8d7 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -1,6 +1,7 @@ import { ActionFunctionArgs } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; +import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server"; import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; @@ -16,7 +17,7 @@ export async function action({ request, params }: ActionFunctionArgs) { return new Response("No body provided", { status: 400 }); } - return v1RealtimeStreams.ingestData(request.body, $params.runId, $params.streamId); + return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId); } export const loader = createLoaderApiRoute( @@ -51,7 +52,7 @@ export const loader = createLoaderApiRoute( }, }, async ({ params, request, resource: run, authentication }) => { - return v1RealtimeStreams.streamResponse( + return relayRealtimeStreams.streamResponse( request, run.friendlyId, params.streamId, diff --git a/apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts index 07f428d5c2..9e1eed221e 100644 --- a/apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts @@ -37,13 +37,14 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder ): Promise { try { const textStream = stream.pipeThrough(new TextDecoderStream()); + const reader = textStream.getReader(); let sequence = 0; while (true) { const { done, value } = await reader.read(); - if (done) { + if (done || !value) { break; } @@ -53,25 +54,13 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder value, }); - const chunks = value - .split("\n") - .filter((chunk) => chunk) // Remove empty lines - .map((line) => { - return { - sequence: sequence++, - value: line, - }; - }); - - await this.options.prisma.realtimeStreamChunk.createMany({ - data: chunks.map((chunk) => { - return { - runId, - key: streamId, - sequence: chunk.sequence, - value: chunk.value, - }; - }), + await this.options.prisma.realtimeStreamChunk.create({ + data: { + runId, + key: streamId, + sequence: sequence++, + value, + }, }); } diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 808776bd06..6eaee46a33 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -1,7 +1,8 @@ -import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis"; +import Redis, { RedisOptions } from "ioredis"; +import { AuthenticatedEnvironment } from "../apiAuth.server"; import { logger } from "../logger.server"; import { StreamIngestor, StreamResponder } from "./types"; -import { AuthenticatedEnvironment } from "../apiAuth.server"; +import { LineTransformStream } from "./utils.server"; export type RealtimeStreamsOptions = { redis: RedisOptions | undefined; @@ -56,7 +57,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { controller.close(); return; } - controller.enqueue(`data: ${fields[1]}\n\n`); + controller.enqueue(fields[1]); if (signal.aborted) { controller.close(); @@ -88,7 +89,18 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { cancel: async () => { await cleanup(); }, - }); + }) + .pipeThrough(new LineTransformStream()) + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + for (const line of chunk) { + controller.enqueue(`data: ${line}\n\n`); + } + }, + }) + ) + .pipeThrough(new TextEncoderStream()); async function cleanup() { if (isCleanedUp) return; @@ -98,7 +110,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { signal.addEventListener("abort", cleanup); - return new Response(stream.pipeThrough(new TextEncoderStream()), { + return new Response(stream, { headers: { "Content-Type": "text/event-stream", "Cache-Control": "no-cache", @@ -119,7 +131,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { try { await redis.quit(); } catch (error) { - logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error }); + logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error }); } } @@ -127,42 +139,20 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { const textStream = stream.pipeThrough(new TextDecoderStream()); const reader = textStream.getReader(); - const batchSize = 10; - let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = []; - while (true) { const { done, value } = await reader.read(); - if (done) { + if (done || !value) { break; } - logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value }); - - const lines = value.split("\n"); - - for (const line of lines) { - if (line.trim()) { - batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]); + logger.debug("[RedisRealtimeStreams][ingestData] Reading data", { + streamKey, + runId, + value, + }); - if (batchCommands.length >= batchSize) { - const pipeline = redis.pipeline(); - for (const args of batchCommands) { - pipeline.xadd(...args); - } - await pipeline.exec(); - batchCommands = []; - } - } - } - } - - if (batchCommands.length > 0) { - const pipeline = redis.pipeline(); - for (const args of batchCommands) { - pipeline.xadd(...args); - } - await pipeline.exec(); + await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value); } await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL); diff --git a/apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts new file mode 100644 index 0000000000..46e16ff5e9 --- /dev/null +++ b/apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts @@ -0,0 +1,257 @@ +import { AuthenticatedEnvironment } from "../apiAuth.server"; +import { logger } from "../logger.server"; +import { StreamIngestor, StreamResponder } from "./types"; +import { LineTransformStream } from "./utils.server"; +import { v1RealtimeStreams } from "./v1StreamsGlobal.server"; +import { singleton } from "~/utils/singleton"; + +export type RelayRealtimeStreamsOptions = { + ttl: number; + cleanupInterval: number; + fallbackIngestor: StreamIngestor; + fallbackResponder: StreamResponder; + waitForBufferTimeout?: number; // Time to wait for buffer in ms (default: 500ms) + waitForBufferInterval?: number; // Polling interval in ms (default: 50ms) +}; + +interface RelayedStreamRecord { + stream: ReadableStream; + createdAt: number; + lastAccessed: number; + locked: boolean; + finalized: boolean; +} + +export class RelayRealtimeStreams implements StreamIngestor, StreamResponder { + private _buffers: Map = new Map(); + private cleanupInterval: NodeJS.Timeout; + private waitForBufferTimeout: number; + private waitForBufferInterval: number; + + constructor(private options: RelayRealtimeStreamsOptions) { + this.waitForBufferTimeout = options.waitForBufferTimeout ?? 1200; + this.waitForBufferInterval = options.waitForBufferInterval ?? 50; + + // Periodic cleanup + this.cleanupInterval = setInterval(() => { + this.cleanup(); + }, this.options.cleanupInterval).unref(); + } + + async streamResponse( + request: Request, + runId: string, + streamId: string, + environment: AuthenticatedEnvironment, + signal: AbortSignal + ): Promise { + let record = this._buffers.get(`${runId}:${streamId}`); + + if (!record) { + logger.debug( + "[RelayRealtimeStreams][streamResponse] No ephemeral record found, waiting to see if one becomes available", + { + streamId, + runId, + } + ); + + record = await this.waitForBuffer(`${runId}:${streamId}`); + + if (!record) { + logger.debug( + "[RelayRealtimeStreams][streamResponse] No ephemeral record found, using fallback", + { + streamId, + runId, + } + ); + + // No ephemeral record, use fallback + return this.options.fallbackResponder.streamResponse( + request, + runId, + streamId, + environment, + signal + ); + } + } + + // Only 1 reader of the stream can use the relayed stream, the rest should use the fallback + if (record.locked) { + logger.debug("[RelayRealtimeStreams][streamResponse] Stream already locked, using fallback", { + streamId, + runId, + }); + + return this.options.fallbackResponder.streamResponse( + request, + runId, + streamId, + environment, + signal + ); + } + + record.locked = true; + record.lastAccessed = Date.now(); + + logger.debug("[RelayRealtimeStreams][streamResponse] Streaming from ephemeral record", { + streamId, + runId, + }); + + // Create a streaming response from the buffered data + const stream = record.stream + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new LineTransformStream()) + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + for (const line of chunk) { + controller.enqueue(`data: ${line}\n\n`); + } + }, + }) + ) + .pipeThrough(new TextEncoderStream()); + + // Once we start streaming, consider deleting the buffer when done. + // For a simple approach, we can rely on finalized and no more reads. + // Or we can let TTL cleanup handle it if multiple readers might come in. + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + "x-trigger-relay-realtime-streams": "true", + }, + }); + } + + async ingestData( + stream: ReadableStream, + runId: string, + streamId: string + ): Promise { + const [localStream, fallbackStream] = stream.tee(); + + logger.debug("[RelayRealtimeStreams][ingestData] Ingesting data", { runId, streamId }); + + // Handle local buffering asynchronously and catch errors + this.handleLocalIngestion(localStream, runId, streamId).catch((err) => { + logger.error("[RelayRealtimeStreams][ingestData] Error in local ingestion:", { err }); + }); + + // Forward to the fallback ingestor asynchronously and catch errors + return this.options.fallbackIngestor.ingestData(fallbackStream, runId, streamId); + } + + /** + * Handles local buffering of the stream data. + * @param stream The readable stream to buffer. + * @param streamId The unique identifier for the stream. + */ + private async handleLocalIngestion( + stream: ReadableStream, + runId: string, + streamId: string + ) { + this.createOrUpdateRelayedStream(`${runId}:${streamId}`, stream); + } + + /** + * Retrieves an existing buffer or creates a new one for the given streamId. + * @param streamId The unique identifier for the stream. + */ + private createOrUpdateRelayedStream( + bufferKey: string, + stream: ReadableStream + ): RelayedStreamRecord { + let record = this._buffers.get(bufferKey); + if (!record) { + record = { + stream, + createdAt: Date.now(), + lastAccessed: Date.now(), + finalized: false, + locked: false, + }; + this._buffers.set(bufferKey, record); + } else { + record.lastAccessed = Date.now(); + } + return record; + } + + private cleanup() { + const now = Date.now(); + + logger.debug("[RelayRealtimeStreams][cleanup] Cleaning up old buffers", { + bufferCount: this._buffers.size, + }); + + for (const [key, record] of this._buffers.entries()) { + // If last accessed is older than ttl, clean up + if (now - record.lastAccessed > this.options.ttl) { + this.deleteBuffer(key); + } + } + + logger.debug("[RelayRealtimeStreams][cleanup] Cleaned up old buffers", { + bufferCount: this._buffers.size, + }); + } + + private deleteBuffer(bufferKey: string) { + this._buffers.delete(bufferKey); + } + + /** + * Waits for a buffer to be created within a specified timeout. + * @param streamId The unique identifier for the stream. + * @returns A promise that resolves to true if the buffer was created, false otherwise. + */ + private async waitForBuffer(bufferKey: string): Promise { + const timeout = this.waitForBufferTimeout; + const interval = this.waitForBufferInterval; + const maxAttempts = Math.ceil(timeout / interval); + let attempts = 0; + + return new Promise((resolve) => { + const checkBuffer = () => { + attempts++; + if (this._buffers.has(bufferKey)) { + resolve(this._buffers.get(bufferKey)); + return; + } + if (attempts >= maxAttempts) { + resolve(undefined); + return; + } + setTimeout(checkBuffer, interval); + }; + checkBuffer(); + }); + } + + // Don't forget to clear interval on shutdown if needed + close() { + clearInterval(this.cleanupInterval); + } +} + +function initializeRelayRealtimeStreams() { + return new RelayRealtimeStreams({ + ttl: 1000 * 60 * 5, // 5 minutes + cleanupInterval: 1000 * 60, // 1 minute + fallbackIngestor: v1RealtimeStreams, + fallbackResponder: v1RealtimeStreams, + }); +} + +export const relayRealtimeStreams = singleton( + "relayRealtimeStreams", + initializeRelayRealtimeStreams +); diff --git a/apps/webapp/app/services/realtime/utils.server.ts b/apps/webapp/app/services/realtime/utils.server.ts new file mode 100644 index 0000000000..9655878fe8 --- /dev/null +++ b/apps/webapp/app/services/realtime/utils.server.ts @@ -0,0 +1,33 @@ +export class LineTransformStream extends TransformStream { + private buffer = ""; + + constructor() { + super({ + transform: (chunk, controller) => { + // Append the chunk to the buffer + this.buffer += chunk; + + // Split on newlines + const lines = this.buffer.split("\n"); + + // The last element might be incomplete, hold it back in buffer + this.buffer = lines.pop() || ""; + + // Filter out empty or whitespace-only lines + const fullLines = lines.filter((line) => line.trim().length > 0); + + // If we got any complete lines, emit them as an array + if (fullLines.length > 0) { + controller.enqueue(fullLines); + } + }, + flush: (controller) => { + // On stream end, if there's leftover text, emit it as a single-element array + const trimmed = this.buffer.trim(); + if (trimmed.length > 0) { + controller.enqueue([trimmed]); + } + }, + }); + } +} diff --git a/apps/webapp/server.ts b/apps/webapp/server.ts index aec725c801..26e30343a2 100644 --- a/apps/webapp/server.ts +++ b/apps/webapp/server.ts @@ -81,6 +81,19 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") { }); if (process.env.DASHBOARD_AND_API_DISABLED !== "true") { + if (process.env.ALLOW_ONLY_REALTIME_API === "true") { + // Block all requests that do not start with /realtime + app.use((req, res, next) => { + // Make sure /healthcheck is still accessible + if (!req.url.startsWith("/realtime") && req.url !== "/healthcheck") { + res.status(404).send("Not Found"); + return; + } + + next(); + }); + } + app.use(apiRateLimiter); app.all( diff --git a/apps/webapp/test/authorizationRateLimitMiddleware.test.ts b/apps/webapp/test/authorizationRateLimitMiddleware.test.ts index a089a6700c..c599a4ddea 100644 --- a/apps/webapp/test/authorizationRateLimitMiddleware.test.ts +++ b/apps/webapp/test/authorizationRateLimitMiddleware.test.ts @@ -15,7 +15,7 @@ import express, { Express } from "express"; import request from "supertest"; import { authorizationRateLimitMiddleware } from "../app/services/authorizationRateLimitMiddleware.server.js"; -describe("authorizationRateLimitMiddleware", () => { +describe.skipIf(process.env.GITHUB_ACTIONS)("authorizationRateLimitMiddleware", () => { let app: Express; beforeEach(() => { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5b9c55333f..fd9e0014af 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -65,7 +65,7 @@ services: - 6379:6379 electric: - image: electricsql/electric:0.9.4 + image: electricsql/electric:1.0.0-beta.1@sha256:2262f6f09caf5fa45f233731af97b84999128170a9529e5f9b9b53642308493f restart: always environment: DATABASE_URL: postgresql://postgres:postgres@database:5432/postgres?sslmode=disable diff --git a/internal-packages/testcontainers/src/utils.ts b/internal-packages/testcontainers/src/utils.ts index 734a5e5625..140628630d 100644 --- a/internal-packages/testcontainers/src/utils.ts +++ b/internal-packages/testcontainers/src/utils.ts @@ -55,7 +55,9 @@ export async function createElectricContainer( network.getName() )}:5432/${postgresContainer.getDatabase()}?sslmode=disable`; - const container = await new GenericContainer("electricsql/electric:0.9.4") + const container = await new GenericContainer( + "electricsql/electric:1.0.0-beta.1@sha256:2262f6f09caf5fa45f233731af97b84999128170a9529e5f9b9b53642308493f" + ) .withExposedPorts(3000) .withNetwork(network) .withEnvironment({ diff --git a/packages/core/package.json b/packages/core/package.json index 6eab0c694b..82e2b94607 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -182,7 +182,7 @@ "check-exports": "attw --pack ." }, "dependencies": { - "@electric-sql/client": "0.9.0", + "@electric-sql/client": "1.0.0-beta.1", "@google-cloud/precise-date": "^4.0.0", "@jsonhero/path": "^1.0.21", "@opentelemetry/api": "1.9.0", diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index ac88270293..c1bb7c93ce 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -16,7 +16,12 @@ import { } from "../utils/ioSerialization.js"; import { ApiError } from "./errors.js"; import { ApiClient } from "./index.js"; -import { AsyncIterableStream, createAsyncIterableReadable, zodShapeStream } from "./stream.js"; +import { + AsyncIterableStream, + createAsyncIterableReadable, + LineTransformStream, + zodShapeStream, +} from "./stream.js"; export type RunShape = TRunTypes extends AnyRunTypes ? { @@ -111,11 +116,14 @@ export function runShapeStream( { once: true } ); + const runStreamInstance = zodShapeStream(SubscribeRunRawShape, url, { + ...options, + signal: abortController.signal, + }); + const $options: RunSubscriptionOptions = { - runShapeStream: zodShapeStream(SubscribeRunRawShape, url, { - ...options, - signal: abortController.signal, - }), + runShapeStream: runStreamInstance.stream, + stopRunShapeStream: runStreamInstance.stop, streamFactory: new VersionedStreamSubscriptionFactory(version1, version2), abortController, ...options, @@ -209,13 +217,24 @@ export class ElectricStreamSubscription implements StreamSubscription { ) {} async subscribe(): Promise> { - return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough( - new TransformStream({ - transform(chunk, controller) { - controller.enqueue(safeParseJSON(chunk.value)); - }, - }) - ); + return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options) + .stream.pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(chunk.value); + }, + }) + ) + .pipeThrough(new LineTransformStream()) + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + for (const line of chunk) { + controller.enqueue(safeParseJSON(line)); + } + }, + }) + ); } } @@ -261,12 +280,15 @@ export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFac const version = typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1"; + const $baseUrl = + typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl; + if (version === "v1") { - return this.version1.createSubscription(metadata, runId, streamKey, baseUrl); + return this.version1.createSubscription(metadata, runId, streamKey, $baseUrl); } if (version === "v2") { - return this.version2.createSubscription(metadata, runId, streamKey, baseUrl); + return this.version2.createSubscription(metadata, runId, streamKey, $baseUrl); } throw new Error(`Unknown stream version: ${version}`); @@ -279,12 +301,12 @@ export interface RunShapeProvider { export type RunSubscriptionOptions = RunShapeStreamOptions & { runShapeStream: ReadableStream; + stopRunShapeStream: () => void; streamFactory: StreamSubscriptionFactory; abortController: AbortController; }; export class RunSubscription { - private unsubscribeShape?: () => void; private stream: AsyncIterableStream>; private packetCache = new Map(); private _closeOnComplete: boolean; @@ -309,9 +331,7 @@ export class RunSubscription { this._isRunComplete && !this.options.abortController.signal.aborted ) { - console.log("Closing stream because run is complete"); - - this.options.abortController.abort(); + this.options.stopRunShapeStream(); } }, }, @@ -323,7 +343,7 @@ export class RunSubscription { if (!this.options.abortController.signal.aborted) { this.options.abortController.abort(); } - this.unsubscribeShape?.(); + this.options.stopRunShapeStream(); } [Symbol.asyncIterator](): AsyncIterator> { diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index fd975a5416..396f242186 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -8,7 +8,6 @@ import { type Message, type Row, type ShapeStreamInterface, - // @ts-ignore it's safe to import types from the client } from "@electric-sql/client"; export type ZodShapeStreamOptions = { @@ -17,25 +16,40 @@ export type ZodShapeStreamOptions = { signal?: AbortSignal; }; +export type ZodShapeStreamInstance = { + stream: AsyncIterableStream>; + stop: () => void; +}; + export function zodShapeStream( schema: TShapeSchema, url: string, options?: ZodShapeStreamOptions -) { - const stream = new ShapeStream>({ +): ZodShapeStreamInstance { + const abortController = new AbortController(); + + options?.signal?.addEventListener( + "abort", + () => { + abortController.abort(); + }, + { once: true } + ); + + const shapeStream = new ShapeStream({ url, headers: { ...options?.headers, - "x-trigger-electric-version": "0.8.1", + "x-trigger-electric-version": "1.0.0-beta.1", }, fetchClient: options?.fetchClient, - signal: options?.signal, + signal: abortController.signal, }); - const readableShape = new ReadableShapeStream(stream); + const readableShape = new ReadableShapeStream(shapeStream); - return readableShape.stream.pipeThrough( - new TransformStream({ + const stream = readableShape.stream.pipeThrough( + new TransformStream>({ async transform(chunk, controller) { const result = schema.safeParse(chunk); @@ -47,6 +61,13 @@ export function zodShapeStream( }, }) ); + + return { + stream: stream as AsyncIterableStream>, + stop: () => { + abortController.abort(); + }, + }; } export type AsyncIterableStream = AsyncIterable & ReadableStream; @@ -105,6 +126,11 @@ class ReadableShapeStream = Row> { readonly #currentState: Map = new Map(); readonly #changeStream: AsyncIterableStream; #error: FetchError | false = false; + #unsubscribe?: () => void; + + stop() { + this.#unsubscribe?.(); + } constructor(stream: ShapeStreamInterface) { this.#stream = stream; @@ -112,7 +138,7 @@ class ReadableShapeStream = Row> { // Create the source stream that will receive messages const source = new ReadableStream[]>({ start: (controller) => { - this.#stream.subscribe( + this.#unsubscribe = this.#stream.subscribe( (messages) => controller.enqueue(messages), this.#handleError.bind(this) ); @@ -122,41 +148,44 @@ class ReadableShapeStream = Row> { // Create the transformed stream that processes messages and emits complete rows this.#changeStream = createAsyncIterableStream(source, { transform: (messages, controller) => { - messages.forEach((message) => { + const updatedKeys = new Set(); + + for (const message of messages) { if (isChangeMessage(message)) { + const key = message.key; switch (message.headers.operation) { case "insert": { - this.#currentState.set(message.key, message.value); - controller.enqueue(message.value); + // New row entirely + this.#currentState.set(key, message.value); + updatedKeys.add(key); break; } case "update": { - const existingRow = this.#currentState.get(message.key); - if (existingRow) { - const updatedRow = { - ...existingRow, - ...message.value, - }; - this.#currentState.set(message.key, updatedRow); - controller.enqueue(updatedRow); - } else { - this.#currentState.set(message.key, message.value); - controller.enqueue(message.value); - } + // Merge updates into existing row if any, otherwise treat as new + const existingRow = this.#currentState.get(key); + const updatedRow = existingRow + ? { ...existingRow, ...message.value } + : message.value; + this.#currentState.set(key, updatedRow); + updatedKeys.add(key); break; } } + } else if (isControlMessage(message)) { + if (message.headers.control === "must-refetch") { + this.#currentState.clear(); + this.#error = false; + } } + } - if (isControlMessage(message)) { - switch (message.headers.control) { - case "must-refetch": - this.#currentState.clear(); - this.#error = false; - break; - } + // Now enqueue only one updated row per key, after all messages have been processed. + for (const key of updatedKeys) { + const finalRow = this.#currentState.get(key); + if (finalRow) { + controller.enqueue(finalRow); } - }); + } }, }); } @@ -203,3 +232,37 @@ class ReadableShapeStream = Row> { } } } + +export class LineTransformStream extends TransformStream { + private buffer = ""; + + constructor() { + super({ + transform: (chunk, controller) => { + // Append the chunk to the buffer + this.buffer += chunk; + + // Split on newlines + const lines = this.buffer.split("\n"); + + // The last element might be incomplete, hold it back in buffer + this.buffer = lines.pop() || ""; + + // Filter out empty or whitespace-only lines + const fullLines = lines.filter((line) => line.trim().length > 0); + + // If we got any complete lines, emit them as an array + if (fullLines.length > 0) { + controller.enqueue(fullLines); + } + }, + flush: (controller) => { + // On stream end, if there's leftover text, emit it as a single-element array + const trimmed = this.buffer.trim(); + if (trimmed.length > 0) { + controller.enqueue([trimmed]); + } + }, + }); + } +} diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index ea04d3692b..470640fbed 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -230,16 +230,10 @@ export class StandardMetadataManager implements RunMetadataManager { } try { - // Add the key to the special stream metadata object - this.appendKey(`$$streams`, key); - this.setKey("$$streamsVersion", this.streamsVersion); - - await this.flush(); - const streamInstance = new MetadataStream({ key, runId: this.runId, - iterator: $value[Symbol.asyncIterator](), + source: $value, baseUrl: this.streamsBaseUrl, headers: this.apiClient.getHeaders(), signal, @@ -251,6 +245,13 @@ export class StandardMetadataManager implements RunMetadataManager { // Clean up when stream completes streamInstance.wait().finally(() => this.activeStreams.delete(key)); + // Add the key to the special stream metadata object + this.appendKey(`$$streams`, key); + this.setKey("$$streamsVersion", this.streamsVersion); + this.setKey("$$streamsBaseUrl", this.streamsBaseUrl); + + await this.flush(); + return streamInstance; } catch (error) { // Clean up metadata key if stream creation fails diff --git a/packages/core/src/v3/runMetadata/metadataStream.ts b/packages/core/src/v3/runMetadata/metadataStream.ts index dfd9965c07..1d6143f5cf 100644 --- a/packages/core/src/v3/runMetadata/metadataStream.ts +++ b/packages/core/src/v3/runMetadata/metadataStream.ts @@ -2,7 +2,7 @@ export type MetadataOptions = { baseUrl: string; runId: string; key: string; - iterator: AsyncIterator; + source: AsyncIterable; headers?: Record; signal?: AbortSignal; version?: "v1" | "v2"; @@ -10,57 +10,40 @@ export type MetadataOptions = { export class MetadataStream { private controller = new AbortController(); - private serverQueue: Array>> = []; - private consumerQueue: Array>> = []; - private serverIterator: AsyncIterator; - private consumerIterator: AsyncIterator; + private serverStream: ReadableStream; + private consumerStream: ReadableStream; private streamPromise: Promise; constructor(private options: MetadataOptions) { - const { serverIterator, consumerIterator } = this.createTeeIterators(); - this.serverIterator = serverIterator; - this.consumerIterator = consumerIterator; + const [serverStream, consumerStream] = this.createTeeStreams(); + this.serverStream = serverStream; + this.consumerStream = consumerStream; this.streamPromise = this.initializeServerStream(); } - private createTeeIterators() { - const teeIterator = (queue: Array>>): AsyncIterator => ({ - next: () => { - if (queue.length === 0) { - const result = this.options.iterator.next(); - this.serverQueue.push(result); - this.consumerQueue.push(result); + private createTeeStreams() { + const readableSource = new ReadableStream({ + start: async (controller) => { + for await (const value of this.options.source) { + controller.enqueue(value); } - return queue.shift()!; + + controller.close(); }, }); - return { - serverIterator: teeIterator(this.serverQueue), - consumerIterator: teeIterator(this.consumerQueue), - }; + return readableSource.tee(); } - private initializeServerStream(): Promise { - const serverIterator = this.serverIterator; - - const serverStream = new ReadableStream({ - async pull(controller) { - try { - const { value, done } = await serverIterator.next(); - if (done) { - controller.close(); - return; - } - - controller.enqueue(JSON.stringify(value) + "\n"); - } catch (err) { - controller.error(err); - } - }, - cancel: () => this.controller.abort(), - }); + private initializeServerStream(): Promise { + const serverStream = this.serverStream.pipeThrough( + new TransformStream({ + async transform(chunk, controller) { + controller.enqueue(JSON.stringify(chunk) + "\n"); + }, + }) + ); return fetch( `${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${ @@ -82,6 +65,19 @@ export class MetadataStream { } public [Symbol.asyncIterator]() { - return this.consumerIterator; + return streamToAsyncIterator(this.consumerStream); + } +} + +async function* streamToAsyncIterator(stream: ReadableStream): AsyncIterableIterator { + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) return; + yield value; + } + } finally { + reader.releaseLock(); } } diff --git a/packages/core/src/v3/utils/ioSerialization.ts b/packages/core/src/v3/utils/ioSerialization.ts index b369d02ffe..8fbfa2e537 100644 --- a/packages/core/src/v3/utils/ioSerialization.ts +++ b/packages/core/src/v3/utils/ioSerialization.ts @@ -14,14 +14,18 @@ export type IOPacket = { dataType: string; }; -export async function parsePacket(value: IOPacket): Promise { +export type ParsePacketOptions = { + filteredKeys?: string[]; +}; + +export async function parsePacket(value: IOPacket, options?: ParsePacketOptions): Promise { if (!value.data) { return undefined; } switch (value.dataType) { case "application/json": - return JSON.parse(value.data); + return JSON.parse(value.data, makeSafeReviver(options)); case "application/super+json": const { parse } = await loadSuperJSON(); @@ -400,6 +404,21 @@ function makeSafeReplacer(options?: ReplacerOptions) { }; } +function makeSafeReviver(options?: ReplacerOptions) { + if (!options) { + return undefined; + } + + return function reviver(key: string, value: any) { + // Check if the key should be filtered out + if (options?.filteredKeys?.includes(key)) { + return undefined; + } + + return value; + }; +} + function getPacketExtension(outputType: string): string { switch (outputType) { case "application/json": diff --git a/packages/core/test/runStream.test.ts b/packages/core/test/runStream.test.ts index 3775246311..6924d07681 100644 --- a/packages/core/test/runStream.test.ts +++ b/packages/core/test/runStream.test.ts @@ -103,6 +103,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory: new TestStreamSubscriptionFactory(), closeOnComplete: true, abortController: new AbortController(), @@ -143,6 +144,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory: new TestStreamSubscriptionFactory(), closeOnComplete: true, abortController: new AbortController(), @@ -196,6 +198,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createDelayedTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory: new TestStreamSubscriptionFactory(), closeOnComplete: false, abortController: new AbortController(), @@ -249,6 +252,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory, abortController: new AbortController(), }); @@ -339,6 +343,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory, abortController: new AbortController(), }); @@ -419,6 +424,7 @@ describe("RunSubscription", () => { const subscription = new RunSubscription({ runShapeStream: createTestShapeStream(shapes), + stopRunShapeStream: () => {}, streamFactory, abortController: new AbortController(), }); diff --git a/packages/react-hooks/src/hooks/useRealtime.ts b/packages/react-hooks/src/hooks/useRealtime.ts index 9481125cff..8c370666a5 100644 --- a/packages/react-hooks/src/hooks/useRealtime.ts +++ b/packages/react-hooks/src/hooks/useRealtime.ts @@ -109,10 +109,13 @@ export function useRealtimeRun( } }, [runId, mutateRun, abortControllerRef, apiClient, setError]); + const hasCalledOnCompleteRef = useRef(false); + // Effect to handle onComplete callback useEffect(() => { - if (isComplete && options?.onComplete && run) { + if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) { options.onComplete(run, error); + hasCalledOnCompleteRef.current = true; } }, [isComplete, run, error, options?.onComplete]); @@ -261,10 +264,13 @@ export function useRealtimeRunWithStreams< } }, [runId, mutateRun, mutateStreams, streamsRef, abortControllerRef, apiClient, setError]); + const hasCalledOnCompleteRef = useRef(false); + // Effect to handle onComplete callback useEffect(() => { - if (isComplete && options?.onComplete && run) { + if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) { options.onComplete(run, error); + hasCalledOnCompleteRef.current = true; } }, [isComplete, run, error, options?.onComplete]); @@ -593,7 +599,7 @@ async function processRealtimeRunWithStreams< nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks]; } - await mutateStreamData(nextStreamData); + mutateStreamData(nextStreamData); }, throttleInMs); for await (const part of subscription.withStreams()) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6369c25fc0..a35e106532 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1263,8 +1263,8 @@ importers: packages/core: dependencies: '@electric-sql/client': - specifier: 0.9.0 - version: 0.9.0 + specifier: 1.0.0-beta.1 + version: 1.0.0-beta.1 '@google-cloud/precise-date': specifier: ^4.0.0 version: 4.0.0 @@ -1619,8 +1619,11 @@ importers: '@radix-ui/react-slot': specifier: ^1.1.0 version: 1.1.0(@types/react@18.3.1)(react@18.3.1) + '@radix-ui/react-tabs': + specifier: ^1.0.3 + version: 1.0.3(react-dom@18.2.0)(react@18.3.1) '@trigger.dev/react-hooks': - specifier: workspace:^3 + specifier: workspace:^ version: link:../../packages/react-hooks '@trigger.dev/sdk': specifier: workspace:^3 @@ -1637,6 +1640,9 @@ importers: clsx: specifier: ^2.1.1 version: 2.1.1 + date-fns: + specifier: ^4.1.0 + version: 4.1.0 lucide-react: specifier: ^0.451.0 version: 0.451.0(react@18.3.1) @@ -5112,8 +5118,8 @@ packages: '@rollup/rollup-darwin-arm64': 4.21.3 dev: false - /@electric-sql/client@0.9.0: - resolution: {integrity: sha512-UL2Gep9wPdGMTE0oEWVi0HA8R293R2OzFfHeAsN2LABYYl/boXss7nseNEiIV5+RjHPH7Tm8NsjH9iJW2rZkrQ==} + /@electric-sql/client@1.0.0-beta.1: + resolution: {integrity: sha512-Ei9jN3pDoGzc+a/bGqnB5ajb52IvSv7/n2btuyzUlcOHIR2kM9fqtYTJXPwZYKLkGZlHWlpHgWyRtrinkP2nHg==} optionalDependencies: '@rollup/rollup-darwin-arm64': 4.21.3 dev: false @@ -8971,6 +8977,21 @@ packages: react-dom: 18.2.0(react@18.2.0) dev: false + /@radix-ui/react-collection@1.0.2(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-s8WdQQ6wNXpaxdZ308KSr8fEWGrg4un8i4r/w7fhiS4ElRNjk5rRcl0/C6TANG2LvLOGIxtzo/jAg6Qf73TEBw==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + react-dom: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-compose-refs': 1.0.0(react@18.3.1) + '@radix-ui/react-context': 1.0.0(react@18.3.1) + '@radix-ui/react-primitive': 1.0.2(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-slot': 1.0.1(react@18.3.1) + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-collection@1.0.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-3SzW+0PW7yBBoQlT8wNcGtaxaD0XSu0uLUFgrtHY08Acx05TaHaOmVLR73c0j/cqpDy53KBMO7s0dx2wmOIDIA==} peerDependencies: @@ -9028,6 +9049,15 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-compose-refs@1.0.0(react@18.3.1): + resolution: {integrity: sha512-0KaSv6sx787/hK3eF53iOkiSLwAGlFMx5lotrqD2pTjB18KbybKoEIgkNZTKC60YECDQTKGTRcDBILwZVqVKvA==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + react: 18.3.1 + dev: false + /@radix-ui/react-compose-refs@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-fDSBgd44FKHa1FRMU59qBMPFcl2PZE+2nmqunj+BWFyYYjnhIDWL2ItDs3rrbJDQOtzt5nIebLCQc4QRfz6LJw==} peerDependencies: @@ -9105,6 +9135,15 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-context@1.0.0(react@18.3.1): + resolution: {integrity: sha512-1pVM9RfOQ+n/N5PJK33kRSKsr1glNxomxONs5c49MliinBY6Yw2Q995qfBUUo0/Mbg05B/sGA0gkgPI7kmSHBg==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + react: 18.3.1 + dev: false + /@radix-ui/react-context@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-ebbrdFoYTcuZ0v4wG5tedGnp9tzcV8awzsxYph7gXUyvnNLuTIcCk1q17JEbnVhXAKG9oX3KtchwiMIAYp9NLg==} peerDependencies: @@ -9250,6 +9289,15 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-direction@1.0.0(react@18.3.1): + resolution: {integrity: sha512-2HV05lGUgYcA6xgLQ4BKPDmtL+QbIZYH5fCOTAOOcJ5O0QbWS3i9lKaurLzliYUDhORI2Qr3pyjhJh44lKA3rQ==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + react: 18.3.1 + dev: false + /@radix-ui/react-direction@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-RXcvnXgyvYvBEOhCBuddKecVkoMiI10Jcm5cTI7abJRAHYfFxeu+FBQs/DvdxSYucxR5mna0dNsL6QFlds5TMA==} peerDependencies: @@ -9520,6 +9568,16 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-id@1.0.0(react@18.3.1): + resolution: {integrity: sha512-Q6iAB/U7Tq3NTolBBQbHTgclPmGWE3OlktGGqrClPozSw4vkQ1DfQAOtzgRPecKsMdJINE05iaoDUG8tRzCBjw==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-use-layout-effect': 1.0.0(react@18.3.1) + react: 18.3.1 + dev: false + /@radix-ui/react-id@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-tI7sT/kqYp8p96yGWY1OAnLHrqDgzHefRBKQ2YAkBS5ja7QLcZ9Z/uY7bEjPUatf8RomoXM8/1sMj1IJaE5UzQ==} peerDependencies: @@ -9796,6 +9854,19 @@ packages: react-dom: 18.2.0(react@18.2.0) dev: false + /@radix-ui/react-presence@1.0.0(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-A+6XEvN01NfVWiKu38ybawfHsBjWum42MRPnEuqPsBZ4eV7e/7K321B5VgYMPv3Xx5An6o1/l9ZuDBgmcmWK3w==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + react-dom: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-compose-refs': 1.0.0(react@18.3.1) + '@radix-ui/react-use-layout-effect': 1.0.0(react@18.3.1) + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-presence@1.0.1(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-UXLW4UAbIY5ZjcvzjfRFo5gxva8QirC9hF7wRE4U5gz+TP0DbRk+//qyuAQ1McDxBt1xNMBTaciFGvEmJvAZCg==} peerDependencies: @@ -9873,6 +9944,18 @@ packages: react-dom: 18.2.0(react@18.2.0) dev: false + /@radix-ui/react-primitive@1.0.2(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-zY6G5Qq4R8diFPNwtyoLRZBxzu1Z+SXMlfYpChN7Dv8gvmx9X3qhDqiLWvKseKVJMuedFeU/Sa0Sy/Ia+t06Dw==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + react-dom: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-slot': 1.0.1(react@18.3.1) + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-primitive@1.0.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-yi58uVyoAcK/Nq1inRY56ZSjKypBNKTa/1mcL8qdl6oJeEaDbOldlzrGn7P6Q3Id5d+SYNGc5AJgc4vGhjs5+g==} peerDependencies: @@ -9985,6 +10068,26 @@ packages: react-dom: 18.2.0(react@18.2.0) dev: false + /@radix-ui/react-roving-focus@1.0.3(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-stjCkIoMe6h+1fWtXlA6cRfikdBzCLp3SnVk7c48cv/uy3DTGoXhN76YaOYUJuy3aEDvDIKwKR5KSmvrtPvQPQ==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + react-dom: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/primitive': 1.0.0 + '@radix-ui/react-collection': 1.0.2(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-compose-refs': 1.0.0(react@18.3.1) + '@radix-ui/react-context': 1.0.0(react@18.3.1) + '@radix-ui/react-direction': 1.0.0(react@18.3.1) + '@radix-ui/react-id': 1.0.0(react@18.3.1) + '@radix-ui/react-primitive': 1.0.2(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-use-callback-ref': 1.0.0(react@18.3.1) + '@radix-ui/react-use-controllable-state': 1.0.0(react@18.3.1) + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-roving-focus@1.0.4(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-2mUg5Mgcu001VkGy+FfzZyzbmuUWzgWkj3rvv4yu+mLw03+mTzbxZHvfcGyFp2b8EkQeMkpRQ5FiA2Vr2O6TeQ==} peerDependencies: @@ -10146,6 +10249,16 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-slot@1.0.1(react@18.3.1): + resolution: {integrity: sha512-avutXAFL1ehGvAXtPquu0YK5oz6ctS474iM3vNGQIkswrVhdrS52e3uoMQBzZhNRAIE0jBnUyXWNmSjGHhCFcw==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-compose-refs': 1.0.0(react@18.3.1) + react: 18.3.1 + dev: false + /@radix-ui/react-slot@1.0.2(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-YeTpuq4deV+6DusvVUW4ivBgnkHwECUu0BiN43L5UCDFgdhsRUWAghhTF5MbvNTPzmiFOx90asDSUjWuCNapwg==} peerDependencies: @@ -10251,6 +10364,25 @@ packages: react-dom: 18.2.0(react@18.2.0) dev: false + /@radix-ui/react-tabs@1.0.3(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-4CkF/Rx1GcrusI/JZ1Rvyx4okGUs6wEenWA0RG/N+CwkRhTy7t54y7BLsWUXrAz/GRbBfHQg/Odfs/RoW0CiRA==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + react-dom: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/primitive': 1.0.0 + '@radix-ui/react-context': 1.0.0(react@18.3.1) + '@radix-ui/react-direction': 1.0.0(react@18.3.1) + '@radix-ui/react-id': 1.0.0(react@18.3.1) + '@radix-ui/react-presence': 1.0.0(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-primitive': 1.0.2(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-roving-focus': 1.0.3(react-dom@18.2.0)(react@18.3.1) + '@radix-ui/react-use-controllable-state': 1.0.0(react@18.3.1) + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-toggle-group@1.0.4(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1): resolution: {integrity: sha512-Uaj/M/cMyiyT9Bx6fOZO0SAG4Cls0GptBWiBmBxofmDbNVnYYoyRWj/2M/6VCi/7qcXFWnHhRUfdfZFvvkuu8A==} peerDependencies: @@ -10367,6 +10499,15 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-use-callback-ref@1.0.0(react@18.3.1): + resolution: {integrity: sha512-GZtyzoHz95Rhs6S63D2t/eqvdFCm7I+yHMLVQheKM7nBD8mbZIt+ct1jz4536MDnaOGKIxynJ8eHTkVGVVkoTg==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + react: 18.3.1 + dev: false + /@radix-ui/react-use-callback-ref@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-D94LjX4Sp0xJFVaoQOd3OO9k7tpBYNOXdVhkltUbGv2Qb9OXdrg/CpsjlZv7ia14Sylv398LswWBVVu5nqKzAQ==} peerDependencies: @@ -10418,6 +10559,16 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-use-controllable-state@1.0.0(react@18.3.1): + resolution: {integrity: sha512-FohDoZvk3mEXh9AWAVyRTYR4Sq7/gavuofglmiXB2g1aKyboUD4YtgWxKj8O5n+Uak52gXQ4wKz5IFST4vtJHg==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + '@radix-ui/react-use-callback-ref': 1.0.0(react@18.3.1) + react: 18.3.1 + dev: false + /@radix-ui/react-use-controllable-state@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-Svl5GY5FQeN758fWKrjM6Qb7asvXeiZltlT4U2gVfl8Gx5UAv2sMR0LWo8yhsIZh2oQ0eFdZ59aoOOMV7b47VA==} peerDependencies: @@ -10497,6 +10648,15 @@ packages: react: 18.2.0 dev: false + /@radix-ui/react-use-layout-effect@1.0.0(react@18.3.1): + resolution: {integrity: sha512-6Tpkq+R6LOlmQb1R5NNETLG0B4YP0wc+klfXafpUCj6JGyaUc8il7/kUZ7m59rGbXGczE9Bs+iz2qloqsZBduQ==} + peerDependencies: + react: ^16.8 || ^17.0 || ^18.0 + dependencies: + '@babel/runtime': 7.24.5 + react: 18.3.1 + dev: false + /@radix-ui/react-use-layout-effect@1.0.1(@types/react@18.2.69)(react@18.2.0): resolution: {integrity: sha512-v/5RegiJWYdoCvMnITBkNNx6bCj20fiaJnWtRkU18yITptraXjffz5Qbn05uOiQnOvi+dbkznkoaMltz1GnszQ==} peerDependencies: @@ -17367,7 +17527,7 @@ packages: resolution: {integrity: sha512-Cg7TFGpIr01vOQNODXOOaGz2NpCU5gl8x1qJFbb6hbZxR7XrcE2vtbAsTAbJ7/xwJtUuJEw8K8Zr/AE0LHlesg==} engines: {node: '>=10', npm: '>=6'} dependencies: - '@babel/runtime': 7.22.5 + '@babel/runtime': 7.24.5 cosmiconfig: 7.1.0 resolve: 1.22.8 dev: true @@ -18889,6 +19049,10 @@ packages: resolution: {integrity: sha512-fRHTG8g/Gif+kSh50gaGEdToemgfj74aRX3swtiouboip5JDLAyDE9F11nHMIcvOaXeOC6D7SpNhi7uFyB7Uww==} dev: false + /date-fns@4.1.0: + resolution: {integrity: sha512-Ukq0owbQXxa/U3EGtsdVBkR1w7KOQ5gIBqdH2hkvknzZPYvBxb/aa6E8L7tmjFtkwZBu3UXBbjIgPo/Ez4xaNg==} + dev: false + /dayjs@1.11.13: resolution: {integrity: sha512-oaMBel6gjolK862uaPQOVTA7q3TZhuSvuMQAAglQDOWYO9A91IrAOUJEyKVlqJlHE0vq5p5UXxzdPfMH/x6xNg==} dev: false @@ -20268,7 +20432,7 @@ packages: peerDependencies: eslint: ^6.8.0 || ^7.0.0 || ^8.0.0 dependencies: - '@babel/runtime': 7.22.5 + '@babel/runtime': 7.24.5 '@testing-library/dom': 8.19.1 eslint: 8.31.0 requireindex: 1.2.0 @@ -20301,7 +20465,7 @@ packages: peerDependencies: eslint: ^3 || ^4 || ^5 || ^6 || ^7 || ^8 dependencies: - '@babel/runtime': 7.22.5 + '@babel/runtime': 7.24.5 aria-query: 5.3.0 array-includes: 3.1.6 array.prototype.flatmap: 1.3.1 diff --git a/references/nextjs-realtime/package.json b/references/nextjs-realtime/package.json index 7a859d9dce..8985fc2a32 100644 --- a/references/nextjs-realtime/package.json +++ b/references/nextjs-realtime/package.json @@ -7,7 +7,8 @@ "build": "next build", "start": "next start", "lint": "next lint", - "dev:trigger": "trigger dev" + "dev:trigger": "trigger dev", + "deploy": "trigger deploy" }, "dependencies": { "@ai-sdk/openai": "^1.0.1", @@ -16,12 +17,14 @@ "@radix-ui/react-icons": "^1.3.0", "@radix-ui/react-scroll-area": "^1.2.0", "@radix-ui/react-slot": "^1.1.0", - "@trigger.dev/react-hooks": "workspace:^3", + "@radix-ui/react-tabs": "^1.0.3", + "@trigger.dev/react-hooks": "workspace:^", "@trigger.dev/sdk": "workspace:^3", "@uploadthing/react": "^7.0.3", "ai": "^4.0.0", "class-variance-authority": "^0.7.0", "clsx": "^2.1.1", + "date-fns": "^4.1.0", "lucide-react": "^0.451.0", "next": "14.2.15", "openai": "^4.68.4", diff --git a/references/nextjs-realtime/src/app/realtime/[id]/page.tsx b/references/nextjs-realtime/src/app/realtime/[id]/page.tsx new file mode 100644 index 0000000000..cd178367d6 --- /dev/null +++ b/references/nextjs-realtime/src/app/realtime/[id]/page.tsx @@ -0,0 +1,18 @@ +import RunRealtimeComparison from "@/components/RunRealtimeComparison"; +import { auth } from "@trigger.dev/sdk/v3"; + +export default async function RunRealtimeComparisonPage({ params }: { params: { id: string } }) { + const accessToken = await auth.createPublicToken({ + scopes: { + read: { + runs: params.id, + }, + }, + }); + + return ( +
+ +
+ ); +} diff --git a/references/nextjs-realtime/src/components/RunRealtimeComparison.tsx b/references/nextjs-realtime/src/components/RunRealtimeComparison.tsx new file mode 100644 index 0000000000..d102b1af5e --- /dev/null +++ b/references/nextjs-realtime/src/components/RunRealtimeComparison.tsx @@ -0,0 +1,91 @@ +"use client"; + +import { Button } from "@/components/ui/button"; +import type { STREAMS, openaiStreaming } from "@/trigger/ai"; +import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks"; + +export default function RealtimeComparison({ + accessToken, + runId, +}: { + accessToken: string; + runId: string; +}) { + const { streams, stop, run } = useRealtimeRunWithStreams(runId, { + accessToken: accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + onComplete: (...args) => { + console.log("Run completed!", args); + }, + }); + + console.log("run", run); + + return ( +
+
+ + + {run && ( + + )} +
+
+
+ + + + + + + + + {(streams.openai ?? []).map((part, i) => ( + + + + + ))} + +
IDData
{i + 1} +
+ {JSON.stringify(part)} +
+
+
+
+ + + + + + + + + {(streams.openaiText ?? []).map((text, i) => ( + + + + + ))} + +
IDData
{i + 1} +
{text}
+
+
+
+
+ ); +} diff --git a/references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx b/references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx index 07895daf90..38ee631287 100644 --- a/references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx +++ b/references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx @@ -23,7 +23,6 @@ export default function TriggerButton({ accessToken }: { accessToken: string }) >("openai-streaming", { accessToken, baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, - experimental_throttleInMs: 100, }); const openWeatherReport = useCallback(() => { diff --git a/references/nextjs-realtime/src/components/ui/tabs.tsx b/references/nextjs-realtime/src/components/ui/tabs.tsx new file mode 100644 index 0000000000..0f4caebb11 --- /dev/null +++ b/references/nextjs-realtime/src/components/ui/tabs.tsx @@ -0,0 +1,55 @@ +"use client" + +import * as React from "react" +import * as TabsPrimitive from "@radix-ui/react-tabs" + +import { cn } from "@/lib/utils" + +const Tabs = TabsPrimitive.Root + +const TabsList = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +TabsList.displayName = TabsPrimitive.List.displayName + +const TabsTrigger = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +TabsTrigger.displayName = TabsPrimitive.Trigger.displayName + +const TabsContent = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, ...props }, ref) => ( + +)) +TabsContent.displayName = TabsPrimitive.Content.displayName + +export { Tabs, TabsList, TabsTrigger, TabsContent } diff --git a/references/nextjs-realtime/src/trigger/ai.ts b/references/nextjs-realtime/src/trigger/ai.ts index 70cd498fe9..26d4c63a9f 100644 --- a/references/nextjs-realtime/src/trigger/ai.ts +++ b/references/nextjs-realtime/src/trigger/ai.ts @@ -108,7 +108,6 @@ export const openaiStreaming = schemaTask({ }); const stream = await metadata.stream("openai", result.fullStream); - await metadata.stream("openaiText", result.textStream); }, });