diff --git a/integration_tests/big-query-sink/create_sink.sql b/integration_tests/big-query-sink/create_sink.sql index 3b8ed3b3ef9d8..94e33fc4bafc2 100644 --- a/integration_tests/big-query-sink/create_sink.sql +++ b/integration_tests/big-query-sink/create_sink.sql @@ -4,10 +4,10 @@ FROM bhv_mv WITH ( connector = 'bigquery', type = 'append-only', - bigquery.local.path= '/gcp-rwctest.json', - bigquery.project= 'rwctest', - bigquery.dataset= 'bqtest', - bigquery.table= 'bq_sink', + bigquery.local.path= '/home/xxhx/winter-dynamics-383822-9690ac19ce78.json', + bigquery.project= 'winter-dynamics-383822', + bigquery.dataset= 'test_bigquery_sink', + bigquery.table= 'tabl31', force_append_only='true' ); diff --git a/integration_tests/big-query-sink/create_source.sql b/integration_tests/big-query-sink/create_source.sql index bfc49aee69ce4..39ecc4591e2ad 100644 --- a/integration_tests/big-query-sink/create_source.sql +++ b/integration_tests/big-query-sink/create_source.sql @@ -11,6 +11,6 @@ CREATE table user_behaviors ( connector = 'datagen', fields.user_id.kind = 'sequence', fields.user_id.start = '1', - fields.user_id.end = '100', - datagen.rows.per.second = '10' + fields.user_id.end = '10000', + datagen.rows.per.second = '1000' ) FORMAT PLAIN ENCODE JSON; diff --git a/src/connector/src/sink/big_query.rs b/src/connector/src/sink/big_query.rs index f3933ff860294..2317ef4461087 100644 --- a/src/connector/src/sink/big_query.rs +++ b/src/connector/src/sink/big_query.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::mem; use core::time::Duration; use std::collections::HashMap; use std::sync::Arc; @@ -32,7 +33,7 @@ use google_cloud_googleapis::cloud::bigquery::storage::v1::{ }; use google_cloud_pubsub::client::google_cloud_auth; use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile; -use prost_reflect::MessageDescriptor; +use prost_reflect::{FieldDescriptor, MessageDescriptor}; use prost_types::{ field_descriptor_proto, DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet, @@ -42,7 +43,7 @@ use risingwave_common::buffer::Bitmap; use risingwave_common::catalog::Schema; use risingwave_common::types::DataType; use serde_derive::Deserialize; -use serde_with::serde_as; +use serde_with::{serde_as, DisplayFromStr}; use url::Url; use uuid::Uuid; use with_options::WithOptions; @@ -77,6 +78,9 @@ pub struct BigQueryCommon { pub dataset: String, #[serde(rename = "bigquery.table")] pub table: String, + #[serde(rename = "bigquery.max_batch_rows", default = "default_max_batch_rows")] + #[serde_as(as = "DisplayFromStr")] + pub max_batch_rows: usize, } fn default_max_batch_rows() -> usize { @@ -311,6 +315,9 @@ pub struct BigQuerySinkWriter { writer_pb_schema: ProtoSchema, message_descriptor: MessageDescriptor, write_stream: String, + proto_field: Option, + write_rows: Vec, + write_rows_count: usize, } impl TryFrom for BigQuerySink { @@ -366,6 +373,14 @@ impl BigQuerySinkWriter { &config.common.table )) })?; + let proto_field = if !is_append_only { + let proto_field = message_descriptor + .get_field_by_name(CHANGE_TYPE) + .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE)))?; + Some(proto_field) + }else{ + None + }; let row_encoder = ProtoEncoder::new( schema.clone(), None, @@ -384,13 +399,16 @@ impl BigQuerySinkWriter { is_append_only, row_encoder, message_descriptor, + proto_field, writer_pb_schema: ProtoSchema { proto_descriptor: Some(descriptor_proto), }, + write_rows: vec![], + write_rows_count: 0, }) } - async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { + fn append_only(&mut self, chunk: StreamChunk) -> Result>> { let mut serialized_rows: Vec> = Vec::with_capacity(chunk.capacity()); for (op, row) in chunk.rows() { if op != Op::Insert { @@ -399,38 +417,28 @@ impl BigQuerySinkWriter { serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?) } - let rows = AppendRowsRequestRows::ProtoRows(ProtoData { - writer_schema: Some(self.writer_pb_schema.clone()), - rows: Some(ProtoRows { serialized_rows }), - }); - self.client - .append_rows(vec![rows], self.write_stream.clone()) - .await?; - Ok(()) + Ok(serialized_rows) } - async fn upsert(&mut self, chunk: StreamChunk) -> Result<()> { + fn upsert(&mut self, chunk: StreamChunk) -> Result>> { let mut serialized_rows: Vec> = Vec::with_capacity(chunk.capacity()); for (op, row) in chunk.rows() { + if op == Op::UpdateDelete { + continue; + } let mut pb_row = self.row_encoder.encode(row)?; - let proto_field = self - .message_descriptor - .get_field_by_name(CHANGE_TYPE) - .ok_or_else(|| { - SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE)) - })?; match op { Op::Insert => pb_row .message .try_set_field( - &proto_field, + &self.proto_field.as_ref().unwrap(), prost_reflect::Value::String("INSERT".to_string()), ) .map_err(|e| SinkError::BigQuery(e.into()))?, Op::Delete => pb_row .message .try_set_field( - &proto_field, + &self.proto_field.as_ref().unwrap(), prost_reflect::Value::String("DELETE".to_string()), ) .map_err(|e| SinkError::BigQuery(e.into()))?, @@ -438,7 +446,7 @@ impl BigQuerySinkWriter { Op::UpdateInsert => pb_row .message .try_set_field( - &proto_field, + &self.proto_field.as_ref().unwrap(), prost_reflect::Value::String("UPSERT".to_string()), ) .map_err(|e| SinkError::BigQuery(e.into()))?, @@ -446,12 +454,14 @@ impl BigQuerySinkWriter { serialized_rows.push(pb_row.ser_to()?) } - let rows = AppendRowsRequestRows::ProtoRows(ProtoData { - writer_schema: Some(self.writer_pb_schema.clone()), - rows: Some(ProtoRows { serialized_rows }), - }); + Ok(serialized_rows) + } + + async fn write_rows(&mut self) -> Result<()> { + let rows = mem::take(&mut self.write_rows); + self.write_rows_count = 0; self.client - .append_rows(vec![rows], self.write_stream.clone()) + .append_rows(rows, self.write_stream.clone()) .await?; Ok(()) } @@ -460,14 +470,27 @@ impl BigQuerySinkWriter { #[async_trait] impl SinkWriter for BigQuerySinkWriter { async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> { - if self.is_append_only { - self.append_only(chunk).await + let serialized_rows = if self.is_append_only { + self.append_only(chunk)? } else { - self.upsert(chunk).await + self.upsert(chunk)? + }; + self.write_rows_count += serialized_rows.len(); + let rows = AppendRowsRequestRows::ProtoRows(ProtoData { + writer_schema: Some(self.writer_pb_schema.clone()), + rows: Some(ProtoRows { serialized_rows }), + }); + self.write_rows.push(rows); + + if self.write_rows_count >= self.config.common.max_batch_rows { + self.write_rows().await?; } + + Ok(()) } async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> { + self.write_rows().await?; Ok(()) } @@ -521,27 +544,29 @@ impl StorageWriterClient { rows: Vec, write_stream: String, ) -> Result<()> { - let trace_id = Uuid::new_v4().hyphenated().to_string(); let append_req: Vec = rows .into_iter() - .map(|row| AppendRowsRequest { + .map(|row| + AppendRowsRequest { write_stream: write_stream.clone(), offset: None, - trace_id: trace_id.clone(), + trace_id: Uuid::new_v4().hyphenated().to_string(), missing_value_interpretations: HashMap::default(), rows: Some(row), }) .collect(); - let resp = self + let mut resp = self .client .append_rows(Request::new(tokio_stream::iter(append_req))) .await .map_err(|e| SinkError::BigQuery(e.into()))? - .into_inner() + .into_inner(); + while let Some(i) = resp .message() .await - .map_err(|e| SinkError::BigQuery(e.into()))?; - if let Some(i) = resp { + .map_err(|e| SinkError::BigQuery(e.into()))? + { + println!("123"); if !i.row_errors.is_empty() { return Err(SinkError::BigQuery(anyhow::anyhow!( "Insert error {:?}", diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 9a4dcc25a0bcb..b287bcd6aa4b4 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -17,6 +17,10 @@ BigQueryConfig: - name: bigquery.table field_type: String required: true + - name: bigquery.max_batch_rows + field_type: usize + required: false + default: '1024' - name: region field_type: String required: false