Skip to content

Commit

Permalink
feat: Log sink implementation for Monovertex (#2150)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
BulkBeing and vigith authored Oct 14, 2024
1 parent 6fb36ac commit 1ea4d2e
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 121 deletions.
8 changes: 5 additions & 3 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ pub struct Settings {
pub sink_default_retry_strategy: RetryStrategy,
pub transformer_config: Option<TransformerConfig>,
pub udsource_config: Option<UDSourceConfig>,
pub udsink_config: UDSinkConfig,
pub udsink_config: Option<UDSinkConfig>,
pub logsink_config: Option<()>,
pub fallback_config: Option<UDSinkConfig>,
pub generator_config: Option<GeneratorConfig>,
}
Expand Down Expand Up @@ -200,6 +201,7 @@ impl Default for Settings {
transformer_config: None,
udsource_config: None,
udsink_config: Default::default(),
logsink_config: None,
fallback_config: None,
generator_config: None,
}
Expand Down Expand Up @@ -274,8 +276,8 @@ impl Settings {
.ok_or(Error::ConfigError("Sink not found".to_string()))?
.udsink
{
Some(_) => UDSinkConfig::default(),
_ => UDSinkConfig::default(),
Some(_) => Some(UDSinkConfig::default()),
_ => None,
};

settings.fallback_config = match mono_vertex_obj
Expand Down
151 changes: 98 additions & 53 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use numaflow_grpc::clients::source::source_client::SourceClient;
use numaflow_grpc::clients::sourcetransformer::source_transform_client::SourceTransformClient;

use crate::config::{config, Settings};
use crate::error;
use crate::error::{self, Error};
use crate::shared::utils;
use crate::shared::utils::create_rpc_channel;
use crate::sink::SinkHandle;
use crate::sink::{SinkClientType, SinkHandle};
use crate::source::generator::{new_generator, GeneratorAck, GeneratorLagReader, GeneratorRead};
use crate::source::user_defined::{
new_source, UserDefinedSourceAck, UserDefinedSourceLagReader, UserDefinedSourceRead,
Expand Down Expand Up @@ -46,7 +46,7 @@ pub async fn mono_vertex() -> error::Result<()> {

// Run the forwarder with cancellation token.
if let Err(e) = start_forwarder(cln_token, config()).await {
error!("Application error: {:?}", e);
tracing::error!("Application error: {:?}", e);

// abort the signal handler task since we have an error and we are shutting down
if !shutdown_handle.is_finished() {
Expand Down Expand Up @@ -97,7 +97,10 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
.udsource_config
.as_ref()
.map(|source_config| source_config.server_info_path.clone().into()),
config.udsink_config.server_info_path.clone().into(),
config
.udsink_config
.as_ref()
.map(|sink_config| sink_config.server_info_path.clone().into()),
config
.transformer_config
.as_ref()
Expand All @@ -119,19 +122,12 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
None
};

let mut sink_grpc_client =
SinkClient::new(create_rpc_channel(config.udsink_config.socket_path.clone().into()).await?)
.max_encoding_message_size(config.udsink_config.grpc_max_message_size)
.max_encoding_message_size(config.udsink_config.grpc_max_message_size);

let mut transformer_grpc_client = if let Some(transformer_config) = &config.transformer_config {
let transformer_grpc_client = SourceTransformClient::new(
create_rpc_channel(transformer_config.socket_path.clone().into()).await?,
let mut sink_grpc_client = if let Some(udsink_config) = &config.udsink_config {
Some(
SinkClient::new(create_rpc_channel(udsink_config.socket_path.clone().into()).await?)
.max_encoding_message_size(udsink_config.grpc_max_message_size)
.max_encoding_message_size(udsink_config.grpc_max_message_size),
)
.max_encoding_message_size(transformer_config.grpc_max_message_size)
.max_encoding_message_size(transformer_config.grpc_max_message_size);

Some(transformer_grpc_client.clone())
} else {
None
};
Expand All @@ -147,6 +143,18 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
None
};

let mut transformer_grpc_client = if let Some(transformer_config) = &config.transformer_config {
let transformer_grpc_client = SourceTransformClient::new(
create_rpc_channel(transformer_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(transformer_config.grpc_max_message_size)
.max_encoding_message_size(transformer_config.grpc_max_message_size);

Some(transformer_grpc_client.clone())
} else {
None
};

// readiness check for all the ud containers
utils::wait_until_ready(
cln_token.clone(),
Expand All @@ -158,6 +166,12 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
.await?;

let source_type = fetch_source(config, &mut source_grpc_client).await?;
let (sink, fb_sink) = fetch_sink(
config,
sink_grpc_client.clone(),
fb_sink_grpc_client.clone(),
)
.await?;

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
Expand All @@ -174,62 +188,89 @@ async fn start_forwarder(cln_token: CancellationToken, config: &Settings) -> err
utils::start_metrics_server(metrics_state).await;

let source = SourceHandle::new(source_type);
start_forwarder_with_source(
source,
sink_grpc_client,
transformer_grpc_client,
fb_sink_grpc_client,
cln_token,
)
.await?;
start_forwarder_with_source(source, sink, transformer_grpc_client, fb_sink, cln_token).await?;

info!("Forwarder stopped gracefully");
Ok(())
}

pub(crate) async fn fetch_source(
// fetch right the source.
// source_grpc_client can be optional because it is valid only for user-defined source.
async fn fetch_source(
config: &Settings,
source_grpc_client: &mut Option<SourceClient<Channel>>,
) -> crate::Result<SourceType> {
let source_type = if let Some(source_grpc_client) = source_grpc_client.clone() {
// check whether the source grpc client is provided, this happens only of the source is a
// user defined source
if let Some(source_grpc_client) = source_grpc_client.clone() {
let (source_read, source_ack, lag_reader) = new_source(
source_grpc_client,
config.batch_size as usize,
config.timeout_in_ms as u16,
)
.await?;
SourceType::UserDefinedSource(source_read, source_ack, lag_reader)
} else if let Some(generator_config) = &config.generator_config {
return Ok(SourceType::UserDefinedSource(
source_read,
source_ack,
lag_reader,
));
}

// now that we know it is not a user-defined source, it has to be a built-in
if let Some(generator_config) = &config.generator_config {
let (source_read, source_ack, lag_reader) = new_generator(
generator_config.content.clone(),
generator_config.rpu,
config.batch_size as usize,
Duration::from_millis(generator_config.duration as u64),
)?;
SourceType::Generator(source_read, source_ack, lag_reader)
Ok(SourceType::Generator(source_read, source_ack, lag_reader))
} else {
return Err(error::Error::ConfigError(
Err(Error::ConfigError(
"No valid source configuration found".into(),
));
))
}
}

// fetch the actor handle for the sink.
// sink_grpc_client can be optional because it is valid only for user-defined sink.
async fn fetch_sink(
settings: &Settings,
sink_grpc_client: Option<SinkClient<Channel>>,
fallback_sink_grpc_client: Option<SinkClient<Channel>>,
) -> crate::Result<(SinkHandle, Option<SinkHandle>)> {
let fb_sink = match fallback_sink_grpc_client {
Some(fallback_sink) => {
Some(SinkHandle::new(SinkClientType::UserDefined(fallback_sink)).await?)
}
None => None,
};
Ok(source_type)

if let Some(sink_client) = sink_grpc_client {
let sink = SinkHandle::new(SinkClientType::UserDefined(sink_client)).await?;
return Ok((sink, fb_sink));
}
if settings.logsink_config.is_some() {
let log = SinkHandle::new(SinkClientType::Log).await?;
return Ok((log, fb_sink));
}
Err(Error::ConfigError(
"No valid Sink configuration found".to_string(),
))
}

async fn start_forwarder_with_source(
source: SourceHandle,
sink_grpc_client: SinkClient<tonic::transport::Channel>,
sink: SinkHandle,
transformer_client: Option<SourceTransformClient<tonic::transport::Channel>>,
fallback_sink_client: Option<SinkClient<tonic::transport::Channel>>,
fallback_sink: Option<SinkHandle>,
cln_token: CancellationToken,
) -> error::Result<()> {
// start the pending reader to publish pending metrics
let mut pending_reader = utils::create_pending_reader(source.clone()).await;
pending_reader.start().await;

// build the forwarder
let sink_writer = SinkHandle::new(sink_grpc_client).await?;
let pending_reader = utils::create_pending_reader(source.clone()).await;
let _pending_reader_handle = pending_reader.start().await;

let mut forwarder_builder = ForwarderBuilder::new(source, sink_writer, cln_token);
let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token);

// add transformer if exists
if let Some(transformer_client) = transformer_client {
Expand All @@ -238,9 +279,8 @@ async fn start_forwarder_with_source(
}

// add fallback sink if exists
if let Some(fallback_sink_client) = fallback_sink_client {
let fallback_writer = SinkHandle::new(fallback_sink_client).await?;
forwarder_builder = forwarder_builder.fallback_sink_writer(fallback_writer);
if let Some(fallback_sink) = fallback_sink {
forwarder_builder = forwarder_builder.fallback_sink_writer(fallback_sink);
}
// build the final forwarder
let mut forwarder = forwarder_builder.build();
Expand All @@ -254,7 +294,7 @@ async fn start_forwarder_with_source(

#[cfg(test)]
mod tests {
use crate::config::{Settings, UDSourceConfig};
use crate::config::{Settings, UDSinkConfig, UDSourceConfig};
use crate::error;
use crate::monovertex::start_forwarder;
use crate::shared::server_info::ServerInfo;
Expand Down Expand Up @@ -363,17 +403,22 @@ mod tests {
token_clone.cancel();
});

let mut config = Settings::default();
config.udsink_config.socket_path = sink_sock_file.to_str().unwrap().to_string();
config.udsink_config.server_info_path = sink_server_info.to_str().unwrap().to_string();

config.udsource_config = Some(UDSourceConfig {
socket_path: src_sock_file.to_str().unwrap().to_string(),
server_info_path: src_info_file.to_str().unwrap().to_string(),
grpc_max_message_size: 1024,
});
let config = Settings {
udsink_config: Some(UDSinkConfig {
socket_path: sink_sock_file.to_str().unwrap().to_string(),
server_info_path: sink_server_info.to_str().unwrap().to_string(),
grpc_max_message_size: 1024,
}),
udsource_config: Some(UDSourceConfig {
socket_path: src_sock_file.to_str().unwrap().to_string(),
server_info_path: src_info_file.to_str().unwrap().to_string(),
grpc_max_message_size: 1024,
}),
..Default::default()
};

let result = start_forwarder(cln_token.clone(), &config).await;
dbg!(&result);
assert!(result.is_ok());

// stop the source and sink servers
Expand Down
38 changes: 17 additions & 21 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ mod tests {
use crate::monovertex::forwarder::ForwarderBuilder;
use crate::monovertex::SourceType;
use crate::shared::utils::create_rpc_channel;
use crate::sink::SinkHandle;
use crate::sink::{SinkClientType, SinkHandle};
use crate::source::user_defined::new_source;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
Expand Down Expand Up @@ -738,11 +738,10 @@ mod tests {
source_lag_reader,
));

let sink_writer = SinkHandle::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to sink server");
let sink_grpc_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap());
let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_grpc_client))
.await
.expect("failed to connect to sink server");

let transformer_client = SourceTransformHandle::new(SourceTransformClient::new(
create_rpc_channel(transformer_sock_file).await.unwrap(),
Expand Down Expand Up @@ -868,11 +867,10 @@ mod tests {
lag_reader,
));

let sink_writer = SinkHandle::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to sink server");
let sink_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap());
let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client))
.await
.expect("failed to connect to sink server");

let mut forwarder =
ForwarderBuilder::new(source_reader, sink_writer, cln_token.clone()).build();
Expand Down Expand Up @@ -991,17 +989,15 @@ mod tests {
source_lag_reader,
));

let sink_writer = SinkHandle::new(SinkClient::new(
create_rpc_channel(sink_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to sink server");
let sink_client = SinkClient::new(create_rpc_channel(sink_sock_file).await.unwrap());
let sink_writer = SinkHandle::new(SinkClientType::UserDefined(sink_client))
.await
.expect("failed to connect to sink server");

let fb_sink_writer = SinkHandle::new(SinkClient::new(
create_rpc_channel(fb_sink_sock_file).await.unwrap(),
))
.await
.expect("failed to connect to fb sink server");
let fb_sink_writer = SinkClient::new(create_rpc_channel(fb_sink_sock_file).await.unwrap());
let fb_sink_writer = SinkHandle::new(SinkClientType::UserDefined(fb_sink_writer))
.await
.expect("failed to connect to fb sink server");

let mut forwarder = ForwarderBuilder::new(source, sink_writer, cln_token.clone())
.fallback_sink_writer(fb_sink_writer)
Expand Down
Loading

0 comments on commit 1ea4d2e

Please sign in to comment.