Skip to content

Commit

Permalink
feat: actor pattern for forwarder + sink trait (#2141)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Co-authored-by: Sreekanth <[email protected]>
  • Loading branch information
vigith and BulkBeing authored Oct 14, 2024
1 parent bc12925 commit fb32885
Show file tree
Hide file tree
Showing 11 changed files with 541 additions and 227 deletions.
63 changes: 62 additions & 1 deletion rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::PartialEq;
use std::collections::HashMap;

use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
Expand All @@ -7,7 +8,8 @@ use chrono::{DateTime, Utc};
use crate::error::Error;
use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp};
use numaflow_grpc::clients::sink::sink_request::Request;
use numaflow_grpc::clients::sink::SinkRequest;
use numaflow_grpc::clients::sink::Status::{Failure, Fallback, Success};
use numaflow_grpc::clients::sink::{sink_response, SinkRequest, SinkResponse};
use numaflow_grpc::clients::source::{read_response, AckRequest};
use numaflow_grpc::clients::sourcetransformer::SourceTransformRequest;

Expand Down Expand Up @@ -113,3 +115,62 @@ impl From<Message> for SinkRequest {
}
}
}

/// Sink's status for each [Message] written to Sink.
#[derive(PartialEq)]
pub(crate) enum ResponseStatusFromSink {
/// Successfully wrote to the Sink.
Success,
/// Failed with error message.
Failed(String),
/// Write to FallBack Sink.
Fallback,
}

/// Sink will give a response per [Message].
pub(crate) struct ResponseFromSink {
/// Unique id per [Message]. We need to track per [Message] status.
pub(crate) id: String,
/// Status of the "sink" operation per [Message].
pub(crate) status: ResponseStatusFromSink,
}

impl From<ResponseFromSink> for SinkResponse {
fn from(value: ResponseFromSink) -> Self {
let (status, err_msg) = match value.status {
ResponseStatusFromSink::Success => (Success, "".to_string()),
ResponseStatusFromSink::Failed(err) => (Failure, err.to_string()),
ResponseStatusFromSink::Fallback => (Fallback, "".to_string()),
};

Self {
result: Some(sink_response::Result {
id: value.id,
status: status as i32,
err_msg,
}),
handshake: None,
}
}
}

impl TryFrom<SinkResponse> for ResponseFromSink {
type Error = crate::Error;

fn try_from(value: SinkResponse) -> Result<Self, Self::Error> {
let value = value
.result
.ok_or(Error::SinkError("result is empty".to_string()))?;

let status = match value.status() {
Success => ResponseStatusFromSink::Success,
Failure => ResponseStatusFromSink::Failed(value.err_msg),
Fallback => ResponseStatusFromSink::Fallback,
};

Ok(Self {
id: value.id,
status,
})
}
}
97 changes: 37 additions & 60 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
use std::time::Duration;

use tokio::signal;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tracing::info;

use numaflow_grpc::clients::sink::sink_client::SinkClient;
use numaflow_grpc::clients::source::source_client::SourceClient;
use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient;

use crate::config::{config, Settings};
use crate::error;
use crate::reader::LagReader;
use crate::shared::utils;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::sink::SinkHandle;
use crate::source::generator::{new_generator, GeneratorAck, GeneratorLagReader, GeneratorRead};
use crate::source::user_defined::{
new_source, UserDefinedSourceAck, UserDefinedSourceLagReader, UserDefinedSourceRead,
};
use crate::source::{SourceAcker, SourceReader};
use crate::transformer::user_defined::SourceTransformer;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
use forwarder::ForwarderBuilder;
use metrics::UserDefinedContainerState;
use numaflow_grpc::clients::sink::sink_client::SinkClient;
use numaflow_grpc::clients::source::source_client::SourceClient;
use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use std::time::Duration;
use tokio::signal;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tracing::info;

/// [forwarder] orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received:
Expand Down Expand Up @@ -78,8 +80,8 @@ async fn shutdown_signal() {
}
}

