Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: default logstore implementation #1742

Merged
merged 10 commits into from
Nov 9, 2023

Conversation

dispanser
Copy link
Contributor

@dispanser dispanser commented Oct 19, 2023

Description

Introduce a LogStore abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of LogStore.

Rationale

The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original delta library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using delta-rs For an overview of how it's done in delta, please see:

  1. Delta blog post (high-level concept)
  2. Associated Databricks design doc (detailed read)
  3. S3DynamoDbLogStore.java(content warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after LogStore.java. Currently in delta-rs, read path for commits is implemented directly in DeltaTable, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb.

@github-actions github-actions bot added binding/python Issues for the Python package binding/rust Issues for the Rust crate rust labels Oct 19, 2023
@github-actions
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

@dispanser
Copy link
Contributor Author

@rtyler @wjones127

I'm not yet happy with how it turned out - opening as draft PR to allow others to weigh in on the approach.

In particular, the abstraction / separation between DeltaTable, LogStore,
and DeltaObjectStore doesn't feel sound yet. I think that the LogStore
could potentially be the only place that ever reads from and writes to
_delta_log, but I'm not sure how this would interact with other planned
changes (delta kernel, ...), and it would also delay the short-term goal of
aligning the multi-cluster write approaches with the JVM implementation.

Due to the fact that most operations (see DeltaOps) do not operate on a
DeltaTable directly, but still require commit semantics, the changes spread
over many different places without much benefit.

I'm thinking that maybe the functionality of DeltaObjectStore, which is a
relatively thin wrapper around the actual object, store could be folded into
the LogStore implementation directly, which would spare us all this
indirection and .object_store() calls everywhere.

Happy to hear your thoughts!

@roeap
Copy link
Collaborator

roeap commented Oct 19, 2023

I'm thinking that maybe the functionality of DeltaObjectStore, which is a
relatively thin wrapper around the actual object, store could be folded into
the LogStore implementation directly, which would spare us all this
indirection and .object_store() calls everywhere.

Agreed, one of the things we were discussing and are moving (admittedly somewhat slowly) towards is to get rid of all the specific object store related implementations we have in this crate. to this end we recently removed the custom local storage.

There are a few operations which i believe require additional APIs - like VACUUM or FSCK - an these may yet need access to the underlying object store. Also for the Datafusion integration we need to register a store that exposes object store apis directly. All in all though being able to properly interop with spark / databricks would be great!

These are just some initial thoughts an will give this a more thorough review soon!

Thanks for this great work!

@rtyler rtyler changed the title Default logstore implementation feature: default logstore implementation Oct 19, 2023
@dispanser dispanser force-pushed the default-logstore-implementation branch from db65ba9 to 2bd1ef8 Compare October 20, 2023 04:14
@dispanser dispanser changed the title feature: default logstore implementation feat: default logstore implementation Oct 20, 2023
@dispanser dispanser force-pushed the default-logstore-implementation branch 4 times, most recently from 6a0d7b1 to b0d0111 Compare October 20, 2023 19:40
@dispanser dispanser force-pushed the default-logstore-implementation branch 2 times, most recently from 15cc211 to 7f34f66 Compare October 27, 2023 17:51
@dispanser
Copy link
Contributor Author

@roeap : I would appreciate your opinion on how to move this forward.

One option I mentioned above is to merge all DeltaObjectStore functionality right into LogStore, , so LogStore would effectively replace DeltaObjectStores' role.
This could include impl ObjectStore for LogStore as it's currently done for the DeltaObjectStore, even though I feel these two things sit at different levels of abstraction:

  • ObjectStore: do stuff with bytes (low-level primitives to read / write / list)
  • LogStore: do stuff with the delta log: actions, versions, time travel, commit, ...

It's difficult for me to gauge how this impacts other lines of work currently ongoing, like crate splitting, ongoing kernel work, so I'd rather resolve this quickly before too many additional conflicts pile up.

I'm not 100% sure on the datafusion integration parts you mention - I had a quick look at register_store(...) in the datafusion module - couldn't we just pass in the actual object store, instead of DeltaObjectStore?

@roeap
Copy link
Collaborator

roeap commented Nov 5, 2023

@dispanser - sorry for the delay.

One option I mentioned above is to merge all DeltaObjectStore functionality right into LogStore, , so LogStore would effectively replace DeltaObjectStores' role.

I believe this is the way to go. IIRC we have very little additional functionality added - i.e. just generating a unique URL for the specific object-store we use, as well as a simple check if a location is a delta table (_delta_log exists).

The main question I think is how we handle the Datafusion integration. The reason we need to create a unique store for datafusion is, that we always assume that the object stores root points at the root location for the delta table. This behaviour actually blocks us to have full delta protocol support, which allows relative paths as well as urls.

Within the kernel migrations we are also looking to migrate to URls in all external APIs instead of objects stores Path (which si always relative to the respective stores root.

I guess eventually we want to internally use somehting like datafusions registry to manage multiple stores if needed.

I'm not 100% sure on the datafusion integration parts you mention - I had a quick look at register_store(...) in the datafusion module - couldn't we just pass in the actual object store, instead of DeltaObjectStore?

Given the above, the answer is yes, we just need to register it for our custom unique url for now to handle the Paths we provide.

@dispanser dispanser force-pushed the default-logstore-implementation branch from ccf67b3 to 112f7ea Compare November 6, 2023 10:49
@dispanser dispanser force-pushed the default-logstore-implementation branch from 112f7ea to 394ae2b Compare November 6, 2023 10:52
@github-actions github-actions bot removed binding/rust Issues for the Rust crate rust labels Nov 6, 2023
Introduce a `LogStore` abstraction to channel all log store reads and writes through a single place. This is supposed to allow implementations with more sophisticated locking mechanisms that do not rely on atomic rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations and commits on the delta commit log to be funneled through the respective methods of `LogStore`.

The goal is to align the implementation of multi-cluster writes for Delta Lake on S3 with the one provided by the original `delta` library, enabling multi-cluster writes with some writers using Spark / Delta library and other writers using `delta-rs`  For an overview of how it's done in delta, please see:
1. Delta [blog post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/) (high-level concept)
2. Associated Databricks [design doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h) (detailed read)
3. [S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished commits from writers - as a result, reading and writing is combined in a single interface, which in this PR is modeled after [LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java). Currently in `delta-rs`, read path for commits is implemented directly in `DeltaTable`, and there's no mechanism to implement storage-specific behavior like interacting with DynamoDb.
@dispanser dispanser force-pushed the default-logstore-implementation branch from 394ae2b to 37706cc Compare November 6, 2023 14:29
@dispanser dispanser force-pushed the default-logstore-implementation branch from 69f9f3d to 711c031 Compare November 7, 2023 07:46
@dispanser dispanser marked this pull request as ready for review November 7, 2023 16:44
@dispanser
Copy link
Contributor Author

@roeap , I managed to remove all of DeltaObjectStore, and things still seem to work (according to our test suite, at least). I believe that this is now in a state that a review is useful.

FWIW I opted for not doing impl ObjectStore for LogStore, but instead pass down the inner ObjectStore for raw storage and datafusion operations. This is not set in stone, but I found it more sensible to have the distinction between low-level abstractions (ObjectStore) and higher-level concepts, which LogStore is aiming to become :).

@roeap
Copy link
Collaborator

roeap commented Nov 7, 2023

@dispanser - awesome work, excited to get this merged, will get reviewing right away.

I opted for not doing impl ObjectStore for LogStore, but instead pass down the inner ObjectStore for raw storage and datafusion operations.

I believe that was the right thing to do. The actual solution to this is also just to move to URLs rather then object store paths in our external handling, including datafusion. once that is done, we have no more need for a dedicated store and can also get righ of the extra stuff on logstore.

As it so happens I have a PR in preparation that at least should get us a little bit in that direction 😄.

roeap
roeap previously approved these changes Nov 7, 2023
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great, left some minor nit comments, but overall I think this is ready to go.

One more merge with main and if you want address some comments, then we can merge.

Thanks for sticking with it, and really think this is a great improvement to the library.

crates/deltalake-core/src/operations/mod.rs Outdated Show resolved Hide resolved
crates/deltalake-core/src/logstore/default_logstore.rs Outdated Show resolved Hide resolved
crates/deltalake-core/src/logstore/mod.rs Outdated Show resolved Hide resolved
crates/deltalake-core/src/logstore/mod.rs Outdated Show resolved Hide resolved
crates/deltalake-core/src/logstore/mod.rs Outdated Show resolved Hide resolved
@roeap
Copy link
Collaborator

roeap commented Nov 8, 2023

@dispanser - the latest build failures are related to a new release of dynamo-db lock, so we have to get the fix onto main first.

Use `ObjectStore::delete_stream()` instead, which can utilize
batch-delete operations from underlying cloud store APIs and is
generally smarter (e.g., by emitting concurrent requests)
@dispanser
Copy link
Contributor Author

@dispanser - the latest build failures are related to a new release of dynamo-db lock, so we have to get the fix onto main first.

I'll have a look into these, thanks for pointing them out.

The new version of this crate properly sets a lease duration such that the locks
can actually expire
@rtyler rtyler added enhancement New feature or request binding/rust Issues for the Rust crate labels Nov 8, 2023
@github-actions github-actions bot removed the binding/rust Issues for the Rust crate label Nov 8, 2023
@dispanser dispanser requested a review from roeap November 8, 2023 19:24
@roeap roeap added the binding/rust Issues for the Rust crate label Nov 9, 2023
@roeap roeap merged commit 9470678 into delta-io:main Nov 9, 2023
24 checks passed
Jan-Schweizer pushed a commit to Jan-Schweizer/delta-rs that referenced this pull request Nov 9, 2023
# Description

Introduce a `LogStore` abstraction to channel all log store reads and
writes through a single place. This is supposed to allow implementations
with more sophisticated locking mechanisms that do not rely on atomic
rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations
and commits on the delta commit log to be funneled through the
respective methods of `LogStore`.

## Rationale

The goal is to align the implementation of multi-cluster writes for
Delta Lake on S3 with the one provided by the original `delta` library,
enabling multi-cluster writes with some writers using Spark / Delta
library and other writers using `delta-rs` For an overview of how it's
done in delta, please see:
1. Delta [blog
post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/)
(high-level concept)
2. Associated Databricks [design
doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h)
(detailed read)
3.
[S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content
warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished
commits from writers - as a result, reading and writing is combined in a
single interface, which in this PR is modeled after
[LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java).
Currently in `delta-rs`, read path for commits is implemented directly
in `DeltaTable`, and there's no mechanism to implement storage-specific
behavior like interacting with DynamoDb.

---------

Co-authored-by: Robert Pack <[email protected]>
ryanaston pushed a commit to segmentio/delta-rs that referenced this pull request Nov 9, 2023
Introduce a `LogStore` abstraction to channel all log store reads and
writes through a single place. This is supposed to allow implementations
with more sophisticated locking mechanisms that do not rely on atomic
rename semantics for the underlying object store.

This does not change any functionality - it reorganizes read operations
and commits on the delta commit log to be funneled through the
respective methods of `LogStore`.

The goal is to align the implementation of multi-cluster writes for
Delta Lake on S3 with the one provided by the original `delta` library,
enabling multi-cluster writes with some writers using Spark / Delta
library and other writers using `delta-rs` For an overview of how it's
done in delta, please see:
1. Delta [blog
post](https://delta.io/blog/2022-05-18-multi-cluster-writes-to-delta-lake-storage-in-s3/)
(high-level concept)
2. Associated Databricks [design
doc](https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h)
(detailed read)
3.
[S3DynamoDbLogStore.java](https://github.com/delta-io/delta/blob/master/storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java)(content
warning: Java code behind this link)

This approach requires readers of a delta table to "recover" unfinished
commits from writers - as a result, reading and writing is combined in a
single interface, which in this PR is modeled after
[LogStore.java](https://github.com/delta-io/delta/blob/master/storage/src/main/java/io/delta/storage/LogStore.java).
Currently in `delta-rs`, read path for commits is implemented directly
in `DeltaTable`, and there's no mechanism to implement storage-specific
behavior like interacting with DynamoDb.

---------

Co-authored-by: Robert Pack <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants