From 300340afee1e2c203928c4ae7a9dbef67c049544 Mon Sep 17 00:00:00 2001 From: nameexhaustion Date: Thu, 5 Dec 2024 21:26:03 +1100 Subject: [PATCH] refactor(rust): Move new-streaming parquet and CSV sources to under `io_sources/` (#20160) --- .../src/nodes/{csv_source.rs => io_sources/csv.rs} | 4 ++-- crates/polars-stream/src/nodes/io_sources/mod.rs | 3 +++ .../src/nodes/{parquet_source => io_sources/parquet}/init.rs | 0 .../parquet}/mem_prefetch_funcs.rs | 0 .../{parquet_source => io_sources/parquet}/metadata_fetch.rs | 0 .../{parquet_source => io_sources/parquet}/metadata_utils.rs | 0 .../src/nodes/{parquet_source => io_sources/parquet}/mod.rs | 4 ++-- .../parquet}/row_group_data_fetch.rs | 0 .../parquet}/row_group_decode.rs | 0 crates/polars-stream/src/nodes/mod.rs | 3 --- crates/polars-stream/src/physical_plan/to_graph.rs | 4 ++-- 11 files changed, 9 insertions(+), 9 deletions(-) rename crates/polars-stream/src/nodes/{csv_source.rs => io_sources/csv.rs} (99%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/init.rs (100%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/mem_prefetch_funcs.rs (100%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/metadata_fetch.rs (100%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/metadata_utils.rs (100%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/mod.rs (99%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/row_group_data_fetch.rs (100%) rename crates/polars-stream/src/nodes/{parquet_source => io_sources/parquet}/row_group_decode.rs (100%) diff --git a/crates/polars-stream/src/nodes/csv_source.rs b/crates/polars-stream/src/nodes/io_sources/csv.rs similarity index 99% rename from crates/polars-stream/src/nodes/csv_source.rs rename to crates/polars-stream/src/nodes/io_sources/csv.rs index 6000a49191f7..43023991e97c 100644 --- a/crates/polars-stream/src/nodes/csv_source.rs +++ b/crates/polars-stream/src/nodes/io_sources/csv.rs @@ -24,12 +24,12 @@ use polars_utils::mmap::MemSlice; use polars_utils::pl_str::PlSmallStr; use polars_utils::IdxSize; -use super::compute_node_prelude::*; -use super::{MorselSeq, TaskPriority}; use crate::async_executor; use crate::async_primitives::connector::connector; use crate::async_primitives::wait_group::{IndexedWaitGroup, WaitToken}; use crate::morsel::SourceToken; +use crate::nodes::compute_node_prelude::*; +use crate::nodes::{MorselSeq, TaskPriority}; struct LineBatch { bytes: MemSlice, diff --git a/crates/polars-stream/src/nodes/io_sources/mod.rs b/crates/polars-stream/src/nodes/io_sources/mod.rs index ce14ad3b0f7a..100f2f2be8af 100644 --- a/crates/polars-stream/src/nodes/io_sources/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/mod.rs @@ -1 +1,4 @@ +pub mod csv; pub mod ipc; +#[cfg(feature = "parquet")] +pub mod parquet; diff --git a/crates/polars-stream/src/nodes/parquet_source/init.rs b/crates/polars-stream/src/nodes/io_sources/parquet/init.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/init.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/init.rs diff --git a/crates/polars-stream/src/nodes/parquet_source/mem_prefetch_funcs.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/mem_prefetch_funcs.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/mem_prefetch_funcs.rs diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/metadata_fetch.rs diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs b/crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/metadata_utils.rs diff --git a/crates/polars-stream/src/nodes/parquet_source/mod.rs b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs similarity index 99% rename from crates/polars-stream/src/nodes/parquet_source/mod.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/mod.rs index eb4bf3cd330a..098a0fd90b04 100644 --- a/crates/polars-stream/src/nodes/parquet_source/mod.rs +++ b/crates/polars-stream/src/nodes/io_sources/parquet/mod.rs @@ -17,10 +17,10 @@ use polars_plan::prelude::FileScanOptions; use polars_utils::index::AtomicIdxSize; use polars_utils::pl_str::PlSmallStr; -use super::compute_node_prelude::*; -use super::{MorselSeq, TaskPriority}; use crate::async_primitives::wait_group::WaitToken; use crate::morsel::SourceToken; +use crate::nodes::compute_node_prelude::*; +use crate::nodes::{MorselSeq, TaskPriority}; use crate::utils::task_handles_ext; mod init; diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/row_group_data_fetch.rs diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs similarity index 100% rename from crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs rename to crates/polars-stream/src/nodes/io_sources/parquet/row_group_decode.rs diff --git a/crates/polars-stream/src/nodes/mod.rs b/crates/polars-stream/src/nodes/mod.rs index 16ac5dab7a98..f51b5676ce2d 100644 --- a/crates/polars-stream/src/nodes/mod.rs +++ b/crates/polars-stream/src/nodes/mod.rs @@ -1,4 +1,3 @@ -pub mod csv_source; pub mod filter; pub mod group_by; pub mod in_memory_map; @@ -11,8 +10,6 @@ pub mod joins; pub mod map; pub mod multiplexer; pub mod ordered_union; -#[cfg(feature = "parquet")] -pub mod parquet_source; pub mod reduce; pub mod select; pub mod simple_projection; diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index 66bb1f4180a8..6a942f427af1 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -352,7 +352,7 @@ fn to_graph_rec<'a>( } => { if std::env::var("POLARS_DISABLE_PARQUET_SOURCE").as_deref() != Ok("1") { ctx.graph.add_node( - nodes::parquet_source::ParquetSourceNode::new( + nodes::io_sources::parquet::ParquetSourceNode::new( scan_sources, file_info, hive_parts, @@ -395,7 +395,7 @@ fn to_graph_rec<'a>( } ctx.graph.add_node( - nodes::csv_source::CsvSourceNode::new( + nodes::io_sources::csv::CsvSourceNode::new( scan_sources, file_info, file_options,