Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrades and fixes to Realtime and Realtime streams #1549

Merged
merged 11 commits into from
Dec 13, 2024

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Dec 11, 2024

This PR includes a bunch of fixes and improvements to Realtime & Realtime streams:

  • Fixes an issue in streams where "chunks" could get split across multiple reads
  • Streams will now optimistically be "relayed" directly in memory on the server, with a fallback of Redis streams for future reads. Improves performance.
  • Improved the performance of the Redis realtime streams.
  • Fixed stopping the run subscription after a run is finished, when using useRealtimeRun or useRealtimeRunWithStreams
  • Added an onComplete callback to useRealtimeRun and useRealtimeRunWithStreams
  • Optimized the run subscription to reduce unnecessary updates

Copy link

changeset-bot bot commented Dec 11, 2024

🦋 Changeset detected

Latest commit: b58a646

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 12 packages
Name Type
@trigger.dev/sdk Patch
@trigger.dev/react-hooks Patch
references-nextjs-realtime Patch
@trigger.dev/build Patch
@trigger.dev/core Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
trigger.dev Patch
@internal/redis-worker Patch
@internal/zod-worker Patch
@internal/testcontainers Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Dec 11, 2024

Warning

Rate limit exceeded

@ericallam has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 16 minutes and 42 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between db4016f and b58a646.

📒 Files selected for processing (2)
  • .changeset/rude-walls-help.md (1 hunks)
  • packages/core/test/runStream.test.ts (6 hunks)

Walkthrough

This pull request introduces significant updates across various components of the project, primarily focusing on enhancing the functionality of the streaming SDK and improving data ingestion methods. Key changes include the introduction of the LineTransformStream class for better line processing, modifications to the ingestData method for individual data handling, and updates to the API for improved metadata management. Additionally, the request handling logic in the Express application has been refined, and various tests have been adapted to accommodate these changes.

Changes

File Change Summary
.changeset/rude-walls-help.md Patch applied to @trigger.dev/sdk for real-time streaming splits.
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts Modified ingestData method to simplify data ingestion by processing individual entries instead of batches.
packages/core/src/v3/apiClient/runStream.ts Introduced LineTransformStream for line-by-line processing in subscribe method; updated method signatures.
packages/core/src/v3/apiClient/stream.ts Added LineTransformStream class for processing string chunks; updated version header in zodShapeStream.
apps/webapp/server.ts Added middleware to restrict API access based on environment variable ALLOW_ONLY_REALTIME_API.
apps/webapp/test/authorizationRateLimitMiddleware.test.ts Updated test suite to conditionally skip tests in GitHub Actions environments.
packages/core/src/v3/runMetadata/manager.ts Updated StandardMetadataManager with new streamsBaseUrl property; modified stream handling logic.
packages/core/src/v3/runMetadata/metadataStream.ts Transitioned from AsyncIterator to ReadableStream for metadata handling.
references/nextjs-realtime/package.json Added new script for deployment.
references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx Removed experimental_throttleInMs property from hook call.
references/nextjs-realtime/src/trigger/ai.ts Removed streaming of textStream from OpenAI model results; enhanced logging.
apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts Updated parsePacket function to accept filteredKeys for metadata parsing.
apps/webapp/app/presenters/v3/SpanPresenter.server.ts Updated prettyPrintPacket to filter out additional key "$streamsBaseUrl".
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts Simplified Redis stream handling; removed batch processing logic.
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts Introduced RelayRealtimeStreams class for managing ephemeral streaming data.
apps/webapp/app/services/realtime/utils.server.ts Added LineTransformStream class for processing incoming string data.
docker/docker-compose.yml Updated electric service image; commented out otel-collector service.
internal-packages/testcontainers/src/utils.ts Updated Electric SQL container version in createElectricContainer.
packages/core/package.json Updated @electric-sql/client dependency version.
packages/core/src/v3/utils/ioSerialization.ts Introduced ParsePacketOptions type and modified parsePacket function.
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts Updated references from v1RealtimeStreams to relayRealtimeStreams.

Possibly related PRs

🐰 In the meadow, changes bloom,
Streams now dance, dispelling gloom.
Data flows with gentle grace,
In the code, a lively pace.
With every line, a story spun,
Hopping forward, we have fun! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Experiment)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)

57-63: Consider batch processing for better performance

The switch from batch processing to individual inserts might impact performance when dealing with high-volume streams. Consider using a buffering strategy to batch inserts while maintaining real-time characteristics.

-await this.options.prisma.realtimeStreamChunk.create({
-  data: {
-    runId,
-    key: streamId,
-    sequence: sequence++,
-    value,
-  },
-});
+const BATCH_SIZE = 100;
+let buffer = [];
+buffer.push({
+  runId,
+  key: streamId,
+  sequence: sequence++,
+  value,
+});
+
+if (buffer.length >= BATCH_SIZE) {
+  await this.options.prisma.realtimeStreamChunk.createMany({
+    data: buffer,
+  });
+  buffer = [];
+}
packages/core/src/v3/apiClient/stream.ts (1)

225-231: Consider reducing debug logging in production

The extensive console logging might impact performance in production. Consider using a debug flag or environment variable to control logging.

-console.log("LineTransformStream", {
-  chunk,
-  lines,
-  fullLines,
-  buffer: this.buffer,
-  streamId,
-});
+if (process.env.DEBUG_STREAMS === 'true') {
+  console.log("LineTransformStream", {
+    chunk,
+    lines,
+    fullLines,
+    buffer: this.buffer,
+    streamId,
+  });
+}
packages/core/src/v3/apiClient/runStream.ts (1)

217-238: Consider optimizing the transform pipeline

The current implementation has multiple transform stages with logging, which might impact performance. Consider:

  1. Combining transforms where possible
  2. Making logging conditional
 return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options)
   .pipeThrough(
     new TransformStream({
       transform(chunk, controller) {
-        console.log("ElectricStreamSubscription chunk.value", chunk.value);
+        if (process.env.DEBUG_STREAMS === 'true') {
+          console.log("ElectricStreamSubscription chunk.value", chunk.value);
+        }
         controller.enqueue(chunk.value);
       },
     })
   )
   .pipeThrough(new LineTransformStream(this.url))
   .pipeThrough(
     new TransformStream({
       transform(chunk, controller) {
         for (const line of chunk) {
-          console.log("ElectricStreamSubscription line", line);
+          if (process.env.DEBUG_STREAMS === 'true') {
+            console.log("ElectricStreamSubscription line", line);
+          }
           controller.enqueue(safeParseJSON(line));
         }
       },
     })
   );
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 30ea5eb and 5a86fe8.

