Skip to content

Commit

Permalink
feat: append-only table feature
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 5, 2023
1 parent f23410f commit 96f8ad9
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 51 deletions.
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
&snapshot,
None,
)
.await?;
Expand Down
60 changes: 14 additions & 46 deletions crates/deltalake-core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ pub enum TransactionError {
/// Error returned when unsupported writer features are required
#[error("Unsupported writer features required: {0:?}")]
UnsupportedWriterFeatures(Vec<WriterFeatures>),

/// Error returned when writer features are required but not specified
#[error("Writer features must be specified for writerversion >= 7")]
WriterFeaturesRequired,

/// Error returned when reader features are required but not specified
#[error("Reader features must be specified for reader version >= 3")]
ReaderFeaturesRequired,
}

impl From<TransactionError> for DeltaTableError {
Expand All @@ -89,18 +97,9 @@ impl From<TransactionError> for DeltaTableError {
// Convert actions to their json representation
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
read_snapshot: &DeltaTableState,
) -> Result<String, TransactionError> {
let append_only = read_snapshot.table_config().append_only();
let mut jsons = Vec::<String>::new();
for action in actions {
if append_only {
if let Action::Remove(remove) = action {
if remove.data_change {
return Err(TransactionError::DeltaTableAppendOnly);
}
}
}
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
Expand All @@ -111,7 +110,6 @@ fn log_entry_from_actions<'a>(
pub(crate) fn get_commit_bytes(
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<HashMap<String, Value>>,
) -> Result<bytes::Bytes, TransactionError> {
if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
Expand All @@ -130,13 +128,9 @@ pub(crate) fn get_commit_bytes(
actions
.iter()
.chain(std::iter::once(&Action::CommitInfo(commit_info))),
read_snapshot,
)?))
} else {
Ok(bytes::Bytes::from(log_entry_from_actions(
actions,
read_snapshot,
)?))
Ok(bytes::Bytes::from(log_entry_from_actions(actions)?))
}
}

