Skip to content

Commit

Permalink
feat: Asynchronous Map Implementation for Pipeline (#2295)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Dec 24, 2024
1 parent dfae989 commit 8128476
Show file tree
Hide file tree
Showing 28 changed files with 3,051 additions and 162 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, []corev1.C

func (in UDF) getMainContainer(req getContainerReq) corev1.Container {
if in.GroupBy == nil {
if req.executeRustBinary {
return containerBuilder{}.init(req).command(NumaflowRustBinary).args("processor", "--type="+string(VertexTypeMapUDF), "--isbsvc-type="+string(req.isbSvcType), "--rust").build()
}
args := []string{"processor", "--type=" + string(VertexTypeMapUDF), "--isbsvc-type=" + string(req.isbSvcType)}
return containerBuilder{}.
init(req).args(args...).build()
Expand Down
24 changes: 23 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async-nats = "0.38.0"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "ddd879588e11455921f1ca958ea2b3c076689293" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }

[build-dependencies]
221 changes: 216 additions & 5 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::config::components::source::SourceConfig;
use crate::config::components::transformer::{TransformerConfig, TransformerType};
use crate::config::get_vertex_replica;
use crate::config::pipeline::isb::{BufferReaderConfig, BufferWriterConfig};
use crate::config::pipeline::map::MapMode;
use crate::config::pipeline::map::MapVtxConfig;
use crate::error::Error;
use crate::Result;

Expand All @@ -23,6 +25,11 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
pub(crate) const DEFAULT_STREAM_MAP_SOCKET: &str = "/var/run/numaflow/mapstream.sock";
const DEFAULT_MAP_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";

pub(crate) mod isb;

Expand Down Expand Up @@ -69,6 +76,84 @@ pub(crate) struct SourceVtxConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
}

pub(crate) mod map {
use std::collections::HashMap;

use numaflow_models::models::Udf;

use crate::config::pipeline::{
DEFAULT_GRPC_MAX_MESSAGE_SIZE, DEFAULT_MAP_SERVER_INFO_FILE, DEFAULT_MAP_SOCKET,
};
use crate::error::Error;

/// A map can be run in different modes.
#[derive(Debug, Clone, PartialEq)]
pub enum MapMode {
Unary,
Batch,
Stream,
}

impl MapMode {
pub(crate) fn from_str(s: &str) -> Option<MapMode> {
match s {
"unary-map" => Some(MapMode::Unary),
"stream-map" => Some(MapMode::Stream),
"batch-map" => Some(MapMode::Batch),
_ => None,
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct MapVtxConfig {
pub(crate) concurrency: usize,
pub(crate) map_type: MapType,
pub(crate) map_mode: MapMode,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) enum MapType {
UserDefined(UserDefinedConfig),
Builtin(BuiltinConfig),
}

impl TryFrom<Box<Udf>> for MapType {
type Error = Error;
fn try_from(udf: Box<Udf>) -> std::result::Result<Self, Self::Error> {
if let Some(builtin) = udf.builtin {
Ok(MapType::Builtin(BuiltinConfig {
name: builtin.name,
kwargs: builtin.kwargs,
args: builtin.args,
}))
} else if let Some(_container) = udf.container {
Ok(MapType::UserDefined(UserDefinedConfig {
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
socket_path: DEFAULT_MAP_SOCKET.to_string(),
server_info_path: DEFAULT_MAP_SERVER_INFO_FILE.to_string(),
}))
} else {
Err(Error::Config("Invalid UDF".to_string()))
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UserDefinedConfig {
pub grpc_max_message_size: usize,
pub socket_path: String,
pub server_info_path: String,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct BuiltinConfig {
pub(crate) name: String,
pub(crate) kwargs: Option<HashMap<String, String>>,
pub(crate) args: Option<Vec<String>>,
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SinkVtxConfig {
pub(crate) sink_config: SinkConfig,
Expand All @@ -79,13 +164,15 @@ pub(crate) struct SinkVtxConfig {
pub(crate) enum VertexType {
Source(SourceVtxConfig),
Sink(SinkVtxConfig),
Map(MapVtxConfig),
}

impl std::fmt::Display for VertexType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
VertexType::Source(_) => write!(f, "Source"),
VertexType::Sink(_) => write!(f, "Sink"),
VertexType::Map(_) => write!(f, "Map"),
}
}
}
Expand Down Expand Up @@ -182,6 +269,12 @@ impl PipelineConfig {
},
fb_sink_config,
})
} else if let Some(map) = vertex_obj.spec.udf {
VertexType::Map(MapVtxConfig {
concurrency: batch_size as usize,
map_type: map.try_into()?,
map_mode: MapMode::Unary,
})
} else {
return Err(Error::Config(
"Only source and sink are supported ATM".to_string(),
Expand Down Expand Up @@ -283,7 +376,7 @@ impl PipelineConfig {
Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
.unwrap_or("30000".to_string())
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
read_timeout: Duration::from_millis(timeout_in_ms as u64),
Expand All @@ -301,11 +394,13 @@ impl PipelineConfig {

#[cfg(test)]
mod tests {
use numaflow_models::models::{Container, Function, Udf};
use numaflow_pulsar::source::PulsarSourceConfig;

use super::*;
use crate::config::components::sink::{BlackholeConfig, LogConfig, SinkType};
use crate::config::components::source::{GeneratorConfig, SourceType};
use crate::config::pipeline::map::{MapType, UserDefinedConfig};

#[test]
fn test_default_pipeline_config() {
Expand Down Expand Up @@ -360,7 +455,7 @@ mod tests {
vertex_name: "out".to_string(),
replica: 0,
batch_size: 500,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand All @@ -371,7 +466,7 @@ mod tests {
name: "in".to_string(),
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-out-0".into(), 0)],
streams: vec![("default-simple-pipeline-out-0", 0)],
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down Expand Up @@ -407,7 +502,7 @@ mod tests {
vertex_name: "in".to_string(),
replica: 0,
batch_size: 1000,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand Down Expand Up @@ -460,7 +555,7 @@ mod tests {
vertex_name: "in".to_string(),
replica: 0,
batch_size: 50,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand Down Expand Up @@ -498,4 +593,120 @@ mod tests {

assert_eq!(pipeline_config, expected);
}

#[test]
fn test_map_vertex_config_user_defined() {
let udf = Udf {
builtin: None,
container: Some(Box::from(Container {
args: None,
command: None,
env: None,
env_from: None,
image: None,
image_pull_policy: None,
liveness_probe: None,
ports: None,
readiness_probe: None,
resources: None,
security_context: None,
volume_mounts: None,
})),
group_by: None,
};

let map_type = MapType::try_from(Box::new(udf)).unwrap();
assert!(matches!(map_type, MapType::UserDefined(_)));

let map_vtx_config = MapVtxConfig {
concurrency: 10,
map_type,
map_mode: MapMode::Unary,
};

assert_eq!(map_vtx_config.concurrency, 10);
if let MapType::UserDefined(config) = map_vtx_config.map_type {
assert_eq!(config.grpc_max_message_size, DEFAULT_GRPC_MAX_MESSAGE_SIZE);
assert_eq!(config.socket_path, DEFAULT_MAP_SOCKET);
assert_eq!(config.server_info_path, DEFAULT_MAP_SERVER_INFO_FILE);
} else {
panic!("Expected UserDefined map type");
}
}

#[test]
fn test_map_vertex_config_builtin() {
let udf = Udf {
builtin: Some(Box::from(Function {
args: None,
kwargs: None,
name: "cat".to_string(),
})),
container: None,
group_by: None,
};

let map_type = MapType::try_from(Box::new(udf)).unwrap();
assert!(matches!(map_type, MapType::Builtin(_)));

let map_vtx_config = MapVtxConfig {
concurrency: 5,
map_type,
map_mode: MapMode::Unary,
};

assert_eq!(map_vtx_config.concurrency, 5);
if let MapType::Builtin(config) = map_vtx_config.map_type {
assert_eq!(config.name, "cat");
assert!(config.kwargs.is_none());
assert!(config.args.is_none());
} else {
panic!("Expected Builtin map type");
}
}

#[test]
fn test_pipeline_config_load_map_vertex() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW1hcCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im1hcCIsInVkZiI6eyJjb250YWluZXIiOnsidGVtcGxhdGUiOiJkZWZhdWx0In19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJzY2FsZSI6eyJtaW4iOjF9LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoibWFwIiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJNYXAiLCJ0b1ZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJ0b1ZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19";

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let pipeline_config =
PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap();

let expected = PipelineConfig {
pipeline_name: "simple-pipeline".to_string(),
vertex_name: "map".to_string(),
replica: 0,
batch_size: 500,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
user: None,
password: None,
},
from_vertex_config: vec![FromVertexConfig {
name: "in".to_string(),
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-map-0", 0)],
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
}],
to_vertex_config: vec![],
vertex_config: VertexType::Map(MapVtxConfig {
concurrency: 500,
map_type: MapType::UserDefined(UserDefinedConfig {
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
socket_path: DEFAULT_MAP_SOCKET.to_string(),
server_info_path: DEFAULT_MAP_SERVER_INFO_FILE.to_string(),
}),
map_mode: MapMode::Unary,
}),
metrics_config: MetricsConfig::default(),
};

assert_eq!(pipeline_config, expected);
}
}
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum Error {
#[error("Transformer Error - {0}")]
Transformer(String),

#[error("Mapper Error - {0}")]
Mapper(String),

#[error("Forwarder Error - {0}")]
Forwarder(String),

Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ mod pipeline;
/// Tracker to track the completeness of message processing.
mod tracker;

/// Map is a feature that allows users to execute custom code to transform their data.
mod mapper;

pub async fn run() -> Result<()> {
let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();
Expand Down
Loading

0 comments on commit 8128476

Please sign in to comment.