-
Notifications
You must be signed in to change notification settings - Fork 174
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
b6eee0b
commit 7514d37
Showing
14 changed files
with
292 additions
and
25 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
use std::{collections::HashMap, future::ready}; | ||
|
||
use common_daft_config::DaftExecutionConfig; | ||
use common_file_formats::FileFormat; | ||
use daft_local_execution::NativeExecutor; | ||
use eyre::{bail, WrapErr}; | ||
use spark_connect::{ | ||
write_operation::{SaveMode, SaveType}, | ||
WriteOperation, | ||
}; | ||
use tonic::Status; | ||
use tracing::warn; | ||
|
||
use crate::{ | ||
op::execute::{ExecuteStream, PlanIds}, | ||
session::Session, | ||
translation, | ||
}; | ||
|
||
impl Session { | ||
pub async fn handle_write_command( | ||
&self, | ||
operation: WriteOperation, | ||
operation_id: String, | ||
) -> Result<ExecuteStream, Status> { | ||
use futures::StreamExt; | ||
|
||
let context = PlanIds { | ||
session: self.client_side_session_id().to_string(), | ||
server_side_session: self.server_side_session_id().to_string(), | ||
operation: operation_id, | ||
}; | ||
|
||
let finished = context.finished(); | ||
|
||
let result = async move { | ||
let WriteOperation { | ||
input, | ||
source, | ||
mode, | ||
sort_column_names, | ||
partitioning_columns, | ||
bucket_by, | ||
options, | ||
clustering_columns, | ||
save_type, | ||
} = operation; | ||
|
||
let Some(input) = input else { | ||
bail!("Input is required"); | ||
}; | ||
|
||
let Some(source) = source else { | ||
bail!("Source is required"); | ||
}; | ||
|
||
if source != "parquet" { | ||
bail!("Unsupported source: {source}; only parquet is supported"); | ||
} | ||
|
||
let Ok(mode) = SaveMode::try_from(mode) else { | ||
bail!("Invalid save mode: {mode}"); | ||
}; | ||
|
||
if !sort_column_names.is_empty() { | ||
// todo(completeness): implement sort | ||
warn!("Ignoring sort_column_names: {sort_column_names:?} (not yet implemented)"); | ||
} | ||
|
||
if !partitioning_columns.is_empty() { | ||
// todo(completeness): implement partitioning | ||
warn!( | ||
"Ignoring partitioning_columns: {partitioning_columns:?} (not yet implemented)" | ||
); | ||
} | ||
|
||
if let Some(bucket_by) = bucket_by { | ||
// todo(completeness): implement bucketing | ||
warn!("Ignoring bucket_by: {bucket_by:?} (not yet implemented)"); | ||
} | ||
|
||
if !options.is_empty() { | ||
// todo(completeness): implement options | ||
warn!("Ignoring options: {options:?} (not yet implemented)"); | ||
} | ||
|
||
if !clustering_columns.is_empty() { | ||
// todo(completeness): implement clustering | ||
warn!("Ignoring clustering_columns: {clustering_columns:?} (not yet implemented)"); | ||
} | ||
|
||
match mode { | ||
SaveMode::Unspecified => {} | ||
SaveMode::Append => {} | ||
SaveMode::Overwrite => {} | ||
SaveMode::ErrorIfExists => {} | ||
SaveMode::Ignore => {} | ||
} | ||
|
||
let Some(save_type) = save_type else { | ||
bail!("Save type is required"); | ||
}; | ||
|
||
let path = match save_type { | ||
SaveType::Path(path) => path, | ||
SaveType::Table(table) => { | ||
let name = table.table_name; | ||
bail!("Tried to write to table {name} but it is not yet implemented. Try to write to a path instead."); | ||
} | ||
}; | ||
|
||
let plan = translation::to_logical_plan(input).await?; | ||
|
||
let plan = plan | ||
.table_write(&path, FileFormat::Parquet, None, None, None) | ||
.wrap_err("Failed to create table write plan")?; | ||
|
||
let optimized_plan = plan.optimize()?; | ||
let cfg = DaftExecutionConfig::default(); | ||
let native_executor = NativeExecutor::from_logical_plan_builder(&optimized_plan)?; | ||
let mut result_stream = native_executor | ||
.run(HashMap::new(), cfg.into(), None)? | ||
.into_stream(); | ||
|
||
// this is so we make sure the operation is actually done | ||
// before we return | ||
// | ||
// an example where this is important is if we write to a parquet file | ||
// and then read immediately after, we need to wait for the write to finish | ||
while let Some(_result) = result_stream.next().await {} | ||
|
||
Ok(()) | ||
}; | ||
|
||
use futures::TryFutureExt; | ||
|
||
let result = result.map_err(|e| Status::internal(format!("Error in Daft server: {e:?}"))); | ||
|
||
let future = result.and_then(|()| ready(Ok(finished))); | ||
let stream = futures::stream::once(future); | ||
|
||
Ok(Box::pin(stream)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
use daft_logical_plan::LogicalPlanBuilder; | ||
use eyre::{bail, WrapErr}; | ||
use spark_connect::read::ReadType; | ||
use tracing::warn; | ||
|
||
mod data_source; | ||
|
||
pub async fn read(read: spark_connect::Read) -> eyre::Result<LogicalPlanBuilder> { | ||
let spark_connect::Read { | ||
is_streaming, | ||
read_type, | ||
} = read; | ||
|
||
warn!("Ignoring is_streaming: {is_streaming}"); | ||
|
||
let Some(read_type) = read_type else { | ||
bail!("Read type is required"); | ||
}; | ||
|
||
match read_type { | ||
ReadType::NamedTable(table) => { | ||
let name = table.unparsed_identifier; | ||
bail!("Tried to read from table {name} but it is not yet implemented. Try to read from a path instead."); | ||
} | ||
ReadType::DataSource(source) => data_source::data_source(source) | ||
.await | ||
.wrap_err("Failed to create data source"), | ||
} | ||
} |
45 changes: 45 additions & 0 deletions
45
src/daft-connect/src/translation/logical_plan/read/data_source.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
use daft_logical_plan::LogicalPlanBuilder; | ||
use daft_scan::builder::ParquetScanBuilder; | ||
use eyre::{bail, ensure, WrapErr}; | ||
use tracing::warn; | ||
|
||
pub async fn data_source( | ||
data_source: spark_connect::read::DataSource, | ||
) -> eyre::Result<LogicalPlanBuilder> { | ||
let spark_connect::read::DataSource { | ||
format, | ||
schema, | ||
options, | ||
paths, | ||
predicates, | ||
} = data_source; | ||
|
||
let Some(format) = format else { | ||
bail!("Format is required"); | ||
}; | ||
|
||
if format != "parquet" { | ||
bail!("Unsupported format: {format}; only parquet is supported"); | ||
} | ||
|
||
ensure!(!paths.is_empty(), "Paths are required"); | ||
|
||
if let Some(schema) = schema { | ||
warn!("Ignoring schema: {schema:?}; not yet implemented"); | ||
} | ||
|
||
if !options.is_empty() { | ||
warn!("Ignoring options: {options:?}; not yet implemented"); | ||
} | ||
|
||
if !predicates.is_empty() { | ||
warn!("Ignoring predicates: {predicates:?}; not yet implemented"); | ||
} | ||
|
||
let builder = ParquetScanBuilder::new(paths) | ||
.finish() | ||
.await | ||
.wrap_err("Failed to create parquet scan builder")?; | ||
|
||
Ok(builder) | ||
} |
Oops, something went wrong.