Skip to content

Commit

Permalink
Allow writers lower than minWriterVersion 7 to be gracefully supported
Browse files Browse the repository at this point in the history
Basically for older minWriterVersions we don't have to really worry
about generated columns unless an expression has been set, in which case
we must fail to write since we cannot honor generationExpression
  • Loading branch information
rtyler authored and ion-elgreco committed Jun 4, 2024
1 parent 4a3f74e commit 2715db2
Showing 1 changed file with 90 additions and 11 deletions.
101 changes: 90 additions & 11 deletions crates/core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;

use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use tracing::log::*;

use super::{TableReference, TransactionError};
use crate::kernel::{
Expand Down Expand Up @@ -148,17 +149,33 @@ impl ProtocolChecker {
pub fn can_write_to(&self, snapshot: &dyn TableReference) -> Result<(), TransactionError> {
// NOTE: writers must always support all required reader features
self.can_read_from(snapshot)?;
let min_writer_version = snapshot.protocol().min_writer_version;

let required_features: Option<&HashSet<WriterFeatures>> = match min_writer_version {
0 | 1 => None,
2 => Some(&WRITER_V2),
3 => Some(&WRITER_V3),
4 => Some(&WRITER_V4),
5 => Some(&WRITER_V5),
6 => Some(&WRITER_V6),
_ => snapshot.protocol().writer_features.as_ref(),
};

let required_features: Option<&HashSet<WriterFeatures>> =
match snapshot.protocol().min_writer_version {
0 | 1 => None,
2 => Some(&WRITER_V2),
3 => Some(&WRITER_V3),
4 => Some(&WRITER_V4),
5 => Some(&WRITER_V5),
6 => Some(&WRITER_V6),
_ => snapshot.protocol().writer_features.as_ref(),
};
if (4..7).contains(&min_writer_version) {
debug!("min_writer_version is less 4-6, checking for unsupported table features");
if let Ok(schema) = snapshot.metadata().schema() {
for field in schema.fields.iter() {
if field.metadata.contains_key(
crate::kernel::ColumnMetadataKey::GenerationExpression.as_ref(),
) {
error!("The table contains `delta.generationExpression` settings on columns which mean this table cannot be currently written to by delta-rs");
return Err(TransactionError::UnsupportedWriterFeatures(vec![
WriterFeatures::GeneratedColumns,
]));
}
}
}
}

if let Some(features) = required_features {
let mut diff = features.difference(&self.writer_features).peekable();
Expand Down Expand Up @@ -250,7 +267,8 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {
mod tests {
use super::super::test_utils::create_metadata_action;
use super::*;
use crate::kernel::{Action, Add, Protocol, Remove};
use crate::kernel::DataType as DeltaDataType;
use crate::kernel::{Action, Add, PrimitiveType, Protocol, Remove};
use crate::protocol::SaveMode;
use crate::table::state::DeltaTableState;
use crate::DeltaConfigKey;
Expand Down Expand Up @@ -559,4 +577,65 @@ mod tests {
assert!(checker_7.can_read_from(eager_7).is_ok());
assert!(checker_7.can_write_to(eager_7).is_ok());
}

#[tokio::test]
async fn test_minwriter_v4_with_cdf() {
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![
Action::Protocol(
Protocol::new(2, 4)
.with_writer_features(vec![crate::kernel::WriterFeatures::ChangeDataFeed]),
),
create_metadata_action(None, Some(HashMap::new())),
];
let snapshot_5 = DeltaTableState::from_actions(actions).unwrap();
let eager_5 = snapshot_5.snapshot();
assert!(checker_5.can_write_to(eager_5).is_ok());
}

/// Technically we do not yet support generated columns, but it is okay to "accept" writing to
/// a column with minWriterVersion=4 and the generated columns feature as long as the
/// `delta.generationExpression` isn't actually defined the write is still allowed
#[tokio::test]
async fn test_minwriter_v4_with_generated_columns() {
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![
Action::Protocol(
Protocol::new(2, 4)
.with_writer_features(vec![crate::kernel::WriterFeatures::GeneratedColumns]),
),
create_metadata_action(None, Some(HashMap::new())),
];
let snapshot_5 = DeltaTableState::from_actions(actions).unwrap();
let eager_5 = snapshot_5.snapshot();
assert!(checker_5.can_write_to(eager_5).is_ok());
}

#[tokio::test]
async fn test_minwriter_v4_with_generated_columns_and_expressions() {
let checker_5 = ProtocolChecker::new(READER_V2.clone(), WRITER_V4.clone());
let actions = vec![Action::Protocol(Protocol::new(2, 4).with_writer_features(
vec![crate::kernel::WriterFeatures::GeneratedColumns],
))];

let table: crate::DeltaTable = crate::DeltaOps::new_in_memory()
.create()
.with_column(
"value",
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
Some(HashMap::from([(
"delta.generationExpression".into(),
"x IS TRUE".into(),
)])),
)
.with_actions(actions)
.with_configuration_property(DeltaConfigKey::EnableChangeDataFeed, Some("true"))
.await
.expect("failed to make a version 4 table with EnableChangeDataFeed");
let eager_5 = table
.snapshot()
.expect("Failed to get snapshot from test table");
assert!(checker_5.can_write_to(eager_5).is_err());
}
}

0 comments on commit 2715db2

Please sign in to comment.