Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sink from Vector to Risingwave #21308

Open
lmatz opened this issue Sep 18, 2024 · 6 comments
Open

feat: sink from Vector to Risingwave #21308

lmatz opened this issue Sep 18, 2024 · 6 comments
Labels
sink: new A request for a new sink type: feature A value-adding code addition that introduce new functionality.

Comments

@lmatz
Copy link

lmatz commented Sep 18, 2024

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Use Cases

The user can use vector.dev as the source of RW to easily re-ingest/replay events into Risingwave on demand.

Risingwave is a streaming database that tries to be compatible with PostgreSQL as much as possible.

We have a working example in RW's repo: https://github.com/risingwavelabs/risingwave/tree/main/integration_tests/vector

Attempted Solutions

No response

Proposal

RW has forked Vector in https://github.com/risingwavelabs/vector.

The way the sink inserts data into RW is done via the "insert into" statement: https://github.com/risingwavelabs/vector/blob/f9c186f01b1a84ac402b6657e48d83e7af01b0c4/src/sinks/risingwave/service.rs#L133

Then it issues a flush command, which is a special command in RW, to ask RW to commit the data just inserted. https://github.com/risingwavelabs/vector/blob/f9c186f01b1a84ac402b6657e48d83e7af01b0c4/src/sinks/risingwave/service.rs#L145
After flush successfully returns, in case of a failure of RW, the data will still be stored in RW after recovery.

We have verified this in RW's customers' production environment.

References

No response

Version

No response

@lmatz lmatz added the type: feature A value-adding code addition that introduce new functionality. label Sep 18, 2024
@lmatz lmatz changed the title feat: sink from vector to Risingwave feat: sink from Vector to Risingwave Sep 18, 2024
@jorgehermo9
Copy link
Contributor

jorgehermo9 commented Sep 18, 2024

If risingwave aims to be postgres compatible, can’t this be done with the postgres sink?

I’ve opened a PR for that #21248

Also, it seems that your sink encodes the payload as Bytes, it does not support structured data in arbitrary table schemas, right? With the use of jsonb_populate_record (https://docs.risingwave.com/docs/current/sql-function-json/#jsonb_populate_record) as I did in the postgres sink, will allow to support structured events easier.

Does it make sense to have a completely separate sink if risingwave seems postgres-compatible @jszwedko ?

@lmatz
Copy link
Author

lmatz commented Sep 18, 2024

Thank you @jorgehermo9 for the pointer! I agree that almost all the logic in #21248 can be and should be reused for RW.

By using the approach in PR #21248, in RW we can do the following:

dev=> CREATE TABLE IF NOT EXISTS t (host text, message text, payload jsonb);
CREATE_TABLE

dev=> insert into t SELECT * FROM jsonb_populate_record(
dev(> null::struct<host varchar, message varchar, payload jsonb>,
dev(> '{"host": "abc", "message": "bcd", "payload": {"d": 4, "e": "zzz"}}');
INSERT 0 1

dev=> select * from t;
 host | message |       payload        
------+---------+----------------------
 abc  | bcd     | {"d": 4, "e": "zzz"}
(1 row)

Unfortunately,

jsonb_populate_record(null::{table}, ...)

is not supported in RW yet. But we can also support it in RW if this is required by Vector's PostgreSQL sink.

Another difference between RW and Postgres is that RW needs a flush to make data durable while we just wait for the insert statement to succeed in PG. Without flush, I wonder if we may lose some data in case of the failure, and breaking the At-least-once delivery semantics, and achieve best-effort only.

Instead of issuing an explicit flush command each time when we insert, we can also set the session variable https://docs.risingwave.com/docs/next/sql-set-rw-implicit-flush/ to make it automatic. Maybe we can add this before inserting along a special code path for RW? (Default is false, in this case we want this session variable to be true)

@lmatz
Copy link
Author

lmatz commented Sep 22, 2024

risingwavelabs/risingwave#18601

We will support jsonb_populate_record PG's syntax properly

@jszwedko jszwedko added the sink: new A request for a new sink label Sep 23, 2024
@jorgehermo9
Copy link
Contributor

jorgehermo9 commented Sep 24, 2024

Hi @lmatz , sorry I couldn't answer earlier.

Another difference between RW and Postgres is that RW needs a flush to make data durable while we just wait for the insert statement to succeed in PG. Without flush, I wonder if we may lose some data in case of the failure, and breaking the At-least-once delivery semantics, and achieve best-effort only.

I could do that in my opened PR... But maybe it makes sense to merge my PR without that (so it is not bloated with lot of changes) and once it is stabilized, I can submit another PR addressing the implicit flush for RW via the sink config. What do you think?

We will support jsonb_populate_record PG's syntax properly

Thats very good news! Please, note that in my PR I use jsonb_populate_recordset, as I work with batches of events within the same INSERT and not one insert per event. Also, note that the PR is not reviewed yet and maybe it is not ok to use jsonb_populate_recordset, I don't really know. For example, a whole batch of events will fail if only one of the records fails (hats postgres behaviour), although one could configure batch.max_events: 1. Hope I'm explaining it well. At least, in my opinion, using jsonb_populate_recordset is fine

If you need some help about that, I'm willingly to help (both Vector or RW side). Although I'm not familiar with RW code, I always wanted to contribute to it and this seems like a good chance. I may take a look these days if I'm bored enough 😄

@jszwedko
Copy link
Member

If it is possible to reuse the PostgreSQL sink that is in-flight, that would be easier for us to maintain (the less code the better 😄). Would there be any downsides to that approach?

@jorgehermo9
Copy link
Contributor

jorgehermo9 commented Sep 28, 2024

I think once the postgres sink stabilizes, we can add a sink config like flavour: postgres (default) and flavour: risingwave to handle the differences between those services. For example, as @lmatz stated, we need that implicit flush statement before inserting in risingwave. This could be useful for other flavours that need specific behaviour (I'm thinking about Timescale and etc)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sink: new A request for a new sink type: feature A value-adding code addition that introduce new functionality.
Projects
None yet
Development

No branches or pull requests

3 participants