Skip to content

Commit

Permalink
feat: add protocol checker
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 5, 2023
1 parent 9aa301e commit 4663c64
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 37 deletions.
46 changes: 46 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
// use std::io::{Cursor, Read};
// use std::sync::Arc;
Expand Down Expand Up @@ -225,6 +226,24 @@ impl From<String> for ReaderFeatures {
}
}

impl AsRef<str> for ReaderFeatures {
fn as_ref(&self) -> &str {
match self {
ReaderFeatures::ColumnMapping => "columnMapping",
ReaderFeatures::DeleteionVecotrs => "deletionVectors",
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
ReaderFeatures::Other(f) => f,
}
}
}

impl fmt::Display for ReaderFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -303,6 +322,33 @@ impl From<String> for WriterFeatures {
}
}

impl AsRef<str> for WriterFeatures {
fn as_ref(&self) -> &str {
match self {
WriterFeatures::AppendOnly => "appendOnly",
WriterFeatures::Invariants => "invariants",
WriterFeatures::CheckConstraints => "checkConstraints",
WriterFeatures::ChangeDataFeed => "changeDataFeed",
WriterFeatures::GeneratedColumns => "generatedColumns",
WriterFeatures::ColumnMapping => "columnMapping",
WriterFeatures::IdentityColumns => "identityColumns",
WriterFeatures::DeleteionVecotrs => "deletionVectors",
WriterFeatures::RowTracking => "rowTracking",
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
WriterFeatures::DomainMetadata => "domainMetadata",
WriterFeatures::V2Checkpoint => "v2Checkpoint",
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
WriterFeatures::Other(f) => f,
}
}
}

impl fmt::Display for WriterFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

#[cfg(all(not(feature = "parquet2"), feature = "parquet"))]
impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
Expand Down
17 changes: 11 additions & 6 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use std::sync::Arc;
use futures::future::BoxFuture;
use serde_json::{Map, Value};

use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use super::transaction::{commit, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
use crate::protocol::{DeltaOperation, SaveMode};
Expand Down Expand Up @@ -247,8 +246,8 @@ impl CreateBuilder {
_ => unreachable!(),
})
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
min_reader_version: PROTOCOL.default_reader_version(),
min_writer_version: PROTOCOL.default_writer_version(),
writer_features: None,
reader_features: None,
});
Expand Down Expand Up @@ -392,8 +391,14 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION);
assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION);
assert_eq!(
table.get_min_reader_version(),
PROTOCOL.default_reader_version()
);
assert_eq!(
table.get_min_writer_version(),
PROTOCOL.default_writer_version()
);
assert_eq!(table.schema().unwrap(), &schema);

// check we can overwrite default settings via adding actions
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Value;

use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::errors::{DeltaResult, DeltaTableError};
Expand All @@ -44,8 +46,6 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

use super::datafusion_utils::Expression;

/// Delete Records from the Delta Table.
/// See this module's documentaiton for more information
pub struct DeleteBuilder {
Expand Down Expand Up @@ -274,6 +274,8 @@ impl std::future::IntoFuture for DeleteBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
let session = SessionContext::new();

Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;

use super::transaction::PROTOCOL;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::DeltaObjectStore;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -46,6 +47,8 @@ impl std::future::IntoFuture for LoadBuilder {
let this = self;

Box::pin(async move {
PROTOCOL.can_read_from(&this.snapshot)?;

let table = DeltaTable::new_with_state(this.store, this.snapshot);
let schema = table.state.arrow_schema()?;
let projection = this
Expand Down
4 changes: 3 additions & 1 deletion crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use serde::Serialize;
use serde_json::Value;

use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
use super::transaction::commit;
use super::transaction::{commit, PROTOCOL};
use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
use crate::delta_datafusion::{register_store, DeltaScanBuilder};
use crate::kernel::{Action, Remove};
Expand Down Expand Up @@ -1208,6 +1208,8 @@ impl std::future::IntoFuture for MergeBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
let session = SessionContext::new();

Expand Down
5 changes: 0 additions & 5 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ pub mod write;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

/// Maximum supported writer version
pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1;
/// Maximum supported reader version
pub const MAX_SUPPORTED_READER_VERSION: i32 = 1;

/// High level interface for executing commands against a DeltaTable
pub struct DeltaOps(pub DeltaTable);

Expand Down
4 changes: 3 additions & 1 deletion crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};

use super::transaction::commit;
use super::transaction::{commit, PROTOCOL};
use super::writer::{PartitionWriter, PartitionWriterConfig};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Remove};
Expand Down Expand Up @@ -259,6 +259,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
let this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let writer_properties = this.writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
Expand Down
19 changes: 16 additions & 3 deletions crates/deltalake-core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore};
use serde_json::Value;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, CommitInfo};
use crate::kernel::{Action, CommitInfo, ReaderFeatures, WriterFeatures};
use crate::protocol::DeltaOperation;
use crate::storage::commit_uri_from_version;
use crate::table::state::DeltaTableState;

