Skip to content

Commit

Permalink
chore: revert reading_object_store
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 9, 2025
1 parent 29ba0a0 commit c602b1d
Show file tree
Hide file tree
Showing 18 changed files with 29 additions and 57 deletions.
4 changes: 0 additions & 4 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ impl LogStore for S3LogStore {
self.storage.clone()
}

fn reading_object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store(None)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
4 changes: 0 additions & 4 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,6 @@ impl LogStore for S3DynamoDbLogStore {
self.storage.clone()
}

fn reading_object_store(&self) -> ObjectStoreRef {
self.object_store(None)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl DeltaTableState {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store.reading_object_store());
env.register_object_store(url, store.object_store(None));
}

/// The logical schema for a Deltatable is different from the protocol level schema since partition
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl LogSegment {
log_store.refresh().await?;
let log_url = table_root.child("_delta_log");
let (mut commit_files, checkpoint_files) = list_log_files(
log_store.reading_object_store().as_ref(),
log_store.object_store(None).as_ref(),
&log_url,
end_version,
Some(start_version),
Expand Down Expand Up @@ -825,12 +825,12 @@ pub(super) mod tests {
let batches = LogSegment::try_new(
&Path::default(),
Some(commit.version),
log_store.reading_object_store().as_ref(),
log_store.object_store(None).as_ref(),
)
.await
.unwrap()
.checkpoint_stream(
log_store.reading_object_store(),
log_store.object_store(None),
&StructType::new(vec![
ActionType::Metadata.schema_field().clone(),
ActionType::Protocol.schema_field().clone(),
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl Snapshot {
}

let (protocol, metadata) = log_segment
.read_metadata(log_store.reading_object_store().clone(), &self.config)
.read_metadata(log_store.object_store(None).clone(), &self.config)
.await?;
if let Some(protocol) = protocol {
self.protocol = protocol;
Expand Down Expand Up @@ -454,7 +454,7 @@ impl EagerSnapshot {
StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
new_slice
.checkpoint_stream(
log_store.reading_object_store(),
log_store.object_store(None),
&read_schema,
&self.snapshot.config,
)
Expand All @@ -464,7 +464,7 @@ impl EagerSnapshot {
schema_actions.insert(ActionType::Remove);
let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone()));
let log_stream = new_slice.commit_stream(
log_store.reading_object_store().clone(),
log_store.object_store(None).clone(),
&read_schema,
&self.snapshot.config,
)?;
Expand Down
4 changes: 0 additions & 4 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ impl LogStore for DefaultLogStore {
self.storage.clone()
}

fn reading_object_store(&self) -> Arc<dyn ObjectStore> {
self.object_store(None)
}

fn config(&self) -> &LogStoreConfig {
&self.config
}
Expand Down
11 changes: 4 additions & 7 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,9 @@ pub trait LogStore: Send + Sync + AsAny {
Ok(PeekCommit::New(next_version, actions.unwrap()))
}

/// Get object store for writing operations.
/// Get object store, can pass operation_id for object stores linked to an operation
fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore>;

/// Get object store for reading operations.
fn reading_object_store(&self) -> Arc<dyn ObjectStore>;

/// [Path] to Delta log
fn to_uri(&self, location: &Path) -> String {
let root = &self.config().location;
Expand All @@ -286,7 +283,7 @@ pub trait LogStore: Send + Sync + AsAny {
/// Check if the location is a delta table location
async fn is_delta_table_location(&self) -> DeltaResult<bool> {
// TODO We should really be using HEAD here, but this fails in windows tests
let object_store = self.reading_object_store();
let object_store = self.object_store(None);
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Expand Down Expand Up @@ -470,7 +467,7 @@ pub async fn get_latest_version(
let mut max_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);

while let Some(obj_meta) = files.next().await {
Expand Down Expand Up @@ -515,7 +512,7 @@ pub async fn get_earliest_version(
let mut min_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(version_start);
let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);

// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
let mut files = object_store
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl ConvertToDeltaBuilder {
);

// Get all the parquet files in the location
let object_store = self.get_log_store().reading_object_store();
let object_store = self.get_log_store().object_store(None);
let mut files = Vec::new();
object_store
.list(None)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl FileSystemCheckBuilder {
}
}

let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);
let mut files = object_store.list(None);
while let Some(result) = files.next().await {
let file = result?;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ async fn execute(
.collect();

if !ignore_missing_files {
check_files_available(log_store.reading_object_store().as_ref(), &files_to_add).await?;
check_files_available(log_store.object_store(None).as_ref(), &files_to_add).await?;
}

let metrics = RestoreMetrics {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ impl PostCommit<'_> {
} else {
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.reading_object_store(),
self.log_store.object_store(None),
Default::default(),
Some(self.version),
)
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ impl VacuumBuilder {
&self.snapshot,
retention_period,
now_millis,
self.log_store.reading_object_store().clone(),
self.log_store.object_store(None).clone(),
)
.await?;
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();

let mut files_to_delete = vec![];
let mut file_sizes = vec![];
let object_store = self.log_store.reading_object_store();
let object_store = self.log_store.object_store(None);
let mut all_files = object_store.list(None);
let partition_columns = &self.snapshot.metadata().partition_columns;

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ pub async fn create_checkpoint_for(

debug!("Writing parquet bytes to checkpoint buffer.");
let tombstones = state
.unexpired_tombstones(log_store.reading_object_store().clone())
.unexpired_tombstones(log_store.object_store(None).clone())
.await
.map_err(|_| ProtocolError::Generic("filed to get tombstones".into()))?
.collect::<Vec<_>>();
Expand Down Expand Up @@ -216,7 +216,7 @@ pub async fn cleanup_expired_logs_for(
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap();
}

let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);
let maybe_last_checkpoint = object_store
.get(&log_store.log_path().child("_last_checkpoint"))
.await;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ pub(crate) async fn get_last_checkpoint(
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match log_store
.reading_object_store()
.object_store(None)
.get(&last_checkpoint_path)
.await
{
Expand All @@ -637,7 +637,7 @@ pub(crate) async fn find_latest_check_point_for_version(
}

let mut cp: Option<CheckPoint> = None;
let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);
let mut stream = object_store.list(Some(log_store.log_path()));

while let Some(obj_meta) = stream.next().await {
Expand Down
13 changes: 4 additions & 9 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,6 @@ impl DeltaTable {
self.log_store.object_store(None)
}

/// get a shared reference of the reading delta object store
pub fn reading_object_store(&self) -> ObjectStoreRef {
self.log_store.reading_object_store()
}

/// Check if the [`DeltaTable`] exists
pub async fn verify_deltatable_existence(&self) -> DeltaResult<bool> {
self.log_store.is_delta_table_location().await
Expand Down Expand Up @@ -351,7 +346,7 @@ impl DeltaTable {
_ => {
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.reading_object_store(),
self.log_store.object_store(None),
self.config.clone(),
max_version,
)
Expand Down Expand Up @@ -381,7 +376,7 @@ impl DeltaTable {
Some(ts) => Ok(ts),
None => {
let meta = self
.reading_object_store()
.object_store()
.head(&commit_uri_from_version(version))
.await?;
let ts = meta.last_modified.timestamp_millis();
Expand All @@ -399,7 +394,7 @@ impl DeltaTable {
.snapshot()?
.snapshot
.snapshot()
.commit_infos(self.reading_object_store(), limit)
.commit_infos(self.object_store(), limit)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down Expand Up @@ -520,7 +515,7 @@ impl DeltaTable {
let log_store = self.log_store();
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(min_version);
let object_store = log_store.reading_object_store();
let object_store = log_store.object_store(None);
let mut files = object_store.list_with_offset(prefix, &offset_path);

while let Some(obj_meta) = files.next().await {
Expand Down
6 changes: 1 addition & 5 deletions crates/lakefs/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,13 @@ impl LogStore for LakeFSLogStore {
get_earliest_version(self, current_version).await
}

fn reading_object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.get_store(&self.config.location).unwrap()
}

fn object_store(&self, operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
match operation_id {
Some(id) => {
let (_, store) = self.get_transaction_objectstore(id).unwrap_or_else(|_| panic!("The object_store registry inside LakeFSLogstore didn't have a store for operation_id {} Something went wrong.", id));
store
}
_ => self.reading_object_store(),
_ => self.storage.get_store(&self.config.location).unwrap(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl DeltaFileSystemHandler {
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.reading_object_store();
.object_store(None);

Ok(Self {
inner: storage,
Expand All @@ -71,7 +71,7 @@ impl DeltaFileSystemHandler {
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table.reading_object_store()?;
let storage = table.object_store()?;
Ok(Self {
inner: storage,
config: FsConfig {
Expand Down
6 changes: 1 addition & 5 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ impl RawDeltaTable {
self.with_table(|t| Ok(t.object_store().clone()))
}

fn reading_object_store(&self) -> PyResult<ObjectStoreRef> {
self.with_table(|t| Ok(t.reading_object_store().clone()))
}

fn cloned_state(&self) -> PyResult<DeltaTableState> {
self.with_table(|t| {
t.snapshot()
Expand Down Expand Up @@ -1284,7 +1280,7 @@ impl RawDeltaTable {

pub fn get_py_storage_backend(&self) -> PyResult<filesystem::DeltaFileSystemHandler> {
Ok(filesystem::DeltaFileSystemHandler {
inner: self.reading_object_store()?,
inner: self.object_store()?,
config: self._config.clone(),
known_sizes: None,
})
Expand Down

0 comments on commit c602b1d

Please sign in to comment.