From 25f951acd1f3398c4cdb2a6b1c5bac4d7e24578c Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Mon, 6 Nov 2023 18:27:16 +0100 Subject: [PATCH] fix: avoid allocations when checking protocol support --- .../src/operations/transaction/protocol.rs | 128 ++++++++++-------- 1 file changed, 68 insertions(+), 60 deletions(-) diff --git a/crates/deltalake-core/src/operations/transaction/protocol.rs b/crates/deltalake-core/src/operations/transaction/protocol.rs index d551f47de5..47e4d0a41a 100644 --- a/crates/deltalake-core/src/operations/transaction/protocol.rs +++ b/crates/deltalake-core/src/operations/transaction/protocol.rs @@ -1,44 +1,47 @@ use std::collections::HashSet; +use lazy_static::lazy_static; use once_cell::sync::Lazy; use super::TransactionError; use crate::kernel::{Action, ReaderFeatures, WriterFeatures}; use crate::table::state::DeltaTableState; -static READER_V2: Lazy> = Lazy::new(|| { - let mut set = HashSet::new(); - set.insert(ReaderFeatures::ColumnMapping); - set -}); - -static WRITER_V2: Lazy> = Lazy::new(|| { - let mut set = HashSet::new(); - set.insert(WriterFeatures::AppendOnly); - set.insert(WriterFeatures::Invariants); - set -}); -static WRITER_V3: Lazy> = Lazy::new(|| { - let mut set = WRITER_V2.clone(); - set.insert(WriterFeatures::CheckConstraints); - set -}); -static WRITER_V4: Lazy> = Lazy::new(|| { - let mut set = WRITER_V3.clone(); - set.insert(WriterFeatures::ChangeDataFeed); - set.insert(WriterFeatures::GeneratedColumns); - set -}); -static WRITER_V5: Lazy> = Lazy::new(|| { - let mut set = WRITER_V4.clone(); - set.insert(WriterFeatures::ColumnMapping); - set -}); -static WRITER_V6: Lazy> = Lazy::new(|| { - let mut set = WRITER_V5.clone(); - set.insert(WriterFeatures::IdentityColumns); - set -}); +lazy_static! { + static ref READER_V2: HashSet = + HashSet::from_iter([ReaderFeatures::ColumnMapping]); + static ref WRITER_V2: HashSet = + HashSet::from_iter([WriterFeatures::AppendOnly, WriterFeatures::Invariants]); + static ref WRITER_V3: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints + ]); + static ref WRITER_V4: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns + ]); + static ref WRITER_V5: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns, + WriterFeatures::ColumnMapping, + ]); + static ref WRITER_V6: HashSet = HashSet::from_iter([ + WriterFeatures::AppendOnly, + WriterFeatures::Invariants, + WriterFeatures::CheckConstraints, + WriterFeatures::ChangeDataFeed, + WriterFeatures::GeneratedColumns, + WriterFeatures::ColumnMapping, + WriterFeatures::IdentityColumns, + ]); +} pub struct ProtocolChecker { reader_features: HashSet, @@ -67,19 +70,18 @@ impl ProtocolChecker { /// Check if delta-rs can read form the given delta table. pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> { - let required_features = match snapshot.min_reader_version() { - 0 | 1 => None, - 2 => Some(READER_V2.clone()), - _ => snapshot.reader_features().cloned(), - }; - + let required_features: Option<&HashSet> = + match snapshot.min_reader_version() { + 0 | 1 => None, + 2 => Some(&READER_V2), + _ => snapshot.reader_features(), + }; if let Some(features) = required_features { - let diff: Vec<_> = features - .difference(&self.reader_features) - .cloned() - .collect(); - if !diff.is_empty() { - return Err(TransactionError::UnsupportedReaderFeatures(diff)); + let mut diff = features.difference(&self.reader_features).peekable(); + if diff.peek().is_some() { + return Err(TransactionError::UnsupportedReaderFeatures( + diff.cloned().collect(), + )); } }; Ok(()) @@ -90,23 +92,23 @@ impl ProtocolChecker { // NOTE: writers must always support all required reader features self.can_read_from(snapshot)?; - let required_features = match snapshot.min_writer_version() { - 0 | 1 => None, - 2 => Some(WRITER_V2.clone()), - 3 => Some(WRITER_V3.clone()), - 4 => Some(WRITER_V4.clone()), - 5 => Some(WRITER_V5.clone()), - 6 => Some(WRITER_V6.clone()), - _ => snapshot.writer_features().cloned(), - }; + let required_features: Option<&HashSet> = + match snapshot.min_writer_version() { + 0 | 1 => None, + 2 => Some(&WRITER_V2), + 3 => Some(&WRITER_V3), + 4 => Some(&WRITER_V4), + 5 => Some(&WRITER_V5), + 6 => Some(&WRITER_V6), + _ => snapshot.writer_features(), + }; if let Some(features) = required_features { - let diff: Vec<_> = features - .difference(&self.writer_features) - .cloned() - .collect(); - if !diff.is_empty() { - return Err(TransactionError::UnsupportedWriterFeatures(diff)); + let mut diff = features.difference(&self.writer_features).peekable(); + if diff.peek().is_some() { + return Err(TransactionError::UnsupportedWriterFeatures( + diff.cloned().collect(), + )); } }; Ok(()) @@ -145,6 +147,12 @@ impl ProtocolChecker { } /// The global protocol checker instance to validate table versions and features. +/// +/// This instance is used by default in all transaction operations, since feature +/// support is not configurable but rather decided at compile time. +/// +/// As we implement new features, we need to update this instance accordingly. +/// resulting version support is determined by the supported table feature set. pub static INSTANCE: Lazy = Lazy::new(|| { let reader_features = HashSet::new(); // reader_features.insert(ReaderFeatures::ColumnMapping);