Skip to content

Commit

Permalink
Upgrades and fixes to Realtime and Realtime streams (#1549)
Browse files Browse the repository at this point in the history
* Fix streaming splits in realtime streams v2

* Add changeset

* Skip all flaky tests 😡

* Improve the way we stream from tasks to the server

* Improve the v1 realtime streams (Redis)

* Turn on the relay realtime stream service

* Improved the relay realtime cleanup

* Fixed consuming realtime runs w/streams after the run is already finished

* Remove some logs

* Update changeset

* Fixed runStream tests
  • Loading branch information
ericallam authored Dec 13, 2024
1 parent 68d7139 commit 6b355ab
Show file tree
Hide file tree
Showing 27 changed files with 920 additions and 181 deletions.
9 changes: 9 additions & 0 deletions .changeset/rude-walls-help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/react-hooks": patch
---

- Fixes an issue in streams where "chunks" could get split across multiple reads
- Fixed stopping the run subscription after a run is finished, when using useRealtimeRun or useRealtimeRunWithStreams
- Added an `onComplete` callback to `useRealtimeRun` and `useRealtimeRunWithStreams`
- Optimized the run subscription to reduce unnecessary updates
13 changes: 9 additions & 4 deletions apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,15 @@ export class ApiRunListPresenter extends BasePresenter {

const data: ListRunResponseItem[] = await Promise.all(
results.runs.map(async (run) => {
const metadata = await parsePacket({
data: run.metadata ?? undefined,
dataType: run.metadataType,
});
const metadata = await parsePacket(
{
data: run.metadata ?? undefined,
dataType: run.metadataType,
},
{
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
}
);

return {
id: run.friendlyId,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export class SpanPresenter extends BasePresenter {

const metadata = run.metadata
? await prettyPrintPacket(run.metadata, run.metadataType, {
filteredKeys: ["$$streams", "$$streamsVersion"],
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
})
: undefined;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

Expand All @@ -16,7 +17,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
return new Response("No body provided", { status: 400 });
}

return v1RealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
return relayRealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
}

export const loader = createLoaderApiRoute(
Expand Down Expand Up @@ -51,7 +52,7 @@ export const loader = createLoaderApiRoute(
},
},
async ({ params, request, resource: run, authentication }) => {
return v1RealtimeStreams.streamResponse(
return relayRealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
): Promise<Response> {
try {
const textStream = stream.pipeThrough(new TextDecoderStream());

const reader = textStream.getReader();
let sequence = 0;

while (true) {
const { done, value } = await reader.read();

if (done) {
if (done || !value) {
break;
}

Expand All @@ -53,25 +54,13 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
value,
});

const chunks = value
.split("\n")
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});

await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
await this.options.prisma.realtimeStreamChunk.create({
data: {
runId,
key: streamId,
sequence: sequence++,
value,
},
});
}

Expand Down
60 changes: 25 additions & 35 deletions apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
import Redis, { RedisOptions } from "ioredis";
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { logger } from "../logger.server";
import { StreamIngestor, StreamResponder } from "./types";
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { LineTransformStream } from "./utils.server";

export type RealtimeStreamsOptions = {
redis: RedisOptions | undefined;
Expand Down Expand Up @@ -56,7 +57,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
controller.close();
return;
}
controller.enqueue(`data: ${fields[1]}\n\n`);
controller.enqueue(fields[1]);

if (signal.aborted) {
controller.close();
Expand Down Expand Up @@ -88,7 +89,18 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
cancel: async () => {
await cleanup();
},
});
})
.pipeThrough(new LineTransformStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
for (const line of chunk) {
controller.enqueue(`data: ${line}\n\n`);
}
},
})
)
.pipeThrough(new TextEncoderStream());

async function cleanup() {
if (isCleanedUp) return;
Expand All @@ -98,7 +110,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {

signal.addEventListener("abort", cleanup);

return new Response(stream.pipeThrough(new TextEncoderStream()), {
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Expand All @@ -119,50 +131,28 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
try {
await redis.quit();
} catch (error) {
logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error });
logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
}
}

try {
const textStream = stream.pipeThrough(new TextDecoderStream());
const reader = textStream.getReader();

const batchSize = 10;
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];

while (true) {
const { done, value } = await reader.read();

if (done) {
if (done || !value) {
break;
}

logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });

const lines = value.split("\n");

for (const line of lines) {
if (line.trim()) {
batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]);
logger.debug("[RedisRealtimeStreams][ingestData] Reading data", {
streamKey,
runId,
value,
});

if (batchCommands.length >= batchSize) {
const pipeline = redis.pipeline();
for (const args of batchCommands) {
pipeline.xadd(...args);
}
await pipeline.exec();
batchCommands = [];
}
}
}
}

if (batchCommands.length > 0) {
const pipeline = redis.pipeline();
for (const args of batchCommands) {
pipeline.xadd(...args);
}
await pipeline.exec();
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value);
}

await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);
Expand Down
Loading

0 comments on commit 6b355ab

Please sign in to comment.