Skip to content

Commit

Permalink
Make polars optional feature in simulations
Browse files Browse the repository at this point in the history
  • Loading branch information
bacv committed Sep 11, 2023
1 parent 428dc06 commit 449e913
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 5 deletions.
7 changes: 5 additions & 2 deletions simulations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ nomos-core = { path = "../nomos-core" }
nomos-consensus = { path = "../nomos-services/consensus" }
once_cell = "1.17"
parking_lot = "0.12"
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"] }
polars = { version = "0.27", features = ["serde", "object", "json", "csv-file", "parquet", "dtype-struct"], optional = true }
rand = { version = "0.8", features = ["small_rng"] }
rayon = "1.7"
scopeguard = "1"
Expand All @@ -43,4 +43,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing-log"]
getrandom = { version = "0.2", features = ["js"] }

[dev-dependencies]
tempfile = "3.4"
tempfile = "3.4"

[features]
polars = ["dep:polars"]
7 changes: 4 additions & 3 deletions simulations/src/bin/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use simulations::node::carnot::{CarnotRecord, CarnotSettings, CarnotState};
use simulations::node::{NodeId, NodeIdExt};
use simulations::output_processors::Record;
use simulations::runner::{BoxedNode, SimulationRunnerHandle};
use simulations::streaming::{
io::IOSubscriber, naive::NaiveSubscriber, polars::PolarsSubscriber, StreamType,
};
#[cfg(feature = "polars")]
use simulations::streaming::polars::PolarsSubscriber;
use simulations::streaming::{io::IOSubscriber, naive::NaiveSubscriber, StreamType};
// internal
use simulations::{runner::SimulationRunner, settings::SimulationSettings};
mod log;
Expand Down Expand Up @@ -163,6 +163,7 @@ where
let settings = stream_settings.unwrap_io();
runner.simulate_and_subscribe::<IOSubscriber<CarnotRecord>>(settings)?
}
#[cfg(feature = "polars")]
Some(StreamType::Polars) => {
let settings = stream_settings.unwrap_polars();
runner.simulate_and_subscribe::<PolarsSubscriber<CarnotRecord>>(settings)?
Expand Down
1 change: 1 addition & 0 deletions simulations/src/bin/app/overlay_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn to_overlay_node<R: Rng>(
simulations::streaming::StreamSettings::IO(_) => {
simulations::streaming::SubscriberFormat::Csv
}
#[cfg(feature = "polars")]
simulations::streaming::StreamSettings::Polars(p) => p.format,
};
match &settings.overlay_settings {
Expand Down
3 changes: 3 additions & 0 deletions simulations/src/node/carnot/timeout.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use consensus_engine::View;
#[cfg(feature = "polars")]
use polars::export::ahash::HashMap;
#[cfg(not(feature = "polars"))]
use std::collections::HashMap;
use std::time::Duration;

pub(crate) struct TimeoutHandler {
Expand Down
5 changes: 5 additions & 0 deletions simulations/src/streaming/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::output_processors::{Record, RecordType, Runtime};

pub mod io;
pub mod naive;
#[cfg(feature = "polars")]
pub mod polars;
pub mod runtime_subscriber;
pub mod settings_subscriber;
Expand Down Expand Up @@ -83,6 +84,7 @@ pub enum StreamType {
#[default]
IO,
Naive,
#[cfg(feature = "polars")]
Polars,
}

Expand All @@ -93,6 +95,7 @@ impl FromStr for StreamType {
match s.trim().to_ascii_lowercase().as_str() {
"io" => Ok(Self::IO),
"naive" => Ok(Self::Naive),
#[cfg(feature = "polars")]
"polars" => Ok(Self::Polars),
tag => Err(format!(
"Invalid {tag} streaming type, only [naive, polars] are supported",
Expand All @@ -116,6 +119,7 @@ impl<'de> serde::Deserialize<'de> for StreamType {
pub enum StreamSettings {
Naive(naive::NaiveSettings),
IO(io::IOStreamSettings),
#[cfg(feature = "polars")]
Polars(polars::PolarsSettings),
}

Expand All @@ -140,6 +144,7 @@ impl StreamSettings {
}
}

#[cfg(feature = "polars")]
pub fn unwrap_polars(self) -> polars::PolarsSettings {
match self {
StreamSettings::Polars(settings) => settings,
Expand Down

0 comments on commit 449e913

Please sign in to comment.