From b63aa2a3e5a4f7f4dfb95cd1f5dbb0e3888134b1 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 19 Aug 2024 21:08:55 +0200 Subject: [PATCH] feat: get earliest version --- crates/aws/Cargo.toml | 4 +- crates/aws/src/logstore/default_logstore.rs | 9 ++-- crates/aws/src/logstore/dynamodb_logstore.rs | 4 ++ crates/azure/Cargo.toml | 4 +- crates/catalog-glue/Cargo.toml | 4 +- crates/core/Cargo.toml | 2 +- crates/core/src/logstore/default_logstore.rs | 4 ++ crates/core/src/logstore/mod.rs | 52 +++++++++++++++++++- crates/core/src/table/mod.rs | 5 ++ crates/deltalake/Cargo.toml | 14 +++--- crates/gcp/Cargo.toml | 4 +- crates/hdfs/Cargo.toml | 4 +- crates/mount/Cargo.toml | 4 +- crates/test/Cargo.toml | 4 +- python/deltalake/_internal.pyi | 1 + python/src/lib.rs | 8 +++ 16 files changed, 100 insertions(+), 27 deletions(-) diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 992a32c93e..e653d2b242 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.3.0" +version = "0.4.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } aws-smithy-runtime-api = { version="1.7" } aws-smithy-runtime = { version="1.7", optional = true} aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} diff --git a/crates/aws/src/logstore/default_logstore.rs b/crates/aws/src/logstore/default_logstore.rs index a5688141c2..f343b88f5a 100644 --- a/crates/aws/src/logstore/default_logstore.rs +++ b/crates/aws/src/logstore/default_logstore.rs @@ -3,11 +3,8 @@ use std::sync::Arc; use bytes::Bytes; +use deltalake_core::logstore::*; use deltalake_core::{ - logstore::{ - abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry, - CommitOrBytes, LogStore, LogStoreConfig, - }, operations::transaction::TransactionError, storage::{ObjectStoreRef, StorageOptions}, DeltaResult, @@ -103,6 +100,10 @@ impl LogStore for S3LogStore { get_latest_version(self, current_version).await } + async fn get_earliest_version(&self, current_version: i64) -> DeltaResult { + get_earliest_version(self, current_version).await + } + fn object_store(&self) -> Arc { self.storage.clone() } diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index 202df1709e..fccb8c9060 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore { } } + async fn get_earliest_version(&self, current_version: i64) -> DeltaResult { + get_earliest_version(self, current_version).await + } + fn object_store(&self) -> ObjectStoreRef { self.storage.clone() } diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index 87a744d608..c9976fae9e 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-azure" -version = "0.3.0" +version = "0.4.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/catalog-glue/Cargo.toml b/crates/catalog-glue/Cargo.toml index 549b3a11c8..c80ec9ce0b 100644 --- a/crates/catalog-glue/Cargo.toml +++ b/crates/catalog-glue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-glue" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -15,7 +15,7 @@ rust-version.workspace = true async-trait = { workspace = true } aws-config = "1" aws-sdk-glue = "1" -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 6329ce9316..d150b5b0d6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.20.1" +version = "0.21.0" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/core/src/logstore/default_logstore.rs b/crates/core/src/logstore/default_logstore.rs index 79a1c76653..c31280ef5d 100644 --- a/crates/core/src/logstore/default_logstore.rs +++ b/crates/core/src/logstore/default_logstore.rs @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore { super::get_latest_version(self, current_version).await } + async fn get_earliest_version(&self, current_version: i64) -> DeltaResult { + super::get_earliest_version(self, current_version).await + } + fn object_store(&self) -> Arc { self.storage.clone() } diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index dd82274157..a81811faeb 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -1,11 +1,12 @@ //! Delta log store. +use std::cmp::min; use std::io::{BufRead, BufReader, Cursor}; use std::sync::OnceLock; use std::{cmp::max, collections::HashMap, sync::Arc}; use bytes::Bytes; use dashmap::DashMap; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use lazy_static::lazy_static; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use regex::Regex; @@ -213,6 +214,9 @@ pub trait LogStore: Sync + Send { /// Find latest version currently stored in the delta log. async fn get_latest_version(&self, start_version: i64) -> DeltaResult; + /// Find earliest version currently stored in the delta log. + async fn get_earliest_version(&self, start_version: i64) -> DeltaResult; + /// Get underlying object store. fn object_store(&self) -> Arc; @@ -441,6 +445,52 @@ pub async fn get_latest_version( Ok(version) } +/// Default implementation for retrieving the earliest version +pub async fn get_earliest_version( + log_store: &dyn LogStore, + current_version: i64, +) -> DeltaResult { + let version_start = match get_last_checkpoint(log_store).await { + Ok(last_check_point) => last_check_point.version, + Err(ProtocolError::CheckpointNotFound) => { + // no checkpoint so start from current_version + current_version + } + Err(e) => { + return Err(DeltaTableError::from(e)); + } + }; + + // list files to find min version + let version = async { + let mut min_version: i64 = version_start; + let prefix = Some(log_store.log_path()); + let offset_path = commit_uri_from_version(version_start); + let object_store = log_store.object_store(); + + // Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274 + let mut files = object_store + .list(prefix) + .try_filter(move |f| futures::future::ready(f.location < offset_path)) + .boxed(); + + while let Some(obj_meta) = files.next().await { + let obj_meta = obj_meta?; + if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) { + min_version = min(min_version, log_version); + } + } + + if min_version < 0 { + return Err(DeltaTableError::not_a_table(log_store.root_uri())); + } + + Ok::(min_version) + } + .await?; + Ok(version) +} + /// Read delta log for a specific version pub async fn read_commit_entry( storage: &dyn ObjectStore, diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 65d84985c7..9b73745a20 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -311,6 +311,11 @@ impl DeltaTable { self.log_store.get_latest_version(self.version()).await } + /// returns the earliest available version of the table + pub async fn get_earliest_version(&self) -> Result { + self.log_store.get_earliest_version(self.version()).await + } + /// Currently loaded version of the table pub fn version(&self) -> i64 { self.state.as_ref().map(|s| s.version()).unwrap_or(-1) diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 373245bbe8..1477d90c29 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.20.1" +version = "0.21.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,12 +16,12 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.20.0", path = "../core" } -deltalake-aws = { version = "0.3.0", path = "../aws", default-features = false, optional = true } -deltalake-azure = { version = "0.3.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.4.0", path = "../gcp", optional = true } -deltalake-hdfs = { version = "0.4.0", path = "../hdfs", optional = true } -deltalake-catalog-glue = { version = "0.4.0", path = "../catalog-glue", optional = true } +deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true } +deltalake-azure = { version = "0.4.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true } +deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true } +deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true } [features] # All of these features are just reflected into the core crate until that diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index 380aa84852..51020fb630 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index f601f55c6d..2d654d0bc1 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-hdfs" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } hdfs-native-object-store = "0.11" # workspace dependecies diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index a770200b98..97372895ab 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-mount" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.20.0", path = "../core", features = [ +deltalake-core = { version = "0.21.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 3638e6fefa..9087755fb1 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "deltalake-test" -version = "0.3.0" +version = "0.4.0" edition = "2021" publish = false [dependencies] bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.20.0", path = "../core" } +deltalake-core = { version = "0.21.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 02a3765e02..8329dddad9 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -77,6 +77,7 @@ class RawDeltaTable: def has_files(self) -> bool: ... def get_add_file_sizes(self) -> Dict[str, int]: ... def get_latest_version(self) -> int: ... + def get_earliest_version(self) -> int: ... def get_num_index_cols(self) -> int: ... def get_stats_columns(self) -> Optional[List[str]]: ... def metadata(self) -> RawDeltaTableMetaData: ... diff --git a/python/src/lib.rs b/python/src/lib.rs index 77db334283..473f5ceea9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -245,6 +245,14 @@ impl RawDeltaTable { }) } + pub fn get_earliest_version(&mut self, py: Python) -> PyResult { + py.allow_threads(|| { + Ok(rt() + .block_on(self._table.get_earliest_version()) + .map_err(PythonError::from)?) + }) + } + pub fn get_num_index_cols(&mut self) -> PyResult { Ok(self ._table