📒 Files selected for processing (4)
  • .changeset/rude-walls-help.md (1 hunks)
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (2 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (2 hunks)
  • packages/core/src/v3/apiClient/stream.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • .changeset/rude-walls-help.md
🔇 Additional comments (2)
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)

47-49: LGTM: Stream termination condition is now more explicit

The simplified condition if (done || !value) properly handles both stream completion and empty chunks.

packages/core/src/v3/apiClient/stream.ts (1)

238-244: LGTM: Proper handling of incomplete data

The flush implementation correctly handles any remaining data in the buffer when the stream ends.

for (const line of chunk) {
console.log("ElectricStreamSubscription line", line);

controller.enqueue(safeParseJSON(line));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider handling JSON parse errors explicitly

The safeParseJSON function silently returns the raw string on parse errors. Consider adding explicit error handling for malformed JSON.

-controller.enqueue(safeParseJSON(line));
+try {
+  const parsed = safeParseJSON(line);
+  if (typeof parsed === 'string' && line !== parsed) {
+    console.warn(`Failed to parse JSON for line: ${line}`);
+  }
+  controller.enqueue(parsed);
+} catch (error) {
+  console.error(`Error processing line: ${line}`, error);
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
controller.enqueue(safeParseJSON(line));
try {
const parsed = safeParseJSON(line);
if (typeof parsed === 'string' && line !== parsed) {
console.warn(`Failed to parse JSON for line: ${line}`);
}
controller.enqueue(parsed);
} catch (error) {
console.error(`Error processing line: ${line}`, error);
}

Copy link

pkg-pr-new bot commented Dec 11, 2024

@trigger.dev/build

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/build@1549

@trigger.dev/react-hooks

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/react-hooks@1549

@trigger.dev/sdk

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/sdk@1549

@trigger.dev/rsc

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/rsc@1549

trigger.dev

npm i https://pkg.pr.new/triggerdotdev/trigger.dev@1549

@trigger.dev/core

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/core@1549

commit: b58a646

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (7)
apps/webapp/server.ts (1)

84-95: Consider adding request logging for blocked endpoints.

The middleware implementation correctly restricts access to realtime endpoints while maintaining the healthcheck endpoint. However, adding logging for blocked requests would help with monitoring and debugging.

Consider adding logging like this:

 if (!req.url.startsWith("/realtime") && req.url !== "/healthcheck") {
+  console.log(`Blocked request to: ${req.method} ${req.url}`);
   res.status(404).send("Not Found");
   return;
 }
apps/webapp/test/authorizationRateLimitMiddleware.test.ts (2)

Line range hint 3-5: Consider documenting test timeout rationale

The 30-second timeout is set globally for all tests. Consider:

  1. Adding a comment explaining why 30 seconds was chosen
  2. Using per-test timeouts for long-running tests
  3. Documenting which tests might approach this timeout

Example documentation:

+ // Set a longer timeout for rate limiting tests as they include
+ // multiple sleep operations to test time-based behaviors
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout

Line range hint 171-315: Consider adding tests for concurrent scenarios

While the test suite is comprehensive, consider adding these scenarios:

  1. Concurrent requests hitting the rate limiter simultaneously
  2. Redis connection errors/timeouts
  3. Race conditions in token bucket refill

Example test structure:

redisTest("should handle concurrent requests correctly", async ({ redis }) => {
  // Setup rate limiter with small window
  // Fire multiple requests simultaneously using Promise.all
  // Verify correct number of requests succeeded/failed
});

redisTest("should handle Redis errors gracefully", async ({ redis }) => {
  // Setup rate limiter
  // Simulate Redis connection issues
  // Verify fallback behavior
});
packages/core/src/v3/runMetadata/metadataStream.ts (2)

68-69: Handle errors in async iterator returned by [Symbol.asyncIterator].

Errors occurring in the consumerStream may not be propagated to the consumer of the iterator.

Ensure that errors from the stream are properly caught and forwarded.


72-81: Manage cancellation in streamToAsyncIterator.

When the consumer stops iteration early, the underlying stream reader should be canceled to prevent resource leaks.

Modify the function to handle cancellation:

 async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterableIterator<T> {
   const reader = stream.getReader();
   try {
     while (true) {
       const { done, value } = await reader.read();
       if (done) return;
       yield value;
     }
   } catch (error) {
     // Optionally handle errors from reader
     throw error;
   } finally {
+    await reader.cancel();
     reader.releaseLock();
   }
 }
packages/core/src/v3/runMetadata/manager.ts (2)

Line range hint 232-243: Inconsistent parameter naming in stream method.

The parameter value was renamed to source in the method signature but value is still used within the method. This inconsistency can lead to errors.

Apply this diff to ensure consistent naming:

 public async stream<T>(
     key: string,
-    value: AsyncIterable<T> | ReadableStream<T>,
+    source: AsyncIterable<T> | ReadableStream<T>,
     signal?: AbortSignal
 ): Promise<AsyncIterable<T>> {
-    const $value = value as AsyncIterable<T>;
+    const $source = source as AsyncIterable<T>;

     if (!this.runId) {
-        return $value;
+        return $source;
     }

     // ... existing code ...

         const streamInstance = new MetadataStream({
             key,
             runId: this.runId,
-            source: $value,
+            source: $source,
             baseUrl: this.streamsBaseUrl,
             headers: this.apiClient.getHeaders(),
             signal,
             version: this.streamsVersion,
         });

236-236: Redundant setting of $$streamsBaseUrl in metadata.

Setting $$streamsBaseUrl every time a new stream is created might be unnecessary if it doesn't change between streams.

Consider setting $$streamsBaseUrl once during initialization rather than on every stream creation.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 6a3a013 and d4f533f.

📒 Files selected for processing (7)
  • apps/webapp/server.ts (1 hunks)
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts (1 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (1 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (2 hunks)
  • references/nextjs-realtime/package.json (1 hunks)
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx (0 hunks)
  • references/nextjs-realtime/src/trigger/ai.ts (0 hunks)
💤 Files with no reviewable changes (2)
  • references/nextjs-realtime/src/trigger/ai.ts
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts

[error] 15-15: void is confusing inside a union type.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

🔇 Additional comments (2)
references/nextjs-realtime/package.json (1)

10-11: LGTM! Deployment script added correctly.

The new deploy script is properly added and aligns with the existing trigger.dev CLI usage pattern.

Let's verify the trigger.dev CLI installation and version compatibility:

✅ Verification successful

Trigger.dev CLI and SDK versions are properly aligned

The verification confirms that both the CLI (trigger.dev) and SDK (@trigger.dev/sdk) packages are using the same workspace version ^3, ensuring compatibility for the deployment script to work correctly.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify trigger.dev CLI installation and version compatibility
# Expected: The CLI package should be listed in devDependencies with a matching version

# Check if trigger.dev CLI package exists in package.json
jq '.devDependencies["trigger.dev"]' references/nextjs-realtime/package.json

# Check for any potential version mismatches between SDK and CLI
jq '[.dependencies["@trigger.dev/sdk"], .devDependencies["trigger.dev"]]' references/nextjs-realtime/package.json

Length of output: 248

apps/webapp/test/authorizationRateLimitMiddleware.test.ts (1)

18-18: Consider alternatives to skipping tests in CI

Skipping these tests in GitHub Actions could lead to reduced test coverage in CI. Consider these alternatives:

  1. Set up Redis in GitHub Actions using service containers
  2. Mock Redis for CI environment
  3. Create a separate test suite for Redis-dependent tests

Let's check if other test files have similar patterns:

✅ Verification successful

Tests are intentionally skipped in CI due to Redis dependency

The skip pattern is consistent across Redis-dependent test files. The test file shows comprehensive Redis integration tests that:

  1. Use @internal/testcontainers for Redis setup
  2. Test various rate limiting scenarios including token bucket, fixed window, and sliding window
  3. Verify edge cases and caching behavior

Since these tests require a Redis instance and are properly tested in a containerized environment locally, skipping them in CI is a valid approach. No changes needed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other test files that might be skipped in CI
rg -l "skipIf.*GITHUB_ACTIONS" "test/"

# Check if Redis is used in other test files
rg -l "redisTest" "test/"

Length of output: 154


Script:

#!/bin/bash
# Search for test files with skipIf and GITHUB_ACTIONS from repository root
fd -e test.ts -e test.js | xargs rg -l "skipIf.*GITHUB_ACTIONS"

# Check Redis usage in test files
fd -e test.ts -e test.js | xargs rg -l "redis"

# Look at the actual test file to understand Redis setup
fd "authorizationRateLimitMiddleware.test.ts" -x cat

Length of output: 13980

apps/webapp/server.ts Show resolved Hide resolved
Comment on lines +39 to +46
private initializeServerStream(): Promise<Response> {
const serverStream = this.serverStream.pipeThrough(
new TransformStream<T, string>({
async transform(chunk, controller) {
controller.enqueue(JSON.stringify(chunk) + "\n");
},
})
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ensure proper error handling in initializeServerStream.

The fetch request may fail or throw an exception, which isn't currently handled. This could lead to unhandled promise rejections.

Consider wrapping the fetch call in a try-catch block or handling errors appropriately.

+try {
   return fetch(
     `${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${
       this.options.runId
     }/${this.options.key}`,
     {
       method: "POST",
       headers: this.options.headers ?? {},
       body: serverStream,
       // @ts-expect-error
       duplex: "half",
       signal: this.controller.signal,
     }
   );
+} catch (error) {
+  // Handle error, possibly by rejecting the promise or logging
+  console.error('Failed to initialize server stream:', error);
+  throw error;
+}

Committable suggestion skipped: line range outside the PR's diff.

private serverIterator: AsyncIterator<T>;
private consumerIterator: AsyncIterator<T>;
private serverStream: ReadableStream<T>;
private consumerStream: ReadableStream<T>;
private streamPromise: Promise<void | Response>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid using 'void' in union types for clarity.

Using void in a union type can be confusing. It is better to use undefined instead to represent the absence of a value.

Apply this diff to improve type clarity:

-private streamPromise: Promise<void | Response>;
+private streamPromise: Promise<Response | undefined>;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private streamPromise: Promise<void | Response>;
private streamPromise: Promise<Response | undefined>;
🧰 Tools
🪛 Biome (1.9.4)

[error] 15-15: void is confusing inside a union type.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

Comment on lines +25 to +36
private createTeeStreams() {
const readableSource = new ReadableStream<T>({
start: async (controller) => {
for await (const value of this.options.source) {
controller.enqueue(value);
}
return queue.shift()!;

controller.close();
},
});

return {
serverIterator: teeIterator(this.serverQueue),
consumerIterator: teeIterator(this.consumerQueue),
};
return readableSource.tee();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle errors during stream creation in createTeeStreams.

If an error occurs while iterating over this.options.source, it may not be properly propagated, leading to unhandled exceptions. Consider adding error handling to manage exceptions during iteration.

You can modify the start method to catch and forward errors:

 const readableSource = new ReadableStream<T>({
   start: async (controller) => {
+    try {
       for await (const value of this.options.source) {
         controller.enqueue(value);
       }
-      
       controller.close();
+    } catch (error) {
+      controller.error(error);
+    } finally {
+      controller.close();
+    }
   },
 });

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)

186-207: Consider Using Promises for Buffer Waiting Mechanism

Instead of using setTimeout with polling in the waitForBuffer method, consider utilizing Promises or async/await with event-based notification to wait for the buffer's availability. This can improve efficiency by reducing unnecessary polling and can make the code more readable.


215-221: Explicitly Set Default Values for Buffer Waiting Options

In the initializeRelayRealtimeStreams function, the waitForBufferTimeout and waitForBufferInterval options are not specified. Explicitly setting these default values can improve clarity and maintainability.

Apply this diff to specify the default options:

 function initializeRelayRealtimeStreams() {
   return new RelayRealtimeStreams({
     ttl: 1000 * 60 * 5, // 5 minutes
     fallbackIngestor: v1RealtimeStreams,
     fallbackResponder: v1RealtimeStreams,
+    waitForBufferTimeout: 5000, // Default timeout
+    waitForBufferInterval: 50,   // Default interval
   });
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d4f533f and c04b552.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (12)
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (5 hunks)
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/utils.server.ts (1 hunks)
  • docker/docker-compose.yml (1 hunks)
  • internal-packages/testcontainers/src/utils.ts (1 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (3 hunks)
  • packages/core/src/v3/apiClient/stream.ts (2 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (2 hunks)
  • packages/core/src/v3/utils/ioSerialization.ts (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/src/v3/runMetadata/manager.ts
👮 Files not reviewed due to content moderation or server errors (3)
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
  • packages/core/src/v3/utils/ioSerialization.ts
  • packages/core/src/v3/apiClient/runStream.ts
🔇 Additional comments (10)
packages/core/package.json (1)

185-185: Verify beta version stability and breaking changes

Upgrading to a beta version (1.0.0-beta.1) of @electric-sql/client may introduce breaking changes. Please ensure:

  1. This is the latest beta version available
  2. Breaking changes are documented and addressed
  3. The beta version is stable enough for production use
docker/docker-compose.yml (2)

68-68: LGTM! Good security practice with digest pinning

The Electric SQL version upgrade matches the package dependency, and using a SHA256 digest ensures image immutability and security.


Line range hint 77-89: Verify impact of removing otel-collector service

The OpenTelemetry collector service has been commented out. Please verify that this won't impact observability requirements.

✅ Verification successful

Let me gather more information about the OpenTelemetry configuration and usage.


Let me check the actual OpenTelemetry configuration and related code to understand the setup better.


Let me check how the application is actually consuming the OpenTelemetry data to understand if removing the collector impacts the system.


Based on the gathered information, I can now provide a final response:

The OpenTelemetry collector service can be safely commented out

The collector is configured as a pass-through component that forwards telemetry data to http://host.docker.internal:3030/otel. The application already has a direct OTLP exporter (otlpExporter.server.ts) that can handle telemetry data without the collector. The commented-out configuration appears to be an optional intermediate collector that's not critical to the system's observability:

  • The application uses direct OTLP export capabilities through @trigger.dev/otlp-importer
  • The collector was only acting as a proxy to forward data to the host machine
  • No services are explicitly dependent on the collector's ports (4317/4318)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for OpenTelemetry dependencies and usage
rg -l "opentelemetry" --type ts --type js

Length of output: 2542


Script:

#!/bin/bash
# Check for OTLP endpoint configurations and usage
rg "otlp" -A 3 --type ts --type js --type yaml --type yml

# Check for any other telemetry collector configurations
fd -e yaml -e yml -x grep -l "otel-collector"

Length of output: 159


Script:

#!/bin/bash
# Check the OpenTelemetry configuration file
cat docker/otel-collector-config.yaml

# Look for OTLP endpoint configurations in code
rg "otlp|opentelemetry" -A 3 apps/webapp/app/v3/otlpExporter.server.ts

# Check for environment variables or configuration related to telemetry
rg "OTEL_" -A 3 --type ts --type js --type env

Length of output: 1043


Script:

#!/bin/bash
# Check how the application handles telemetry data
rg -A 5 "endpoint.*otel" apps/webapp

# Check for direct OTLP endpoint usage in core packages
rg -A 5 "endpoint.*otel" packages/core

# Look for alternative telemetry configurations
rg -A 5 "telemetry" packages/core/src/v3/config.ts

Length of output: 413

internal-packages/testcontainers/src/utils.ts (1)

58-60: LGTM! Consistent version upgrade

The Electric SQL container version and digest match the docker-compose.yml configuration, maintaining consistency across the codebase.

apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)

39-111: streamResponse Method Implementation is Sound

The streamResponse method correctly handles the retrieval of buffered streams and falls back to the fallbackResponder when necessary. The use of LineTransformStream for processing streamed data enhances readability and efficiency.

apps/webapp/app/services/realtime/utils.server.ts (1)

1-33: LineTransformStream Implementation is Correct and Efficient

The LineTransformStream class effectively handles incoming string data, buffering incomplete lines and emitting arrays of complete lines. This ensures proper processing of streamed text data and improves the reliability of data handling in real-time applications.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)

93-103: Enhanced Stream Processing with LineTransformStream

The addition of LineTransformStream improves the handling of incoming data by properly processing and formatting each line before sending it in the response. This enhances the reliability and readability of the streamed data.


145-146: ⚠️ Potential issue

Correct the Condition to Handle Empty String Values

The condition if (done || !value) may inadvertently skip over empty string values (""), which could be valid data. Modify the condition to ensure all values are correctly processed.

Apply this diff to fix the condition:

 if (done || !value) {
   break;
 }
+if (done) {
+  break;
+}

Likely invalid or redundant comment.

packages/core/src/v3/apiClient/stream.ts (2)

28-28: Updated Version Header to 1.0.0-beta.1

The x-trigger-electric-version header has been updated to 1.0.0-beta.1, ensuring compatibility with the expected API version.


206-238: LineTransformStream Implementation is Effective

The LineTransformStream class effectively processes incoming string data into arrays of complete lines, improving data handling in streaming operations. Its inclusion enhances the robustness of the API client.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)

137-141: Unnecessary async keyword in handleLocalIngestion method

The method handleLocalIngestion is marked as async but does not contain any await statements or return a Promise. Consider removing the async keyword unless you plan to add asynchronous operations within this method.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c04b552 and 4dd27b4.

📒 Files selected for processing (2)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3 hunks)
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1 hunks)
🔇 Additional comments (5)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)

159-160: Unused finalized property in RelayedStreamRecord

The finalized property is defined in RelayedStreamRecord but is not used elsewhere in the code. Verify if this property is necessary or if additional logic should be implemented to utilize it.


86-99: Stream transformation pipeline appears correct

The stream transformation pipeline effectively decodes, processes lines, and encodes the stream for the response. This implementation aligns with the expected functionality for handling real-time data streams.

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3)

4-4: Import statement updated appropriately

The import of relayRealtimeStreams is correctly added to replace v1RealtimeStreams for the updated streaming functionality.


20-20: Switch to relayRealtimeStreams in action function

The action function now uses relayRealtimeStreams.ingestData, which aligns with the new streaming implementation.


Line range hint 55-59: Update loader function to use relayRealtimeStreams

The loader function correctly utilizes relayRealtimeStreams.streamResponse for handling stream responses.

@ericallam ericallam force-pushed the fix-streaming-splits-v2 branch from cab69db to bb54d09 Compare December 12, 2024 21:07
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (1)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)

178-178: Remove unused finalized property or implement its logic

The finalized property in RelayedStreamRecord is initialized to false (line 178) but is never updated or utilized elsewhere in the code. Additionally, the comment at lines 121-123 references relying on finalized, but there is no implementation that sets this property to true when the stream is completed.

Consider implementing logic to set finalized to true when the stream has finished processing. If the property is not needed, remove it to avoid confusion.

Also applies to: 121-123

🛑 Comments failed to post (2)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)

81-95: ⚠️ Potential issue

Potential race condition when locking the stream record

In the streamResponse method, the check and update of record.locked are not atomic operations. If multiple requests access the same record simultaneously, there's a possibility that both see record.locked as false and proceed to set it to true, leading to multiple readers accessing the stream.

Consider introducing a synchronization mechanism to ensure that only one reader can lock and access the stream at a time. For example, using a mutex or a promise-based lock to prevent race conditions.


207-209: ⚠️ Potential issue

Ensure streams are properly closed when deleting buffers

In the deleteBuffer method, when a buffer is deleted, the associated stream is not being canceled or closed. This can lead to resource leaks since the stream may remain open even after the buffer is removed from the _buffers map.

Apply this diff to fix the issue:

private deleteBuffer(bufferKey: string) {
+   const record = this._buffers.get(bufferKey);
+   if (record) {
+     // Cancel the stream to release resources
+     record.stream.cancel();
+   }
    this._buffers.delete(bufferKey);
}

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
packages/core/src/v3/runMetadata/metadataStream.ts (1)

18-22: Add error handling in the constructor.

While the initialization is clean, consider adding error handling to catch potential stream creation failures.

 constructor(private options: MetadataOptions<T>) {
+  try {
     const [serverStream, consumerStream] = this.createTeeStreams();
     this.serverStream = serverStream;
     this.consumerStream = consumerStream;
 
     this.streamPromise = this.initializeServerStream();
+  } catch (error) {
+    throw new Error(`Failed to initialize streams: ${error.message}`);
+  }
 }
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

145-155: Consider implementing batch processing for better performance.

While the current implementation is clean, consider batching multiple values before writing to Redis to improve performance under high load.

+ const batchSize = 100;
+ const batch: string[] = [];
  while (true) {
    const { done, value } = await reader.read();
    
    if (done || !value) {
      break;
    }
    
+   batch.push(value);
+   if (batch.length >= batchSize) {
+     const pipeline = redis.pipeline();
+     for (const item of batch) {
+       pipeline.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", item);
+     }
+     await pipeline.exec();
+     batch.length = 0;
+   }
-   await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value);
  }
+ if (batch.length > 0) {
+   const pipeline = redis.pipeline();
+   for (const item of batch) {
+     pipeline.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", item);
+   }
+   await pipeline.exec();
+ }
packages/core/src/v3/apiClient/runStream.ts (2)

217-234: Well-structured stream processing pipeline

The stream transformation pipeline is well-organized with clear separation of concerns:

  1. Shape validation using zodShapeStream
  2. Value extraction
  3. Line-by-line processing
  4. JSON parsing

However, consider adding error handling for the pipeline to gracefully handle stream processing failures.

 async subscribe(): Promise<ReadableStream<unknown>> {
   return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options)
     .pipeThrough(
       new TransformStream({
         transform(chunk, controller) {
           controller.enqueue(chunk.value);
+        },
+        catch(error) {
+          console.error("Error transforming chunk:", error);
         },
       })
     )
     .pipeThrough(new LineTransformStream())
     .pipeThrough(
       new TransformStream({
         transform(chunk, controller) {
           for (const line of chunk) {
             controller.enqueue(safeParseJSON(line));
           }
+        },
+        catch(error) {
+          console.error("Error processing lines:", error);
         },
       })
     );
 }

