Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: simplify DeltaTableState
Browse files Browse the repository at this point in the history
roeap committed Nov 24, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent fa6c513 commit 8a1fa91
Showing 15 changed files with 89 additions and 148 deletions.
6 changes: 2 additions & 4 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
@@ -568,7 +568,7 @@ impl<'a> DeltaScanBuilder<'a> {

let table_partition_cols = &self
.snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

@@ -1457,9 +1457,7 @@ pub async fn find_files<'a>(
state: &SessionState,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;

match &predicate {
Some(predicate) => {
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/delete.rs
Original file line number Diff line number Diff line change
@@ -139,7 +139,7 @@ async fn excute_non_empty_expr(
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let table_partition_cols = snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns
.clone();
4 changes: 1 addition & 3 deletions crates/deltalake-core/src/operations/merge.rs
Original file line number Diff line number Diff line change
@@ -665,9 +665,7 @@ async fn execute(
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;

// TODO: Given the join predicate, remove any expression that involve the
// source table and keep expressions that only involve the target table.
6 changes: 3 additions & 3 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
@@ -767,7 +767,7 @@ pub fn create_merge_plan(
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());

let partitions_keys = &snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

@@ -785,7 +785,7 @@ pub fn create_merge_plan(
&Arc::new(
<ArrowSchema as TryFrom<&crate::kernel::StructType>>::try_from(
&snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.schema,
)?,
@@ -933,7 +933,7 @@ fn build_zorder_plan(
)));
}
let field_names = snapshot
.current_metadata()
.metadata()
.unwrap()
.schema
.fields()
12 changes: 6 additions & 6 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
@@ -209,12 +209,12 @@ async fn execute(
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
writer_features: if snapshot.min_writer_version() < 7 {
writer_features: if snapshot.protocol().min_writer_version < 7 {
None
} else {
table.get_writer_features().cloned()
},
reader_features: if snapshot.min_reader_version() < 3 {
reader_features: if snapshot.protocol().min_reader_version < 3 {
None
} else {
table.get_reader_features().cloned()
@@ -224,14 +224,14 @@ async fn execute(
Protocol {
min_reader_version: max(
table.get_min_reader_version(),
snapshot.min_reader_version(),
snapshot.protocol().min_reader_version,
),
min_writer_version: max(
table.get_min_writer_version(),
snapshot.min_writer_version(),
snapshot.protocol().min_writer_version,
),
writer_features: snapshot.writer_features().cloned(),
reader_features: snapshot.reader_features().cloned(),
writer_features: snapshot.protocol().writer_features.clone(),
reader_features: snapshot.protocol().reader_features.clone(),
}
};
actions.push(Action::Protocol(protocol));
Original file line number Diff line number Diff line change
@@ -398,11 +398,11 @@ impl<'a> ConflictChecker<'a> {
for p in self.winning_commit_summary.protocol() {
let (win_read, curr_read) = (
p.min_reader_version,
self.txn_info.read_snapshot.min_reader_version(),
self.txn_info.read_snapshot.protocol().min_reader_version,
);
let (win_write, curr_write) = (
p.min_writer_version,
self.txn_info.read_snapshot.min_writer_version(),
self.txn_info.read_snapshot.protocol().min_writer_version,
);
if curr_read < win_read || win_write < curr_write {
return Err(CommitConflictError::ProtocolChanged(
@@ -475,7 +475,7 @@ impl<'a> ConflictChecker<'a> {
let partition_columns = &self
.txn_info
.read_snapshot
.current_metadata()
.metadata()
.ok_or(CommitConflictError::NoMetadata)?
.partition_columns;
AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
16 changes: 9 additions & 7 deletions crates/deltalake-core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
@@ -71,10 +71,10 @@ impl ProtocolChecker {
/// Check if delta-rs can read form the given delta table.
pub fn can_read_from(&self, snapshot: &DeltaTableState) -> Result<(), TransactionError> {
let required_features: Option<&HashSet<ReaderFeatures>> =
match snapshot.min_reader_version() {
match snapshot.protocol().min_reader_version {
0 | 1 => None,
2 => Some(&READER_V2),
_ => snapshot.reader_features(),
_ => snapshot.protocol().reader_features.as_ref(),
};
if let Some(features) = required_features {
let mut diff = features.difference(&self.reader_features).peekable();
@@ -93,14 +93,14 @@ impl ProtocolChecker {
self.can_read_from(snapshot)?;

let required_features: Option<&HashSet<WriterFeatures>> =
match snapshot.min_writer_version() {
match snapshot.protocol().min_writer_version {
0 | 1 => None,
2 => Some(&WRITER_V2),
3 => Some(&WRITER_V3),
4 => Some(&WRITER_V4),
5 => Some(&WRITER_V5),
6 => Some(&WRITER_V6),
_ => snapshot.writer_features(),
_ => snapshot.protocol().writer_features.as_ref(),
};

if let Some(features) = required_features {
@@ -122,13 +122,15 @@ impl ProtocolChecker {
self.can_write_to(snapshot)?;

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#append-only-tables
let append_only_enabled = if snapshot.min_writer_version() < 2 {
let append_only_enabled = if snapshot.protocol().min_writer_version < 2 {
false
} else if snapshot.min_writer_version() < 7 {
} else if snapshot.protocol().min_writer_version < 7 {
snapshot.table_config().append_only()
} else {
snapshot
.writer_features()
.protocol()
.writer_features
.as_ref()
.ok_or(TransactionError::WriterFeaturesRequired)?
.contains(&WriterFeatures::AppendOnly)
&& snapshot.table_config().append_only()
8 changes: 4 additions & 4 deletions crates/deltalake-core/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ impl DeltaTableState {
}

fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let meta = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let fields = meta
.schema
.fields()
@@ -299,7 +299,7 @@ impl PruningStatistics for DeltaTableState {
/// return the minimum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.min_values(column)
@@ -308,7 +308,7 @@ impl PruningStatistics for DeltaTableState {
/// return the maximum values for the named column, if known.
/// Note: the returned array must contain `num_containers()` rows.
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.max_values(column)
@@ -325,7 +325,7 @@ impl PruningStatistics for DeltaTableState {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.current_metadata()?.partition_columns;
let partition_columns = &self.metadata()?.partition_columns;
let container =
AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?);
container.null_counts(column)
4 changes: 1 addition & 3 deletions crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
@@ -207,9 +207,7 @@ async fn execute(
})
.collect::<Result<HashMap<Column, Expr>, _>>()?;

let current_metadata = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?;
let table_partition_cols = current_metadata.partition_columns.clone();

let scan_start = Instant::now();
9 changes: 7 additions & 2 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
@@ -163,7 +163,12 @@ impl VacuumBuilder {

/// Determine which files can be deleted. Does not actually peform the deletion
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
let min_retention = Duration::milliseconds(self.snapshot.tombstone_retention_millis());
let min_retention = Duration::milliseconds(
self.snapshot
.table_config()
.deleted_file_retention_duration()
.as_millis() as i64,
);
let retention_period = self.retention_period.unwrap_or(min_retention);
let enforce_retention_duration = self.enforce_retention_duration;

@@ -191,7 +196,7 @@ impl VacuumBuilder {
.map_err(DeltaTableError::from)?;
let partition_columns = &self
.snapshot
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.partition_columns;

4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -253,7 +253,7 @@ pub(crate) async fn write_execution_plan(
safe_cast: bool,
) -> DeltaResult<Vec<Add>> {
let invariants = snapshot
.current_metadata()
.metadata()
.and_then(|meta| meta.schema.get_invariants().ok())
.unwrap_or_default();

@@ -318,7 +318,7 @@ impl std::future::IntoFuture for WriteBuilder {

let active_partitions = this
.snapshot
.current_metadata()
.metadata()
.map(|meta| meta.partition_columns.clone());

// validate partition columns
22 changes: 15 additions & 7 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -82,8 +82,12 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError>
/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError> {
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
let log_retention_timestamp = Utc::now().timestamp_millis()
- table
.get_state()
.table_config()
.log_retention_duration()
.as_millis() as i64;
cleanup_expired_logs_for(
table.version(),
table.log_store.as_ref(),
@@ -105,8 +109,12 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
.map_err(|err| ProtocolError::Generic(err.to_string()))?;
create_checkpoint_for(version, table.get_state(), table.log_store.as_ref()).await?;

let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());
let enable_expired_log_cleanup = cleanup.unwrap_or_else(|| {
table
.get_state()
.table_config()
.enable_expired_log_cleanup()
});

if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
@@ -213,7 +221,7 @@ pub async fn cleanup_expired_logs_for(
fn parquet_bytes_from_state(
state: &DeltaTableState,
) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
let current_metadata = state.current_metadata().ok_or(ProtocolError::NoMetaData)?;
let current_metadata = state.metadata().ok_or(ProtocolError::NoMetaData)?;

let partition_col_data_types = current_metadata.get_partition_col_data_types();

@@ -247,8 +255,8 @@ fn parquet_bytes_from_state(

// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.min_reader_version(),
min_writer_version: state.min_writer_version(),
min_reader_version: state.protocol().min_reader_version,
min_writer_version: state.protocol().min_writer_version,
writer_features: None,
reader_features: None,
}))
20 changes: 9 additions & 11 deletions crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
@@ -766,9 +766,7 @@ impl DeltaTable {

/// Returns the metadata associated with the loaded state.
pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> {
self.state
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)
self.state.metadata().ok_or(DeltaTableError::NoMetadata)
}

/// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log).
@@ -784,23 +782,23 @@ impl DeltaTable {
/// Returns the minimum reader version supported by the DeltaTable based on the loaded
/// metadata.
pub fn get_min_reader_version(&self) -> i32 {
self.state.min_reader_version()
self.state.protocol().min_reader_version
}

/// Returns the minimum writer version supported by the DeltaTable based on the loaded
/// metadata.
pub fn get_min_writer_version(&self) -> i32 {
self.state.min_writer_version()
self.state.protocol().min_writer_version
}

/// Returns current supported reader features by this table
pub fn get_reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.state.reader_features()
self.state.protocol().reader_features.as_ref()
}

/// Returns current supported writer features by this table
pub fn get_writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.state.writer_features()
self.state.protocol().writer_features.as_ref()
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
@@ -819,7 +817,7 @@ impl DeltaTable {
pub fn get_configurations(&self) -> Result<&HashMap<String, Option<String>>, DeltaTableError> {
Ok(self
.state
.current_metadata()
.metadata()
.ok_or(DeltaTableError::NoMetadata)?
.get_configuration())
}
@@ -869,7 +867,7 @@ impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_uri())?;
writeln!(f, "\tversion: {}", self.version())?;
match self.state.current_metadata() {
match self.state.metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {metadata}")?;
}
@@ -880,8 +878,8 @@ impl fmt::Display for DeltaTable {
writeln!(
f,
"\tmin_version: read={}, write={}",
self.state.min_reader_version(),
self.state.min_writer_version()
self.state.protocol().min_reader_version,
self.state.protocol().min_writer_version
)?;
writeln!(f, "\tfiles count: {}", self.state.files().len())
}
112 changes: 23 additions & 89 deletions crates/deltalake-core/src/table/state.rs
Original file line number Diff line number Diff line change
@@ -12,10 +12,8 @@ use serde::{Deserialize, Serialize};

use super::config::TableConfig;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, Add, CommitInfo, DataType, DomainMetadata, ReaderFeatures, Remove, StructType,
WriterFeatures,
};
use crate::kernel::Protocol;
use crate::kernel::{Action, Add, CommitInfo, DataType, DomainMetadata, Remove, StructType};
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::protocol::ProtocolError;
use crate::storage::commit_uri_from_version;
@@ -41,17 +39,9 @@ pub struct DeltaTableState {
// Domain metadatas provided by the system or user
domain_metadatas: Vec<DomainMetadata>,
app_transaction_version: HashMap<String, i64>,
min_reader_version: i32,
min_writer_version: i32,
reader_features: Option<HashSet<ReaderFeatures>>,
writer_features: Option<HashSet<WriterFeatures>>,
// table metadata corresponding to current version
current_metadata: Option<DeltaTableMetaData>,
// retention period for tombstones in milli-seconds
tombstone_retention_millis: i64,
// retention period for log entries in milli-seconds
log_retention_millis: i64,
enable_expired_log_cleanup: bool,
current_protocol: Option<Protocol>,
}

impl DeltaTableState {
@@ -173,21 +163,6 @@ impl DeltaTableState {
&self.commit_infos
}

/// Retention of tombstone in milliseconds.
pub fn tombstone_retention_millis(&self) -> i64 {
self.tombstone_retention_millis
}

/// Retention of logs in milliseconds.
pub fn log_retention_millis(&self) -> i64 {
self.log_retention_millis
}

/// Whether to clean up expired checkpoints and delta logs.
pub fn enable_expired_log_cleanup(&self) -> bool {
self.enable_expired_log_cleanup
}

/// Full list of tombstones (remove actions) representing files removed from table state).
pub fn all_tombstones(&self) -> &HashSet<Remove> {
&self.tombstones
@@ -196,7 +171,11 @@ impl DeltaTableState {
/// List of unexpired tombstones (remove actions) representing files removed from table state.
/// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week.
pub fn unexpired_tombstones(&self) -> impl Iterator<Item = &Remove> {
let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis;
let retention_timestamp = Utc::now().timestamp_millis()
- self
.table_config()
.deleted_file_retention_duration()
.as_millis() as i64;
self.tombstones
.iter()
.filter(move |t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp)
@@ -223,28 +202,16 @@ impl DeltaTableState {
&self.app_transaction_version
}

/// The min reader version required by the protocol.
pub fn min_reader_version(&self) -> i32 {
self.min_reader_version
}

/// The min writer version required by the protocol.
pub fn min_writer_version(&self) -> i32 {
self.min_writer_version
}

/// Current supported reader features
pub fn reader_features(&self) -> Option<&HashSet<ReaderFeatures>> {
self.reader_features.as_ref()
}

/// Current supported writer features
pub fn writer_features(&self) -> Option<&HashSet<WriterFeatures>> {
self.writer_features.as_ref()
/// The most recent protocol of the table.
pub fn protocol(&self) -> &Protocol {
lazy_static! {
static ref DEFAULT_PROTOCOL: Protocol = Protocol::default();
}
self.current_protocol.as_ref().unwrap_or(&DEFAULT_PROTOCOL)
}

/// The most recent metadata of the table.
pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> {
pub fn metadata(&self) -> Option<&DeltaTableMetaData> {
self.current_metadata.as_ref()
}

@@ -299,26 +266,14 @@ impl DeltaTableState {
self.files.append(&mut new_state.files);
}

if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
self.min_writer_version = new_state.min_writer_version;
}

if new_state.min_writer_version >= 5 {
self.writer_features = new_state.writer_features;
}

if new_state.min_reader_version >= 3 {
self.reader_features = new_state.reader_features;
}

if new_state.current_metadata.is_some() {
self.tombstone_retention_millis = new_state.tombstone_retention_millis;
self.log_retention_millis = new_state.log_retention_millis;
self.enable_expired_log_cleanup = new_state.enable_expired_log_cleanup;
self.current_metadata = new_state.current_metadata.take();
}

if new_state.current_protocol.is_some() {
self.current_protocol = new_state.current_protocol.take();
}

new_state
.app_transaction_version
.drain()
@@ -359,19 +314,10 @@ impl DeltaTableState {
}
}
Action::Protocol(v) => {
self.min_reader_version = v.min_reader_version;
self.min_writer_version = v.min_writer_version;
self.reader_features = v.reader_features;
self.writer_features = v.writer_features;
self.current_protocol = Some(v);
}
Action::Metadata(v) => {
let md = DeltaTableMetaData::try_from(v)?;
let table_config = TableConfig(&md.configuration);
self.tombstone_retention_millis =
table_config.deleted_file_retention_duration().as_millis() as i64;
self.log_retention_millis =
table_config.log_retention_duration().as_millis() as i64;
self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup();
self.current_metadata = Some(md);
}
Action::Txn(v) => {
@@ -396,7 +342,7 @@ impl DeltaTableState {
&'a self,
filters: &'a [PartitionFilter],
) -> Result<impl Iterator<Item = &Add> + '_, DeltaTableError> {
let current_metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let current_metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;

let nonpartitioned_columns: Vec<String> = filters
.iter()
@@ -444,14 +390,8 @@ mod tests {
commit_infos: vec![],
domain_metadatas: vec![],
app_transaction_version: Default::default(),
min_reader_version: 0,
min_writer_version: 0,
reader_features: None,
writer_features: None,
current_metadata: None,
tombstone_retention_millis: 0,
log_retention_millis: 0,
enable_expired_log_cleanup: false,
current_protocol: None,
};
let bytes = serde_json::to_vec(&expected).unwrap();
let actual: DeltaTableState = serde_json::from_slice(&bytes).unwrap();
@@ -471,14 +411,8 @@ mod tests {
domain_metadatas: vec![],
tombstones: HashSet::new(),
current_metadata: None,
min_reader_version: 1,
min_writer_version: 1,
reader_features: None,
writer_features: None,
current_protocol: None,
app_transaction_version,
tombstone_retention_millis: 0,
log_retention_millis: 0,
enable_expired_log_cleanup: true,
};

let txn_action = Action::Txn(Txn {
6 changes: 3 additions & 3 deletions crates/deltalake-core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
@@ -86,7 +86,7 @@ impl DeltaTableState {
(Cow::Borrowed("data_change"), Arc::new(data_change)),
];

let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;

if !metadata.partition_columns.is_empty() {
let partition_cols_batch = self.partition_columns_as_batch(flatten)?;
@@ -145,7 +145,7 @@ impl DeltaTableState {
&self,
flatten: bool,
) -> Result<arrow::record_batch::RecordBatch, DeltaTableError> {
let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let column_mapping_mode = self.table_config().column_mapping_mode();
let partition_column_types: Vec<arrow::datatypes::DataType> = metadata
.partition_columns
@@ -413,7 +413,7 @@ impl DeltaTableState {
.map(|maybe_stat| maybe_stat.as_ref().map(|stat| stat.num_records))
.collect::<Vec<Option<i64>>>(),
);
let metadata = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?;
let schema = &metadata.schema;

#[derive(Debug)]

0 comments on commit 8a1fa91

Please sign in to comment.