pub use self::protocol::INSTANCE as PROTOCOL;

mod conflict_checker;
mod protocol;
#[cfg(feature = "datafusion")]
mod state;
#[cfg(test)]
pub(crate) mod test_utils;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};

const DELTA_LOG_FOLDER: &str = "_delta_log";

/// Error raised while commititng transaction
Expand All @@ -45,17 +47,28 @@ pub enum TransactionError {
#[from]
source: ObjectStoreError,
},

/// Error returned when a commit conflict ocurred
#[error("Failed to commit transaction: {0}")]
CommitConflict(#[from] CommitConflictError),

/// Error returned when maximum number of commit trioals is exceeded
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),

/// The transaction includes Remove action with data change but Delta table is append-only
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,

/// Error returned when unsupported reader features are required
#[error("Unsupported reader features required: {0:?}")]
UnsupportedReaderFeatures(Vec<ReaderFeatures>),

/// Error returned when unsupported writer features are required
#[error("Unsupported writer features required: {0:?}")]
UnsupportedWriterFeatures(Vec<WriterFeatures>),
}

impl From<TransactionError> for DeltaTableError {
Expand Down
138 changes: 138 additions & 0 deletions crates/deltalake-core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::collections::HashSet;

use once_cell::sync::Lazy;

use super::TransactionError;
use crate::kernel::{ReaderFeatures, WriterFeatures};
use crate::table::state::DeltaTableState;

static READER_V2: Lazy<HashSet<ReaderFeatures>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert(ReaderFeatures::ColumnMapping);
set
});

static WRITER_V2: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = HashSet::new();
set.insert(WriterFeatures::AppendOnly);
set.insert(WriterFeatures::Invariants);
set
});
static WRITER_V3: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V2.clone();
set.insert(WriterFeatures::CheckConstraints);
set
});
static WRITER_V4: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V3.clone();
set.insert(WriterFeatures::ChangeDataFeed);
set.insert(WriterFeatures::GeneratedColumns);
set
});
static WRITER_V5: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V4.clone();
set.insert(WriterFeatures::ColumnMapping);
set
});
static WRITER_V6: Lazy<HashSet<WriterFeatures>> = Lazy::new(|| {
let mut set = WRITER_V5.clone();
set.insert(WriterFeatures::IdentityColumns);
set
});

pub struct ProtocolChecker {
reader_features: HashSet<ReaderFeatures>,
writer_features: HashSet<WriterFeatures>,
}

impl ProtocolChecker {
/// Create a new protocol checker.
pub fn new(
reader_features: HashSet<ReaderFeatures>,
writer_features: HashSet<WriterFeatures>,
) -> Self {
Self {
reader_features,
writer_features,
}
}

pub fn default_reader_version(&self) -> i32 {
1
}

pub fn default_writer_version(&self) -> i32 {
1
}

/// Check if delta-rs can read form the given delta table.
pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
let required_features = match snapshot.min_reader_version() {
0 | 1 => None,
2 => Some(READER_V2.clone()),
_ => snapshot.reader_features().cloned(),
};

if let Some(features) = required_features {
let diff: Vec<_> = features
.difference(&self.reader_features)
.cloned()
.collect();
if diff.is_empty() {
Ok(())
} else {
Err(TransactionError::UnsupportedReaderFeatures(diff))
}
} else {
return Ok(());
}
}

/// Check if delta-rs can write to the given delta table.
pub fn can_write_to(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
// NOTE: writers must always support all required reader features
self.can_read_from(snapshot)?;

let required_features = match snapshot.min_writer_version() {
0 | 1 => None,
2 => Some(WRITER_V2.clone()),
3 => Some(WRITER_V3.clone()),
4 => Some(WRITER_V4.clone()),
5 => Some(WRITER_V5.clone()),
6 => Some(WRITER_V6.clone()),
_ => snapshot.writer_features().cloned(),
};

if let Some(features) = required_features {
let diff: Vec<_> = features
.difference(&self.writer_features)
.cloned()
.collect();
if diff.is_empty() {
Ok(())
} else {
Err(TransactionError::UnsupportedWriterFeatures(diff))
}
} else {
// TODO: check if this is correct
Ok(())
}
}
}

/// The global protocol checker instance to validate table versions and features.
pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {
let reader_features = HashSet::new();
// reader_features.insert(ReaderFeatures::ColumnMapping);

let mut writer_features = HashSet::new();
writer_features.insert(WriterFeatures::AppendOnly);
writer_features.insert(WriterFeatures::Invariants);
// writer_features.insert(WriterFeatures::CheckConstraints);
// writer_features.insert(WriterFeatures::ChangeDataFeed);
// writer_features.insert(WriterFeatures::GeneratedColumns);
// writer_features.insert(WriterFeatures::ColumnMapping);
// writer_features.insert(WriterFeatures::IdentityColumns);

ProtocolChecker::new(reader_features, writer_features)
});
Loading

0 comments on commit 4663c64

Please sign in to comment.