280-282: Consider using type guards for metadata fields

The string type checking for metadata fields could be more type-safe using custom type guards.

+interface StreamMetadata extends Record<string, unknown> {
+  $$streamsVersion?: string;
+  $$streamsBaseUrl?: string;
+}
+
+function isStreamMetadata(metadata: Record<string, unknown>): metadata is StreamMetadata {
+  return true;
+}
+
 const $baseUrl =
-  typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl;
+  isStreamMetadata(metadata) && metadata.$$streamsBaseUrl ? metadata.$$streamsBaseUrl : baseUrl;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cab69db and bb54d09.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (21)
  • .changeset/rude-walls-help.md (1 hunks)
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1 hunks)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3 hunks)
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (5 hunks)
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/utils.server.ts (1 hunks)
  • apps/webapp/server.ts (1 hunks)
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts (1 hunks)
  • docker/docker-compose.yml (1 hunks)
  • internal-packages/testcontainers/src/utils.ts (1 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (3 hunks)
  • packages/core/src/v3/apiClient/stream.ts (2 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (2 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (2 hunks)
  • packages/core/src/v3/utils/ioSerialization.ts (2 hunks)
  • references/nextjs-realtime/package.json (1 hunks)
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx (0 hunks)
  • references/nextjs-realtime/src/trigger/ai.ts (0 hunks)
💤 Files with no reviewable changes (2)
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
  • references/nextjs-realtime/src/trigger/ai.ts
🚧 Files skipped from review as they are similar to previous changes (15)
  • .changeset/rude-walls-help.md
  • apps/webapp/server.ts
  • apps/webapp/app/services/realtime/utils.server.ts
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts
  • packages/core/package.json
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
  • references/nextjs-realtime/package.json
  • packages/core/src/v3/utils/ioSerialization.ts
  • packages/core/src/v3/apiClient/stream.ts
  • internal-packages/testcontainers/src/utils.ts
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • docker/docker-compose.yml
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts

[error] 15-15: void is confusing inside a union type.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

🔇 Additional comments (7)
packages/core/src/v3/runMetadata/metadataStream.ts (3)

5-5: LGTM: Improved type definitions for better stream handling.

The change from AsyncIterator to AsyncIterable and the use of ReadableStream properties align better with modern streaming patterns and provide more flexibility in handling different stream types.

Also applies to: 13-14


72-82: LGTM: Well-implemented stream to iterator conversion.

The implementation properly handles stream reading and cleanup, with appropriate error handling and reader lock release.


25-36: 🛠️ Refactor suggestion

Enhance error handling in stream creation.

The stream creation logic should handle potential errors during iteration.

 private createTeeStreams() {
   const readableSource = new ReadableStream<T>({
     start: async (controller) => {
+      try {
         for await (const value of this.options.source) {
           controller.enqueue(value);
         }
 
         controller.close();
+      } catch (error) {
+        controller.error(error);
+      }
     },
   });
 
   return readableSource.tee();
 }

Likely invalid or redundant comment.

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)

92-103: LGTM: Well-structured stream transformation pipeline.

The stream processing pipeline is well-organized with clear separation of concerns:

  1. Line transformation
  2. SSE format conversion
  3. Text encoding
packages/core/src/v3/runMetadata/manager.ts (1)

Line range hint 236-254: LGTM: Comprehensive stream metadata management.

The implementation properly:

  1. Handles stream initialization
  2. Manages metadata
  3. Includes proper cleanup
  4. Sets all required stream-related metadata
packages/core/src/v3/apiClient/runStream.ts (2)

19-24: LGTM: Clean import organization

The new imports are well-structured and appropriately scoped for the streaming functionality.


217-234: Consider handling JSON parse errors explicitly

The safeParseJSON function silently returns the raw string on parse errors. Consider adding explicit error handling for malformed JSON.

@ericallam ericallam force-pushed the fix-streaming-splits-v2 branch from bb54d09 to 7014057 Compare December 13, 2024 11:19
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (10)
packages/core/src/v3/utils/ioSerialization.ts (1)

407-420: Consider performance optimizations for key filtering

The implementation is secure and correctly handles the filtering logic. However, for better performance with large datasets or frequent parsing operations, consider these optimizations:

  1. Use Set for more efficient key lookups
  2. Implement memoization when the same options are used frequently
 function makeSafeReviver(options?: ReplacerOptions) {
   if (!options) {
     return undefined;
   }
 
+  // Convert array to Set for O(1) lookups
+  const filteredKeysSet = new Set(options.filteredKeys);
+
   return function reviver(key: string, value: any) {
     // Check if the key should be filtered out
-    if (options?.filteredKeys?.includes(key)) {
+    if (filteredKeysSet.has(key)) {
       return undefined;
     }
 
     return value;
   };
 }
references/nextjs-realtime/src/app/realtime/[id]/page.tsx (1)

13-17: Consider error boundary for RunRealtimeComparison

While the implementation is correct, consider wrapping the RunRealtimeComparison component with an error boundary to gracefully handle potential streaming failures.

 return (
   <main className="flex min-h-screen items-center justify-center p-4 bg-gray-900">
+    <ErrorBoundary fallback={<div>Something went wrong with the stream</div>}>
       <RunRealtimeComparison accessToken={accessToken} runId={params.id} />
+    </ErrorBoundary>
   </main>
 );
references/nextjs-realtime/src/components/RunRealtimeComparison.tsx (2)

17-22: Remove console.log statements

Remove development console.log statements before production deployment.

-    onComplete: (...args) => {
-      console.log("Run completed!", args);
-    },
+    onComplete: (...args) => {
+      // Handle completion if needed
+    },
   });

-  console.log("run", run);

27-33: Clarify disabled debug button purpose

The debug button is permanently disabled. If it's not needed, consider removing it. If it's for future use, add a TODO comment explaining its purpose.

packages/react-hooks/src/hooks/useRealtime.ts (1)

267-273: Consider extracting the onComplete logic into a custom hook

The onComplete handling logic is duplicated between useRealtimeRun and useRealtimeRunWithStreams. Consider extracting this into a reusable custom hook to follow DRY principles.

+function useOnComplete<TTask extends AnyTask>(
+  isComplete: boolean,
+  run: RealtimeRun<TTask> | undefined,
+  error: Error | undefined,
+  onComplete?: (run: RealtimeRun<TTask>, err?: Error) => void
+) {
+  const hasCalledOnCompleteRef = useRef(false);
+
+  useEffect(() => {
+    if (isComplete && run && onComplete && !hasCalledOnCompleteRef.current) {
+      onComplete(run, error);
+      hasCalledOnCompleteRef.current = true;
+    }
+  }, [isComplete, run, error, onComplete]);
+}

 export function useRealtimeRun<TTask extends AnyTask>(
   runId?: string,
   options?: UseRealtimeSingleRunOptions<TTask>
 ): UseRealtimeRunInstance<TTask> {
   // ... existing code ...
-  const hasCalledOnCompleteRef = useRef(false);
-
-  useEffect(() => {
-    if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) {
-      options.onComplete(run, error);
-      hasCalledOnCompleteRef.current = true;
-    }
-  }, [isComplete, run, error, options?.onComplete]);
+  useOnComplete(isComplete, run, error, options?.onComplete);

   // ... rest of the code ...
 }

 export function useRealtimeRunWithStreams<
   TTask extends AnyTask = AnyTask,
   TStreams extends Record<string, any> = Record<string, any>
 >(
   // ... existing code ...
 ) {
   // ... existing code ...
-  const hasCalledOnCompleteRef = useRef(false);
-
-  useEffect(() => {
-    if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) {
-      options.onComplete(run, error);
-      hasCalledOnCompleteRef.current = true;
-    }
-  }, [isComplete, run, error, options?.onComplete]);
+  useOnComplete(isComplete, run, error, options?.onComplete);

   // ... rest of the code ...
 }
