Skip to content

Sync streams #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Sync streams #307

wants to merge 7 commits into from

Conversation

simolus3
Copy link
Contributor

This is a work-in-progress PR for sync streams.

Sync streams essentially consist of two features:

  1. A new way to define sync rules in a way that only requires a single definition instead of parameter and data queries (the internal "Subqueries / New Sync Streams Syntax" document).
  2. The ability for clients to explicitly subscribe to sync streams (the internal "Sync Streams" document).

This PR implements a small subset of feature 1 by desugaring the unified query into the parameter / data queries we have today. That seemed like the easiest way to get started on this, but I think it's not a good approach to handle subqueries (which are not implemented in this PR). I will open a follow-up PR to implement the new syntax in a clean structure able to handle that better.

Instead, most of this PR is related to protocol changes enabling the second feature. While most of the subscription management happens on the client, there are two fundamental changes:

  1. Clients need to know why they're receiving a bucket: We want clients to be able to track progress (and also provide the usual hasSynced / lastSyncedAt fields) for individual stream subscriptions. Since progress is ultimately attached to buckets, clients need enough information to associate buckets with stream subscriptions.
  2. Bucket queriers can yield the same bucket multiple times: Since clients can subscribe to the same stream with different parameters, it's possible to subscribe in a way that causes streams to have overlapping results. Consider for instance a stream defined as SELECT * FROM assets WHERE id IN (request.args() -> 'asset_ids') and a client subscribing once with {'asset_id': [1]} and another time with {'asset_id': [1, 2]}. Here, both subscriptions yield the bucket assets[id=1]. We obviously don't want the bucket to be included in the checkpoint multiple times, but clients need to know that progress on bucket assets[id=1] contributes to both stream subscriptions.

Most of the logic related to the first requirement is implemented in the sync-rules package:

  1. In SqlSyncRules.getBucketParameterQuerier, we take an additional array of explicit stream subscriptions. When we find a matching stream, we evaluate parameter queries for each of those subscriptions. Otherwise, we evaluate the default subscription if the stream has one.
  2. Each stream subscription has an opaque id, which is internal to a sync stream (so, as far as the service is concerned, stateless), and generated by the client. When a bucket is found as part of a querier, we either:
  • remember that the bucket was generated by a legacy sync rule or a default stream
  • or otherwise, include the opaque id of the stream that resulted in that bucket.

In service-core, we:

  1. Forward stream subscriptions from the client to sync-rules.
  2. Merge multiple buckets created from different stream subscriptions into one if they're duplicate.
  • TODO: Check if it makes sense to do this in sync-rules directly - it's a bit easier to do it when building checksums because we need access to all buckets (static and dynamic) to do this.
  1. Apply subscription options, like a custom bucket priority for buckets within a subscription.

This is still missing tests, but I wanted to get it out for an initial discussion.

Copy link

changeset-bot bot commented Jul 21, 2025

⚠️ No Changeset found

Latest commit: 81d2ddd

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant