Skip to content

Commit

Permalink
feat: use EagerSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 7, 2024
1 parent 5e86161 commit a2fc20f
Show file tree
Hide file tree
Showing 69 changed files with 1,338 additions and 1,274 deletions.
4 changes: 2 additions & 2 deletions crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ async fn benchmark_merge_tpcds(
merge: fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>,
) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> {
let table = DeltaTableBuilder::from_uri(path).load().await?;
let file_count = table.state.files().len();
let file_count = table.snapshot()?.files()?.len();

let provider = DeltaTableProvider::try_new(
table.state.clone(),
table.snapshot()?.clone(),
table.log_store(),
DeltaScanConfig {
file_column_name: Some("file_path".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ async fn append_to_table(
table.log_store().as_ref(),
&actions,
operation,
&table.state,
Some(table.snapshot()?),
metadata,
)
.await
Expand Down
6 changes: 4 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,8 @@ mod test {
.cast_to::<DFSchema>(
&arrow_schema::DataType::Utf8,
&table
.state
.snapshot()
.unwrap()
.input_schema()
.unwrap()
.as_ref()
Expand Down Expand Up @@ -612,7 +613,8 @@ mod test {
assert_eq!(test.expected, actual);

let actual_expr = table
.state
.snapshot()
.unwrap()
.parse_predicate_expression(actual, &session.state())
.unwrap();

Expand Down
72 changes: 43 additions & 29 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl DeltaTableState {
let mut downgrade = false;
let unknown_stats = Statistics::new_unknown(&schema);

let files = self.files();
let files = self.files()?;

// Initalize statistics
let mut table_stats = match files.first() {
Expand Down Expand Up @@ -269,8 +269,8 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option

let data_type = field.data_type().try_into().ok()?;
let partition_columns = &table.metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
let files = table.snapshot().ok()?.files().ok()?;
let values = files.iter().map(|add| {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
let value = match value {
Expand Down Expand Up @@ -308,6 +308,7 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
ScalarValue::iter_to_array(values).ok()
}

// TODO only implement this for Snapshot, not for DeltaTable
impl PruningStatistics for DeltaTable {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
Expand All @@ -324,7 +325,7 @@ impl PruningStatistics for DeltaTable {
/// return the number of containers (e.g. row groups) being
/// pruned with these statistics
fn num_containers(&self) -> usize {
self.get_state().files().len()
self.get_state().unwrap().files().unwrap().len()
}

/// return the number of null values for the named column as an
Expand All @@ -333,8 +334,8 @@ impl PruningStatistics for DeltaTable {
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.metadata().ok()?.partition_columns;

let values = self.get_state().files().iter().map(|add| {
let files = self.snapshot().ok()?.files().ok()?;
let values = files.iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
if partition_columns.contains(&column.name) {
let value = add.partition_values.get(&column.name).unwrap();
Expand Down Expand Up @@ -564,7 +565,7 @@ impl<'a> DeltaScanBuilder<'a> {
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
self.snapshot
.files()
.files()?
.iter()
.zip(files_to_prune.into_iter())
.filter_map(
Expand All @@ -578,7 +579,7 @@ impl<'a> DeltaScanBuilder<'a> {
)
.collect()
} else {
self.snapshot.files().to_owned()
self.snapshot.files()?
}
}
};
Expand All @@ -588,7 +589,7 @@ impl<'a> DeltaScanBuilder<'a> {
// However we may want to do some additional balancing in case we are far off from the above.
let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();

let table_partition_cols = &self.snapshot.metadata()?.partition_columns;
let table_partition_cols = &self.snapshot.metadata().partition_columns;

for action in files.iter() {
let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
Expand Down Expand Up @@ -666,14 +667,15 @@ impl<'a> DeltaScanBuilder<'a> {
}
}

// TODO: implement this for Snapshot, not for DeltaTable
#[async_trait]
impl TableProvider for DeltaTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> Arc<ArrowSchema> {
self.state.arrow_schema().unwrap()
self.snapshot().unwrap().arrow_schema().unwrap()
}

fn table_type(&self) -> TableType {
Expand All @@ -698,7 +700,7 @@ impl TableProvider for DeltaTable {
register_store(self.log_store(), session.runtime_env().clone());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(&self.state, self.log_store(), session)
let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
.with_projection(projection)
.with_limit(limit)
.with_filter(filter_expr)
Expand All @@ -716,7 +718,7 @@ impl TableProvider for DeltaTable {
}

fn statistics(&self) -> Option<Statistics> {
self.state.datafusion_table_statistics().ok()
self.get_state()?.datafusion_table_statistics().ok()
}
}

Expand Down Expand Up @@ -1065,6 +1067,15 @@ pub struct DeltaDataChecker {
}

impl DeltaDataChecker {
/// Create a new DeltaDataChecker with no invariants or constraints
pub fn empty() -> Self {
Self {
invariants: vec![],
constraints: vec![],
ctx: DeltaSessionContext::default().into(),
}
}

/// Create a new DeltaDataChecker with a specified set of invariants
pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
Self {
Expand All @@ -1091,13 +1102,7 @@ impl DeltaDataChecker {

/// Create a new DeltaDataChecker
pub fn new(snapshot: &DeltaTableState) -> Self {
// TODO remove unwrap
let invariants = if let Ok(m) = snapshot.metadata() {
m.schema().unwrap().get_invariants().unwrap()
} else {
vec![]
};

let invariants = snapshot.schema().get_invariants().unwrap_or_default();
let constraints = snapshot.table_config().get_constraints();
Self {
invariants,
Expand Down Expand Up @@ -1418,7 +1423,7 @@ pub(crate) async fn find_files_scan<'a>(
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
.files()
.files()?
.iter()
.map(|add| (add.path.clone(), add.to_owned()))
.collect();
Expand Down Expand Up @@ -1478,7 +1483,7 @@ pub(crate) async fn scan_memory_table(
snapshot: &DeltaTableState,
predicate: &Expr,
) -> DeltaResult<Vec<Add>> {
let actions = snapshot.files().to_owned();
let actions = snapshot.files()?;

let batch = snapshot.add_actions_table(true)?;
let mut arrays = Vec::new();
Expand Down Expand Up @@ -1535,7 +1540,7 @@ pub async fn find_files<'a>(
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot.metadata()?;
let current_metadata = snapshot.metadata();

match &predicate {
Some(predicate) => {
Expand Down Expand Up @@ -1566,7 +1571,7 @@ pub async fn find_files<'a>(
}
}
None => Ok(FindFiles {
candidates: snapshot.files().to_owned(),
candidates: snapshot.files()?,
partition_scan: true,
}),
}
Expand Down Expand Up @@ -1912,11 +1917,13 @@ mod tests {
.unwrap();
let config = DeltaScanConfigBuilder::new()
.with_file_column_name(&"file_source")
.build(&table.state)
.build(&table.snapshot().unwrap())
.unwrap();

let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -1973,10 +1980,14 @@ mod tests {
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();
let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.unwrap();

let log_store = table.log_store();
let provider = DeltaTableProvider::try_new(table.state, log_store, config).unwrap();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down Expand Up @@ -2024,10 +2035,13 @@ mod tests {
.await
.unwrap();

let config = DeltaScanConfigBuilder::new().build(&table.state).unwrap();
let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.unwrap();
let log = table.log_store();

let provider = DeltaTableProvider::try_new(table.state, log, config).unwrap();
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ pub enum DeltaTableError {

#[error("Table metadata is invalid: {0}")]
MetadataError(String),

#[error("Table has not yet been initialized")]
NotInitialized,
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
Loading

0 comments on commit a2fc20f

Please sign in to comment.