packages/core/src/v3/apiClient/runStream.ts (1)

284-285: Handle undefined $$streamsBaseUrl in metadata

When accessing metadata.$$streamsBaseUrl, ensure that it is safely handled in case it's undefined or not a string to prevent potential runtime errors.

packages/core/src/v3/runMetadata/metadataStream.ts (2)

68-69: Avoid using console logs in production code

The use of console.log in the stop() method may not be appropriate for production environments. Consider using a proper logging mechanism or removing the log statement altogether.

Apply this diff to remove the console log:

stop: () => {
-   console.log("Stopping zodShapeStream with abortController.abort()");
    abortController.abort();
},

72-81: Consider using existing utilities for stream-to-iterator conversion

The streamToAsyncIterator function converts a ReadableStream into an AsyncIterableIterator. Check if similar functionality already exists in standard libraries or utility packages to avoid duplicating code.

packages/core/src/v3/apiClient/stream.ts (2)

19-23: Add documentation for the new ZodShapeStreamInstance type

The introduction of ZodShapeStreamInstance is significant. Adding documentation comments would help other developers understand its purpose and usage.


237-269: Handle Windows-style line endings in LineTransformStream

The LineTransformStream class splits chunks by \n, which may not correctly handle Windows-style line endings (\r\n). Consider updating the splitting logic to handle both \n and \r\n to ensure compatibility across different platforms.

