Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: create a rust based PartitionSet #3515

Merged
merged 8 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/daft-connect/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ daft-scan = {workspace = true}
daft-schema = {workspace = true}
daft-table = {workspace = true}
dashmap = "6.1.0"
derive_more = {workspace = true}
universalmind303 marked this conversation as resolved.
Show resolved Hide resolved
eyre = "0.6.12"
futures = "0.3.31"
itertools = {workspace = true}
Expand Down
19 changes: 12 additions & 7 deletions src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::{collections::HashMap, sync::Arc};

use daft_logical_plan::LogicalPlanBuilder;
use daft_micropartition::MicroPartition;
use derive_more::Constructor;
use daft_micropartition::partitioning::InMemoryPartitionSet;
use eyre::{bail, Context};
use spark_connect::{relation::RelType, Limit, Relation};
use tracing::warn;
Expand All @@ -19,17 +16,25 @@
mod to_df;
mod with_columns;

#[derive(Constructor)]
pub struct Plan {
pub builder: LogicalPlanBuilder,
pub psets: HashMap<String, Vec<Arc<MicroPartition>>>,
pub psets: InMemoryPartitionSet,
}

impl Plan {
pub fn new(builder: LogicalPlanBuilder) -> Self {
Self {
builder,
psets: InMemoryPartitionSet::default(),
}
}

Check warning on line 30 in src/daft-connect/src/translation/logical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-connect/src/translation/logical_plan.rs#L25-L30

Added lines #L25 - L30 were not covered by tests
}

