Skip to content

Commit

Permalink
Tested callbacks with monovertex
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 7, 2025
1 parent 8d8340c commit b5e3e85
Show file tree
Hide file tree
Showing 25 changed files with 389 additions and 16 deletions.
24 changes: 24 additions & 0 deletions rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ use crate::config::monovertex::sink::SinkType;
use crate::error::Error;
use crate::Result;

use super::pipeline::ServingCallbackConfig;

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED"; //FIXME: duplicates
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct MonovertexConfig {
pub(crate) name: String,
Expand All @@ -33,6 +39,7 @@ pub(crate) struct MonovertexConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
pub(crate) fb_sink_config: Option<SinkConfig>,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

impl Default for MonovertexConfig {
Expand All @@ -53,6 +60,7 @@ impl Default for MonovertexConfig {
transformer_config: None,
fb_sink_config: None,
metrics_config: MetricsConfig::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -143,6 +151,21 @@ impl MonovertexConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = env::var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(MonovertexConfig {
name: mono_vertex_name,
replica: *get_vertex_replica(),
Expand All @@ -153,6 +176,7 @@ impl MonovertexConfig {
sink_config,
transformer_config,
fb_sink_config,
callback_config,
})
}
}
Expand Down
34 changes: 31 additions & 3 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
Expand All @@ -47,6 +50,12 @@ pub(crate) struct PipelineConfig {
pub(crate) to_vertex_config: Vec<ToVertexConfig>,
pub(crate) vertex_config: VertexType,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ServingCallbackConfig {
pub(crate) callback_concurrency: usize,
}

impl Default for PipelineConfig {
Expand All @@ -66,6 +75,7 @@ impl Default for PipelineConfig {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -373,6 +383,21 @@ impl PipelineConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if let Ok(_) = get_var(ENV_CALLBACK_ENABLED) {
let callback_concurrency: usize = get_var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
Expand All @@ -388,6 +413,7 @@ impl PipelineConfig {
to_vertex_config,
vertex_config: vertex,
metrics_config: MetricsConfig::with_lookback_window_in_secs(look_back_window),
callback_config,
})
}
}
Expand Down Expand Up @@ -419,6 +445,7 @@ mod tests {
transformer_config: None,
}),
metrics_config: Default::default(),
callback_config: None,
};

let config = PipelineConfig::default();
Expand Down Expand Up @@ -485,6 +512,7 @@ mod tests {
lag_refresh_interval_in_secs: 3,
lookback_window_in_secs: 120,
},
..Default::default()
};
assert_eq!(pipeline_config, expected);
}
Expand Down Expand Up @@ -536,7 +564,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -588,7 +616,7 @@ mod tests {
},
transformer_config: None,
}),
metrics_config: Default::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down Expand Up @@ -704,7 +732,7 @@ mod tests {
}),
map_mode: MapMode::Unary,
}),
metrics_config: MetricsConfig::default(),
..Default::default()
};

assert_eq!(pipeline_config, expected);
Expand Down
9 changes: 9 additions & 0 deletions rust/numaflow-core/src/mapper/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (output_tx, mut output_rx) = mpsc::channel(10);
Expand Down Expand Up @@ -646,6 +647,7 @@ mod tests {
index: i,
},
headers: Default::default(),
metadata: None,
};
input_tx.send(message).await.unwrap();
}
Expand Down Expand Up @@ -735,6 +737,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

input_tx.send(message).await.unwrap();
Expand Down Expand Up @@ -829,6 +832,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -842,6 +846,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -939,6 +944,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -952,6 +958,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -1049,6 +1056,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (input_tx, input_rx) = mpsc::channel(10);
Expand Down Expand Up @@ -1145,6 +1153,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (input_tx, input_rx) = mpsc::channel(10);
Expand Down
6 changes: 6 additions & 0 deletions rust/numaflow-core/src/mapper/map/user_defined.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ async fn process_response(sender_map: &ResponseSenderMap, resp: MapResponse) {
offset: Some(msg_info.offset.clone()),
event_time: msg_info.event_time,
headers: msg_info.headers.clone(),
metadata: None,
};
response_messages.push(message);
}
Expand Down Expand Up @@ -387,6 +388,7 @@ impl UserDefinedStreamMap {
offset: None,
event_time: message_info.event_time,
headers: message_info.headers.clone(),
metadata: None,
};
response_sender
.send(Ok(message))
Expand Down Expand Up @@ -496,6 +498,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -586,6 +589,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
},
crate::message::Message {
keys: Arc::from(vec!["second".into()]),
Expand All @@ -602,6 +606,7 @@ mod tests {
index: 1,
},
headers: Default::default(),
metadata: None,
},
];

Expand Down Expand Up @@ -701,6 +706,7 @@ mod tests {
index: 0,
},
headers: Default::default(),
metadata: None,
};

let (tx, mut rx) = tokio::sync::mpsc::channel(3);
Expand Down
9 changes: 9 additions & 0 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub(crate) struct Message {
pub(crate) id: MessageID,
/// headers of the message
pub(crate) headers: HashMap<String, String>,
pub(crate) metadata: Option<Metadata>,
}

#[derive(Debug, Clone)]
pub(crate) struct Metadata {
// name of the previous vertex.
pub(crate) previous_vertex: String,
}

/// Offset of the message which will be used to acknowledge the message.
Expand Down Expand Up @@ -212,6 +219,7 @@ impl TryFrom<Bytes> for Message {
event_time: utc_from_timestamp(message_info.event_time),
id: id.into(),
headers: header.headers,
metadata: None,
})
}
}
Expand Down Expand Up @@ -263,6 +271,7 @@ mod tests {
index: 0,
},
headers: HashMap::new(),
metadata: None,
};

let result: Result<BytesMut> = message.clone().try_into();
Expand Down
23 changes: 21 additions & 2 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use forwarder::ForwarderBuilder;
use serving::callback::CallbackHandler;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -68,7 +69,24 @@ pub(crate) async fn start_forwarder(
// FIXME: what to do with the handle
shared::metrics::start_metrics_server(config.metrics_config.clone(), metrics_state).await;

start(config.clone(), source, sink_writer, transformer, cln_token).await?;
let callback_handler = match config.callback_config {
Some(ref cb_cfg) => Some(CallbackHandler::new(
config.name.clone(),
config.name.clone(),
cb_cfg.callback_concurrency,
)),
None => None,
};

start(
config.clone(),
source,
sink_writer,
transformer,
cln_token,
callback_handler,
)
.await?;

Ok(())
}
Expand All @@ -79,13 +97,14 @@ async fn start(
sink: SinkWriter,
transformer: Option<Transformer>,
cln_token: CancellationToken,
callback_handler: Option<CallbackHandler>,
) -> error::Result<()> {
// start the pending reader to publish pending metrics
let pending_reader =
shared::metrics::create_pending_reader(&mvtx_config.metrics_config, source.clone()).await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token);
let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token, callback_handler);

// add transformer if exists
if let Some(transformer_client) = transformer {
Expand Down
Loading

0 comments on commit b5e3e85

Please sign in to comment.