Skip to content

Commit

Permalink
feat(rust): advance state in post commit (delta-io#2396)
Browse files Browse the repository at this point in the history
# Description
We advance the state in the post commit now, so it's done in a single
location as per suggestion from @Blajda here:
delta-io#2391 (comment)

This PR also supersedes this one:
delta-io#2280

# Related Issue(s)
- fixes delta-io#2279
- fixes delta-io#2262
  • Loading branch information
ion-elgreco authored Apr 27, 2024
1 parent 9d3ecbe commit 28ad395
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 151 deletions.
7 changes: 4 additions & 3 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
35 changes: 13 additions & 22 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ async fn excute_non_empty_expr(
async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), DeleteMetrics)> {
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

let scan_start = Instant::now();
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis();

let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
Expand All @@ -205,7 +205,7 @@ async fn execute(
} else {
let write_start = Instant::now();
let add = excute_non_empty_expr(
snapshot,
&snapshot,
log_store.clone(),
&state,
&predicate,
Expand Down Expand Up @@ -258,21 +258,14 @@ async fn execute(
predicate: Some(fmt_expr_to_sql(&predicate)?),
};
if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot.clone(), metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store, operation)?
.build(Some(&snapshot), log_store, operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}

impl std::future::IntoFuture for DeleteBuilder {
Expand Down Expand Up @@ -305,22 +298,20 @@ impl std::future::IntoFuture for DeleteBuilder {
None => None,
};

let ((actions, version, operation), metrics) = execute(
let (new_snapshot, metrics) = execute(
predicate,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
)
.await?;

if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}

let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, new_snapshot),
metrics,
))
})
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

this.snapshot
.merge(commit.data.actions, &commit.data.operation, commit.version)?;
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot))
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
37 changes: 14 additions & 23 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ async fn execute(
predicate: Expression,
source: DataFrame,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
Expand All @@ -942,7 +942,7 @@ async fn execute(
match_operations: Vec<MergeOperationConfig>,
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<((Vec<Action>, i64, Option<DeltaOperation>), MergeMetrics)> {
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

Expand Down Expand Up @@ -987,7 +987,7 @@ async fn execute(
let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(true)
.with_parquet_pushdown(false)
.build(snapshot)?;
.build(&snapshot)?;

let target_provider = Arc::new(DeltaTableProvider::try_new(
snapshot.clone(),
Expand Down Expand Up @@ -1017,7 +1017,7 @@ async fn execute(
} else {
try_construct_early_filter(
predicate.clone(),
snapshot,
&snapshot,
&state,
&source,
&source_name,
Expand Down Expand Up @@ -1370,7 +1370,7 @@ async fn execute(

let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(snapshot),
Some(&snapshot),
state.clone(),
write,
table_partition_cols.clone(),
Expand Down Expand Up @@ -1449,21 +1449,14 @@ async fn execute(
};

if actions.is_empty() {
return Ok(((actions, snapshot.version(), None), metrics));
return Ok((snapshot, metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store.clone(), operation)?
.build(Some(&snapshot), log_store.clone(), operation)?
.await?;
Ok((
(
commit.data.actions,
commit.version,
Some(commit.data.operation),
),
metrics,
))
Ok((commit.snapshot(), metrics))
}

fn remove_table_alias(expr: Expr, table_alias: &str) -> Expr {
Expand Down Expand Up @@ -1521,11 +1514,11 @@ impl std::future::IntoFuture for MergeBuilder {
session.state()
});

let ((actions, version, operation), metrics) = execute(
let (snapshot, metrics) = execute(
this.predicate,
this.source,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
Expand All @@ -1538,12 +1531,10 @@ impl std::future::IntoFuture for MergeBuilder {
)
.await?;

if let Some(op) = &operation {
this.snapshot.merge(actions, op, version)?;
}
let table = DeltaTable::new_with_state(this.log_store, this.snapshot);

Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
))
})
}
}
Expand Down
111 changes: 58 additions & 53 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync {
fn metadata(&self) -> &Metadata;

/// Try to cast this table reference to a `EagerSnapshot`
fn eager_snapshot(&self) -> Option<&EagerSnapshot>;
fn eager_snapshot(&self) -> &EagerSnapshot;
}

impl TableReference for EagerSnapshot {
Expand All @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot {
self.table_config()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(self)
fn eager_snapshot(&self) -> &EagerSnapshot {
self
}
}

Expand All @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState {
self.snapshot.metadata()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(&self.snapshot)
fn eager_snapshot(&self) -> &EagerSnapshot {
&self.snapshot
}
}

Expand Down Expand Up @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {

// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot =
this.table_data
.unwrap()
.eager_snapshot()
.ok_or(DeltaTableError::Generic(
"Expected an instance of EagerSnapshot".to_owned(),
))?;
let read_snapshot = this.table_data.unwrap().eager_snapshot();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
Expand Down Expand Up @@ -595,59 +589,72 @@ pub struct PostCommit<'a> {

impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(
&self,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if self.create_checkpoint {
self.create_checkpoint(&self.table_data, &self.log_store, version, commit_data)
.await?
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
if let Some(table) = self.table_data {
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
// This may only occur during concurrent write actions. We need to update the state first to - 1
// then we can advance.
snapshot
.update(self.log_store.clone(), Some(self.version - 1))
.await?;
snapshot.advance(vec![&self.data])?;
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
.await?;
}
Ok(state)
} else {
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.object_store(),
Default::default(),
Some(self.version),
)
.await?;
Ok(state)
}
Ok(())
}
async fn create_checkpoint(
&self,
table: &Option<&'a dyn TableReference>,
table_state: &DeltaTableState,
log_store: &LogStoreRef,
version: i64,
commit_data: &CommitData,
) -> DeltaResult<()> {
if let Some(table) = table {
let checkpoint_interval = table.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
// We have to advance the snapshot otherwise we can't create a checkpoint
let mut snapshot = table.eager_snapshot().unwrap().clone();
snapshot.advance(vec![commit_data])?;
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
create_checkpoint_for(version, &state, log_store.as_ref()).await?
}
let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
if ((version + 1) % checkpoint_interval) == 0 {
create_checkpoint_for(version, table_state, log_store.as_ref()).await?
}
Ok(())
}
}

/// A commit that successfully completed
pub struct FinalizedCommit {
/// The winning version number of the commit
/// The new table state after a commmit
pub snapshot: DeltaTableState,

/// Version of the finalized commit
pub version: i64,
/// The data that was comitted to the log store
pub data: CommitData,
}

impl FinalizedCommit {
/// The materialized version of the commit
/// The new table state after a commmit
pub fn snapshot(&self) -> DeltaTableState {
self.snapshot.clone()
}
/// Version of the finalized commit
pub fn version(&self) -> i64 {
self.version
}

/// Data used to write the commit
pub fn data(&self) -> &CommitData {
&self.data
}
}

impl<'a> std::future::IntoFuture for PostCommit<'a> {
Expand All @@ -658,15 +665,13 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {
let this = self;

Box::pin(async move {
match this.run_post_commit_hook(this.version, &this.data).await {
Ok(_) => {
return Ok(FinalizedCommit {
version: this.version,
data: this.data,
})
}
Err(err) => return Err(err),
};
match this.run_post_commit_hook().await {
Ok(snapshot) => Ok(FinalizedCommit {
snapshot,
version: this.version,
}),
Err(err) => Err(err),
}
})
}
}
Expand Down
Loading

0 comments on commit 28ad395

Please sign in to comment.