Skip to content

Commit

Permalink
chore: organize rust imports (#2164)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Oct 17, 2024
1 parent 3d6e47f commit 7672de7
Show file tree
Hide file tree
Showing 29 changed files with 158 additions and 145 deletions.
3 changes: 1 addition & 2 deletions rust/backoff/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use crate::strategy::fixed;

use super::*;
use crate::strategy::fixed;

async fn always_successful() -> Result<u64, ()> {
Ok(42)
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::sync::OnceLock;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use numaflow_models::models::{Backoff, MonoVertex, RetryStrategy};

use crate::Error;
use numaflow_models::models::{Backoff, MonoVertex, RetryStrategy};

const DEFAULT_SOURCE_SOCKET: &str = "/var/run/numaflow/source.sock";
const DEFAULT_SOURCE_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info";
Expand Down
22 changes: 13 additions & 9 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp};
use crate::Error;
use crate::Result;
use std::cmp::PartialEq;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::{env, fmt};

use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
Expand All @@ -12,10 +14,10 @@ use numaflow_pb::clients::source::{read_response, AckRequest};
use numaflow_pb::clients::sourcetransformer::SourceTransformRequest;
use prost::Message as ProtoMessage;
use serde::{Deserialize, Serialize};
use std::cmp::PartialEq;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::{env, fmt};

use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp};
use crate::Error;
use crate::Result;

