Skip to content

Commit

Permalink
feat: get earliest version
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Sep 29, 2024
1 parent 72e344e commit 18cc561
Show file tree
Hide file tree
Showing 16 changed files with 100 additions and 27 deletions.
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
9 changes: 5 additions & 4 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
use std::sync::Arc;

use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
logstore::{
abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry,
CommitOrBytes, LogStore, LogStoreConfig,
},
operations::transaction::TransactionError,
storage::{ObjectStoreRef, StorageOptions},
DeltaResult,
Expand Down Expand Up @@ -103,6 +100,10 @@ impl LogStore for S3LogStore {
get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore {
}
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.20.1"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore {
super::get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
52 changes: 51 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Delta log store.
use std::cmp::min;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::OnceLock;
use std::{cmp::max, collections::HashMap, sync::Arc};

use bytes::Bytes;
use dashmap::DashMap;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;
Expand Down Expand Up @@ -213,6 +214,9 @@ pub trait LogStore: Sync + Send {
/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down Expand Up @@ -441,6 +445,52 @@ pub async fn get_latest_version(
Ok(version)
}

/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
current_version: i64,
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint so start from current_version
current_version
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

// list files to find min version
let version = async {
let mut min_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(version_start);
let object_store = log_store.object_store();

// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
let mut files = object_store
.list(prefix)
.try_filter(move |f| futures::future::ready(f.location < offset_path))
.boxed();

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
min_version = min(min_version, log_version);
}
}

if min_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

Ok::<i64, DeltaTableError>(min_version)
}
.await?;
Ok(version)
}

/// Read delta log for a specific version
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ impl DeltaTable {
self.log_store.get_latest_version(self.version()).await
}

/// returns the earliest available version of the table
pub async fn get_earliest_version(&self) -> Result<i64, DeltaTableError> {
self.log_store.get_earliest_version(self.version()).await
}

/// Currently loaded version of the table
pub fn version(&self) -> i64 {
self.state.as_ref().map(|s| s.version()).unwrap_or(-1)
Expand Down
14 changes: 7 additions & 7 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.20.1"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -16,12 +16,12 @@ rust-version.workspace = true
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-aws = { version = "0.3.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.3.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.4.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.4.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.4.0", path = "../catalog-glue", optional = true }
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.4.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true }

[features]
# All of these features are just reflected into the core crate until that
Expand Down
4 changes: 2 additions & 2 deletions crates/gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-gcp"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/hdfs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-hdfs"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
hdfs-native-object-store = "0.11"

# workspace dependecies
Expand Down
4 changes: 2 additions & 2 deletions crates/mount/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-mount"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core", features = [
deltalake-core = { version = "0.21.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "deltalake-test"
version = "0.3.0"
version = "0.4.0"
edition = "2021"
publish = false

[dependencies]
bytes = { workspace = true }
chrono = { workspace = true, default-features = false, features = ["clock"] }
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
dotenvy = "0"
fs_extra = "1.3.0"
futures = { version = "0.3" }
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RawDeltaTable:
def has_files(self) -> bool: ...
def get_add_file_sizes(self) -> Dict[str, int]: ...
def get_latest_version(self) -> int: ...
def get_earliest_version(self) -> int: ...
def get_num_index_cols(self) -> int: ...
def get_stats_columns(self) -> Optional[List[str]]: ...
def metadata(self) -> RawDeltaTableMetaData: ...
Expand Down
8 changes: 8 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ impl RawDeltaTable {
})
}

pub fn get_earliest_version(&mut self, py: Python) -> PyResult<i64> {
py.allow_threads(|| {
Ok(rt()
.block_on(self._table.get_earliest_version())
.map_err(PythonError::from)?)
})
}

pub fn get_num_index_cols(&mut self) -> PyResult<i32> {
Ok(self
._table
Expand Down

0 comments on commit 18cc561

Please sign in to comment.