Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor some of the writebuilder preconditions into the appropriate function #3022

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 181 additions & 36 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -309,20 +309,45 @@ impl WriteBuilder {
}

async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
if self.schema_mode == Some(SchemaMode::Overwrite) && self.mode != SaveMode::Overwrite {
return Err(DeltaTableError::Generic(
"Schema overwrite not supported for Append".to_string(),
));
}

let batches: &Vec<RecordBatch> = match &self.batches {
Some(batches) => {
if batches.is_empty() {
error!("The WriteBuilder was an empty set of batches!");
return Err(WriteError::MissingData.into());
}
batches
}
None => {
if self.input.is_none() {
error!("The WriteBuilder must have an input plan _or_ batches!");
return Err(WriteError::MissingData.into());
}
// provide an empty array in the case that an input plan exists
&vec![]
}
};

let schema: StructType = match &self.input {
Some(plan) => (plan.schema()).try_into()?,
None => (batches[0].schema()).try_into()?,
};

match &self.snapshot {
Some(snapshot) => {
PROTOCOL.can_write_to(snapshot)?;

let schema: StructType = if let Some(plan) = &self.input {
(plan.schema()).try_into()?
} else if let Some(batches) = &self.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
if self.mode == SaveMode::Overwrite {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
(batches[0].schema()).try_into()?
} else {
return Err(WriteError::MissingData.into());
};
}

PROTOCOL.can_write_to(snapshot)?;

if self.schema_mode.is_none() {
PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?;
@@ -335,16 +360,6 @@ impl WriteBuilder {
}
}
None => {
let schema: StructType = if let Some(plan) = &self.input {
Ok(plan.schema().try_into()?)
} else if let Some(batches) = &self.batches {
if batches.is_empty() {
return Err(WriteError::MissingData.into());
}
Ok(batches[0].schema().try_into()?)
} else {
Err(WriteError::MissingData)
}?;
let mut builder = CreateBuilder::new()
.with_log_store(self.log_store.clone())
.with_columns(schema.fields().cloned())
@@ -786,21 +801,8 @@ impl std::future::IntoFuture for WriteBuilder {
let mut metrics = WriteMetrics::default();
let exec_start = Instant::now();

if this.mode == SaveMode::Overwrite {
if let Some(snapshot) = &this.snapshot {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
}
}
if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite {
return Err(DeltaTableError::Generic(
"Schema overwrite not supported for Append".to_string(),
));
}

// Create table actions to initialize table in case it does not yet exist and should be created
// Create table actions to initialize table in case it does not yet exist and should be
// created
let mut actions = this.check_preconditions().await?;

let active_partitions = this
@@ -2320,4 +2322,147 @@ mod tests {
assert!(!cdc_actions.is_empty());
Ok(())
}

/// SMall module to collect test cases which validate the [WriteBuilder]'s
/// check_preconditions() function
mod check_preconditions_test {
use super::*;

#[tokio::test]
async fn test_schema_overwrite_on_append() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_schema_mode(SchemaMode::Overwrite)
.with_save_mode(SaveMode::Append);

let check = writer.check_preconditions().await;
assert!(check.is_err());
Ok(())
}

#[tokio::test]
async fn test_savemode_overwrite_on_append_table() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_configuration_property(TableProperty::AppendOnly, Some("true".to_string()))
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite);

let check = writer.check_preconditions().await;
assert!(check.is_err());
Ok(())
}

#[tokio::test]
async fn test_empty_set_of_batches() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table).write(vec![]);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}
Ok(())
}

#[tokio::test]
async fn test_errorifexists() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::ErrorIfExists);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}
Ok(())
}

#[tokio::test]
async fn test_allow_empty_batches_with_input_plan() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;

let ctx = SessionContext::new();
let plan = ctx
.sql("SELECT 1 as id")
.await
.unwrap()
.create_physical_plan()
.await
.unwrap();
let writer = WriteBuilder::new(table.log_store.clone(), table.state)
.with_input_execution_plan(plan)
.with_save_mode(SaveMode::Overwrite);

let _ = writer.check_preconditions().await?;
Ok(())
}

#[tokio::test]
async fn test_no_snapshot_create_actions() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let batch = get_record_batch(None, false);
let writer =
WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![batch]);

let actions = writer.check_preconditions().await?;
assert_eq!(
actions.len(),
2,
"Expecting a Protocol and a Metadata action in {actions:?}"
);

Ok(())
}

#[tokio::test]
async fn test_no_snapshot_err_no_batches_check() -> DeltaResult<()> {
let table_schema = get_delta_schema();
let table = DeltaOps::new_in_memory()
.create()
.with_columns(table_schema.fields().cloned())
.await?;
let writer =
WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![]);

match writer.check_preconditions().await {
Ok(_) => panic!("Expected check_preconditions to fail!"),
Err(DeltaTableError::GenericError { .. }) => {}
Err(e) => panic!("Unexpected error returned: {e:#?}"),
}

Ok(())
}
}
}