Skip to content

Commit

Permalink
feat: add protocol checker
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 6, 2023
1 parent dcc3a7b commit 89c2f85
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 39 deletions.
1 change: 1 addition & 0 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"
hyper = { version = "0.14", features = ["server"] }
criterion = "0.5"

[features]
azure = ["object_store/azure"]
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/benches/read_checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use deltalake::table::state::DeltaTableState;
use deltalake::DeltaTableConfig;
use deltalake_core::table::state::DeltaTableState;
use deltalake_core::DeltaTableConfig;
use std::fs::File;
use std::io::Read;

Expand Down
46 changes: 46 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
// use std::io::{Cursor, Read};
// use std::sync::Arc;
Expand Down Expand Up @@ -225,6 +226,24 @@ impl From<String> for ReaderFeatures {
}
}

impl AsRef<str> for ReaderFeatures {
fn as_ref(&self) -> &str {
match self {
ReaderFeatures::ColumnMapping => "columnMapping",
ReaderFeatures::DeleteionVecotrs => "deletionVectors",
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
ReaderFeatures::Other(f) => f,
}
}
}

impl fmt::Display for ReaderFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -303,6 +322,33 @@ impl From<String> for WriterFeatures {
}
}

impl AsRef<str> for WriterFeatures {
fn as_ref(&self) -> &str {
match self {
WriterFeatures::AppendOnly => "appendOnly",
WriterFeatures::Invariants => "invariants",
WriterFeatures::CheckConstraints => "checkConstraints",
WriterFeatures::ChangeDataFeed => "changeDataFeed",
WriterFeatures::GeneratedColumns => "generatedColumns",
WriterFeatures::ColumnMapping => "columnMapping",
WriterFeatures::IdentityColumns => "identityColumns",
WriterFeatures::DeleteionVecotrs => "deletionVectors",
WriterFeatures::RowTracking => "rowTracking",
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
WriterFeatures::DomainMetadata => "domainMetadata",
WriterFeatures::V2Checkpoint => "v2Checkpoint",
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
WriterFeatures::Other(f) => f,
}
}
}

impl fmt::Display for WriterFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

#[cfg(all(not(feature = "parquet2"), feature = "parquet"))]
impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
Expand Down
17 changes: 11 additions & 6 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use std::sync::Arc;
use futures::future::BoxFuture;
use serde_json::{Map, Value};

use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use super::transaction::{commit, PROTOCOL};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, DataType, Metadata, Protocol, StructField, StructType};
use crate::protocol::{DeltaOperation, SaveMode};
Expand Down Expand Up @@ -247,8 +246,8 @@ impl CreateBuilder {
_ => unreachable!(),
})
.unwrap_or_else(|| Protocol {
min_reader_version: MAX_SUPPORTED_READER_VERSION,
min_writer_version: MAX_SUPPORTED_WRITER_VERSION,
min_reader_version: PROTOCOL.default_reader_version(),
min_writer_version: PROTOCOL.default_writer_version(),
writer_features: None,
reader_features: None,
});
Expand Down Expand Up @@ -392,8 +391,14 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_reader_version(), MAX_SUPPORTED_READER_VERSION);
assert_eq!(table.get_min_writer_version(), MAX_SUPPORTED_WRITER_VERSION);
assert_eq!(
table.get_min_reader_version(),
PROTOCOL.default_reader_version()
);
assert_eq!(
table.get_min_writer_version(),
PROTOCOL.default_writer_version()
);
assert_eq!(table.schema().unwrap(), &schema);

// check we can overwrite default settings via adding actions
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use parquet::file::properties::WriterProperties;
use serde::Serialize;
use serde_json::Value;

use super::datafusion_utils::Expression;
use super::transaction::PROTOCOL;
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
use crate::errors::{DeltaResult, DeltaTableError};
Expand All @@ -44,8 +46,6 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table::state::DeltaTableState;
use crate::DeltaTable;

use super::datafusion_utils::Expression;

/// Delete Records from the Delta Table.
/// See this module's documentaiton for more information
pub struct DeleteBuilder {
Expand Down Expand Up @@ -274,6 +274,8 @@ impl std::future::IntoFuture for DeleteBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
let session = SessionContext::new();

Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;

use super::transaction::PROTOCOL;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::storage::DeltaObjectStore;
use crate::table::state::DeltaTableState;
Expand Down Expand Up @@ -46,6 +47,8 @@ impl std::future::IntoFuture for LoadBuilder {
let this = self;

Box::pin(async move {
PROTOCOL.can_read_from(&this.snapshot)?;

let table = DeltaTable::new_with_state(this.store, this.snapshot);
let schema = table.state.arrow_schema()?;
let projection = this
Expand Down
4 changes: 3 additions & 1 deletion crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use serde::Serialize;
use serde_json::Value;

use super::datafusion_utils::{into_expr, maybe_into_expr, Expression};
use super::transaction::commit;
use super::transaction::{commit, PROTOCOL};
use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression};
use crate::delta_datafusion::{register_store, DeltaScanBuilder};
use crate::kernel::{Action, Remove};
Expand Down Expand Up @@ -1208,6 +1208,8 @@ impl std::future::IntoFuture for MergeBuilder {
let mut this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let state = this.state.unwrap_or_else(|| {
let session = SessionContext::new();

Expand Down
5 changes: 0 additions & 5 deletions crates/deltalake-core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ pub mod write;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

/// Maximum supported writer version
pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1;
/// Maximum supported reader version
pub const MAX_SUPPORTED_READER_VERSION: i32 = 1;

/// High level interface for executing commands against a DeltaTable
pub struct DeltaOps(pub DeltaTable);

Expand Down
4 changes: 3 additions & 1 deletion crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parquet::errors::ParquetError;
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};

use super::transaction::commit;
use super::transaction::{commit, PROTOCOL};
use super::writer::{PartitionWriter, PartitionWriterConfig};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, Remove};
Expand Down Expand Up @@ -259,6 +259,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
let this = self;

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot)?;

let writer_properties = this.writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
Expand Down
19 changes: 16 additions & 3 deletions crates/deltalake-core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ use object_store::path::Path;
use object_store::{Error as ObjectStoreError, ObjectStore};
use serde_json::Value;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::{Action, CommitInfo};
use crate::kernel::{Action, CommitInfo, ReaderFeatures, WriterFeatures};
use crate::protocol::DeltaOperation;
use crate::storage::commit_uri_from_version;
use crate::table::state::DeltaTableState;

pub use self::protocol::INSTANCE as PROTOCOL;

mod conflict_checker;
mod protocol;
#[cfg(feature = "datafusion")]
mod state;
#[cfg(test)]
pub(crate) mod test_utils;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};

const DELTA_LOG_FOLDER: &str = "_delta_log";

/// Error raised while commititng transaction
Expand All @@ -45,17 +47,28 @@ pub enum TransactionError {
#[from]
source: ObjectStoreError,
},

/// Error returned when a commit conflict ocurred
#[error("Failed to commit transaction: {0}")]
CommitConflict(#[from] CommitConflictError),

/// Error returned when maximum number of commit trioals is exceeded
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),

/// The transaction includes Remove action with data change but Delta table is append-only
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,

/// Error returned when unsupported reader features are required
#[error("Unsupported reader features required: {0:?}")]
UnsupportedReaderFeatures(Vec<ReaderFeatures>),

/// Error returned when unsupported writer features are required
#[error("Unsupported writer features required: {0:?}")]
UnsupportedWriterFeatures(Vec<WriterFeatures>),
}

impl From<TransactionError> for DeltaTableError {
Expand Down
Loading

0 comments on commit 89c2f85

Please sign in to comment.