Skip to content

Commit

Permalink
feat: get earliest version
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Sep 9, 2024
1 parent 3127c81 commit 776a4e1
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 1 deletion.
4 changes: 4 additions & 0 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore {
}
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore {
super::get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
52 changes: 51 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Delta log store.
use std::cmp::min;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::OnceLock;
use std::{cmp::max, collections::HashMap, sync::Arc};

use bytes::Bytes;
use dashmap::DashMap;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;
Expand Down Expand Up @@ -212,6 +213,9 @@ pub trait LogStore: Sync + Send {
/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down Expand Up @@ -440,6 +444,52 @@ pub async fn get_latest_version(
Ok(version)
}

/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
current_version: i64,
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint so start from current_version
current_version
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

// list files to find min version
let version = async {
let mut min_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(version_start);
let object_store = log_store.object_store();

// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
let mut files = object_store
.list(prefix)
.try_filter(move |f| futures::future::ready(f.location < offset_path))
.boxed();

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
min_version = min(min_version, log_version);
}
}

if min_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

Ok::<i64, DeltaTableError>(min_version)
}
.await?;
Ok(version)
}

/// Read delta log for a specific version
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ impl DeltaTable {
self.log_store.get_latest_version(self.version()).await
}

/// returns the earliest available version of the table
pub async fn get_earliest_version(&self) -> Result<i64, DeltaTableError> {
self.log_store.get_earliest_version(self.version()).await
}

/// Currently loaded version of the table
pub fn version(&self) -> i64 {
self.state.as_ref().map(|s| s.version()).unwrap_or(-1)
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RawDeltaTable:
def version(self) -> int: ...
def get_add_file_sizes(self) -> Dict[str, int]: ...
def get_latest_version(self) -> int: ...
def get_earliest_version(self) -> int: ...
def get_num_index_cols(self) -> int: ...
def get_stats_columns(self) -> Optional[List[str]]: ...
def metadata(self) -> RawDeltaTableMetaData: ...
Expand Down
8 changes: 8 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ impl RawDeltaTable {
})
}

pub fn get_earliest_version(&mut self, py: Python) -> PyResult<i64> {
py.allow_threads(|| {
Ok(rt()
.block_on(self._table.get_earliest_version())
.map_err(PythonError::from)?)
})
}

pub fn get_num_index_cols(&mut self) -> PyResult<i32> {
Ok(self
._table
Expand Down

0 comments on commit 776a4e1

Please sign in to comment.