Skip to content

Commit

Permalink
feat(sink): support async for pubsub and nats (#17358)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jun 26, 2024
1 parent 7cbb3a9 commit 994e48f
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 44 deletions.
76 changes: 55 additions & 21 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::apiv1;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::Publisher;
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -42,6 +46,20 @@ use super::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkParam, Sink
use crate::dispatch_sink_formatter_str_key_impl;

pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand Down Expand Up @@ -130,7 +148,7 @@ impl Sink for GooglePubSubSink {
self.sink_from_name.clone(),
)
.await?
.into_log_sinker(usize::MAX))
.into_log_sinker(PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE))
}
}

Expand All @@ -157,10 +175,15 @@ impl TryFrom<SinkParam> for GooglePubSubSink {
}
}

struct GooglePubSubPayloadWriter {
publisher: Publisher,
struct GooglePubSubPayloadWriter<'w> {
publisher: &'w mut Publisher,
message_vec: Vec<PubsubMessage>,
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down Expand Up @@ -231,35 +254,51 @@ impl GooglePubSubSinkWriter {
.await?;

let publisher = topic.new_publisher(None);
let payload_writer = GooglePubSubPayloadWriter { publisher };

Ok(Self {
payload_writer,
formatter,
publisher,
})
}
}

pub struct GooglePubSubSinkWriter {
payload_writer: GooglePubSubPayloadWriter,
formatter: SinkFormatterImpl,
publisher: Publisher,
}

impl AsyncTruncateSinkWriter for GooglePubSubSinkWriter {
type DeliveryFuture = GooglePubSubSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
dispatch_sink_formatter_str_key_impl!(
&self.formatter,
formatter,
self.payload_writer.write_chunk(chunk, formatter).await
)
let mut payload_writer = GooglePubSubPayloadWriter {
publisher: &mut self.publisher,
message_vec: Vec::with_capacity(chunk.cardinality()),
add_future,
};
dispatch_sink_formatter_str_key_impl!(&self.formatter, formatter, {
payload_writer.write_chunk(chunk, formatter).await
})?;
payload_writer.finish().await
}
}

impl FormattedSink for GooglePubSubPayloadWriter {
impl<'w> GooglePubSubPayloadWriter<'w> {
pub async fn finish(&mut self) -> Result<()> {
let message_vec = std::mem::take(&mut self.message_vec);
let awaiters = self.publisher.publish_bulk(message_vec).await;
self.add_future
.add_future_may_await(may_delivery_future(awaiters))
.await?;
Ok(())
}
}

impl<'w> FormattedSink for GooglePubSubPayloadWriter<'w> {
type K = String;
type V = Vec<u8>;

Expand All @@ -272,13 +311,8 @@ impl FormattedSink for GooglePubSubPayloadWriter {
ordering_key,
..Default::default()
};
let awaiter = self.publisher.publish(msg).await;
awaiter
.get()
.await
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
self.message_vec.push(msg);
Ok(())
}
None => Err(SinkError::GooglePubSub(anyhow!(
"Google Pub/Sub sink error: missing value to publish"
Expand Down
57 changes: 34 additions & 23 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::fmt::Debug;
use core::future::IntoFuture;
use std::collections::BTreeMap;

use anyhow::{anyhow, Context as _};
use async_nats::jetstream::context::Context;
use futures::prelude::TryFuture;
use futures::FutureExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::session_config::sink_decouple::SinkDecouple;
Expand All @@ -38,6 +41,7 @@ use crate::sink::writer::{
use crate::sink::{Result, Sink, SinkError, SinkParam, SINK_TYPE_APPEND_ONLY};

pub const NATS_SINK: &str = "nats";
const NATS_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
Expand All @@ -64,6 +68,8 @@ pub struct NatsSinkWriter {
json_encoder: JsonEncoder,
}

pub type NatsSinkDeliveryFuture = impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

/// Basic data types for use with the nats interface
impl NatsConfig {
pub fn from_btreemap(values: BTreeMap<String, String>) -> Result<Self> {
Expand Down Expand Up @@ -122,7 +128,7 @@ impl Sink for NatsSink {
Ok(
NatsSinkWriter::new(self.config.clone(), self.schema.clone())
.await?
.into_log_sinker(usize::MAX),
.into_log_sinker(NATS_SEND_FUTURE_BUFFER_MAX_SIZE),
)
}
}
Expand All @@ -148,34 +154,39 @@ impl NatsSinkWriter {
),
})
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| async {
let data = chunk_to_json(chunk.clone(), &self.json_encoder).unwrap();
for item in data {
self.context
.publish(self.config.common.subject.clone(), item.into())
.await
.context("nats sink error")
.map_err(SinkError::Nats)?;
}
Ok::<_, SinkError>(())
},
)
.await
.context("nats sink error")
.map_err(SinkError::Nats)
}
}

impl AsyncTruncateSinkWriter for NatsSinkWriter {
type DeliveryFuture = NatsSinkDeliveryFuture;

async fn write_chunk<'a>(
&'a mut self,
chunk: StreamChunk,
_add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
mut add_future: DeliveryFutureManagerAddFuture<'a, Self::DeliveryFuture>,
) -> Result<()> {
self.append_only(chunk).await
let mut data = chunk_to_json(chunk, &self.json_encoder).unwrap();
for item in &mut data {
let publish_ack_future = Retry::spawn(
ExponentialBackoff::from_millis(100).map(jitter).take(3),
|| async {
self.context
.publish(self.config.common.subject.clone(), item.clone().into())
.await
.context("nats sink error")
.map_err(SinkError::Nats)
},
)
.await
.context("nats sink error")
.map_err(SinkError::Nats)?;
let future = publish_ack_future.into_future().map(|result| {
result
.context("Nats sink error")
.map_err(SinkError::Nats)
.map(|_| ())
});
add_future.add_future_may_await(future).await?;
}
Ok(())
}
}

0 comments on commit 994e48f

Please sign in to comment.