diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5c476f088e..064afdb38c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -107,16 +107,19 @@ jobs: - name: Install grcov run: cargo install grcov + - name: Install Protobuf Compiler + run: sudo apt-get install -y protobuf-compiler + - name: Test Rust - working-directory: ./serving + working-directory: ./rust run: | - CARGO_INCREMENTAL=0 RUSTFLAGS='-Cinstrument-coverage' LLVM_PROFILE_FILE='./target/debug/coverage/cargo-test-%p-%m.profraw' cargo test --all-features + CARGO_INCREMENTAL=0 RUSTFLAGS='-Cinstrument-coverage' LLVM_PROFILE_FILE='./target/debug/coverage/cargo-test-%p-%m.profraw' cargo test --all-features --workspace --all grcov . -s ./target/debug/coverage/ --binary-path ./target/debug/ -t lcov --branch --ignore-not-existing -o ./target/debug/coverage/lcov.info - name: Upload coverage reports to Codecov uses: codecov/codecov-action@v4 with: - files: ./test/profile.cov,./serving/target/debug/coverage/lcov.info + files: ./test/profile.cov,./rust/target/debug/coverage/lcov.info env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/Dockerfile b/Dockerfile index 350a9fe4e0..7796a35b81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,42 +20,44 @@ RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/ca RUN apt-get update RUN apt-get install protobuf-compiler -y -RUN cargo new serve +RUN cargo new numaflow # Create a new empty shell project -WORKDIR /serve -RUN cargo new servesink -COPY ./serving/servesink/Cargo.toml ./servesink/ +WORKDIR /numaflow -RUN cargo new extras/upstreams -COPY ./serving/extras/upstreams/Cargo.toml ./extras/upstreams/ +RUN cargo new servesink +COPY ./rust/servesink/Cargo.toml ./servesink/ RUN cargo new backoff -COPY ./serving/backoff/Cargo.toml ./backoff/ +COPY ./rust/backoff/Cargo.toml ./backoff/ RUN cargo new numaflow-models -COPY ./serving/numaflow-models/Cargo.toml ./numaflow-models/ +COPY ./rust/numaflow-models/Cargo.toml ./numaflow-models/ + +RUN cargo new monovertex +COPY ./rust/monovertex/Cargo.toml ./monovertex/ -RUN cargo new source-sink -COPY ./serving/source-sink/Cargo.toml ./source-sink/ +RUN cargo new serving +COPY ./rust/serving/Cargo.toml ./serving/Cargo.toml # Copy all Cargo.toml and Cargo.lock files for caching dependencies -COPY ./serving/Cargo.toml ./serving/Cargo.lock ./ +COPY ./rust/Cargo.toml ./rust/Cargo.lock ./ -# Build only the dependencies to cache them -RUN cargo build --release +# Build to cache dependencies +RUN mkdir -p src/bin && echo "fn main() {}" > src/bin/main.rs && \ + cargo build --workspace --all --release # Copy the actual source code files of the main project and the subprojects -COPY ./serving/src ./src -COPY ./serving/servesink/src ./servesink/src -COPY ./serving/extras/upstreams/src ./extras/upstreams/src -COPY ./serving/backoff/src ./backoff/src -COPY ./serving/numaflow-models/src ./numaflow-models/src -COPY ./serving/source-sink/src ./source-sink/src -COPY ./serving/source-sink/build.rs ./source-sink/build.rs -COPY ./serving/source-sink/proto ./source-sink/proto +COPY ./rust/src ./src +COPY ./rust/servesink/src ./servesink/src +COPY ./rust/backoff/src ./backoff/src +COPY ./rust/numaflow-models/src ./numaflow-models/src +COPY ./rust/serving/src ./serving/src +COPY ./rust/monovertex/src ./monovertex/src +COPY ./rust/monovertex/build.rs ./monovertex/build.rs +COPY ./rust/monovertex/proto ./monovertex/proto # Build the real binaries -RUN touch src/main.rs servesink/src/main.rs numaflow-models/src/main.rs source-sink/src/main.rs && \ +RUN touch src/bin/main.rs && \ cargo build --workspace --all --release #################################################################################################### @@ -70,10 +72,8 @@ RUN apt-get update && apt-get install -y libssl3 COPY --from=base /bin/numaflow /bin/numaflow COPY ui/build /ui/build -COPY --from=extension-base /serve/target/release/serve /bin/serve -COPY --from=extension-base /serve/target/release/sourcer-sinker /bin/sourcer-sinker - -COPY ./serving/config config +COPY --from=extension-base /numaflow/target/release/numaflow /bin/numaflow-rs +COPY ./rust/serving/config config ENTRYPOINT [ "/bin/numaflow" ] diff --git a/Makefile b/Makefile index 1c11b01583..2c6e30d68c 100644 --- a/Makefile +++ b/Makefile @@ -195,7 +195,7 @@ codegen: $(MAKE) manifests rm -rf ./vendor go mod tidy - $(MAKE) --directory serving/numaflow-models generate + $(MAKE) --directory rust/numaflow-models generate clean: -rm -rf ${CURRENT_DIR}/dist diff --git a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go index 7dd39fd2e8..b4a372ae45 100644 --- a/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/mono_vertex_types.go @@ -403,7 +403,7 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec { func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container { mainContainer := containerBuilder{}. - init(req).command(MonoVertexBinary).build() + init(req).command(NumaflowRustBinary).args("--monovertex").build() containers := []corev1.Container{mainContainer} if mvspec.Source.UDSource != nil { // Only support UDSource for now. diff --git a/pkg/apis/numaflow/v1alpha1/vertex_types.go b/pkg/apis/numaflow/v1alpha1/vertex_types.go index 71cd9c7171..b4a26f4815 100644 --- a/pkg/apis/numaflow/v1alpha1/vertex_types.go +++ b/pkg/apis/numaflow/v1alpha1/vertex_types.go @@ -50,8 +50,7 @@ const ( VertexTypeReduceUDF VertexType = "ReduceUDF" ) -const ServingBinary = "/bin/serve" -const MonoVertexBinary = "/bin/sourcer-sinker" +const NumaflowRustBinary = "/bin/numaflow-rs" // +genclient // +kubebuilder:object:root=true @@ -350,8 +349,9 @@ func (v Vertex) getServingContainer(req GetVertexPodSpecReq) (corev1.Container, Env: req.Env, Image: req.Image, ImagePullPolicy: req.PullPolicy, - Command: []string{ServingBinary}, // we use the same image, but we execute the extension binary + Command: []string{NumaflowRustBinary}, // we use the same image, but we execute the extension binary Resources: req.DefaultResources, + Args: []string{"--serving"}, } // set the common envs diff --git a/serving/.dockerignore b/rust/.dockerignore similarity index 100% rename from serving/.dockerignore rename to rust/.dockerignore diff --git a/serving/.rustfmt.toml b/rust/.rustfmt.toml similarity index 100% rename from serving/.rustfmt.toml rename to rust/.rustfmt.toml diff --git a/serving/Cargo.lock b/rust/Cargo.lock similarity index 99% rename from serving/Cargo.lock rename to rust/Cargo.lock index 30fd1a14db..db2f0404bd 100644 --- a/serving/Cargo.lock +++ b/rust/Cargo.lock @@ -1555,6 +1555,44 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" +[[package]] +name = "monovertex" +version = "0.1.0" +dependencies = [ + "axum", + "axum-server", + "backoff", + "base64 0.22.1", + "bytes", + "chrono", + "hyper-util", + "metrics", + "metrics-exporter-prometheus", + "numaflow 0.1.0 (git+https://github.com/numaproj/numaflow-rs.git?branch=main)", + "numaflow-models", + "once_cell", + "pep440_rs", + "prost", + "prost-types", + "rcgen", + "rustls", + "semver", + "serde", + "serde_json", + "tempfile", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tonic", + "tonic-build", + "tower", + "tracing", + "tracing-subscriber", + "trait-variant", + "uuid", +] + [[package]] name = "multimap" version = "0.10.0" @@ -1666,6 +1704,18 @@ dependencies = [ "libc", ] +[[package]] +name = "numaflow" +version = "0.1.0" +dependencies = [ + "backoff", + "monovertex", + "servesink", + "serving", + "tokio", + "tracing", +] + [[package]] name = "numaflow" version = "0.1.0" @@ -2601,7 +2651,19 @@ dependencies = [ ] [[package]] -name = "serve" +name = "servesink" +version = "0.1.0" +dependencies = [ + "numaflow 0.1.0 (git+https://github.com/numaproj/numaflow-rs.git?branch=main)", + "reqwest 0.12.5", + "tokio", + "tonic", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "serving" version = "0.1.0" dependencies = [ "async-nats", @@ -2627,18 +2689,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "servesink" -version = "0.1.0" -dependencies = [ - "numaflow", - "reqwest 0.12.5", - "tokio", - "tonic", - "tracing", - "tracing-subscriber", -] - [[package]] name = "sha1" version = "0.10.6" @@ -2744,43 +2794,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "sourcer-sinker" -version = "0.1.0" -dependencies = [ - "axum", - "axum-server", - "base64 0.22.1", - "bytes", - "chrono", - "hyper-util", - "metrics", - "metrics-exporter-prometheus", - "numaflow", - "numaflow-models", - "once_cell", - "pep440_rs", - "prost", - "prost-types", - "rcgen", - "rustls", - "semver", - "serde", - "serde_json", - "tempfile", - "thiserror", - "tokio", - "tokio-stream", - "tokio-util", - "tonic", - "tonic-build", - "tower", - "tracing", - "tracing-subscriber", - "trait-variant", - "uuid", -] - [[package]] name = "spin" version = "0.9.8" @@ -3321,22 +3334,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "upstreams" -version = "0.1.0" -dependencies = [ - "axum", - "axum-macros", - "http-body-util", - "serde", - "serde_json", - "tokio", - "tower", - "tower-http", - "tracing", - "tracing-subscriber", -] - [[package]] name = "url" version = "2.5.2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml new file mode 100644 index 0000000000..45a630b732 --- /dev/null +++ b/rust/Cargo.toml @@ -0,0 +1,19 @@ +workspace = { members = ["backoff", "numaflow-models", "servesink", "serving", "monovertex"] } + +[[bin]] +name = "numaflow" +path = "src/bin/main.rs" + +[package] +name = "numaflow" +version = "0.1.0" +edition = "2021" + + +[dependencies] +tokio = "1.39.2" +backoff = { path = "backoff" } +servesink = { path = "servesink" } +serving = { path = "serving" } +monovertex = { path = "monovertex" } +tracing = "0.1.40" diff --git a/serving/Dockerfile b/rust/Dockerfile similarity index 58% rename from serving/Dockerfile rename to rust/Dockerfile index 697cfd6f27..e680b85eac 100644 --- a/serving/Dockerfile +++ b/rust/Dockerfile @@ -7,42 +7,44 @@ RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/ca RUN apt-get update RUN apt-get install protobuf-compiler -y -RUN cargo new serve +RUN cargo new numaflow # Create a new empty shell project -WORKDIR /serve -RUN cargo new servesink -COPY ./servesink/Cargo.toml ./servesink/ +WORKDIR /numaflow -RUN cargo new extras/upstreams -COPY ./extras/upstreams/Cargo.toml ./extras/upstreams/ +RUN cargo new servesink +COPY ./servesink/Cargo.toml ./servesink/Cargo.toml RUN cargo new backoff COPY ./backoff/Cargo.toml ./backoff/Cargo.toml RUN cargo new numaflow-models -COPY ./numaflow-models/Cargo.toml ./numaflow-models/ +COPY ./numaflow-models/Cargo.toml ./numaflow-models/Cargo.toml -RUN cargo new source-sink -COPY ./source-sink/Cargo.toml ./source-sink/Cargo.toml +RUN cargo new monovertex +COPY monovertex/Cargo.toml ./monovertex/Cargo.toml + +RUN cargo new serving +COPY ./serving/Cargo.toml ./serving/Cargo.toml # Copy all Cargo.toml and Cargo.lock files for caching dependencies COPY ./Cargo.toml ./Cargo.lock ./ -# Build only the dependencies to cache them -RUN cargo build --release +# Build to cache dependencies +RUN mkdir -p src/bin && echo "fn main() {}" > src/bin/main.rs && \ + cargo build --workspace --all --release # Copy the actual source code files of the main project and the subprojects COPY ./src ./src COPY ./servesink/src ./servesink/src -COPY ./extras/upstreams/src ./extras/upstreams/src COPY ./backoff/src ./backoff/src COPY ./numaflow-models/src ./numaflow-models/src -COPY ./source-sink/src ./source-sink/src -COPY ./source-sink/build.rs ./source-sink/build.rs -COPY ./source-sink/proto ./source-sink/proto +COPY ./serving/src ./serving/src +COPY monovertex/src ./monovertex/src +COPY monovertex/build.rs ./monovertex/build.rs +COPY monovertex/proto ./monovertex/proto # Build the real binaries -RUN touch src/main.rs servesink/src/main.rs numaflow-models/src/main.rs source-sink/src/main.rs && \ +RUN touch src/bin/main.rs && \ cargo build --workspace --all --release # Use a lightweight image for the runtime @@ -50,7 +52,7 @@ FROM debian:bookworm as numaflow-ext RUN apt-get update && apt-get install -y libssl3 -COPY --from=builder /serve/target/release/ . -COPY ./config config +COPY --from=builder /numaflow/target/release/ . +COPY serving/config config -ENTRYPOINT ["./serve"] \ No newline at end of file +ENTRYPOINT ["./numaflow"] \ No newline at end of file diff --git a/serving/Makefile b/rust/Makefile similarity index 100% rename from serving/Makefile rename to rust/Makefile diff --git a/serving/README.md b/rust/README.md similarity index 100% rename from serving/README.md rename to rust/README.md diff --git a/serving/backoff/Cargo.toml b/rust/backoff/Cargo.toml similarity index 100% rename from serving/backoff/Cargo.toml rename to rust/backoff/Cargo.toml diff --git a/serving/backoff/src/lib.rs b/rust/backoff/src/lib.rs similarity index 100% rename from serving/backoff/src/lib.rs rename to rust/backoff/src/lib.rs diff --git a/serving/backoff/src/retry.rs b/rust/backoff/src/retry.rs similarity index 100% rename from serving/backoff/src/retry.rs rename to rust/backoff/src/retry.rs diff --git a/serving/backoff/src/strategy.rs b/rust/backoff/src/strategy.rs similarity index 100% rename from serving/backoff/src/strategy.rs rename to rust/backoff/src/strategy.rs diff --git a/serving/backoff/src/strategy/fixed.rs b/rust/backoff/src/strategy/fixed.rs similarity index 100% rename from serving/backoff/src/strategy/fixed.rs rename to rust/backoff/src/strategy/fixed.rs diff --git a/serving/source-sink/Cargo.toml b/rust/monovertex/Cargo.toml similarity index 95% rename from serving/source-sink/Cargo.toml rename to rust/monovertex/Cargo.toml index 9813d52b1e..3e98b10d69 100644 --- a/serving/source-sink/Cargo.toml +++ b/rust/monovertex/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sourcer-sinker" +name = "monovertex" version = "0.1.0" edition = "2021" @@ -32,6 +32,7 @@ rustls = { version = "0.23.12", features = ["aws_lc_rs"] } serde = { version = "1.0.204", features = ["derive"] } semver = "1.0" pep440_rs = "0.6.6" +backoff = { path = "../backoff" } [dev-dependencies] tower = "0.4.13" diff --git a/serving/source-sink/build.rs b/rust/monovertex/build.rs similarity index 100% rename from serving/source-sink/build.rs rename to rust/monovertex/build.rs diff --git a/serving/source-sink/proto/sink.proto b/rust/monovertex/proto/sink.proto similarity index 100% rename from serving/source-sink/proto/sink.proto rename to rust/monovertex/proto/sink.proto diff --git a/serving/source-sink/proto/source.proto b/rust/monovertex/proto/source.proto similarity index 100% rename from serving/source-sink/proto/source.proto rename to rust/monovertex/proto/source.proto diff --git a/serving/source-sink/proto/sourcetransform.proto b/rust/monovertex/proto/sourcetransform.proto similarity index 100% rename from serving/source-sink/proto/sourcetransform.proto rename to rust/monovertex/proto/sourcetransform.proto diff --git a/serving/source-sink/src/config.rs b/rust/monovertex/src/config.rs similarity index 100% rename from serving/source-sink/src/config.rs rename to rust/monovertex/src/config.rs diff --git a/serving/source-sink/src/error.rs b/rust/monovertex/src/error.rs similarity index 100% rename from serving/source-sink/src/error.rs rename to rust/monovertex/src/error.rs diff --git a/serving/source-sink/src/forwarder.rs b/rust/monovertex/src/forwarder.rs similarity index 100% rename from serving/source-sink/src/forwarder.rs rename to rust/monovertex/src/forwarder.rs diff --git a/serving/source-sink/src/lib.rs b/rust/monovertex/src/lib.rs similarity index 79% rename from serving/source-sink/src/lib.rs rename to rust/monovertex/src/lib.rs index 15b59e537f..88f7bb9c19 100644 --- a/serving/source-sink/src/lib.rs +++ b/rust/monovertex/src/lib.rs @@ -1,6 +1,4 @@ -use std::net::SocketAddr; -use std::time::Duration; - +pub(crate) use self::error::Result; use crate::config::config; pub(crate) use crate::error::Error; use crate::forwarder::Forwarder; @@ -8,11 +6,15 @@ use crate::metrics::{start_metrics_https_server, MetricsState}; use crate::sink::{SinkClient, SinkConfig}; use crate::source::{SourceClient, SourceConfig}; use crate::transformer::{TransformerClient, TransformerConfig}; +use std::net::SocketAddr; +use std::time::Duration; +use tokio::signal; +use tokio::task::JoinHandle; use tokio::time::sleep; use tokio_util::sync::CancellationToken; +use tracing::level_filters::LevelFilter; use tracing::{error, info, warn}; - -pub(crate) use self::error::Result; +use tracing_subscriber::EnvFilter; /// SourcerSinker orchestrates data movement from the Source to the Sink via the optional SourceTransformer. /// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received: @@ -38,6 +40,82 @@ pub mod message; mod server_info; pub(crate) mod shared; +pub async fn mono_vertex() { + // Initialize the logger + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .parse_lossy(&config().log_level), + ) + .with_target(false) + .init(); + + // Initialize the source, sink and transformer configurations + // We are using the default configurations for now. + let source_config = SourceConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }; + + let sink_config = SinkConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }; + + let transformer_config = if config().is_transformer_enabled { + Some(TransformerConfig { + max_message_size: config().grpc_max_message_size, + ..Default::default() + }) + } else { + None + }; + + let cln_token = CancellationToken::new(); + let shutdown_cln_token = cln_token.clone(); + // wait for SIG{INT,TERM} and invoke cancellation token. + let shutdown_handle: JoinHandle> = tokio::spawn(async move { + shutdown_signal().await; + shutdown_cln_token.cancel(); + Ok(()) + }); + + // Run the forwarder with cancellation token. + if let Err(e) = init(source_config, sink_config, transformer_config, cln_token).await { + error!("Application error: {:?}", e); + + // abort the task since we have an error + if !shutdown_handle.is_finished() { + shutdown_handle.abort(); + } + } + + info!("Gracefully Exiting..."); +} + +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + info!("Received Ctrl+C signal"); + }; + + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + info!("Received terminate signal"); + }; + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } +} + /// forwards a chunk of data from the source to the sink via an optional transformer. /// It takes an optional custom_shutdown_rx for shutting down the forwarder, useful for testing. pub async fn init( diff --git a/serving/source-sink/src/message.rs b/rust/monovertex/src/message.rs similarity index 100% rename from serving/source-sink/src/message.rs rename to rust/monovertex/src/message.rs diff --git a/serving/source-sink/src/metrics.rs b/rust/monovertex/src/metrics.rs similarity index 100% rename from serving/source-sink/src/metrics.rs rename to rust/monovertex/src/metrics.rs diff --git a/serving/source-sink/src/server_info.rs b/rust/monovertex/src/server_info.rs similarity index 100% rename from serving/source-sink/src/server_info.rs rename to rust/monovertex/src/server_info.rs diff --git a/serving/source-sink/src/shared.rs b/rust/monovertex/src/shared.rs similarity index 100% rename from serving/source-sink/src/shared.rs rename to rust/monovertex/src/shared.rs diff --git a/serving/source-sink/src/sink.rs b/rust/monovertex/src/sink.rs similarity index 91% rename from serving/source-sink/src/sink.rs rename to rust/monovertex/src/sink.rs index ab9a0f49c2..167524050c 100644 --- a/serving/source-sink/src/sink.rs +++ b/rust/monovertex/src/sink.rs @@ -1,14 +1,17 @@ -use tonic::transport::Channel; -use tonic::Request; - -use crate::error::Result; +use crate::error::{Error, Result}; use crate::message::Message; use crate::shared::connect_with_uds; +use backoff::retry::Retry; +use backoff::strategy::fixed; +use tonic::transport::Channel; +use tonic::Request; pub mod proto { tonic::include_proto!("sink.v1"); } +const RECONNECT_INTERVAL: u64 = 1000; +const MAX_RECONNECT_ATTEMPTS: usize = 5; const SINK_SOCKET: &str = "/var/run/numaflow/sink.sock"; const SINK_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info"; @@ -38,7 +41,16 @@ pub struct SinkClient { impl SinkClient { pub(crate) async fn connect(config: SinkConfig) -> Result { - let channel = connect_with_uds(config.socket_path.into()).await?; + let interval = + fixed::Interval::from_millis(RECONNECT_INTERVAL).take(MAX_RECONNECT_ATTEMPTS); + + let channel = Retry::retry( + interval, + || async { connect_with_uds(config.socket_path.clone().into()).await }, + |_: &Error| true, + ) + .await?; + let client = proto::sink_client::SinkClient::new(channel) .max_decoding_message_size(config.max_message_size) .max_encoding_message_size(config.max_message_size); @@ -65,6 +77,7 @@ impl SinkClient { .sink_fn(tokio_stream::wrappers::ReceiverStream::new(rx)) .await? .into_inner(); + Ok(response) } diff --git a/serving/source-sink/src/source.rs b/rust/monovertex/src/source.rs similarity index 94% rename from serving/source-sink/src/source.rs rename to rust/monovertex/src/source.rs index 3e4ec30d40..a58922d5c7 100644 --- a/serving/source-sink/src/source.rs +++ b/rust/monovertex/src/source.rs @@ -1,17 +1,19 @@ +use crate::error::{Error, Result}; +use crate::message::{Message, Offset}; +use crate::shared::connect_with_uds; +use backoff::retry::Retry; +use backoff::strategy::fixed; use base64::prelude::BASE64_STANDARD; use base64::Engine; use tokio_stream::StreamExt; use tonic::transport::Channel; use tonic::Request; -use crate::error::{Error, Result}; -use crate::message::{Message, Offset}; -use crate::shared::connect_with_uds; - pub mod proto { tonic::include_proto!("source.v1"); } - +const RECONNECT_INTERVAL: u64 = 1000; +const MAX_RECONNECT_ATTEMPTS: usize = 5; const SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock"; const SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; @@ -41,7 +43,16 @@ pub(crate) struct SourceClient { impl SourceClient { pub(crate) async fn connect(config: SourceConfig) -> Result { - let channel = connect_with_uds(config.socket_path.into()).await?; + let interval = + fixed::Interval::from_millis(RECONNECT_INTERVAL).take(MAX_RECONNECT_ATTEMPTS); + + let channel = Retry::retry( + interval, + || async { connect_with_uds(config.socket_path.clone().into()).await }, + |_: &Error| true, + ) + .await?; + let client = proto::source_client::SourceClient::new(channel) .max_encoding_message_size(config.max_message_size) .max_decoding_message_size(config.max_message_size); diff --git a/serving/source-sink/src/transformer.rs b/rust/monovertex/src/transformer.rs similarity index 91% rename from serving/source-sink/src/transformer.rs rename to rust/monovertex/src/transformer.rs index 5a3f70f73f..eeabffe6fc 100644 --- a/serving/source-sink/src/transformer.rs +++ b/rust/monovertex/src/transformer.rs @@ -1,15 +1,18 @@ -use tonic::transport::Channel; -use tonic::Request; - -use crate::error::Result; +use crate::error::{Error, Result}; use crate::message::Message; use crate::shared::{connect_with_uds, utc_from_timestamp}; use crate::transformer::proto::SourceTransformRequest; +use backoff::retry::Retry; +use backoff::strategy::fixed; +use tonic::transport::Channel; +use tonic::Request; pub mod proto { tonic::include_proto!("sourcetransformer.v1"); } +const RECONNECT_INTERVAL: u64 = 1000; +const MAX_RECONNECT_ATTEMPTS: usize = 5; const TRANSFORMER_SOCKET: &str = "/var/run/numaflow/sourcetransform.sock"; const TRANSFORMER_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info"; @@ -39,7 +42,16 @@ pub struct TransformerClient { impl TransformerClient { pub(crate) async fn connect(config: TransformerConfig) -> Result { - let channel = connect_with_uds(config.socket_path.into()).await?; + let interval = + fixed::Interval::from_millis(RECONNECT_INTERVAL).take(MAX_RECONNECT_ATTEMPTS); + + let channel = Retry::retry( + interval, + || async { connect_with_uds(config.socket_path.clone().into()).await }, + |_: &Error| true, + ) + .await?; + let client = proto::source_transform_client::SourceTransformClient::new(channel) .max_decoding_message_size(config.max_message_size) .max_encoding_message_size(config.max_message_size); diff --git a/serving/numaflow-models/Cargo.toml b/rust/numaflow-models/Cargo.toml similarity index 100% rename from serving/numaflow-models/Cargo.toml rename to rust/numaflow-models/Cargo.toml diff --git a/serving/numaflow-models/Makefile b/rust/numaflow-models/Makefile similarity index 100% rename from serving/numaflow-models/Makefile rename to rust/numaflow-models/Makefile diff --git a/serving/numaflow-models/hack/swaggerfilter.py b/rust/numaflow-models/hack/swaggerfilter.py similarity index 100% rename from serving/numaflow-models/hack/swaggerfilter.py rename to rust/numaflow-models/hack/swaggerfilter.py diff --git a/serving/numaflow-models/src/apis/configuration.rs b/rust/numaflow-models/src/apis/configuration.rs similarity index 100% rename from serving/numaflow-models/src/apis/configuration.rs rename to rust/numaflow-models/src/apis/configuration.rs diff --git a/serving/numaflow-models/src/apis/mod.rs b/rust/numaflow-models/src/apis/mod.rs similarity index 100% rename from serving/numaflow-models/src/apis/mod.rs rename to rust/numaflow-models/src/apis/mod.rs diff --git a/serving/numaflow-models/src/lib.rs b/rust/numaflow-models/src/lib.rs similarity index 100% rename from serving/numaflow-models/src/lib.rs rename to rust/numaflow-models/src/lib.rs diff --git a/serving/numaflow-models/src/models/abstract_pod_template.rs b/rust/numaflow-models/src/models/abstract_pod_template.rs similarity index 100% rename from serving/numaflow-models/src/models/abstract_pod_template.rs rename to rust/numaflow-models/src/models/abstract_pod_template.rs diff --git a/serving/numaflow-models/src/models/abstract_sink.rs b/rust/numaflow-models/src/models/abstract_sink.rs similarity index 100% rename from serving/numaflow-models/src/models/abstract_sink.rs rename to rust/numaflow-models/src/models/abstract_sink.rs diff --git a/serving/numaflow-models/src/models/abstract_vertex.rs b/rust/numaflow-models/src/models/abstract_vertex.rs similarity index 100% rename from serving/numaflow-models/src/models/abstract_vertex.rs rename to rust/numaflow-models/src/models/abstract_vertex.rs diff --git a/serving/numaflow-models/src/models/authorization.rs b/rust/numaflow-models/src/models/authorization.rs similarity index 100% rename from serving/numaflow-models/src/models/authorization.rs rename to rust/numaflow-models/src/models/authorization.rs diff --git a/serving/numaflow-models/src/models/basic_auth.rs b/rust/numaflow-models/src/models/basic_auth.rs similarity index 100% rename from serving/numaflow-models/src/models/basic_auth.rs rename to rust/numaflow-models/src/models/basic_auth.rs diff --git a/serving/numaflow-models/src/models/blackhole.rs b/rust/numaflow-models/src/models/blackhole.rs similarity index 100% rename from serving/numaflow-models/src/models/blackhole.rs rename to rust/numaflow-models/src/models/blackhole.rs diff --git a/serving/numaflow-models/src/models/buffer_service_config.rs b/rust/numaflow-models/src/models/buffer_service_config.rs similarity index 100% rename from serving/numaflow-models/src/models/buffer_service_config.rs rename to rust/numaflow-models/src/models/buffer_service_config.rs diff --git a/serving/numaflow-models/src/models/combined_edge.rs b/rust/numaflow-models/src/models/combined_edge.rs similarity index 100% rename from serving/numaflow-models/src/models/combined_edge.rs rename to rust/numaflow-models/src/models/combined_edge.rs diff --git a/serving/numaflow-models/src/models/container.rs b/rust/numaflow-models/src/models/container.rs similarity index 100% rename from serving/numaflow-models/src/models/container.rs rename to rust/numaflow-models/src/models/container.rs diff --git a/serving/numaflow-models/src/models/container_builder.rs b/rust/numaflow-models/src/models/container_builder.rs similarity index 100% rename from serving/numaflow-models/src/models/container_builder.rs rename to rust/numaflow-models/src/models/container_builder.rs diff --git a/serving/numaflow-models/src/models/container_template.rs b/rust/numaflow-models/src/models/container_template.rs similarity index 100% rename from serving/numaflow-models/src/models/container_template.rs rename to rust/numaflow-models/src/models/container_template.rs diff --git a/serving/numaflow-models/src/models/daemon_template.rs b/rust/numaflow-models/src/models/daemon_template.rs similarity index 100% rename from serving/numaflow-models/src/models/daemon_template.rs rename to rust/numaflow-models/src/models/daemon_template.rs diff --git a/serving/numaflow-models/src/models/edge.rs b/rust/numaflow-models/src/models/edge.rs similarity index 100% rename from serving/numaflow-models/src/models/edge.rs rename to rust/numaflow-models/src/models/edge.rs diff --git a/serving/numaflow-models/src/models/fixed_window.rs b/rust/numaflow-models/src/models/fixed_window.rs similarity index 100% rename from serving/numaflow-models/src/models/fixed_window.rs rename to rust/numaflow-models/src/models/fixed_window.rs diff --git a/serving/numaflow-models/src/models/forward_conditions.rs b/rust/numaflow-models/src/models/forward_conditions.rs similarity index 100% rename from serving/numaflow-models/src/models/forward_conditions.rs rename to rust/numaflow-models/src/models/forward_conditions.rs diff --git a/serving/numaflow-models/src/models/function.rs b/rust/numaflow-models/src/models/function.rs similarity index 100% rename from serving/numaflow-models/src/models/function.rs rename to rust/numaflow-models/src/models/function.rs diff --git a/serving/numaflow-models/src/models/generator_source.rs b/rust/numaflow-models/src/models/generator_source.rs similarity index 100% rename from serving/numaflow-models/src/models/generator_source.rs rename to rust/numaflow-models/src/models/generator_source.rs diff --git a/serving/numaflow-models/src/models/get_container_req.rs b/rust/numaflow-models/src/models/get_container_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_container_req.rs rename to rust/numaflow-models/src/models/get_container_req.rs diff --git a/serving/numaflow-models/src/models/get_daemon_deployment_req.rs b/rust/numaflow-models/src/models/get_daemon_deployment_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_daemon_deployment_req.rs rename to rust/numaflow-models/src/models/get_daemon_deployment_req.rs diff --git a/serving/numaflow-models/src/models/get_jet_stream_service_spec_req.rs b/rust/numaflow-models/src/models/get_jet_stream_service_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_jet_stream_service_spec_req.rs rename to rust/numaflow-models/src/models/get_jet_stream_service_spec_req.rs diff --git a/serving/numaflow-models/src/models/get_jet_stream_stateful_set_spec_req.rs b/rust/numaflow-models/src/models/get_jet_stream_stateful_set_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_jet_stream_stateful_set_spec_req.rs rename to rust/numaflow-models/src/models/get_jet_stream_stateful_set_spec_req.rs diff --git a/serving/numaflow-models/src/models/get_mono_vertex_daemon_deployment_req.rs b/rust/numaflow-models/src/models/get_mono_vertex_daemon_deployment_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_mono_vertex_daemon_deployment_req.rs rename to rust/numaflow-models/src/models/get_mono_vertex_daemon_deployment_req.rs diff --git a/serving/numaflow-models/src/models/get_mono_vertex_pod_spec_req.rs b/rust/numaflow-models/src/models/get_mono_vertex_pod_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_mono_vertex_pod_spec_req.rs rename to rust/numaflow-models/src/models/get_mono_vertex_pod_spec_req.rs diff --git a/serving/numaflow-models/src/models/get_redis_service_spec_req.rs b/rust/numaflow-models/src/models/get_redis_service_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_redis_service_spec_req.rs rename to rust/numaflow-models/src/models/get_redis_service_spec_req.rs diff --git a/serving/numaflow-models/src/models/get_redis_stateful_set_spec_req.rs b/rust/numaflow-models/src/models/get_redis_stateful_set_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_redis_stateful_set_spec_req.rs rename to rust/numaflow-models/src/models/get_redis_stateful_set_spec_req.rs diff --git a/serving/numaflow-models/src/models/get_side_input_deployment_req.rs b/rust/numaflow-models/src/models/get_side_input_deployment_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_side_input_deployment_req.rs rename to rust/numaflow-models/src/models/get_side_input_deployment_req.rs diff --git a/serving/numaflow-models/src/models/get_vertex_pod_spec_req.rs b/rust/numaflow-models/src/models/get_vertex_pod_spec_req.rs similarity index 100% rename from serving/numaflow-models/src/models/get_vertex_pod_spec_req.rs rename to rust/numaflow-models/src/models/get_vertex_pod_spec_req.rs diff --git a/serving/numaflow-models/src/models/group_by.rs b/rust/numaflow-models/src/models/group_by.rs similarity index 100% rename from serving/numaflow-models/src/models/group_by.rs rename to rust/numaflow-models/src/models/group_by.rs diff --git a/serving/numaflow-models/src/models/gssapi.rs b/rust/numaflow-models/src/models/gssapi.rs similarity index 100% rename from serving/numaflow-models/src/models/gssapi.rs rename to rust/numaflow-models/src/models/gssapi.rs diff --git a/serving/numaflow-models/src/models/http_source.rs b/rust/numaflow-models/src/models/http_source.rs similarity index 100% rename from serving/numaflow-models/src/models/http_source.rs rename to rust/numaflow-models/src/models/http_source.rs diff --git a/serving/numaflow-models/src/models/idle_source.rs b/rust/numaflow-models/src/models/idle_source.rs similarity index 100% rename from serving/numaflow-models/src/models/idle_source.rs rename to rust/numaflow-models/src/models/idle_source.rs diff --git a/serving/numaflow-models/src/models/inter_step_buffer_service.rs b/rust/numaflow-models/src/models/inter_step_buffer_service.rs similarity index 100% rename from serving/numaflow-models/src/models/inter_step_buffer_service.rs rename to rust/numaflow-models/src/models/inter_step_buffer_service.rs diff --git a/serving/numaflow-models/src/models/inter_step_buffer_service_list.rs b/rust/numaflow-models/src/models/inter_step_buffer_service_list.rs similarity index 100% rename from serving/numaflow-models/src/models/inter_step_buffer_service_list.rs rename to rust/numaflow-models/src/models/inter_step_buffer_service_list.rs diff --git a/serving/numaflow-models/src/models/inter_step_buffer_service_spec.rs b/rust/numaflow-models/src/models/inter_step_buffer_service_spec.rs similarity index 100% rename from serving/numaflow-models/src/models/inter_step_buffer_service_spec.rs rename to rust/numaflow-models/src/models/inter_step_buffer_service_spec.rs diff --git a/serving/numaflow-models/src/models/inter_step_buffer_service_status.rs b/rust/numaflow-models/src/models/inter_step_buffer_service_status.rs similarity index 100% rename from serving/numaflow-models/src/models/inter_step_buffer_service_status.rs rename to rust/numaflow-models/src/models/inter_step_buffer_service_status.rs diff --git a/serving/numaflow-models/src/models/jet_stream_buffer_service.rs b/rust/numaflow-models/src/models/jet_stream_buffer_service.rs similarity index 100% rename from serving/numaflow-models/src/models/jet_stream_buffer_service.rs rename to rust/numaflow-models/src/models/jet_stream_buffer_service.rs diff --git a/serving/numaflow-models/src/models/jet_stream_config.rs b/rust/numaflow-models/src/models/jet_stream_config.rs similarity index 100% rename from serving/numaflow-models/src/models/jet_stream_config.rs rename to rust/numaflow-models/src/models/jet_stream_config.rs diff --git a/serving/numaflow-models/src/models/jet_stream_source.rs b/rust/numaflow-models/src/models/jet_stream_source.rs similarity index 100% rename from serving/numaflow-models/src/models/jet_stream_source.rs rename to rust/numaflow-models/src/models/jet_stream_source.rs diff --git a/serving/numaflow-models/src/models/job_template.rs b/rust/numaflow-models/src/models/job_template.rs similarity index 100% rename from serving/numaflow-models/src/models/job_template.rs rename to rust/numaflow-models/src/models/job_template.rs diff --git a/serving/numaflow-models/src/models/kafka_sink.rs b/rust/numaflow-models/src/models/kafka_sink.rs similarity index 100% rename from serving/numaflow-models/src/models/kafka_sink.rs rename to rust/numaflow-models/src/models/kafka_sink.rs diff --git a/serving/numaflow-models/src/models/kafka_source.rs b/rust/numaflow-models/src/models/kafka_source.rs similarity index 100% rename from serving/numaflow-models/src/models/kafka_source.rs rename to rust/numaflow-models/src/models/kafka_source.rs diff --git a/serving/numaflow-models/src/models/lifecycle.rs b/rust/numaflow-models/src/models/lifecycle.rs similarity index 100% rename from serving/numaflow-models/src/models/lifecycle.rs rename to rust/numaflow-models/src/models/lifecycle.rs diff --git a/serving/numaflow-models/src/models/log.rs b/rust/numaflow-models/src/models/log.rs similarity index 100% rename from serving/numaflow-models/src/models/log.rs rename to rust/numaflow-models/src/models/log.rs diff --git a/serving/numaflow-models/src/models/metadata.rs b/rust/numaflow-models/src/models/metadata.rs similarity index 100% rename from serving/numaflow-models/src/models/metadata.rs rename to rust/numaflow-models/src/models/metadata.rs diff --git a/serving/numaflow-models/src/models/mod.rs b/rust/numaflow-models/src/models/mod.rs similarity index 100% rename from serving/numaflow-models/src/models/mod.rs rename to rust/numaflow-models/src/models/mod.rs diff --git a/serving/numaflow-models/src/models/mono_vertex.rs b/rust/numaflow-models/src/models/mono_vertex.rs similarity index 100% rename from serving/numaflow-models/src/models/mono_vertex.rs rename to rust/numaflow-models/src/models/mono_vertex.rs diff --git a/serving/numaflow-models/src/models/mono_vertex_limits.rs b/rust/numaflow-models/src/models/mono_vertex_limits.rs similarity index 100% rename from serving/numaflow-models/src/models/mono_vertex_limits.rs rename to rust/numaflow-models/src/models/mono_vertex_limits.rs diff --git a/serving/numaflow-models/src/models/mono_vertex_list.rs b/rust/numaflow-models/src/models/mono_vertex_list.rs similarity index 100% rename from serving/numaflow-models/src/models/mono_vertex_list.rs rename to rust/numaflow-models/src/models/mono_vertex_list.rs diff --git a/serving/numaflow-models/src/models/mono_vertex_spec.rs b/rust/numaflow-models/src/models/mono_vertex_spec.rs similarity index 100% rename from serving/numaflow-models/src/models/mono_vertex_spec.rs rename to rust/numaflow-models/src/models/mono_vertex_spec.rs diff --git a/serving/numaflow-models/src/models/mono_vertex_status.rs b/rust/numaflow-models/src/models/mono_vertex_status.rs similarity index 100% rename from serving/numaflow-models/src/models/mono_vertex_status.rs rename to rust/numaflow-models/src/models/mono_vertex_status.rs diff --git a/serving/numaflow-models/src/models/native_redis.rs b/rust/numaflow-models/src/models/native_redis.rs similarity index 100% rename from serving/numaflow-models/src/models/native_redis.rs rename to rust/numaflow-models/src/models/native_redis.rs diff --git a/serving/numaflow-models/src/models/nats_auth.rs b/rust/numaflow-models/src/models/nats_auth.rs similarity index 100% rename from serving/numaflow-models/src/models/nats_auth.rs rename to rust/numaflow-models/src/models/nats_auth.rs diff --git a/serving/numaflow-models/src/models/nats_source.rs b/rust/numaflow-models/src/models/nats_source.rs similarity index 100% rename from serving/numaflow-models/src/models/nats_source.rs rename to rust/numaflow-models/src/models/nats_source.rs diff --git a/serving/numaflow-models/src/models/no_store.rs b/rust/numaflow-models/src/models/no_store.rs similarity index 100% rename from serving/numaflow-models/src/models/no_store.rs rename to rust/numaflow-models/src/models/no_store.rs diff --git a/serving/numaflow-models/src/models/pbq_storage.rs b/rust/numaflow-models/src/models/pbq_storage.rs similarity index 100% rename from serving/numaflow-models/src/models/pbq_storage.rs rename to rust/numaflow-models/src/models/pbq_storage.rs diff --git a/serving/numaflow-models/src/models/persistence_strategy.rs b/rust/numaflow-models/src/models/persistence_strategy.rs similarity index 100% rename from serving/numaflow-models/src/models/persistence_strategy.rs rename to rust/numaflow-models/src/models/persistence_strategy.rs diff --git a/serving/numaflow-models/src/models/pipeline.rs b/rust/numaflow-models/src/models/pipeline.rs similarity index 100% rename from serving/numaflow-models/src/models/pipeline.rs rename to rust/numaflow-models/src/models/pipeline.rs diff --git a/serving/numaflow-models/src/models/pipeline_limits.rs b/rust/numaflow-models/src/models/pipeline_limits.rs similarity index 100% rename from serving/numaflow-models/src/models/pipeline_limits.rs rename to rust/numaflow-models/src/models/pipeline_limits.rs diff --git a/serving/numaflow-models/src/models/pipeline_list.rs b/rust/numaflow-models/src/models/pipeline_list.rs similarity index 100% rename from serving/numaflow-models/src/models/pipeline_list.rs rename to rust/numaflow-models/src/models/pipeline_list.rs diff --git a/serving/numaflow-models/src/models/pipeline_spec.rs b/rust/numaflow-models/src/models/pipeline_spec.rs similarity index 100% rename from serving/numaflow-models/src/models/pipeline_spec.rs rename to rust/numaflow-models/src/models/pipeline_spec.rs diff --git a/serving/numaflow-models/src/models/pipeline_status.rs b/rust/numaflow-models/src/models/pipeline_status.rs similarity index 100% rename from serving/numaflow-models/src/models/pipeline_status.rs rename to rust/numaflow-models/src/models/pipeline_status.rs diff --git a/serving/numaflow-models/src/models/redis_buffer_service.rs b/rust/numaflow-models/src/models/redis_buffer_service.rs similarity index 100% rename from serving/numaflow-models/src/models/redis_buffer_service.rs rename to rust/numaflow-models/src/models/redis_buffer_service.rs diff --git a/serving/numaflow-models/src/models/redis_config.rs b/rust/numaflow-models/src/models/redis_config.rs similarity index 100% rename from serving/numaflow-models/src/models/redis_config.rs rename to rust/numaflow-models/src/models/redis_config.rs diff --git a/serving/numaflow-models/src/models/redis_settings.rs b/rust/numaflow-models/src/models/redis_settings.rs similarity index 100% rename from serving/numaflow-models/src/models/redis_settings.rs rename to rust/numaflow-models/src/models/redis_settings.rs diff --git a/serving/numaflow-models/src/models/sasl.rs b/rust/numaflow-models/src/models/sasl.rs similarity index 100% rename from serving/numaflow-models/src/models/sasl.rs rename to rust/numaflow-models/src/models/sasl.rs diff --git a/serving/numaflow-models/src/models/sasl_plain.rs b/rust/numaflow-models/src/models/sasl_plain.rs similarity index 100% rename from serving/numaflow-models/src/models/sasl_plain.rs rename to rust/numaflow-models/src/models/sasl_plain.rs diff --git a/serving/numaflow-models/src/models/scale.rs b/rust/numaflow-models/src/models/scale.rs similarity index 100% rename from serving/numaflow-models/src/models/scale.rs rename to rust/numaflow-models/src/models/scale.rs diff --git a/serving/numaflow-models/src/models/serving_source.rs b/rust/numaflow-models/src/models/serving_source.rs similarity index 100% rename from serving/numaflow-models/src/models/serving_source.rs rename to rust/numaflow-models/src/models/serving_source.rs diff --git a/serving/numaflow-models/src/models/serving_store.rs b/rust/numaflow-models/src/models/serving_store.rs similarity index 100% rename from serving/numaflow-models/src/models/serving_store.rs rename to rust/numaflow-models/src/models/serving_store.rs diff --git a/serving/numaflow-models/src/models/session_window.rs b/rust/numaflow-models/src/models/session_window.rs similarity index 100% rename from serving/numaflow-models/src/models/session_window.rs rename to rust/numaflow-models/src/models/session_window.rs diff --git a/serving/numaflow-models/src/models/side_input.rs b/rust/numaflow-models/src/models/side_input.rs similarity index 100% rename from serving/numaflow-models/src/models/side_input.rs rename to rust/numaflow-models/src/models/side_input.rs diff --git a/serving/numaflow-models/src/models/side_input_trigger.rs b/rust/numaflow-models/src/models/side_input_trigger.rs similarity index 100% rename from serving/numaflow-models/src/models/side_input_trigger.rs rename to rust/numaflow-models/src/models/side_input_trigger.rs diff --git a/serving/numaflow-models/src/models/side_inputs_manager_template.rs b/rust/numaflow-models/src/models/side_inputs_manager_template.rs similarity index 100% rename from serving/numaflow-models/src/models/side_inputs_manager_template.rs rename to rust/numaflow-models/src/models/side_inputs_manager_template.rs diff --git a/serving/numaflow-models/src/models/sink.rs b/rust/numaflow-models/src/models/sink.rs similarity index 100% rename from serving/numaflow-models/src/models/sink.rs rename to rust/numaflow-models/src/models/sink.rs diff --git a/serving/numaflow-models/src/models/sliding_window.rs b/rust/numaflow-models/src/models/sliding_window.rs similarity index 100% rename from serving/numaflow-models/src/models/sliding_window.rs rename to rust/numaflow-models/src/models/sliding_window.rs diff --git a/serving/numaflow-models/src/models/source.rs b/rust/numaflow-models/src/models/source.rs similarity index 100% rename from serving/numaflow-models/src/models/source.rs rename to rust/numaflow-models/src/models/source.rs diff --git a/serving/numaflow-models/src/models/status.rs b/rust/numaflow-models/src/models/status.rs similarity index 100% rename from serving/numaflow-models/src/models/status.rs rename to rust/numaflow-models/src/models/status.rs diff --git a/serving/numaflow-models/src/models/tag_conditions.rs b/rust/numaflow-models/src/models/tag_conditions.rs similarity index 100% rename from serving/numaflow-models/src/models/tag_conditions.rs rename to rust/numaflow-models/src/models/tag_conditions.rs diff --git a/serving/numaflow-models/src/models/templates.rs b/rust/numaflow-models/src/models/templates.rs similarity index 100% rename from serving/numaflow-models/src/models/templates.rs rename to rust/numaflow-models/src/models/templates.rs diff --git a/serving/numaflow-models/src/models/tls.rs b/rust/numaflow-models/src/models/tls.rs similarity index 100% rename from serving/numaflow-models/src/models/tls.rs rename to rust/numaflow-models/src/models/tls.rs diff --git a/serving/numaflow-models/src/models/transformer.rs b/rust/numaflow-models/src/models/transformer.rs similarity index 100% rename from serving/numaflow-models/src/models/transformer.rs rename to rust/numaflow-models/src/models/transformer.rs diff --git a/serving/numaflow-models/src/models/ud_sink.rs b/rust/numaflow-models/src/models/ud_sink.rs similarity index 100% rename from serving/numaflow-models/src/models/ud_sink.rs rename to rust/numaflow-models/src/models/ud_sink.rs diff --git a/serving/numaflow-models/src/models/ud_source.rs b/rust/numaflow-models/src/models/ud_source.rs similarity index 100% rename from serving/numaflow-models/src/models/ud_source.rs rename to rust/numaflow-models/src/models/ud_source.rs diff --git a/serving/numaflow-models/src/models/ud_transformer.rs b/rust/numaflow-models/src/models/ud_transformer.rs similarity index 100% rename from serving/numaflow-models/src/models/ud_transformer.rs rename to rust/numaflow-models/src/models/ud_transformer.rs diff --git a/serving/numaflow-models/src/models/udf.rs b/rust/numaflow-models/src/models/udf.rs similarity index 100% rename from serving/numaflow-models/src/models/udf.rs rename to rust/numaflow-models/src/models/udf.rs diff --git a/serving/numaflow-models/src/models/vertex.rs b/rust/numaflow-models/src/models/vertex.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex.rs rename to rust/numaflow-models/src/models/vertex.rs diff --git a/serving/numaflow-models/src/models/vertex_instance.rs b/rust/numaflow-models/src/models/vertex_instance.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_instance.rs rename to rust/numaflow-models/src/models/vertex_instance.rs diff --git a/serving/numaflow-models/src/models/vertex_limits.rs b/rust/numaflow-models/src/models/vertex_limits.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_limits.rs rename to rust/numaflow-models/src/models/vertex_limits.rs diff --git a/serving/numaflow-models/src/models/vertex_list.rs b/rust/numaflow-models/src/models/vertex_list.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_list.rs rename to rust/numaflow-models/src/models/vertex_list.rs diff --git a/serving/numaflow-models/src/models/vertex_spec.rs b/rust/numaflow-models/src/models/vertex_spec.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_spec.rs rename to rust/numaflow-models/src/models/vertex_spec.rs diff --git a/serving/numaflow-models/src/models/vertex_status.rs b/rust/numaflow-models/src/models/vertex_status.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_status.rs rename to rust/numaflow-models/src/models/vertex_status.rs diff --git a/serving/numaflow-models/src/models/vertex_template.rs b/rust/numaflow-models/src/models/vertex_template.rs similarity index 100% rename from serving/numaflow-models/src/models/vertex_template.rs rename to rust/numaflow-models/src/models/vertex_template.rs diff --git a/serving/numaflow-models/src/models/watermark.rs b/rust/numaflow-models/src/models/watermark.rs similarity index 100% rename from serving/numaflow-models/src/models/watermark.rs rename to rust/numaflow-models/src/models/watermark.rs diff --git a/serving/numaflow-models/src/models/window.rs b/rust/numaflow-models/src/models/window.rs similarity index 100% rename from serving/numaflow-models/src/models/window.rs rename to rust/numaflow-models/src/models/window.rs diff --git a/serving/numaflow-models/templates/Cargo.mustache b/rust/numaflow-models/templates/Cargo.mustache similarity index 100% rename from serving/numaflow-models/templates/Cargo.mustache rename to rust/numaflow-models/templates/Cargo.mustache diff --git a/serving/numaflow-models/templates/partial_header.mustache b/rust/numaflow-models/templates/partial_header.mustache similarity index 100% rename from serving/numaflow-models/templates/partial_header.mustache rename to rust/numaflow-models/templates/partial_header.mustache diff --git a/serving/servesink/.dockerignore b/rust/servesink/.dockerignore similarity index 100% rename from serving/servesink/.dockerignore rename to rust/servesink/.dockerignore diff --git a/serving/servesink/Cargo.toml b/rust/servesink/Cargo.toml similarity index 88% rename from serving/servesink/Cargo.toml rename to rust/servesink/Cargo.toml index 7534a32008..70fa8e55f5 100644 --- a/serving/servesink/Cargo.toml +++ b/rust/servesink/Cargo.toml @@ -3,10 +3,6 @@ name = "servesink" version = "0.1.0" edition = "2021" -[[bin]] -name = "servesink" -path = "src/main.rs" - [dependencies] tonic = "0.12.0" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/serving/servesink/Dockerfile b/rust/servesink/Dockerfile similarity index 100% rename from serving/servesink/Dockerfile rename to rust/servesink/Dockerfile diff --git a/serving/servesink/src/main.rs b/rust/servesink/src/lib.rs similarity index 96% rename from serving/servesink/src/main.rs rename to rust/servesink/src/lib.rs index aa089f60d1..5663b61b07 100644 --- a/serving/servesink/src/main.rs +++ b/rust/servesink/src/lib.rs @@ -5,8 +5,7 @@ use reqwest::Client; use tracing::{error, warn}; use tracing_subscriber::prelude::*; -#[tokio::main] -async fn main() -> Result<(), Box> { +pub async fn servesink() -> Result<(), Box> { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() diff --git a/serving/Cargo.toml b/rust/serving/Cargo.toml similarity index 86% rename from serving/Cargo.toml rename to rust/serving/Cargo.toml index 58525ad62b..635bb4f208 100644 --- a/serving/Cargo.toml +++ b/rust/serving/Cargo.toml @@ -1,6 +1,5 @@ -workspace = { members = ["backoff", "extras/upstreams", "numaflow-models", "servesink", "source-sink"] } [package] -name = "serve" +name = "serving" version = "0.1.0" edition = "2021" @@ -29,7 +28,6 @@ redis = { version = "0.26.0", features = ["tokio-comp", "aio", "connection-manag config = "0.14.0" trait-variant = "0.1.2" chrono = { version = "0.4", features = ["serde"] } -# intern -backoff = { path = "backoff" } +backoff = { path = "../backoff" } base64 = "0.22.1" diff --git a/rust/serving/README.md b/rust/serving/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/serving/config/default.toml b/rust/serving/config/default.toml similarity index 100% rename from serving/config/default.toml rename to rust/serving/config/default.toml diff --git a/serving/config/jetstream.conf b/rust/serving/config/jetstream.conf similarity index 100% rename from serving/config/jetstream.conf rename to rust/serving/config/jetstream.conf diff --git a/serving/config/pipeline_spec.json b/rust/serving/config/pipeline_spec.json similarity index 100% rename from serving/config/pipeline_spec.json rename to rust/serving/config/pipeline_spec.json diff --git a/serving/src/app.rs b/rust/serving/src/app.rs similarity index 100% rename from serving/src/app.rs rename to rust/serving/src/app.rs diff --git a/serving/src/app/callback.rs b/rust/serving/src/app/callback.rs similarity index 100% rename from serving/src/app/callback.rs rename to rust/serving/src/app/callback.rs diff --git a/serving/src/app/callback/state.rs b/rust/serving/src/app/callback/state.rs similarity index 100% rename from serving/src/app/callback/state.rs rename to rust/serving/src/app/callback/state.rs diff --git a/serving/src/app/callback/store.rs b/rust/serving/src/app/callback/store.rs similarity index 100% rename from serving/src/app/callback/store.rs rename to rust/serving/src/app/callback/store.rs diff --git a/serving/src/app/callback/store/memstore.rs b/rust/serving/src/app/callback/store/memstore.rs similarity index 100% rename from serving/src/app/callback/store/memstore.rs rename to rust/serving/src/app/callback/store/memstore.rs diff --git a/serving/src/app/callback/store/redisstore.rs b/rust/serving/src/app/callback/store/redisstore.rs similarity index 100% rename from serving/src/app/callback/store/redisstore.rs rename to rust/serving/src/app/callback/store/redisstore.rs diff --git a/serving/src/app/direct_proxy.rs b/rust/serving/src/app/direct_proxy.rs similarity index 100% rename from serving/src/app/direct_proxy.rs rename to rust/serving/src/app/direct_proxy.rs diff --git a/serving/src/app/jetstream_proxy.rs b/rust/serving/src/app/jetstream_proxy.rs similarity index 98% rename from serving/src/app/jetstream_proxy.rs rename to rust/serving/src/app/jetstream_proxy.rs index b243a030b6..6a56266f72 100644 --- a/serving/src/app/jetstream_proxy.rs +++ b/rust/serving/src/app/jetstream_proxy.rs @@ -117,8 +117,8 @@ async fn sync_publish_serve( } }; - // The reponse can be a binary array of elements as single chunk. For the user to process the blob, we return the array len and - // length of each element in the array. This will help the user to decomponse the binary response chunk into individual + // The response can be a binary array of elements as single chunk. For the user to process the blob, we return the array len and + // length of each element in the array. This will help the user to decompose the binary response chunk into individual // elements. let mut header_map = HeaderMap::new(); let response_arr_len: String = result diff --git a/serving/src/app/message_path.rs b/rust/serving/src/app/message_path.rs similarity index 100% rename from serving/src/app/message_path.rs rename to rust/serving/src/app/message_path.rs diff --git a/serving/src/app/response.rs b/rust/serving/src/app/response.rs similarity index 100% rename from serving/src/app/response.rs rename to rust/serving/src/app/response.rs diff --git a/serving/src/app/tracker.rs b/rust/serving/src/app/tracker.rs similarity index 100% rename from serving/src/app/tracker.rs rename to rust/serving/src/app/tracker.rs diff --git a/serving/src/config.rs b/rust/serving/src/config.rs similarity index 100% rename from serving/src/config.rs rename to rust/serving/src/config.rs diff --git a/serving/src/consts.rs b/rust/serving/src/consts.rs similarity index 100% rename from serving/src/consts.rs rename to rust/serving/src/consts.rs diff --git a/serving/src/error.rs b/rust/serving/src/error.rs similarity index 100% rename from serving/src/error.rs rename to rust/serving/src/error.rs diff --git a/serving/src/main.rs b/rust/serving/src/lib.rs similarity index 87% rename from serving/src/main.rs rename to rust/serving/src/lib.rs index 6e3beea02d..929a28f050 100644 --- a/serving/src/main.rs +++ b/rust/serving/src/lib.rs @@ -1,12 +1,11 @@ use tracing::{error, info}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -pub use config::config; - -use crate::pipeline::pipeline_spec; -use crate::{app::start_main_server, metrics::start_metrics_server}; - +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use crate::app::start_main_server; +use crate::config::config; pub use self::error::{Error, Result}; +use crate::metrics::start_metrics_server; +use crate::pipeline::pipeline_spec; mod app; mod config; @@ -15,8 +14,7 @@ mod error; mod metrics; mod pipeline; -#[tokio::main] -async fn main() { +pub async fn serve() { tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() @@ -50,4 +48,4 @@ async fn flatten(handle: tokio::task::JoinHandle>) -> Result { Ok(Err(err)) => Err(err), Err(err) => Err(Error::Other(format!("Spawning the server: {err:?}"))), } -} \ No newline at end of file +} diff --git a/serving/src/metrics.rs b/rust/serving/src/metrics.rs similarity index 100% rename from serving/src/metrics.rs rename to rust/serving/src/metrics.rs diff --git a/serving/src/pipeline.rs b/rust/serving/src/pipeline.rs similarity index 100% rename from serving/src/pipeline.rs rename to rust/serving/src/pipeline.rs diff --git a/rust/src/bin/main.rs b/rust/src/bin/main.rs new file mode 100644 index 0000000000..ec715b0b34 --- /dev/null +++ b/rust/src/bin/main.rs @@ -0,0 +1,20 @@ +use std::env; +use tracing::{error, info}; + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + + // Based on the argument, run the appropriate component. + if args.contains(&"--serving".to_string()) { + serving::serve().await; + } else if args.contains(&"--servesink".to_string()) { + if let Err(e) = servesink::servesink().await { + info!("Error running servesink: {}", e); + } + } else if args.contains(&"--monovertex".to_string()) { + monovertex::mono_vertex().await; + } else { + error!("Invalid argument. Use --serve, --servesink, or --monovertex."); + } +} \ No newline at end of file diff --git a/serving/extras/upstreams/Cargo.toml b/serving/extras/upstreams/Cargo.toml deleted file mode 100644 index 58340d4891..0000000000 --- a/serving/extras/upstreams/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "upstreams" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -axum = "0.7.5" -axum-macros = "0.4.1" -serde = { version = "1.0.197", features = ["derive"] } -serde_json = "1.0.114" -tokio = { version = "1.36.0", features = ["full"] } -tower-http = { version = "0.5.2", features = ["trace"] } -tracing = "0.1.40" -tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } - -[dev-dependencies] -http-body-util = "0.1.1" -tower = "0.4.13" diff --git a/serving/extras/upstreams/src/bin/simple_proxy.rs b/serving/extras/upstreams/src/bin/simple_proxy.rs deleted file mode 100644 index d09f5167a9..0000000000 --- a/serving/extras/upstreams/src/bin/simple_proxy.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::time::Duration; - -use axum::{response::IntoResponse, routing::get, Router}; -use axum_macros::debug_handler; -use tokio::{net::TcpListener, time::sleep}; -use tower_http::trace::TraceLayer; -use tracing::{debug, info}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -#[tokio::main] -async fn main() { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "simple_proxy=debug,tower_http=debug".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let router = app(); - let router = router.layer(TraceLayer::new_for_http()); - - let listener = TcpListener::bind("localhost:8888").await.unwrap(); - info!("listening on {}", listener.local_addr().unwrap()); - axum::serve(listener, router).await.unwrap(); -} - -fn app() -> Router { - Router::new() - .route("/fast", get(|| async {})) - .route( - "/slow", - get(|| async { - debug!("sleeping"); - sleep(Duration::from_secs(1)).await - }), - ) - .route("/", get(root_handler)) -} - -#[debug_handler] -async fn root_handler() -> impl IntoResponse { - "ok" -} - -#[cfg(test)] -mod tests { - - // inspired from: https://github.com/tokio-rs/axum/blob/main/examples/testing/src/main.rs - - use axum::{ - body::Body, - http::{Request, StatusCode}, - }; - use http_body_util::BodyExt; - use tower::ServiceExt; - - use super::*; - - #[tokio::test] - async fn fast() { - let app = app(); - - let request = Request::builder().uri("/fast").body(Body::empty()).unwrap(); - - let response = app.oneshot(request).await.unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - } - - #[tokio::test] - async fn slow() { - let app = app(); - - let request = Request::builder().uri("/slow").body(Body::empty()).unwrap(); - - let response = app.oneshot(request).await.unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - } - - #[tokio::test] - async fn root() { - let app = app(); - - let request = Request::builder().uri("/").body(Body::empty()).unwrap(); - - let response = app.oneshot(request).await.unwrap(); - assert_eq!(response.status(), StatusCode::OK); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - assert_eq!(&body[..], b"ok"); - } -} diff --git a/serving/source-sink/Dockerfile b/serving/source-sink/Dockerfile deleted file mode 100644 index 4ed8bb62f7..0000000000 --- a/serving/source-sink/Dockerfile +++ /dev/null @@ -1,19 +0,0 @@ -FROM rust:1.76-bookworm AS build - -RUN apt-get update -RUN apt-get install protobuf-compiler -y - -WORKDIR /source-sink -COPY ./ ./ - -# build for release -RUN cargo build --release - -# our final base -FROM debian:bookworm AS simple-source - -# copy the build artifact from the build stage -COPY --from=build /source-sink/target/release/source-sink /bin/serve - -# set the startup command to run your binary -CMD ["/bin/serve"] diff --git a/serving/source-sink/src/main.rs b/serving/source-sink/src/main.rs deleted file mode 100644 index e3cfb7e6a6..0000000000 --- a/serving/source-sink/src/main.rs +++ /dev/null @@ -1,89 +0,0 @@ -use tokio::signal; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; -use tracing::level_filters::LevelFilter; -use tracing::{error, info}; -use tracing_subscriber::EnvFilter; - -use sourcer_sinker::config::config; -use sourcer_sinker::init; -use sourcer_sinker::sink::SinkConfig; -use sourcer_sinker::source::SourceConfig; -use sourcer_sinker::transformer::TransformerConfig; - -#[tokio::main] -async fn main() { - // Initialize the logger - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .parse_lossy(&config().log_level), - ) - .with_target(false) - .init(); - - // Initialize the source, sink and transformer configurations - // We are using the default configurations for now. - let source_config = SourceConfig { - max_message_size: config().grpc_max_message_size, - ..Default::default() - }; - - let sink_config = SinkConfig { - max_message_size: config().grpc_max_message_size, - ..Default::default() - }; - - let transformer_config = if config().is_transformer_enabled { - Some(TransformerConfig { - max_message_size: config().grpc_max_message_size, - ..Default::default() - }) - } else { - None - }; - - let cln_token = CancellationToken::new(); - let shutdown_cln_token = cln_token.clone(); - // wait for SIG{INT,TERM} and invoke cancellation token. - let shutdown_handle: JoinHandle> = tokio::spawn(async move { - shutdown_signal().await; - shutdown_cln_token.cancel(); - Ok(()) - }); - - // Run the forwarder with cancellation token. - if let Err(e) = init(source_config, sink_config, transformer_config, cln_token).await { - error!("Application error: {:?}", e); - - // abort the task since we have an error - if !shutdown_handle.is_finished() { - shutdown_handle.abort(); - } - } - - info!("Gracefully Exiting..."); -} - -async fn shutdown_signal() { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - info!("Received Ctrl+C signal"); - }; - - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - info!("Received terminate signal"); - }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - } -}