From 737946effb1d2bac6bfe874a2586da7c0e098523 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 21 Aug 2024 21:22:24 +0200 Subject: [PATCH 1/2] fix: skip collecting files when set --- crates/core/src/kernel/snapshot/mod.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 8d5101f2df..44671de6d3 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -364,8 +364,13 @@ impl EagerSnapshot { .iter() .flat_map(get_visitor) .collect::>(); - let snapshot = Snapshot::try_new(table_root, store.clone(), config, version).await?; - let files = snapshot.files(store, &mut visitors)?.try_collect().await?; + let snapshot = + Snapshot::try_new(table_root, store.clone(), config.clone(), version).await?; + + let files = match config.require_files { + true => snapshot.files(store, &mut visitors)?.try_collect().await?, + false => vec![], + }; let mut sn = Self { snapshot, From 116f7855036209745b8b27031dccf64b74de86cf Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Wed, 21 Aug 2024 21:22:39 +0200 Subject: [PATCH 2/2] feat: raise if files not loaded --- crates/core/src/errors.rs | 3 +++ crates/core/src/kernel/snapshot/mod.rs | 10 ++++++++++ crates/core/src/operations/constraints.rs | 6 ++++++ crates/core/src/operations/delete.rs | 4 ++++ crates/core/src/operations/load.rs | 3 +++ crates/core/src/operations/merge/mod.rs | 4 ++++ crates/core/src/operations/optimize.rs | 3 +++ crates/core/src/operations/update.rs | 3 +++ crates/core/src/operations/vacuum.rs | 5 ++++- crates/core/src/operations/write.rs | 3 +++ crates/core/src/table/state.rs | 5 +++++ python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 5 ++++- python/src/lib.rs | 15 +++++++++++++++ 14 files changed, 68 insertions(+), 2 deletions(-) diff --git a/crates/core/src/errors.rs b/crates/core/src/errors.rs index 0b1d8755a3..609bc16656 100644 --- a/crates/core/src/errors.rs +++ b/crates/core/src/errors.rs @@ -221,6 +221,9 @@ pub enum DeltaTableError { #[error("Table has not yet been initialized")] NotInitialized, + #[error("Table has not yet been initialized with files, therefore {0} is not supported")] + NotInitializedWithFiles(String), + #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")] ChangeDataNotRecorded { version: i64, start: i64, end: i64 }, diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 44671de6d3..99a62500d8 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -193,6 +193,11 @@ impl Snapshot { &self.protocol } + /// Get the table config which is loaded with of the snapshot + pub fn load_config(&self) -> &DeltaTableConfig { + &self.config + } + /// Get the table root of the snapshot pub fn table_root(&self) -> Path { Path::from(self.table_url.clone()) @@ -535,6 +540,11 @@ impl EagerSnapshot { self.snapshot.table_root() } + /// Get the table config which is loaded with of the snapshot + pub fn load_config(&self) -> &DeltaTableConfig { + &self.snapshot.load_config() + } + /// Well known table configuration pub fn table_config(&self) -> TableConfig<'_> { self.snapshot.table_config() diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 13336a39f4..2acf57a03d 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -89,6 +89,12 @@ impl std::future::IntoFuture for ConstraintBuilder { let this = self; Box::pin(async move { + if !this.snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles( + "ADD CONSTRAINTS".into(), + )); + } + let name = match this.name { Some(v) => v, None => return Err(DeltaTableError::Generic("No name provided".to_string())), diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 0627e0b633..f62e8b8130 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -293,6 +293,10 @@ async fn execute( writer_properties: Option, mut commit_properties: CommitProperties, ) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { + if !&snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("DELETE".into())); + } + let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 4bf439cd0d..930b5d48ec 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -51,6 +51,9 @@ impl std::future::IntoFuture for LoadBuilder { Box::pin(async move { PROTOCOL.can_read_from(&this.snapshot.snapshot)?; + if !this.snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("reading".into())); + } let table = DeltaTable::new_with_state(this.log_store, this.snapshot); let schema = table.snapshot()?.arrow_schema()?; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index fbe255cdbc..86930e3a26 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -690,6 +690,10 @@ async fn execute( not_match_target_operations: Vec, not_match_source_operations: Vec, ) -> DeltaResult<(DeltaTableState, MergeMetrics)> { + if !snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into())); + } + let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); // Determining whether we should write change data once so that computation of change data can diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 64566a0fa3..e00fd6451e 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -296,6 +296,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { Box::pin(async move { PROTOCOL.can_write_to(&this.snapshot.snapshot)?; + if !&this.snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into())); + } let writer_properties = this.writer_properties.unwrap_or_else(|| { WriterProperties::builder() diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index bd4ec4ad1b..bb1262f5c0 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -238,6 +238,9 @@ async fn execute( // For files that were identified, scan for records that match the predicate, // perform update operations, and then commit add and remove actions to // the log. + if !&snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into())); + } let update_planner = DeltaPlanner:: { extension_planner: UpdateMetricExtensionPlanner {}, diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index 0e4bd2b467..4452526258 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -240,8 +240,11 @@ impl std::future::IntoFuture for VacuumBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; - Box::pin(async move { + if !&this.snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into())); + } + let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 48fc2df368..cce4ed0901 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -789,6 +789,9 @@ impl std::future::IntoFuture for WriteBuilder { if this.mode == SaveMode::Overwrite { if let Some(snapshot) = &this.snapshot { PROTOCOL.check_append_only(&snapshot.snapshot)?; + if !snapshot.load_config().require_files { + return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into())); + } } } if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite { diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 9544198581..0876dc9e79 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -181,6 +181,11 @@ impl DeltaTableState { self.snapshot.schema() } + /// Get the table config which is loaded with of the snapshot + pub fn load_config(&self) -> &DeltaTableConfig { + &self.snapshot.load_config() + } + /// Well known table configuration pub fn table_config(&self) -> TableConfig<'_> { self.snapshot.table_config() diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index b7fc21f484..ee10564b8b 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -45,6 +45,7 @@ class RawDeltaTable: ) -> bool: ... def table_uri(self) -> str: ... def version(self) -> int: ... + def has_files(self) -> bool: ... def get_add_file_sizes(self) -> Dict[str, int]: ... def get_latest_version(self) -> int: ... def get_num_index_cols(self) -> int: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 11a8baa0dd..fd1e2c1a6d 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -39,6 +39,7 @@ import os from deltalake._internal import ( + DeltaError, PyMergeBuilder, RawDeltaTable, ) @@ -1138,6 +1139,9 @@ def to_pyarrow_dataset( Returns: the PyArrow dataset in PyArrow """ + if not self._table.has_files(): + raise DeltaError("Table is instantiated without files.") + table_protocol = self.protocol() if ( table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION @@ -1158,7 +1162,6 @@ def to_pyarrow_dataset( raise DeltaProtocolError( f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader." ) - if not filesystem: filesystem = pa_fs.PyFileSystem( DeltaStorageHandler.from_table( diff --git a/python/src/lib.rs b/python/src/lib.rs index fc1c18c880..1b4194a906 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -50,6 +50,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::storage::IORuntime; use deltalake::DeltaTableBuilder; use deltalake::{DeltaOps, DeltaResult}; +use error::DeltaError; use futures::future::join_all; use pyo3::exceptions::{PyRuntimeError, PyValueError}; @@ -166,6 +167,10 @@ impl RawDeltaTable { Ok(self._table.version()) } + pub fn has_files(&self) -> PyResult { + Ok(self._table.config.require_files) + } + pub fn metadata(&self) -> PyResult { let metadata = self._table.metadata().map_err(PythonError::from)?; Ok(RawDeltaTableMetaData { @@ -273,6 +278,9 @@ impl RawDeltaTable { py: Python, partition_filters: Option>, ) -> PyResult> { + if !self.has_files()? { + return Err(DeltaError::new_err("Table is instantiated without files.")); + } py.allow_threads(|| { if let Some(filters) = partition_filters { let filters = convert_partition_filters(filters).map_err(PythonError::from)?; @@ -298,6 +306,10 @@ impl RawDeltaTable { &self, partition_filters: Option>, ) -> PyResult> { + if !self._table.config.require_files { + return Err(DeltaError::new_err("Table is initiated without files.")); + } + if let Some(filters) = partition_filters { let filters = convert_partition_filters(filters).map_err(PythonError::from)?; Ok(self @@ -1073,6 +1085,9 @@ impl RawDeltaTable { } pub fn get_add_actions(&self, flatten: bool) -> PyResult> { + if !self.has_files()? { + return Err(DeltaError::new_err("Table is instantiated without files.")); + } Ok(PyArrowType( self._table .snapshot()