Apply this diff to improve line splitting:

- const lines = this.buffer.split("\n");
+ const lines = this.buffer.split(/\r?\n/);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bb54d09 and 7014057.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (25)
  • .changeset/rude-walls-help.md (1 hunks)
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts (1 hunks)
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts (1 hunks)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3 hunks)
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (2 hunks)
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (5 hunks)
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1 hunks)
  • apps/webapp/app/services/realtime/utils.server.ts (1 hunks)
  • apps/webapp/server.ts (1 hunks)
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts (1 hunks)
  • docker/docker-compose.yml (1 hunks)
  • internal-packages/testcontainers/src/utils.ts (1 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/src/v3/apiClient/stream.ts (5 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (2 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (2 hunks)
  • packages/core/src/v3/utils/ioSerialization.ts (2 hunks)
  • packages/react-hooks/src/hooks/useRealtime.ts (3 hunks)
  • references/nextjs-realtime/package.json (2 hunks)
  • references/nextjs-realtime/src/app/realtime/[id]/page.tsx (1 hunks)
  • references/nextjs-realtime/src/components/RunRealtimeComparison.tsx (1 hunks)
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx (0 hunks)
  • references/nextjs-realtime/src/components/ui/tabs.tsx (1 hunks)
  • references/nextjs-realtime/src/trigger/ai.ts (0 hunks)
💤 Files with no reviewable changes (2)
  • references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
  • references/nextjs-realtime/src/trigger/ai.ts
🚧 Files skipped from review as they are similar to previous changes (14)
  • .changeset/rude-walls-help.md
  • apps/webapp/server.ts
  • internal-packages/testcontainers/src/utils.ts
  • apps/webapp/app/services/realtime/utils.server.ts
  • apps/webapp/test/authorizationRateLimitMiddleware.test.ts
  • apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
  • apps/webapp/app/presenters/v3/SpanPresenter.server.ts
  • docker/docker-compose.yml
  • apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
  • apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
  • packages/core/package.json
  • references/nextjs-realtime/package.json
  • apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts

[error] 15-15: void is confusing inside a union type.

Unsafe fix: Use undefined instead.

(lint/suspicious/noConfusingVoidType)

🔇 Additional comments (14)
packages/core/src/v3/utils/ioSerialization.ts (4)

17-19: LGTM: Clean type definition

The ParsePacketOptions type is well-defined and maintains backward compatibility with its optional property.


21-21: LGTM: Backward-compatible signature update

The function signature change properly introduces the optional options parameter while maintaining backward compatibility.


28-28: LGTM: Safe JSON parsing with optional filtering

The modification correctly integrates the filtering capability while maintaining the default parsing behavior when no options are provided.


17-28: Verify impact on existing parsePacket usage

The new filtering capability is well-implemented, but let's verify its impact on existing code.

✅ Verification successful

Based on the search results, I can now provide a final response regarding the impact of the new filtering capability in parsePacket:

No breaking changes detected in parsePacket usage patterns

The codebase scan shows that all existing uses of parsePacket are simple invocations without any options parameter, making the new filtering capability a fully backward-compatible addition. The key observations:

  • All 16 current usages of parsePacket() are basic calls without options
  • No existing code relies on specific data being present in the parsed output
  • The filtering is opt-in through the new options parameter
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for parsePacket usage patterns to ensure compatibility
echo "Searching for parsePacket usage patterns..."
rg --type typescript "parsePacket\(" -A 2 -B 2

# Look for potential sensitive data handling patterns
echo "Checking for potential sensitive data patterns..."
rg --type typescript -e "password" -e "secret" -e "token" -e "key" --glob "**/*.ts" -A 2 -B 2

Length of output: 412


Script:

#!/bin/bash
# Search for parsePacket usage patterns without typescript type
echo "Searching for parsePacket usage patterns..."
rg "parsePacket\(" -A 2 -B 2

# Look for potential sensitive data handling patterns
echo "Checking for potential sensitive data patterns..."
rg -e "password" -e "secret" -e "token" -e "key" --glob "*.ts" -A 2 -B 2

Length of output: 98300

references/nextjs-realtime/src/app/realtime/[id]/page.tsx (1)

5-11: LGTM: Properly scoped authentication token

The token generation is well-implemented with appropriate scope restrictions, limiting access to only the specific run.

references/nextjs-realtime/src/components/ui/tabs.tsx (2)

10-13: LGTM: Well-typed component implementations

The components are properly typed with React.ElementRef and ComponentPropsWithoutRef, ensuring type safety and proper prop handling.

Also applies to: 25-28, 40-43


31-33: LGTM: Accessible and well-styled components

The implementation maintains accessibility through Radix primitives and proper focus management. The styling is well-organized using the cn utility.

Also applies to: 46-48

packages/react-hooks/src/hooks/useRealtime.ts (2)

112-118: LGTM: Good fix to prevent multiple onComplete calls

The addition of hasCalledOnCompleteRef ensures that the onComplete callback is called exactly once when the subscription completes. This prevents potential issues that could arise from multiple invocations of the callback.


602-602: LGTM: Verify no race conditions with non-awaited stream updates

Removing await from mutateStreamData can improve performance by not blocking the event loop. The throttle queue and ref-based state management should handle concurrent updates correctly.

✅ Verification successful

Let me gather more context about the mutateStreamData usage and implementation to better understand the potential impact.


Let me check the broader context of the streaming implementation to understand the data flow and state management.


Removing await from mutateStreamData is safe in this context

The implementation is well-protected against race conditions because:

  1. Updates are processed through a throttled queue that batches stream updates
  2. State is managed via a ref (existingDataRef) that always contains the latest state
  3. Updates are merged using spread operator, preserving all data
  4. The throttling mechanism (createThrottledQueue) ensures ordered processing of batches
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any race condition handling or ordering requirements in the codebase

# Look for comments or documentation about stream ordering requirements
rg -i "stream.*order|order.*stream" --type ts

# Look for any explicit stream synchronization or sequential processing
ast-grep --pattern 'await $_.stream'

# Look for any race condition related comments
rg -i "race.*condition|concurrent.*update" --type ts

Length of output: 3358


Script:

#!/bin/bash
# Check the implementation and usage of mutateStreamData
ast-grep --pattern 'function mutateStreamData'

# Look for other usages of mutateStreamData
rg 'mutateStreamData' -A 3 --type ts

# Check the implementation of processRealtimeRunWithStreams
ast-grep --pattern 'function processRealtimeRunWithStreams'

Length of output: 865


Script:

#!/bin/bash
# Check the implementation of withStreams and stream data handling
ast-grep --pattern 'withStreams<$_>($$$)'

# Look for stream state management patterns
rg 'StreamResults|nextStreamData' -B 2 -A 2 --type ts

# Check for any throttling or batching logic
rg 'throttle.*stream|batch.*stream' -B 2 -A 2 --type ts

Length of output: 4112

packages/core/src/v3/apiClient/runStream.ts (2)

119-123: Ensure proper cleanup of resources when aborting streams

When creating runStreamInstance through zodShapeStream, you should ensure that resources are properly cleaned up when the stream is aborted. Verify that aborting the abortController in the runShapeStream method correctly stops the underlying stream without any resource leaks.


223-236: ⚠️ Potential issue

Handle JSON parse errors explicitly in stream transformation

In the ElectricStreamSubscription's subscribe method, the safeParseJSON function may return the raw string if parsing fails. This could silently propagate malformed data downstream. Consider adding explicit error handling to manage JSON parse errors.

This comment is similar to a previous one.

packages/core/src/v3/runMetadata/metadataStream.ts (2)

40-46: 🛠️ Refactor suggestion

Ensure proper error handling in initializeServerStream

The fetch request may fail or throw an exception, which isn't currently handled. This could lead to unhandled promise rejections. Consider wrapping the fetch call in a try-catch block or handling errors appropriately.

This comment is similar to a previous one.


27-33: ⚠️ Potential issue

Add error handling when reading from the source iterable

If an error occurs while iterating over this.options.source, it may not be properly propagated, leading to unhandled exceptions. Consider wrapping the for await loop in a try-catch block to handle any errors during iteration.

Apply this diff to handle errors:

start: async (controller) => {
+  try {
    for await (const value of this.options.source) {
      controller.enqueue(value);
    }
+  } catch (error) {
+    controller.error(error);
+  } finally {
+    controller.close();
+  }
},

Likely invalid or redundant comment.

packages/core/src/v3/runMetadata/manager.ts (1)

249-252: ⚠️ Potential issue

Verify potential security implications of including $$streamsBaseUrl in metadata

Storing the $$streamsBaseUrl in metadata may expose internal URLs or sensitive information if the metadata is accessible externally. Ensure that including $$streamsBaseUrl does not pose any security risks.

Comment on lines +14 to +20
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, {
accessToken: accessToken,
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
onComplete: (...args) => {
console.log("Run completed!", args);
},
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add cleanup for stream subscription

Ensure the stream subscription is properly cleaned up when the component unmounts.

+import { useEffect } from 'react';
+
 export default function RealtimeComparison({
   accessToken,
   runId,
 }) {
   const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, {
     accessToken: accessToken,
     baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
     onComplete: (...args) => {
-      console.log("Run completed!", args);
     },
   });
+
+  useEffect(() => {
+    return () => {
+      // Cleanup subscription on unmount
+      stop();
+    };
+  }, [stop]);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, {
accessToken: accessToken,
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
onComplete: (...args) => {
console.log("Run completed!", args);
},
});
import { useEffect } from 'react';
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, {
accessToken: accessToken,
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
onComplete: (...args) => {
},
});
useEffect(() => {
return () => {
// Cleanup subscription on unmount
stop();
};
}, [stop]);

Comment on lines +45 to +88
<div className="flex-grow flex overflow-hidden">
<div className="w-1/2 border-r border-gray-700 overflow-auto">
<table className="w-full table-fixed">
<thead>
<tr className="bg-gray-800">
<th className="w-16 p-2 text-left">ID</th>
<th className="p-2 text-left">Data</th>
</tr>
</thead>
<tbody>
{(streams.openai ?? []).map((part, i) => (
<tr key={i} className="border-b border-gray-700">
<td className="w-16 p-2 truncate">{i + 1}</td>
<td className="p-2">
<div className="font-mono whitespace-nowrap overflow-x-auto">
{JSON.stringify(part)}
</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
<div className="w-1/2 overflow-auto">
<table className="w-full table-fixed">
<thead>
<tr className="bg-gray-800">
<th className="w-16 p-2 text-left">ID</th>
<th className="p-2 text-left">Data</th>
</tr>
</thead>
<tbody>
{(streams.openaiText ?? []).map((text, i) => (
<tr key={i} className="border-b border-gray-700">
<td className="w-16 p-2 truncate">{i + 1}</td>
<td className="p-2">
<div className="font-mono whitespace-nowrap overflow-x-auto">{text}</div>
</td>
</tr>
))}
</tbody>
</table>
</div>
</div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add loading and error states, implement virtualization

Several improvements needed for production readiness:

  1. Add loading state while streams are initializing
  2. Add error handling for stream failures
  3. Implement virtualization for large stream arrays to prevent memory issues

Consider using a virtualized table component like react-window for better performance with large streams:

+import { FixedSizeList } from 'react-window';
+
 <div className="flex-grow flex overflow-hidden">
   <div className="w-1/2 border-r border-gray-700 overflow-auto">
+    {streams.openai === undefined ? (
+      <div className="p-4">Loading...</div>
+    ) : streams.openai.length === 0 ? (
+      <div className="p-4">No data yet</div>
+    ) : (
     <table className="w-full table-fixed">
       {/* ... table header ... */}
-      <tbody>
-        {(streams.openai ?? []).map((part, i) => (
+      <FixedSizeList
+        height={400}
+        width="100%"
+        itemCount={streams.openai.length}
+        itemSize={35}
+      >
+        {({ index, style }) => (
           <tr key={i} className="border-b border-gray-700">
             {/* ... row content ... */}
           </tr>
-        ))}
-      </tbody>
+        )}
+      </FixedSizeList>
     </table>
+    )}
   </div>
   {/* Similar changes for the second table */}
 </div>

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
packages/core/src/v3/apiClient/runStream.ts (1)

283-291: Consider adding URL validation

While the baseUrl handling is good, consider validating the URL format to prevent potential issues.

+    const isValidUrl = (url: string) => {
+      try {
+        new URL(url);
+        return true;
+      } catch {
+        return false;
+      }
+    };
+
     const $baseUrl =
-      typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl;
+      typeof metadata.$$streamsBaseUrl === "string" && isValidUrl(metadata.$$streamsBaseUrl)
+        ? metadata.$$streamsBaseUrl
+        : baseUrl;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7014057 and db4016f.

📒 Files selected for processing (2)
  • packages/core/src/v3/apiClient/runStream.ts (7 hunks)
  • packages/core/src/v3/apiClient/stream.ts (5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/src/v3/apiClient/stream.ts
🔇 Additional comments (4)
packages/core/src/v3/apiClient/runStream.ts (4)

119-126: LGTM: Improved stream lifecycle management

The addition of stopRunShapeStream ensures proper cleanup of stream resources.


304-304: LGTM: Proper stream cleanup implementation

The implementation ensures proper cleanup of stream resources both on completion and unsubscription.

Also applies to: 334-334, 346-346


220-237: ⚠️ Potential issue

Consider handling JSON parse errors explicitly

The JSON parsing implementation could be improved with explicit error handling.

Apply this diff to improve error handling:

-              controller.enqueue(safeParseJSON(line));
+              try {
+                const parsed = safeParseJSON(line);
+                if (typeof parsed === 'string' && line !== parsed) {
+                  console.warn(`Failed to parse JSON for line: ${line}`);
+                }
+                controller.enqueue(parsed);
+              } catch (error) {
+                console.error(`Error processing line: ${line}`, error);
+              }

19-24: Verify stream utility imports

The new imports enhance stream processing capabilities. Let's verify their implementation.

✅ Verification successful

Stream utility imports are correctly implemented

All imported utilities are properly defined in packages/core/src/v3/apiClient/stream.ts:

  • AsyncIterableStream is defined as a type combining AsyncIterable and ReadableStream
  • createAsyncIterableReadable is implemented as a function that creates an AsyncIterableStream
  • LineTransformStream is implemented as a class extending TransformStream
  • zodShapeStream is implemented as a function for schema-based stream processing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the implementation of stream utilities
# Check if the stream.js file exists and contains the expected exports
rg -A 5 "export.*(?:AsyncIterableStream|createAsyncIterableReadable|LineTransformStream|zodShapeStream)"

Length of output: 2561

@ericallam ericallam changed the title Fix streaming splits in realtime streams v2 Upgrades and fixes to Realtime and Realtime streams Dec 13, 2024
@ericallam ericallam merged commit 6b355ab into main Dec 13, 2024
9 checks passed
@ericallam ericallam deleted the fix-streaming-splits-v2 branch December 13, 2024 11:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant