Skip to content

Commit

Permalink
refactor(rust!): Remove dedicated sink_(parquet/ipc)_cloud functions (
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Dec 5, 2024
1 parent 588a22b commit bdf4512
Show file tree
Hide file tree
Showing 16 changed files with 42 additions and 209 deletions.
1 change: 0 additions & 1 deletion crates/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ check-wasm: ## Check wasm build without supported features
--exclude-features aws \
--exclude-features azure \
--exclude-features cloud \
--exclude-features cloud_write \
--exclude-features decompress \
--exclude-features decompress-fast \
--exclude-features default \
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ cloud = [
"polars-mem-engine/cloud",
"polars-stream?/cloud",
]
cloud_write = ["cloud"]
ipc = ["polars-io/ipc", "polars-plan/ipc", "polars-pipe?/ipc", "polars-mem-engine/ipc", "polars-stream?/ipc"]
json = [
"polars-io/json",
Expand Down Expand Up @@ -331,7 +330,6 @@ features = [
"bigidx",
"binary_encoding",
"cloud",
"cloud_write",
"coalesce",
"concat_str",
"cov",
Expand Down
55 changes: 1 addition & 54 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ impl LazyFrame {
#[cfg(feature = "parquet")]
pub fn sink_parquet(
self,
path: impl AsRef<Path>,
path: &dyn AsRef<Path>,
options: ParquetWriteOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
) -> PolarsResult<()> {
Expand All @@ -769,27 +769,6 @@ impl LazyFrame {
)
}

/// Stream a query result into a parquet file on an ObjectStore-compatible cloud service. This is useful if the final result doesn't fit
/// into memory, and where you do not want to write to a local file but to a location in the cloud.
/// This method will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(all(feature = "cloud_write", feature = "parquet"))]
pub fn sink_parquet_cloud(
self,
uri: String,
cloud_options: Option<polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
) -> PolarsResult<()> {
self.sink(
SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
file_type: FileType::Parquet(parquet_options),
},
"collect().write_parquet()",
)
}

/// Stream a query result into an ipc/arrow file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
Expand All @@ -810,37 +789,6 @@ impl LazyFrame {
)
}

/// Stream a query result into an ipc/arrow file on an ObjectStore-compatible cloud service.
/// This is useful if the final result doesn't fit
/// into memory, and where you do not want to write to a local file but to a location in the cloud.
/// This method will return an error if the query cannot be completely done in a
/// streaming fashion.
#[cfg(all(feature = "cloud_write", feature = "ipc"))]
pub fn sink_ipc_cloud(
mut self,
uri: String,
cloud_options: Option<polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
) -> PolarsResult<()> {
self.opt_state |= OptFlags::STREAMING;
self.logical_plan = DslPlan::Sink {
input: Arc::new(self.logical_plan),
payload: SinkType::Cloud {
uri: Arc::new(uri),
cloud_options,
file_type: FileType::Ipc(ipc_options),
},
};
let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?;
polars_ensure!(
is_streaming,
ComputeError: "cannot run the whole query in a streaming order; \
use `collect().write_ipc()` instead"
);
let _ = physical_plan.execute(&mut state)?;
Ok(())
}

/// Stream a query result into an csv file. This is useful if the final result doesn't fit
/// into memory. This methods will return an error if the query cannot be completely done in a
/// streaming fashion.
Expand Down Expand Up @@ -937,7 +885,6 @@ impl LazyFrame {
#[cfg(any(
feature = "ipc",
feature = "parquet",
feature = "cloud_write",
feature = "csv",
feature = "json",
))]
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,6 @@ fn create_physical_plan_impl(
"sink_{file_type:?} not yet supported in standard engine. Use 'collect().write_{file_type:?}()'"
)
},
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => {
polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.")
},
},
Union { inputs, options } => {
let inputs = inputs
Expand Down
38 changes: 0 additions & 38 deletions crates/polars-pipe/src/executors/sinks/output/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,44 +43,6 @@ impl IpcSink {
}
}

#[cfg(feature = "cloud")]
pub struct IpcCloudSink {}
#[cfg(feature = "cloud")]
impl IpcCloudSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
ipc_options: IpcWriterOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = IpcWriter::new(cloud_writer)
.with_compression(ipc_options.compression)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
ipc_options.maintain_order,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}

impl<W: std::io::Write> SinkWriter for polars_io::ipc::BatchedWriter<W> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
self.write_batch(df)
Expand Down
46 changes: 1 addition & 45 deletions crates/polars-pipe/src/executors/sinks/output/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use polars_io::parquet::write::{
};
use polars_io::utils::file::try_get_writeable;

use crate::executors::sinks::output::file_sink::{init_writer_thread, FilesSink, SinkWriter};
use crate::executors::sinks::output::file_sink::SinkWriter;
use crate::operators::{DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult};
use crate::pipeline::morsels_per_sink;

Expand Down Expand Up @@ -148,50 +148,6 @@ impl Sink for ParquetSink {
}
}

#[cfg(feature = "cloud")]
pub struct ParquetCloudSink {}
#[cfg(feature = "cloud")]
impl ParquetCloudSink {
#[allow(clippy::new_ret_no_self)]
pub fn new(
uri: &str,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
parquet_options: ParquetWriteOptions,
schema: &Schema,
) -> PolarsResult<FilesSink> {
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let cloud_writer = polars_io::cloud::CloudWriter::new(uri, cloud_options).await?;
let writer = ParquetWriter::new(cloud_writer)
.with_compression(parquet_options.compression)
.with_data_page_size(parquet_options.data_page_size)
.with_statistics(parquet_options.statistics)
.with_row_group_size(parquet_options.row_group_size)
// This is important! Otherwise we will deadlock
// See: #7074
.set_parallel(false)
.batched(schema)?;

let writer = Box::new(writer) as Box<dyn SinkWriter + Send>;

let morsels_per_sink = morsels_per_sink();
let backpressure = morsels_per_sink * 2;
let (sender, receiver) = bounded(backpressure);

let io_thread_handle = Arc::new(Some(init_writer_thread(
receiver,
writer,
true,
morsels_per_sink,
)));

Ok(FilesSink {
sender,
io_thread_handle,
})
})
}
}