enum SourceType {
UdSource(
pub(crate) enum SourceType {
UserDefinedSource(
UserDefinedSourceRead,
UserDefinedSourceAck,
UserDefinedSourceLagReader,
Expand Down Expand Up @@ -155,7 +157,7 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
)
.await?;

let source_type = fetch_source(&config, &mut source_grpc_client).await?;
let source_type = fetch_source(config, &mut source_grpc_client).await?;

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
Expand All @@ -171,38 +173,21 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
// FIXME: what to do with the handle
utils::start_metrics_server(metrics_state).await;

match source_type {
SourceType::UdSource(udsource_reader, udsource_acker, udsource_lag_reader) => {
start_forwarder_with_source(
udsource_reader,
udsource_acker,
udsource_lag_reader,
sink_grpc_client,
transformer_grpc_client,
fb_sink_grpc_client,
cln_token,
)
.await?;
}
SourceType::Generator(generator_reader, generator_acker, generator_lag_reader) => {
start_forwarder_with_source(
generator_reader,
generator_acker,
generator_lag_reader,
sink_grpc_client,
transformer_grpc_client,
fb_sink_grpc_client,
cln_token,
)
.await?;
}
}
let source = SourceHandle::new(source_type);
start_forwarder_with_source(
source,
sink_grpc_client,
transformer_grpc_client,
fb_sink_grpc_client,
cln_token,
)
.await?;

info!("Forwarder stopped gracefully");
Ok(())
}

async fn fetch_source(
pub(crate) async fn fetch_source(
config: &Settings,
source_grpc_client: &mut Option<SourceClient<Channel>>,
) -> crate::Result<SourceType> {
Expand All @@ -213,7 +198,7 @@ async fn fetch_source(
config.timeout_in_ms as u16,
)
.await?;
SourceType::UdSource(source_read, source_ack, lag_reader)
SourceType::UserDefinedSource(source_read, source_ack, lag_reader)
} else if let Some(generator_config) = &config.generator_config {
let (source_read, source_ack, lag_reader) = new_generator(
generator_config.content.clone(),
Expand All @@ -230,39 +215,31 @@ async fn fetch_source(
Ok(source_type)
}

async fn start_forwarder_with_source<R, A, L>(
source_reader: R,
source_acker: A,
source_lag_reader: L,
async fn start_forwarder_with_source(
source: SourceHandle,
sink_grpc_client: SinkClient<tonic::transport::Channel>,
transformer_client: Option<SourceTransformClient<tonic::transport::Channel>>,
fallback_sink_client: Option<SinkClient<tonic::transport::Channel>>,
cln_token: CancellationToken,
) -> error::Result<()>
where
R: SourceReader,
A: SourceAcker,
L: LagReader + Clone + 'static,
{
) -> error::Result<()> {
// start the pending reader to publish pending metrics
let mut pending_reader = utils::create_pending_reader(source_lag_reader).await;
let mut pending_reader = utils::create_pending_reader(source.clone()).await;
pending_reader.start().await;

// build the forwarder
let sink_writer = SinkWriter::new(sink_grpc_client).await?;
let sink_writer = SinkHandle::new(sink_grpc_client).await?;

let mut forwarder_builder =
ForwarderBuilder::new(source_reader, source_acker, sink_writer, cln_token);
let mut forwarder_builder = ForwarderBuilder::new(source, sink_writer, cln_token);

// add transformer if exists
if let Some(transformer_client) = transformer_client {
let transformer = SourceTransformer::new(transformer_client).await?;
let transformer = SourceTransformHandle::new(transformer_client).await?;
forwarder_builder = forwarder_builder.source_transformer(transformer);
}

// add fallback sink if exists
if let Some(fallback_sink_client) = fallback_sink_client {
let fallback_writer = SinkWriter::new(fallback_sink_client).await?;
let fallback_writer = SinkHandle::new(fallback_sink_client).await?;
forwarder_builder = forwarder_builder.fallback_sink_writer(fallback_writer);
}
// build the final forwarder
Expand Down
Loading

0 comments on commit fb32885

Please sign in to comment.