impl From<LogicalPlanBuilder> for Plan {
fn from(builder: LogicalPlanBuilder) -> Self {
Self {
builder,
psets: HashMap::new(),
psets: InMemoryPartitionSet::default(),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use daft_logical_plan::{
logical_plan::Source, InMemoryInfo, LogicalPlan, LogicalPlanBuilder, PyLogicalPlanBuilder,
SourceInfo,
};
use daft_micropartition::partitioning::InMemoryPartitionSet;
use daft_schema::dtype::DaftDataType;
use daft_table::Table;
use eyre::{bail, ensure, WrapErr};
Expand Down Expand Up @@ -180,7 +181,10 @@ pub fn local_relation(plan: spark_connect::LocalRelation) -> eyre::Result<Plan>
let mut psets = HashMap::new();
psets.insert(cache_key, vec![micro_partition]);

let plan = Plan::new(plan, psets);
let plan = Plan {
builder: plan,
psets: InMemoryPartitionSet::new(psets),
};

Ok(plan)
}
Expand Down
14 changes: 7 additions & 7 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
Expand All @@ -16,7 +16,7 @@ use daft_local_plan::{
Sort, UnGroupedAggregate, Unpivot,
};
use daft_logical_plan::{stats::StatsState, JoinType};
use daft_micropartition::MicroPartition;
use daft_micropartition::{partitioning::PartitionSet, MicroPartition};
use daft_physical_plan::{extract_agg_expr, populate_aggregation_stages};
use daft_scan::ScanTaskRef;
use daft_writers::make_physical_writer_factory;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub fn viz_pipeline(root: &dyn PipelineNode) -> String {

pub fn physical_plan_to_pipeline(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
psets: &(impl PartitionSet + ?Sized),
cfg: &Arc<DaftExecutionConfig>,
) -> crate::Result<Box<dyn PipelineNode>> {
use daft_local_plan::PhysicalScan;
Expand All @@ -102,10 +102,10 @@ pub fn physical_plan_to_pipeline(
scan_task_source.arced().into()
}
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets
.get(&info.cache_key)
.unwrap_or_else(|| panic!("Cache key not found: {:?}", info.cache_key));
InMemorySource::new(partitions.clone(), info.source_schema.clone())
let materialized_pset = psets
.get_partition(&info.cache_key)
.unwrap_or_else(|_| panic!("Cache key not found: {:?}", info.cache_key));
InMemorySource::new(materialized_pset, info.source_schema.clone())
.arced()
.into()
}
Expand Down
12 changes: 8 additions & 4 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use common_error::DaftResult;
use common_tracing::refresh_chrome_trace;
use daft_local_plan::{translate, LocalPhysicalPlan};
use daft_logical_plan::LogicalPlanBuilder;
use daft_micropartition::MicroPartition;
use daft_micropartition::{
partitioning::{InMemoryPartitionSet, PartitionSet},
MicroPartition,
};
use futures::{FutureExt, Stream};
use loole::RecvFuture;
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -89,9 +92,10 @@ impl PyNativeExecutor {
)
})
.collect();
let psets = InMemoryPartitionSet::new(native_psets);
let out = py.allow_threads(|| {
self.executor
.run(native_psets, cfg.config, results_buffer_size)
.run(psets, cfg.config, results_buffer_size)
.map(|res| res.into_iter())
})?;
let iter = Box::new(out.map(|part| {
Expand Down Expand Up @@ -121,7 +125,7 @@ impl NativeExecutor {

pub fn run(
&self,
psets: HashMap<String, Vec<Arc<MicroPartition>>>,
psets: impl PartitionSet,
cfg: Arc<DaftExecutionConfig>,
results_buffer_size: Option<usize>,
) -> DaftResult<ExecutionEngineResult> {
Expand Down Expand Up @@ -246,7 +250,7 @@ impl IntoIterator for ExecutionEngineResult {

pub fn run_local(
physical_plan: &LocalPhysicalPlan,
psets: HashMap<String, Vec<Arc<MicroPartition>>>,
psets: impl PartitionSet,
cfg: Arc<DaftExecutionConfig>,
results_buffer_size: Option<usize>,
cancel: CancellationToken,
Expand Down
10 changes: 4 additions & 6 deletions src/daft-local-execution/src/sources/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ use async_trait::async_trait;
use common_error::DaftResult;
use daft_core::prelude::SchemaRef;
use daft_io::IOStatsRef;
use daft_micropartition::MicroPartition;
use daft_micropartition::partitioning::PartitionBatchRef;
use tracing::instrument;

use super::source::Source;
use crate::sources::source::SourceStream;

pub struct InMemorySource {
data: Vec<Arc<MicroPartition>>,
data: PartitionBatchRef,
schema: SchemaRef,
}

impl InMemorySource {
pub fn new(data: Vec<Arc<MicroPartition>>, schema: SchemaRef) -> Self {
pub fn new(data: PartitionBatchRef, schema: SchemaRef) -> Self {
Self { data, schema }
}
pub fn arced(self) -> Arc<dyn Source> {
Expand All @@ -32,9 +32,7 @@ impl Source for InMemorySource {
_maintain_order: bool,
_io_stats: IOStatsRef,
) -> DaftResult<SourceStream<'static>> {
Ok(Box::pin(futures::stream::iter(
self.data.clone().into_iter().map(Ok),
)))
Ok(self.data.clone().into_partition_stream())
}
fn name(&self) -> &'static str {
"InMemory"
Expand Down
1 change: 1 addition & 0 deletions src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ daft-parquet = {path = "../daft-parquet", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use pyo3::PyErr;
#[cfg(feature = "python")]
pub use python::register_modules;

pub mod partitioning;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("DaftCoreComputeError: {}", source))]
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,12 @@ impl MicroPartition {
TableState::Loaded(tables) => Ok(tables.clone()),
}
}

pub fn get_tables(&self) -> crate::Result<Arc<Vec<Table>>> {
let tables = self.tables_or_read(IOStatsContext::new("get tables"))?;
Ok(tables)
}

pub fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result<Arc<Vec<Table>>> {
let tables = self.tables_or_read(io_stats)?;
if tables.len() <= 1 {
Expand Down
Loading
Loading