impl<W: std::io::Write> SinkWriter for polars_io::parquet::write::BatchedWriter<W> {
fn _write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
self.write_batch(df)
Expand Down
68 changes: 25 additions & 43 deletions crates/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,62 +189,44 @@ where
},
#[allow(unused_variables)]
SinkType::File {
path, file_type, cloud_options
} => {
let path = path.as_ref().as_path();
match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(options) => {
Box::new(ParquetSink::new(path, *options, input_schema.as_ref(), cloud_options.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "ipc")]
FileType::Ipc(options) => {
Box::new(IpcSink::new(path, *options, input_schema.as_ref(), cloud_options.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "csv")]
FileType::Csv(options) => {
Box::new(CsvSink::new(path, options.clone(), input_schema.as_ref(), cloud_options.as_ref())?)
as Box<dyn SinkTrait>
},
#[cfg(feature = "json")]
FileType::Json(options) => {
Box::new(JsonSink::new(path, *options, input_schema.as_ref(), cloud_options.as_ref())?)
as Box<dyn SinkTrait>
},
#[allow(unreachable_patterns)]
_ => unreachable!(),
}
},
#[cfg(feature = "cloud")]
SinkType::Cloud {
#[cfg(any(feature = "parquet", feature = "ipc"))]
uri,
path,
file_type,
#[cfg(any(feature = "parquet", feature = "ipc"))]
cloud_options,
..
} => {
let path = path.as_ref().as_path();
match &file_type {
#[cfg(feature = "parquet")]
FileType::Parquet(parquet_options) => Box::new(ParquetCloudSink::new(
uri.as_ref().as_str(),
FileType::Parquet(options) => Box::new(ParquetSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
*parquet_options,
lp_arena.get(*input).schema(lp_arena).as_ref(),
)?)
as Box<dyn SinkTrait>,
#[cfg(feature = "ipc")]
FileType::Ipc(ipc_options) => Box::new(IpcCloudSink::new(
uri.as_ref().as_str(),
FileType::Ipc(options) => Box::new(IpcSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "csv")]
FileType::Csv(options) => Box::new(CsvSink::new(
path,
options.clone(),
input_schema.as_ref(),
cloud_options.as_ref(),
)?) as Box<dyn SinkTrait>,
#[cfg(feature = "json")]
FileType::Json(options) => Box::new(JsonSink::new(
path,
*options,
input_schema.as_ref(),
cloud_options.as_ref(),
*ipc_options,
lp_arena.get(*input).schema(lp_arena).as_ref(),
)?)
as Box<dyn SinkTrait>,
#[allow(unreachable_patterns)]
other_file_type => todo!("Cloud-sinking of the file type {other_file_type:?} is not (yet) supported."),
_ => unreachable!(),
}
},
}
Expand Down
3 changes: 1 addition & 2 deletions crates/polars-plan/src/client/check.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use polars_core::error::{polars_err, PolarsResult};
use polars_io::path_utils::is_cloud_url;

use crate::plans::options::SinkType;
use crate::plans::{DslPlan, FileScan, ScanSources};

/// Assert that the given [`DslPlan`] is eligible to be executed on Polars Cloud.
Expand Down Expand Up @@ -35,7 +34,7 @@ pub(super) fn assert_cloud_eligible(dsl: &DslPlan) -> PolarsResult<()> {
}
},
DslPlan::Sink { payload, .. } => {
if !matches!(payload, SinkType::Cloud { .. }) {
if !payload.is_cloud_destination() {
return ineligible_error("contains sink to non-cloud location");
}
},
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/ir/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,6 @@ impl<'a> IRDotDisplay<'a> {
f.write_str(match payload {
SinkType::Memory => "SINK (MEMORY)",
SinkType::File { .. } => "SINK (FILE)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (CLOUD)",
})
})?;
},
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/ir/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,6 @@ impl<'a> IRDisplay<'a> {
let name = match payload {
SinkType::Memory => "SINK (memory)",
SinkType::File { .. } => "SINK (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
};
write!(f, "{:indent$}{name}", "")?;
self.with_root(*input)._format(f, sub_indent)
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ impl IR {
Sink { payload, .. } => match payload {
SinkType::Memory => "sink (memory)",
SinkType::File { .. } => "sink (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "sink (cloud)",
},
SimpleProjection { .. } => "simple_projection",
Invalid => "invalid",
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/plans/ir/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,6 @@ impl<'a> TreeFmtNode<'a> {
match payload {
SinkType::Memory => "SINK (memory)",
SinkType::File { .. } => "SINK (file)",
#[cfg(feature = "cloud")]
SinkType::Cloud { .. } => "SINK (cloud)",
},
),
vec![self.lp_node(None, *input)],
Expand Down
20 changes: 13 additions & 7 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use polars_io::ipc::IpcWriterOptions;
use polars_io::json::JsonWriterOptions;
#[cfg(feature = "parquet")]
use polars_io::parquet::write::ParquetWriteOptions;
use polars_io::{HiveOptions, RowIndex};
use polars_io::{is_cloud_url, HiveOptions, RowIndex};
#[cfg(feature = "dynamic_group_by")]
use polars_time::{DynamicGroupOptions, RollingGroupOptions};
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -297,12 +297,18 @@ pub enum SinkType {
file_type: FileType,
cloud_options: Option<polars_io::cloud::CloudOptions>,
},
#[cfg(feature = "cloud")]
Cloud {
uri: Arc<String>,
file_type: FileType,
cloud_options: Option<polars_io::cloud::CloudOptions>,
},
}

impl SinkType {
pub(crate) fn is_cloud_destination(&self) -> bool {
if let Self::File { path, .. } = self {
if is_cloud_url(path.as_ref()) {
return true;
}
}

false
}
}

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
Loading

0 comments on commit bdf4512

Please sign in to comment.