From e4af46f0c320cc8c1f434cad9353340c596e777d Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 16 Dec 2024 23:19:57 +0100 Subject: [PATCH] fix: pop logs in log segment --- crates/core/src/kernel/snapshot/mod.rs | 42 +++++++- crates/core/src/operations/transaction/mod.rs | 20 ++-- crates/core/src/operations/vacuum.rs | 1 + crates/core/src/protocol/checkpoints.rs | 41 ++++---- python/deltalake/_internal.pyi | 16 +++- python/deltalake/writer.py | 49 ++++++---- python/src/lib.rs | 95 ++++++++++++++++--- 7 files changed, 204 insertions(+), 60 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index a85087ea9b..9f0fd084ee 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -15,7 +15,7 @@ //! //! -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{BufRead, BufReader, Cursor}; use std::sync::Arc; @@ -34,10 +34,12 @@ use super::{ Action, Add, AddCDCFile, CommitInfo, DataType, Metadata, Protocol, Remove, StructField, Transaction, }; +use crate::checkpoints::cleanup_expired_logs_for; use crate::kernel::parse::read_cdf_adds; use crate::kernel::{ActionType, StructType}; use crate::logstore::LogStore; use crate::operations::transaction::CommitData; +use crate::protocol::ProtocolError; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -567,6 +569,44 @@ impl EagerSnapshot { )) } + pub async fn clean_up_logs( + &mut self, + until_version: i64, + log_store: &dyn LogStore, + cutoff_timestamp: i64, + ) -> Result { + let mut deleted = + cleanup_expired_logs_for(until_version, log_store, cutoff_timestamp).await?; + let mut survived_files = VecDeque::new(); + + while !deleted.is_empty() { + if deleted.is_empty() { + break; + } + if self.snapshot.log_segment.commit_files.is_empty() { + break; + } + + match self.snapshot.log_segment.commit_files.pop_back() { + Some(end) => { + if let Ok(idx) = deleted.binary_search(&end.location) { + deleted.remove(idx); + } else { + survived_files.push_front(end.clone()); + } + } + None => (), + } + } + + self.snapshot + .log_segment + .commit_files + .append(&mut survived_files); + + Ok(deleted.len()) + } + /// Advance the snapshot based on the given commit actions pub fn advance<'a>( &mut self, diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 6d80d858b0..4e61adaa26 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -85,7 +85,7 @@ use serde_json::Value; use tracing::warn; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; -use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; +use crate::checkpoints::create_checkpoint_for; use crate::errors::DeltaTableError; use crate::kernel::{ Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction, @@ -664,7 +664,7 @@ impl PostCommit<'_> { } else { snapshot.advance(vec![&self.data])?; } - let state = DeltaTableState { snapshot }; + let mut state = DeltaTableState { snapshot }; // Execute each hook if self.create_checkpoint { self.create_checkpoint(&state, &self.log_store, self.version) @@ -677,13 +677,15 @@ impl PostCommit<'_> { }; if cleanup_logs { - cleanup_expired_logs_for( - self.version, - self.log_store.as_ref(), - Utc::now().timestamp_millis() - - state.table_config().log_retention_duration().as_millis() as i64, - ) - .await?; + state + .snapshot + .clean_up_logs( + self.version, + self.log_store.as_ref(), + Utc::now().timestamp_millis() + - state.table_config().log_retention_duration().as_millis() as i64, + ) + .await?; } Ok(state) } else { diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index 4452526258..0a36d33302 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -202,6 +202,7 @@ impl VacuumBuilder { self.log_store.object_store().clone(), ) .await?; + let valid_files = self.snapshot.file_paths_iter().collect::>(); let mut files_to_delete = vec![]; diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 0a9a7f036f..dd44a46c2b 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -10,6 +10,7 @@ use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc}; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use lazy_static::lazy_static; +use object_store::path::Path; use object_store::{Error, ObjectStore}; use parquet::arrow::ArrowWriter; use parquet::basic::Compression; @@ -27,7 +28,7 @@ use crate::kernel::{ use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder}; -use crate::{open_table_with_version, DeltaTable}; +use crate::{open_table_with_version, DeltaTable, DeltaTableError}; type SchemaPath = Vec; @@ -96,9 +97,11 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError> Ok(()) } -/// Delete expires log files before given version from table. The table log retention is based on -/// the `logRetentionDuration` property of the Delta Table, 30 days by default. -pub async fn cleanup_metadata(table: &DeltaTable) -> Result { +// / Delete expires log files before given version from table. The table log retention is based on +// / the `logRetentionDuration` property of the Delta Table, 30 days by default. +pub async fn cleanup_metadata( + table: &DeltaTable, +) -> Result<(Option, usize), ProtocolError> { let log_retention_timestamp = Utc::now().timestamp_millis() - table .snapshot() @@ -106,12 +109,16 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result = table.state.clone(); + let size = state + .as_mut() + .ok_or(ProtocolError::NoMetaData)? + .snapshot + .clean_up_logs(version, table.log_store.as_ref(), log_retention_timestamp) + .await?; + Ok((state, size)) } /// Loads table from given `table_uri` at given `version` and creates checkpoint for it. @@ -132,7 +139,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup( cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup()); if table.version() >= 0 && enable_expired_log_cleanup { - let deleted_log_num = cleanup_metadata(&table).await?; + let (_, deleted_log_num) = cleanup_metadata(&table).await?; debug!("Deleted {:?} log files.", deleted_log_num); } @@ -198,7 +205,7 @@ pub async fn cleanup_expired_logs_for( until_version: i64, log_store: &dyn LogStore, cutoff_timestamp: i64, -) -> Result { +) -> Result, ProtocolError> { lazy_static! { static ref DELTA_LOG_REGEX: Regex = Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap(); @@ -210,7 +217,7 @@ pub async fn cleanup_expired_logs_for( .await; if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint { - return Ok(0); + return Ok(vec![]); } let last_checkpoint = maybe_last_checkpoint?.bytes().await?; @@ -255,7 +262,7 @@ pub async fn cleanup_expired_logs_for( .await?; debug!("Deleted {} expired logs", deleted.len()); - Ok(deleted.len()) + Ok(deleted) } fn parquet_bytes_from_state( @@ -889,7 +896,8 @@ mod tests { log_retention_timestamp, ) .await - .unwrap(); + .unwrap() + .len(); assert_eq!(count, 0); println!("{:?}", count); @@ -917,7 +925,8 @@ mod tests { log_retention_timestamp, ) .await - .unwrap(); + .unwrap() + .len(); assert_eq!(count, 1); let log_store = table.log_store(); diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index f19c685118..b99f445874 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -222,6 +222,21 @@ class RawDeltaTable: ending_timestamp: Optional[str] = None, allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... + def write( + self, + data: pyarrow.RecordBatchReader, + partition_by: Optional[List[str]], + mode: str, + schema_mode: Optional[str], + predicate: Optional[str], + target_file_size: Optional[int], + name: Optional[str], + description: Optional[str], + configuration: Optional[Mapping[str, Optional[str]]], + writer_properties: Optional[WriterProperties], + commit_properties: Optional[CommitProperties], + post_commithook_properties: Optional[PostCommitHookProperties], + ) -> None: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... @@ -243,7 +258,6 @@ def write_to_deltalake( data: pyarrow.RecordBatchReader, partition_by: Optional[List[str]], mode: str, - table: Optional[RawDeltaTable], schema_mode: Optional[str], predicate: Optional[str], target_file_size: Optional[int], diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 535a6e7a13..f949f35aea 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -320,25 +320,38 @@ def write_deltalake( conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH, ) data = RecordBatchReader.from_batches(schema, (batch for batch in data)) - write_deltalake_rust( - table_uri=table_uri, - data=data, - partition_by=partition_by, - mode=mode, - table=table._table if table is not None else None, - schema_mode=schema_mode, - predicate=predicate, - target_file_size=target_file_size, - name=name, - description=description, - configuration=configuration, - storage_options=storage_options, - writer_properties=writer_properties, - commit_properties=commit_properties, - post_commithook_properties=post_commithook_properties, - ) if table: - table.update_incremental() + table._table.write( + data=data, + partition_by=partition_by, + mode=mode, + schema_mode=schema_mode, + predicate=predicate, + target_file_size=target_file_size, + name=name, + description=description, + configuration=configuration, + writer_properties=writer_properties, + commit_properties=commit_properties, + post_commithook_properties=post_commithook_properties, + ) + else: + write_deltalake_rust( + table_uri=table_uri, + data=data, + partition_by=partition_by, + mode=mode, + schema_mode=schema_mode, + predicate=predicate, + target_file_size=target_file_size, + name=name, + description=description, + configuration=configuration, + storage_options=storage_options, + writer_properties=writer_properties, + commit_properties=commit_properties, + post_commithook_properties=post_commithook_properties, + ) elif engine == "pyarrow": warnings.warn( "pyarrow engine is deprecated and will be removed in v1.0", diff --git a/python/src/lib.rs b/python/src/lib.rs index 0135864c7e..4772ad147f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -796,6 +796,77 @@ impl RawDeltaTable { }) } + #[allow(clippy::too_many_arguments)] + #[pyo3(signature = (data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] + fn write( + &mut self, + py: Python, + data: PyArrowType, + mode: String, + schema_mode: Option, + partition_by: Option>, + predicate: Option, + target_file_size: Option, + name: Option, + description: Option, + configuration: Option>>, + writer_properties: Option, + commit_properties: Option, + post_commithook_properties: Option, + ) -> PyResult<()> { + let table = py.allow_threads(|| { + let batches = data.0.map(|batch| batch.unwrap()).collect::>(); + let save_mode = mode.parse().map_err(PythonError::from)?; + + let table = DeltaOps(self._table.clone()); + + let mut builder = table.write(batches).with_save_mode(save_mode); + if let Some(schema_mode) = schema_mode { + builder = builder.with_schema_mode(schema_mode.parse().map_err(PythonError::from)?); + } + if let Some(partition_columns) = partition_by { + builder = builder.with_partition_columns(partition_columns); + } + + if let Some(writer_props) = writer_properties { + builder = builder.with_writer_properties( + set_writer_properties(writer_props).map_err(PythonError::from)?, + ); + } + + if let Some(name) = &name { + builder = builder.with_table_name(name); + }; + + if let Some(description) = &description { + builder = builder.with_description(description); + }; + + if let Some(predicate) = predicate { + builder = builder.with_replace_where(predicate); + }; + + if let Some(target_file_size) = target_file_size { + builder = builder.with_target_file_size(target_file_size) + }; + + if let Some(config) = configuration { + builder = builder.with_configuration(config); + }; + + if let Some(commit_properties) = + maybe_create_commit_properties(commit_properties, post_commithook_properties) + { + builder = builder.with_commit_properties(commit_properties); + }; + rt().block_on(builder.into_future()) + .map_err(PythonError::from) + })?; + + self._table.state = table.state; + Ok(()) + } + // Run the restore command on the Delta Table: restore table to a given version or datetime #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false, commit_properties=None))] pub fn restore( @@ -1127,14 +1198,12 @@ impl RawDeltaTable { Ok(()) } - pub fn cleanup_metadata(&self, py: Python) -> PyResult<()> { - py.allow_threads(|| { - Ok::<_, pyo3::PyErr>( - rt().block_on(cleanup_metadata(&self._table)) - .map_err(PythonError::from)?, - ) + pub fn cleanup_metadata(&mut self, py: Python) -> PyResult<()> { + let (state, _) = py.allow_threads(|| { + rt().block_on(cleanup_metadata(&self._table)) + .map_err(PythonError::from) })?; - + self._table.state = state; Ok(()) } @@ -1760,13 +1829,12 @@ pub struct PyCommitProperties { #[pyfunction] #[allow(clippy::too_many_arguments)] -#[pyo3(signature = (table_uri, data, mode, table=None, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] +#[pyo3(signature = (table_uri, data, mode, schema_mode=None, partition_by=None, predicate=None, target_file_size=None, name=None, description=None, configuration=None, storage_options=None, writer_properties=None, commit_properties=None, post_commithook_properties=None))] fn write_to_deltalake( py: Python, table_uri: String, data: PyArrowType, mode: String, - table: Option<&RawDeltaTable>, schema_mode: Option, partition_by: Option>, predicate: Option, @@ -1784,14 +1852,11 @@ fn write_to_deltalake( let save_mode = mode.parse().map_err(PythonError::from)?; let options = storage_options.clone().unwrap_or_default(); - let table = if let Some(table) = table { - DeltaOps(table._table.clone()) - } else { - rt().block_on(DeltaOps::try_from_uri_with_storage_options( + let table = rt() + .block_on(DeltaOps::try_from_uri_with_storage_options( &table_uri, options, )) - .map_err(PythonError::from)? - }; + .map_err(PythonError::from)?; let mut builder = table.write(batches).with_save_mode(save_mode); if let Some(schema_mode) = schema_mode {