const NUMAFLOW_MONO_VERTEX_NAME: &str = "NUMAFLOW_MONO_VERTEX_NAME";
const NUMAFLOW_VERTEX_NAME: &str = "NUMAFLOW_VERTEX_NAME";
Expand Down Expand Up @@ -296,14 +298,16 @@ impl TryFrom<SinkResponse> for ResponseFromSink {

#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;

use chrono::TimeZone;
use numaflow_pb::clients::sink::sink_response::Result as SinkResult;
use numaflow_pb::clients::source::Offset as SourceOffset;
use numaflow_pb::objects::isb::{
Body, Header, Message as ProtoMessage, MessageId, MessageInfo,
};
use std::collections::HashMap;

use super::*;

#[test]
fn test_offset_display() {
Expand Down
25 changes: 13 additions & 12 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::time::Duration;

use forwarder::ForwarderBuilder;
use metrics::UserDefinedContainerState;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use tokio::signal;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tracing::info;

use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;

use crate::config::{config, Settings};
use crate::error::{self, Error};
use crate::shared::utils;
Expand All @@ -21,8 +22,6 @@ use crate::source::user_defined::{
};
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
use forwarder::ForwarderBuilder;
use metrics::UserDefinedContainerState;

/// [forwarder] orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
/// The forward-a-chunk executes the following in an infinite loop till a shutdown signal is received:
Expand Down Expand Up @@ -292,17 +291,19 @@ async fn start_forwarder_with_source(

#[cfg(test)]
mod tests {
use crate::config::{Settings, UDSinkConfig, UDSourceConfig};
use crate::error;
use crate::monovertex::start_forwarder;
use crate::shared::server_info::ServerInfo;
use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source};
use std::fs::File;
use std::io::Write;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source};
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

use crate::config::{Settings, UDSinkConfig, UDSourceConfig};
use crate::error;
use crate::monovertex::start_forwarder;
use crate::shared::server_info::ServerInfo;

struct SimpleSource;
#[tonic::async_trait]
impl source::Sourcer for SimpleSource {
Expand Down
29 changes: 15 additions & 14 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
use std::collections::HashMap;

use chrono::Utc;
use log::warn;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

use crate::config::{config, OnFailureStrategy};
use crate::error;
use crate::message::{Message, Offset, ResponseStatusFromSink};
Expand All @@ -9,12 +15,6 @@ use crate::sink::SinkHandle;
use crate::Error;
use crate::{source::SourceHandle, transformer::user_defined::SourceTransformHandle};

use chrono::Utc;
use log::warn;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
/// back to the source.
Expand Down Expand Up @@ -527,14 +527,6 @@ impl Forwarder {
mod tests {
use std::collections::HashSet;

use crate::config::config;
use crate::monovertex::forwarder::ForwarderBuilder;
use crate::monovertex::SourceType;
use crate::shared::utils::create_rpc_channel;
use crate::sink::{SinkClientType, SinkHandle};
use crate::source::user_defined::new_source;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
use chrono::Utc;
use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source, sourcetransform};
Expand All @@ -545,6 +537,15 @@ mod tests {
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

use crate::config::config;
use crate::monovertex::forwarder::ForwarderBuilder;
use crate::monovertex::SourceType;
use crate::shared::utils::create_rpc_channel;
use crate::sink::{SinkClientType, SinkHandle};
use crate::source::user_defined::new_source;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;

struct SimpleSource {
yet_to_be_acked: std::sync::RwLock<HashSet<String>>,
}
Expand Down
7 changes: 3 additions & 4 deletions rust/numaflow-core/src/monovertex/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use axum::http::{Response, StatusCode};
use axum::response::IntoResponse;
use axum::{routing::get, Router};
use axum_server::tls_rustls::RustlsConfig;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand All @@ -24,10 +27,6 @@ use tonic::transport::Channel;
use tonic::Request;
use tracing::{debug, error, info};

use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;

use crate::config::config;
use crate::source::SourceHandle;
use crate::Error;
Expand Down
19 changes: 11 additions & 8 deletions rust/numaflow-core/src/pipeline/isb/jetstream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::error::Error;
use crate::message::Message;
use crate::pipeline::isb::jetstream::writer::JetstreamWriter;
use crate::Result;
use async_nats::jetstream::Context;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;

use crate::error::Error;
use crate::message::Message;
use crate::pipeline::isb::jetstream::writer::JetstreamWriter;
use crate::Result;

/// Jetstream Writer is responsible for writing messages to Jetstream ISB.
/// it exposes both sync and async methods to write messages.
pub(super) mod writer;
Expand Down Expand Up @@ -111,16 +112,18 @@ impl WriterHandle {

#[cfg(test)]
mod tests {
use super::*;
use crate::message::{Message, MessageID, Offset};
use std::collections::HashMap;
use std::time::Duration;

use async_nats::jetstream;
use async_nats::jetstream::stream;
use chrono::Utc;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::Instant;

use super::*;
use crate::message::{Message, MessageID, Offset};

#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_publish_messages() {
Expand Down
16 changes: 10 additions & 6 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::error::Error;
use crate::Result;
use std::time::Duration;

use async_nats::jetstream::context::PublishAckFuture;
use async_nats::jetstream::publish::PublishAck;
use async_nats::jetstream::Context;
use bytes::Bytes;
use log::warn;
use std::time::Duration;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, oneshot};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::error;

use crate::error::Error;
use crate::Result;

#[derive(Clone, Debug)]
/// Writes to JetStream ISB. Exposes both write and blocking methods to write messages.
/// It accepts a cancellation token to stop infinite retries during shutdown.
Expand Down Expand Up @@ -183,12 +185,14 @@ impl PafResolverActor {

#[cfg(test)]
mod tests {
use super::*;
use crate::message::{Message, MessageID, Offset};
use std::collections::HashMap;

use async_nats::jetstream;
use async_nats::jetstream::stream;
use chrono::Utc;
use std::collections::HashMap;

use super::*;
use crate::message::{Message, MessageID, Offset};

#[cfg(feature = "nats-tests")]
#[tokio::test]
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/shared/server_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,10 @@ mod version {

#[cfg(test)]
mod tests {
use serde_json::json;
use std::io::{Read, Write};
use std::{collections::HashMap, fs::File};

use serde_json::json;
use tempfile::tempdir;

use super::*;
Expand Down
18 changes: 10 additions & 8 deletions rust/numaflow-core/src/shared/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use axum::http::Uri;
use backoff::retry::Retry;
use backoff::strategy::fixed;
use chrono::{DateTime, TimeZone, Timelike, Utc};
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use prost_types::Timestamp;
use tokio::net::UnixStream;
use tokio::task::JoinHandle;
Expand All @@ -24,9 +27,6 @@ use crate::monovertex::metrics::{
use crate::shared::server_info;
use crate::source::SourceHandle;
use crate::Error;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;

pub(crate) async fn check_compatibility(
cln_token: &CancellationToken,
Expand Down Expand Up @@ -208,18 +208,20 @@ pub(crate) async fn connect_with_uds(uds_path: PathBuf) -> Result<Channel, Error

#[cfg(test)]
mod tests {
use super::*;
use crate::shared::server_info::ServerInfo;
use crate::shared::utils::create_rpc_channel;
use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source, sourcetransform};
use std::fs::File;
use std::io::Write;

use numaflow::source::{Message, Offset, SourceReadRequest};
use numaflow::{sink, source, sourcetransform};
use tempfile::tempdir;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio_util::sync::CancellationToken;

use super::*;
use crate::shared::server_info::ServerInfo;
use crate::shared::utils::create_rpc_channel;

async fn write_server_info(file_path: &str, server_info: &ServerInfo) -> error::Result<()> {
let serialized = serde_json::to_string(server_info).unwrap();
let mut file = File::create(file_path).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use numaflow_pb::clients::sink::sink_client::SinkClient;
use tokio::sync::{mpsc, oneshot};
use tonic::transport::Channel;
use user_defined::UserDefinedSink;

use crate::config::config;
use crate::message::{Message, ResponseFromSink};
use numaflow_pb::clients::sink::sink_client::SinkClient;
use user_defined::UserDefinedSink;

mod log;
/// [User-Defined Sink] extends Numaflow to add custom sources supported outside the builtins.
Expand Down
10 changes: 4 additions & 6 deletions rust/numaflow-core/src/sink/user_defined.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::sink::{Handshake, SinkRequest, SinkResponse, TransmissionStatus};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tonic::{Request, Streaming};

use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::sink::{Handshake, SinkRequest, SinkResponse, TransmissionStatus};

use crate::error;
use crate::message::{Message, ResponseFromSink};
use crate::sink::Sink;
Expand Down Expand Up @@ -111,18 +110,17 @@ impl Sink for UserDefinedSink {

#[cfg(test)]
mod tests {
use super::*;

use chrono::offset::Utc;
use numaflow::sink;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use tokio::sync::mpsc;
use tracing::info;

use super::*;
use crate::error::Result;
use crate::message::{Message, MessageID, Offset};
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::UserDefinedSink;
use numaflow_pb::clients::sink::sink_client::SinkClient;

struct Logger;
#[tonic::async_trait]
Expand Down
Loading

0 comments on commit 7672de7

Please sign in to comment.