Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): implement snowflake sink #15429

Merged
merged 40 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
82c03f9
basic structure for snowflake sink
xzhseh Mar 4, 2024
03c5aa0
fix format
xzhseh Mar 6, 2024
4f7cac9
Merge branch 'main' into xzhseh/snowflake-sink
xzhseh Mar 6, 2024
e9da466
update snowflake common
xzhseh Mar 7, 2024
805b6a2
add snowflake_connector.rs
xzhseh Mar 7, 2024
f0657e2
add snowflake inserter (and builder)
xzhseh Mar 8, 2024
b6bdd34
update license
xzhseh Mar 8, 2024
827a1d9
add snowflake http client
xzhseh Mar 8, 2024
e19c3ea
update fmt
xzhseh Mar 8, 2024
53ef2c5
remove redundant import
xzhseh Mar 8, 2024
25d3aef
add jwt_token auto-generation
xzhseh Mar 9, 2024
fb0cade
add SnowflakeS3Client
xzhseh Mar 11, 2024
cd4168c
update SnowflakeSinkWriter
xzhseh Mar 11, 2024
7a9fdf9
set three SinkWriter functions to return Ok
xzhseh Mar 11, 2024
db090a9
add log sinker
xzhseh Mar 11, 2024
95310bf
basic sink funtionality with json encoder
xzhseh Mar 12, 2024
e46b51c
add comments && update sink_to_s3
xzhseh Mar 12, 2024
cd6f587
add file num to send_request
xzhseh Mar 12, 2024
4caa11f
fix typo
xzhseh Mar 12, 2024
00d548d
add aws credentials to prevent load_from_env
xzhseh Mar 12, 2024
805c44f
enable basic snowflake sink pipeline
xzhseh Mar 12, 2024
5b26ccd
improve format
xzhseh Mar 12, 2024
8bc0bf4
update comment
xzhseh Mar 12, 2024
ce20818
fix check
xzhseh Mar 12, 2024
b7fa1bc
fix fmt
xzhseh Mar 12, 2024
5c7ac05
update fmt
xzhseh Mar 12, 2024
287fa2c
make max_batch_row_num configurable
xzhseh Mar 18, 2024
4c10695
update fmt
xzhseh Mar 18, 2024
ccfd638
sink payload when checkpoint barrier comes
xzhseh Mar 18, 2024
7ddbdf8
update fmt
xzhseh Mar 18, 2024
3fc56b9
use epoch with file_suffix as the unique identifier for sink file to s3
xzhseh Mar 18, 2024
9ceab04
update validate to ensure append-only
xzhseh Mar 18, 2024
af53821
support s3_path for configuration
xzhseh Mar 18, 2024
d877b14
udpate fmt
xzhseh Mar 18, 2024
dbc468a
update comments
xzhseh Apr 4, 2024
426cb61
add reference to snowpipe rest api
xzhseh Apr 5, 2024
4b257d5
update error msg & comments
xzhseh Apr 9, 2024
6373fdc
Merge branch 'main' into xzhseh/snowflake-sink
xzhseh Apr 9, 2024
583964a
update with_options_sink accordingly
xzhseh Apr 10, 2024
9e52dc2
use uuid to ensure the global uniqueness of file suffix
xzhseh Apr 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ icelake = { workspace = true }
indexmap = { version = "1.9.3", features = ["serde"] }
itertools = { workspace = true }
jni = { version = "0.21.1", features = ["invocation"] }
jsonwebtoken = "9.2.0"
jst = { package = 'jsonschema-transpiler', git = "https://github.com/mozilla/jsonschema-transpiler", rev = "c1a89d720d118843d8bcca51084deb0ed223e4b4" }
maplit = "1.0.2"
moka = { version = "0.12", features = ["future"] }
Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod nats;
pub mod pulsar;
pub mod redis;
pub mod remote;
pub mod snowflake;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May have a separate folder snowflake to hold the files related to snowflake.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we only have snowflake.rs for the core sinking logic, plus snowflake_connector.rs for the helper clients (i.e., rest api client, s3 client) implementations - let's keep it simple at present, and move things around when it gets bigger in the future.

pub mod snowflake_connector;
pub mod starrocks;
pub mod test_sink;
pub mod trivial;
Expand Down Expand Up @@ -91,6 +93,7 @@ macro_rules! for_all_sinks {
{ HttpJava, $crate::sink::remote::HttpJavaSink },
{ Doris, $crate::sink::doris::DorisSink },
{ Starrocks, $crate::sink::starrocks::StarrocksSink },
{ Snowflake, $crate::sink::snowflake::SnowflakeSink },
{ DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
{ BigQuery, $crate::sink::big_query::BigQuerySink },
{ Test, $crate::sink::test_sink::TestSink },
Expand Down Expand Up @@ -538,6 +541,8 @@ pub enum SinkError {
),
#[error("Starrocks error: {0}")]
Starrocks(String),
#[error("Snowflake error: {0}")]
Snowflake(String),
#[error("Pulsar error: {0}")]
Pulsar(
#[source]
Expand Down
337 changes: 337 additions & 0 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_json::Value;
use serde_with::serde_as;
use uuid::Uuid;
use with_options::WithOptions;

use super::encoder::{
JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam};
use crate::sink::writer::SinkWriterExt;
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const SNOWFLAKE_SINK: &str = "snowflake";

#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct SnowflakeCommon {
/// The snowflake database used for sinking
#[serde(rename = "snowflake.database")]
pub database: String,

/// The corresponding schema where sink table exists
#[serde(rename = "snowflake.schema")]
pub schema: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we query the schema from snowflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a solution to query schema from snowflow, but the schema is just the schema name, which will be used to identify which pipe to sink to when sending insertFiles post request.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can access the table schema via https://docs.snowflake.com/en/sql-reference/info-schema/tables selecting from information_schema.tables IIUC.

Copy link
Contributor Author

@xzhseh xzhseh Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the ability to query the schema from external REST API request? 🤔
This is more like a user-side query IIUC.

Copy link
Contributor

@neverchanje neverchanje Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have to use a snowflake rust driver, like https://docs.rs/snowflake-api/latest/snowflake_api/, which is based on SQL REST API https://docs.rs/snowflake-api/latest/snowflake_api/, and issue a query to information_schema.tables

    let mut api = SnowflakeApi::with_password_auth(
        "ACCOUNT_IDENTIFIER",
        Some("WAREHOUSE"),
        Some("DATABASE"),
        Some("SCHEMA"),
        "USERNAME",
        Some("ROLE"),
        "PASSWORD",
    )?;
    let res = api.exec("select * from information_schema.tables").await?;

It's just an idea. I never tried it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this (query the schema from external REST API request) can be done by the same credentials provided below.

Asking users to provide a schema also LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asking users to provide a schema also LGTM.

Provide an entire schema seems impossible, the schema here is for insertFiles to correctly find the corresponding database.schema.pipe to sink.


/// The created pipe object, will be used as `insertFiles` target
#[serde(rename = "snowflake.pipe")]
pub pipe: String,

/// The unique, snowflake provided `account_identifier`
/// NOTE: please use the form `<orgname>-<account_name>`
/// For detailed guidance, reference: <https://docs.snowflake.com/en/user-guide/admin-account-identifier>
#[serde(rename = "snowflake.account_identifier")]
pub account_identifier: String,

/// The user that owns the table to be sinked
/// NOTE: the user should've been granted corresponding *role*
/// reference: <https://docs.snowflake.com/en/sql-reference/sql/grant-role>
#[serde(rename = "snowflake.user")]
pub user: String,

/// The public key fingerprint used when generating custom `jwt_token`
/// reference: <https://docs.snowflake.com/en/developer-guide/sql-api/authenticating>
#[serde(rename = "snowflake.rsa_public_key_fp")]
pub rsa_public_key_fp: String,

/// The rsa pem key *without* encryption
#[serde(rename = "snowflake.private_key")]
pub private_key: String,

/// The s3 bucket where intermediate sink files will be stored
#[serde(rename = "snowflake.s3_bucket")]
pub s3_bucket: String,
xzhseh marked this conversation as resolved.
Show resolved Hide resolved

/// The optional s3 path to be specified
/// the actual file location would be `<s3_bucket>://<s3_path>/<rw_auto_gen_file_name>`
/// if this field is specified by user(s)
/// otherwise it would be `<s3_bucket>://<rw_auto_gen_file_name>`
#[serde(rename = "snowflake.s3_path")]
pub s3_path: Option<String>,

/// s3 credentials
#[serde(rename = "snowflake.aws_access_key_id")]
pub aws_access_key_id: String,

/// s3 credentials
#[serde(rename = "snowflake.aws_secret_access_key")]
pub aws_secret_access_key: String,

/// The s3 region, e.g., us-east-2
#[serde(rename = "snowflake.aws_region")]
pub aws_region: String,

/// The configurable max row(s) to batch,
/// which should be *explicitly* specified by user(s)
#[serde(rename = "snowflake.max_batch_row_num")]
pub max_batch_row_num: String,
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct SnowflakeConfig {
#[serde(flatten)]
pub common: SnowflakeCommon,
}

impl SnowflakeConfig {
pub fn from_hashmap(properties: HashMap<String, String>) -> Result<Self> {
let config =
serde_json::from_value::<SnowflakeConfig>(serde_json::to_value(properties).unwrap())
.map_err(|e| SinkError::Config(anyhow!(e)))?;
Ok(config)
}
}

#[derive(Debug)]
pub struct SnowflakeSink {
pub config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
}

impl Sink for SnowflakeSink {
type Coordinator = DummySinkCommitCoordinator;
type LogSinker = LogSinkerOf<SnowflakeSinkWriter>;

const SINK_NAME: &'static str = SNOWFLAKE_SINK;

async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
Ok(SnowflakeSinkWriter::new(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
)
.await
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
if !self.is_append_only {
return Err(SinkError::Config(
anyhow!("SnowflakeSink only supports append-only mode at present, please change the query to append-only, or use `force_append_only = 'true'`")
));
}
Ok(())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like only append-only is supported, please check it here. Schema, conn, and other issues at create time, we can also check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema may not be possible to check at present, but let me do some further investigations.

}
}

impl TryFrom<SinkParam> for SnowflakeSink {
type Error = SinkError;

fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
let schema = param.schema();
let config = SnowflakeConfig::from_hashmap(param.properties)?;
Ok(SnowflakeSink {
config,
schema,
pk_indices: param.downstream_pk,
is_append_only: param.sink_type.is_append_only(),
})
}
}

pub struct SnowflakeSinkWriter {
config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
/// the client used to send `insertFiles` post request
http_client: SnowflakeHttpClient,
/// the client to insert file to external storage (i.e., s3)
s3_client: SnowflakeS3Client,
row_encoder: JsonEncoder,
row_counter: u32,
payload: String,
fuyufjh marked this conversation as resolved.
Show resolved Hide resolved
/// the threshold for sinking to s3
max_batch_row_num: u32,
/// The current epoch, used in naming the sink files
/// mainly used for debugging purpose
epoch: u64,
}

impl SnowflakeSinkWriter {
pub async fn new(
config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Self {
let http_client = SnowflakeHttpClient::new(
config.common.account_identifier.clone(),
config.common.user.clone(),
config.common.database.clone(),
config.common.schema.clone(),
config.common.pipe.clone(),
config.common.rsa_public_key_fp.clone(),
config.common.private_key.clone(),
HashMap::new(),
config.common.s3_path.clone(),
);

let s3_client = SnowflakeS3Client::new(
xzhseh marked this conversation as resolved.
Show resolved Hide resolved
config.common.s3_bucket.clone(),
config.common.s3_path.clone(),
config.common.aws_access_key_id.clone(),
config.common.aws_secret_access_key.clone(),
config.common.aws_region.clone(),
)
.await;

let max_batch_row_num = config
.common
.max_batch_row_num
.clone()
.parse::<u32>()
.expect("failed to parse `snowflake.max_batch_row_num` as a `u32`");

Self {
config,
schema: schema.clone(),
pk_indices,
is_append_only,
http_client,
s3_client,
row_encoder: JsonEncoder::new(
schema,
None,
super::encoder::DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
),
row_counter: 0,
payload: String::new(),
max_batch_row_num,
// initial value of `epoch` will start from 0
epoch: 0,
}
}

/// reset the `payload` and `row_counter`.
/// shall *only* be called after a successful sink.
fn reset(&mut self) {
self.payload.clear();
self.row_counter = 0;
}

fn at_sink_threshold(&self) -> bool {
self.row_counter >= self.max_batch_row_num
}

fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
for (op, row) in chunk.rows() {
assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
self.payload.push_str(&row_json_string);
self.row_counter += 1;
}
Ok(())
}

fn update_epoch(&mut self, epoch: u64) {
self.epoch = epoch;
}

/// generate a *global unique* uuid,
/// which is the key to the uniqueness of file suffix.
fn gen_uuid() -> Uuid {
Uuid::new_v4()
}

/// construct the *global unique* file suffix for the sink.
/// note: this is unique even across multiple parallel writer(s).
fn file_suffix(&self) -> String {
// the format of suffix will be <epoch>_<uuid>
format!("{}_{}", self.epoch, Self::gen_uuid())
}

/// sink `payload` to s3, then trigger corresponding `insertFiles` post request
/// to snowflake, to finish the overall sinking pipeline.
async fn sink_payload(&mut self) -> Result<()> {
if self.payload.is_empty() {
return Ok(());
}
// todo: change this to streaming upload
// first sink to the external stage provided by user (i.e., s3)
self.s3_client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use streaming upload instead of buffering the data by ourselves? We can use the streaming upload of opendal or the streaming upload implemented by ourselves with the aws sdk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep I think streaming upload is possible - a possible implementation would probably be something like this: https://gist.github.com/ivormetcalf/f2b8e6abfece4328c86ad1ee34363caf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is no need to reimplement it again. In our object store crate, we have implemented streaming upload for both aws s3 sdk and opendal. For simplicity we can use opendal. You may see the implementation in the following code.

async fn streaming_upload(&self, path: &str) -> ObjectResult<BoxedStreamingUploader> {

async fn streaming_upload(&self, path: &str) -> ObjectResult<BoxedStreamingUploader> {

.sink_to_s3(self.payload.clone().into(), self.file_suffix())
.await?;
// then trigger `insertFiles` post request to snowflake
self.http_client.send_request(self.file_suffix()).await?;
// reset `payload` & `row_counter`
self.reset();
Ok(())
}
}

#[async_trait]
impl SinkWriter for SnowflakeSinkWriter {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
self.update_epoch(epoch);
Ok(())
}

async fn abort(&mut self) -> Result<()> {
Ok(())
}

async fn update_vnode_bitmap(&mut self, _vnode_bitmap: Arc<Bitmap>) -> Result<()> {
Ok(())
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
if is_checkpoint {
// sink all the row(s) currently batched in `self.payload`
self.sink_payload().await?;
}
Ok(())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, what if a checkpoint barrier comes and there is still some rows in buffer payload? IIUC, you must commit these upon checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, thanks for pointing this out!

Copy link
Contributor Author

@xzhseh xzhseh Mar 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our ckpt interval is relatively short, it may not meet the max_batch_row_num specified by user(s), thus we may need to explicitly mention this in document.

As an example, sink may be triggered by ckpt per ~100 row(s) on average, so even if user set the max_batch_row_num greater than 100 (e.g., to 1000), the actual number of rows being sent to S3 may not fit this threshold.

Below is an example figure from the copy history of snowflake side.

CleanShot 2024-03-18 at 16 10 25@2x

Copy link
Member

@fuyufjh fuyufjh Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that is expected.

Also, this is exactly the case to be solved by log store aka. sink decouple. May ask @wenym1 for more details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can following the similar implementation from iceberg #15634

}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
self.append_only(chunk)?;

// When the number of row exceeds `MAX_BATCH_ROW_NUM`
if self.at_sink_threshold() {
self.sink_payload().await?;
}

Ok(())
}
}
Loading
Loading