Expand All @@ -147,11 +141,10 @@ pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<HashMap<String, Value>>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?;
let log_entry = get_commit_bytes(operation, actions, app_metadata)?;

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
Expand Down Expand Up @@ -213,8 +206,8 @@ pub async fn commit_with_retries(
app_metadata: Option<HashMap<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit =
prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?;
PROTOCOL.can_commit(read_snapshot, actions)?;
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;

let mut attempt_number = 1;

Expand Down Expand Up @@ -255,11 +248,9 @@ pub async fn commit_with_retries(

#[cfg(all(test, feature = "parquet"))]
mod tests {
use self::test_utils::{create_remove_action, init_table_actions};
use self::test_utils::init_table_actions;
use super::*;
use crate::DeltaConfigKey;
use object_store::memory::InMemory;
use std::collections::HashMap;

#[test]
fn test_commit_uri_from_version() {
Expand All @@ -272,35 +263,12 @@ mod tests {
#[test]
fn test_log_entry_from_actions() {
let actions = init_table_actions(None);
let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap();
let entry = log_entry_from_actions(&actions, &state).unwrap();
let entry = log_entry_from_actions(&actions).unwrap();
let lines: Vec<_> = entry.lines().collect();
// writes every action to a line
assert_eq!(actions.len(), lines.len())
}

fn remove_action_exists_when_delta_table_is_append_only(
data_change: bool,
) -> Result<String, TransactionError> {
let remove = create_remove_action("test_append_only", data_change);
let mut actions = init_table_actions(Some(HashMap::from([(
DeltaConfigKey::AppendOnly.as_ref().to_string(),
Some("true".to_string()),
)])));
actions.push(remove);
let state =
DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state");
log_entry_from_actions(&actions, &state)
}

#[test]
fn test_remove_action_exists_when_delta_table_is_append_only() {
let _err = remove_action_exists_when_delta_table_is_append_only(true)
.expect_err("Remove action is included when Delta table is append-only. Should error");
let _actions = remove_action_exists_when_delta_table_is_append_only(false)
.expect("Data is not changed by the Remove action. Should succeed");
}

#[tokio::test]
async fn test_try_commit_transaction() {
let store = InMemory::new();
Expand Down
129 changes: 126 additions & 3 deletions crates/deltalake-core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use once_cell::sync::Lazy;

use super::TransactionError;
use crate::kernel::{ReaderFeatures, WriterFeatures};
use crate::kernel::{Action, ReaderFeatures, WriterFeatures};
use crate::table::state::DeltaTableState;

static READER_V2: Lazy<HashSet<ReaderFeatures>> = Lazy::new(|| {
Expand Down Expand Up @@ -62,7 +62,7 @@ impl ProtocolChecker {
}

pub fn default_writer_version(&self) -> i32 {
1
2
}

/// Check if delta-rs can read form the given delta table.
Expand Down Expand Up @@ -111,6 +111,37 @@ impl ProtocolChecker {
};
Ok(())
}

pub fn can_commit(
&self,
snapshot: &DeltaTableState,
actions: &[Action],
) -> Result<(), TransactionError> {
self.can_write_to(snapshot)?;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables
let append_only_enabled = if snapshot.min_writer_version() < 2 {
false
} else if snapshot.min_writer_version() < 7 {
snapshot.table_config().append_only()
} else {
snapshot
.writer_features()
.ok_or(TransactionError::WriterFeaturesRequired)?
.contains(&WriterFeatures::AppendOnly)
&& snapshot.table_config().append_only()
};
if append_only_enabled {
actions.iter().try_for_each(|action| match action {
Action::Remove(remove) if remove.data_change => {
Err(TransactionError::DeltaTableAppendOnly)
}
_ => Ok(()),
})?;
}

Ok(())
}
}

/// The global protocol checker instance to validate table versions and features.
Expand All @@ -132,8 +163,100 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {

#[cfg(test)]
mod tests {
use super::super::test_utils::create_metadata_action;
use super::*;
use crate::kernel::{Action, Protocol};
use crate::kernel::{Action, Add, Protocol, Remove};
use crate::DeltaConfigKey;
use std::collections::HashMap;

#[test]
fn test_can_commit_append_only() {
let append_actions = vec![Action::Add(Add {
path: "test".to_string(),
data_change: true,
..Default::default()
})];
let change_actions = vec![
Action::Add(Add {
path: "test".to_string(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "test".to_string(),
data_change: true,
..Default::default()
}),
];
let neutral_actions = vec![
Action::Add(Add {
path: "test".to_string(),
data_change: false,
..Default::default()
}),
Action::Remove(Remove {
path: "test".to_string(),
data_change: false,
..Default::default()
}),
];

let create_actions = |writer: i32, append: &str, feat: Vec<WriterFeatures>| {
vec![
Action::Protocol(Protocol {
min_reader_version: 1,
min_writer_version: writer,
writer_features: Some(feat.into_iter().collect()),
..Default::default()
}),
create_metadata_action(
None,
Some(HashMap::from([(
DeltaConfigKey::AppendOnly.as_ref().to_string(),
Some(append.to_string()),
)])),
),
]
};

let checker = ProtocolChecker::new(HashSet::new(), WRITER_V2.clone());

let actions = create_actions(1, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_ok());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());

let actions = create_actions(2, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_err());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());

let actions = create_actions(2, "false", vec![]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_ok());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());

let actions = create_actions(7, "true", vec![WriterFeatures::AppendOnly]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_err());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());

let actions = create_actions(7, "false", vec![WriterFeatures::AppendOnly]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_ok());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());

let actions = create_actions(7, "true", vec![]);
let snapshot = DeltaTableState::from_actions(actions, 1).unwrap();
assert!(checker.can_commit(&snapshot, &append_actions).is_ok());
assert!(checker.can_commit(&snapshot, &change_actions).is_ok());
assert!(checker.can_commit(&snapshot, &neutral_actions).is_ok());
}

#[test]
fn test_versions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub async fn create_initialized_table(
),
};
let actions = init_table_actions(None);
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None)
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None)
.await
.unwrap();
try_commit_transaction(storage.as_ref(), &prepared_commit, 0)
Expand Down

0 comments on commit 96f8ad9

Please sign in to comment.