From 994e48ffe41bd8aa99f3ece37afe68f104165ff3 Mon Sep 17 00:00:00 2001 From: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> Date: Wed, 26 Jun 2024 15:27:14 +0800 Subject: [PATCH] feat(sink): support async for pubsub and nats (#17358) --- src/connector/src/sink/google_pubsub.rs | 76 ++++++++++++++++++------- src/connector/src/sink/nats.rs | 57 +++++++++++-------- 2 files changed, 89 insertions(+), 44 deletions(-) diff --git a/src/connector/src/sink/google_pubsub.rs b/src/connector/src/sink/google_pubsub.rs index ad039ad02070..a01daa59c127 100644 --- a/src/connector/src/sink/google_pubsub.rs +++ b/src/connector/src/sink/google_pubsub.rs @@ -15,6 +15,10 @@ use std::collections::BTreeMap; use anyhow::{anyhow, Context}; +use futures::future::try_join_all; +use futures::prelude::future::FutureExt; +use futures::prelude::TryFuture; +use futures::TryFutureExt; use google_cloud_gax::conn::Environment; use google_cloud_googleapis::pubsub::v1::PubsubMessage; use google_cloud_pubsub::apiv1; @@ -22,7 +26,7 @@ use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile use google_cloud_pubsub::client::google_cloud_auth::project; use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider; use google_cloud_pubsub::client::{Client, ClientConfig}; -use google_cloud_pubsub::publisher::Publisher; +use google_cloud_pubsub::publisher::{Awaiter, Publisher}; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::session_config::sink_decouple::SinkDecouple; @@ -42,6 +46,20 @@ use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, Sink use crate::dispatch_sink_formatter_str_key_impl; pub const PUBSUB_SINK: &str = "google_pubsub"; +const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; + +fn may_delivery_future(awaiter: Vec) -> GooglePubSubSinkDeliveryFuture { + try_join_all(awaiter.into_iter().map(|awaiter| { + awaiter.get().map(|result| { + result + .context("Google Pub/Sub sink error") + .map_err(SinkError::GooglePubSub) + .map(|_| ()) + }) + })) + .map_ok(|_: Vec<()>| ()) + .boxed() +} #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] @@ -130,7 +148,7 @@ impl Sink for GooglePubSubSink { self.sink_from_name.clone(), ) .await? - .into_log_sinker(usize::MAX)) + .into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE)) } } @@ -157,10 +175,15 @@ impl TryFrom for GooglePubSubSink { } } -struct GooglePubSubPayloadWriter { - publisher: Publisher, +struct GooglePubSubPayloadWriter<'w> { + publisher: &'w mut Publisher, + message_vec: Vec, + add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>, } +pub type GooglePubSubSinkDeliveryFuture = + impl TryFuture + Unpin + 'static; + impl GooglePubSubSinkWriter { pub async fn new( config: GooglePubSubConfig, @@ -231,35 +254,51 @@ impl GooglePubSubSinkWriter { .await?; let publisher = topic.new_publisher(None); - let payload_writer = GooglePubSubPayloadWriter { publisher }; Ok(Self { - payload_writer, formatter, + publisher, }) } } pub struct GooglePubSubSinkWriter { - payload_writer: GooglePubSubPayloadWriter, formatter: SinkFormatterImpl, + publisher: Publisher, } impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter { + type DeliveryFuture = GooglePubSubSinkDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - dispatch_sink_formatter_str_key_impl!( - &self.formatter, - formatter, - self.payload_writer.write_chunk(chunk, formatter).await - ) + let mut payload_writer = GooglePubSubPayloadWriter { + publisher: &mut self.publisher, + message_vec: Vec::with_capacity(chunk.cardinality()), + add_future, + }; + dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, { + payload_writer.write_chunk(chunk, formatter).await + })?; + payload_writer.finish().await } } -impl FormattedSink for GooglePubSubPayloadWriter { +impl<'w> GooglePubSubPayloadWriter<'w> { + pub async fn finish(&mut self) -> Result<()> { + let message_vec = std::mem::take(&mut self.message_vec); + let awaiters = self.publisher.publish_bulk(message_vec).await; + self.add_future + .add_future_may_await(may_delivery_future(awaiters)) + .await?; + Ok(()) + } +} + +impl<'w> FormattedSink for GooglePubSubPayloadWriter<'w> { type K = String; type V = Vec; @@ -272,13 +311,8 @@ impl FormattedSink for GooglePubSubPayloadWriter { ordering_key, ..Default::default() }; - let awaiter = self.publisher.publish(msg).await; - awaiter - .get() - .await - .context("Google Pub/Sub sink error") - .map_err(SinkError::GooglePubSub) - .map(|_| ()) + self.message_vec.push(msg); + Ok(()) } None => Err(SinkError::GooglePubSub(anyhow!( "Google Pub/Sub sink error: missing value to publish" diff --git a/src/connector/src/sink/nats.rs b/src/connector/src/sink/nats.rs index 162aca3c4d2e..471ce6129841 100644 --- a/src/connector/src/sink/nats.rs +++ b/src/connector/src/sink/nats.rs @@ -12,10 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. use core::fmt::Debug; +use core::future::IntoFuture; use std::collections::BTreeMap; use anyhow::{anyhow, Context as _}; use async_nats::jetstream::context::Context; +use futures::prelude::TryFuture; +use futures::FutureExt; use risingwave_common::array::StreamChunk; use risingwave_common::catalog::Schema; use risingwave_common::session_config::sink_decouple::SinkDecouple; @@ -38,6 +41,7 @@ use crate::sink::writer::{ use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY}; pub const NATS_SINK: &str = "nats"; +const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536; #[serde_as] #[derive(Clone, Debug, Deserialize, WithOptions)] @@ -64,6 +68,8 @@ pub struct NatsSinkWriter { json_encoder: JsonEncoder, } +pub type NatsSinkDeliveryFuture = impl TryFuture + Unpin + 'static; + /// Basic data types for use with the nats interface impl NatsConfig { pub fn from_btreemap(values: BTreeMap) -> Result { @@ -122,7 +128,7 @@ impl Sink for NatsSink { Ok( NatsSinkWriter::new(self.config.clone(), self.schema.clone()) .await? - .into_log_sinker(usize::MAX), + .into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE), ) } } @@ -148,34 +154,39 @@ impl NatsSinkWriter { ), }) } - - async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> { - Retry::spawn( - ExponentialBackoff::from_millis(100).map(jitter).take(3), - || async { - let data = chunk_to_json(chunk.clone(), &self.json_encoder).unwrap(); - for item in data { - self.context - .publish(self.config.common.subject.clone(), item.into()) - .await - .context("nats sink error") - .map_err(SinkError::Nats)?; - } - Ok::<_, SinkError>(()) - }, - ) - .await - .context("nats sink error") - .map_err(SinkError::Nats) - } } impl AsyncTruncateSinkWriter for NatsSinkWriter { + type DeliveryFuture = NatsSinkDeliveryFuture; + async fn write_chunk<'a>( &'a mut self, chunk: StreamChunk, - _add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, + mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>, ) -> Result<()> { - self.append_only(chunk).await + let mut data = chunk_to_json(chunk, &self.json_encoder).unwrap(); + for item in &mut data { + let publish_ack_future = Retry::spawn( + ExponentialBackoff::from_millis(100).map(jitter).take(3), + || async { + self.context + .publish(self.config.common.subject.clone(), item.clone().into()) + .await + .context("nats sink error") + .map_err(SinkError::Nats) + }, + ) + .await + .context("nats sink error") + .map_err(SinkError::Nats)?; + let future = publish_ack_future.into_future().map(|result| { + result + .context("Nats sink error") + .map_err(SinkError::Nats) + .map(|_| ()) + }); + add_future.add_future_may_await(future).await?; + } + Ok(()) } }