Skip to content

Commit

Permalink
fix: avoid allocations when checking protocol support
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 16, 2023
1 parent 114b6d4 commit ecb9090
Showing 1 changed file with 68 additions and 60 deletions.
128 changes: 68 additions & 60 deletions crates/deltalake-core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,47 @@
use std::collections::HashSet;

use lazy_static::lazy_static;
use once_cell::sync::Lazy;

use super::TransactionError;
use crate::kernel::{Action, ReaderFeatures, WriterFeatures};
use crate::table::state::DeltaTableState;

static READER_V2: Lazy<HashSet<ReaderFeatures>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert(ReaderFeatures::ColumnMapping);
set
});

static WRITER_V2: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert(WriterFeatures::AppendOnly);
set.insert(WriterFeatures::Invariants);
set
});
static WRITER_V3: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V2.clone();
set.insert(WriterFeatures::CheckConstraints);
set
});
static WRITER_V4: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V3.clone();
set.insert(WriterFeatures::ChangeDataFeed);
set.insert(WriterFeatures::GeneratedColumns);
set
});
static WRITER_V5: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V4.clone();
set.insert(WriterFeatures::ColumnMapping);
set
});
static WRITER_V6: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V5.clone();
set.insert(WriterFeatures::IdentityColumns);
set
});
lazy_static! {
static ref READER_V2: HashSet<ReaderFeatures> =
HashSet::from_iter([ReaderFeatures::ColumnMapping]);
static ref WRITER_V2: HashSet<WriterFeatures> =
HashSet::from_iter([WriterFeatures::AppendOnly, WriterFeatures::Invariants]);
static ref WRITER_V3: HashSet<WriterFeatures> = HashSet::from_iter([
WriterFeatures::AppendOnly,
WriterFeatures::Invariants,
WriterFeatures::CheckConstraints
]);
static ref WRITER_V4: HashSet<WriterFeatures> = HashSet::from_iter([
WriterFeatures::AppendOnly,
WriterFeatures::Invariants,
WriterFeatures::CheckConstraints,
WriterFeatures::ChangeDataFeed,
WriterFeatures::GeneratedColumns
]);
static ref WRITER_V5: HashSet<WriterFeatures> = HashSet::from_iter([
WriterFeatures::AppendOnly,
WriterFeatures::Invariants,
WriterFeatures::CheckConstraints,
WriterFeatures::ChangeDataFeed,
WriterFeatures::GeneratedColumns,
WriterFeatures::ColumnMapping,
]);
static ref WRITER_V6: HashSet<WriterFeatures> = HashSet::from_iter([
WriterFeatures::AppendOnly,
WriterFeatures::Invariants,
WriterFeatures::CheckConstraints,
WriterFeatures::ChangeDataFeed,
WriterFeatures::GeneratedColumns,
WriterFeatures::ColumnMapping,
WriterFeatures::IdentityColumns,
]);
}

pub struct ProtocolChecker {
reader_features: HashSet<ReaderFeatures>,
Expand Down Expand Up @@ -67,19 +70,18 @@ impl ProtocolChecker {

/// Check if delta-rs can read form the given delta table.
pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
let required_features = match snapshot.min_reader_version() {
0 | 1 => None,
2 => Some(READER_V2.clone()),
_ => snapshot.reader_features().cloned(),
};

let required_features: Option<&HashSet<ReaderFeatures>> =
match snapshot.min_reader_version() {
0 | 1 => None,
2 => Some(&READER_V2),
_ => snapshot.reader_features(),
};
if let Some(features) = required_features {
let diff: Vec<_> = features
.difference(&self.reader_features)
.cloned()
.collect();
if !diff.is_empty() {
return Err(TransactionError::UnsupportedReaderFeatures(diff));
let mut diff = features.difference(&self.reader_features).peekable();
if diff.peek().is_some() {
return Err(TransactionError::UnsupportedReaderFeatures(
diff.cloned().collect(),
));
}
};
Ok(())
Expand All @@ -90,23 +92,23 @@ impl ProtocolChecker {
// NOTE: writers must always support all required reader features
self.can_read_from(snapshot)?;

let required_features = match snapshot.min_writer_version() {
0 | 1 => None,
2 => Some(WRITER_V2.clone()),
3 => Some(WRITER_V3.clone()),
4 => Some(WRITER_V4.clone()),
5 => Some(WRITER_V5.clone()),
6 => Some(WRITER_V6.clone()),
_ => snapshot.writer_features().cloned(),
};
let required_features: Option<&HashSet<WriterFeatures>> =
match snapshot.min_writer_version() {
0 | 1 => None,
2 => Some(&WRITER_V2),
3 => Some(&WRITER_V3),
4 => Some(&WRITER_V4),
5 => Some(&WRITER_V5),
6 => Some(&WRITER_V6),
_ => snapshot.writer_features(),
};

if let Some(features) = required_features {
let diff: Vec<_> = features
.difference(&self.writer_features)
.cloned()
.collect();
if !diff.is_empty() {
return Err(TransactionError::UnsupportedWriterFeatures(diff));
let mut diff = features.difference(&self.writer_features).peekable();
if diff.peek().is_some() {
return Err(TransactionError::UnsupportedWriterFeatures(
diff.cloned().collect(),
));
}
};
Ok(())
Expand Down Expand Up @@ -145,6 +147,12 @@ impl ProtocolChecker {
}

/// The global protocol checker instance to validate table versions and features.
///
/// This instance is used by default in all transaction operations, since feature
/// support is not configurable but rather decided at compile time.
///
/// As we implement new features, we need to update this instance accordingly.
/// resulting version support is determined by the supported table feature set.
pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {
let reader_features = HashSet::new();
// reader_features.insert(ReaderFeatures::ColumnMapping);
Expand Down

0 comments on commit ecb9090

Please sign in to comment.