Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement S3 log store with transactions backed by DynamoDb #1904

Merged
merged 6 commits into from
Dec 11, 2023

Conversation

dispanser
Copy link
Contributor

Description

This log store implementation provides an alternative approach to allow for multi-cluster writes for use with S3 and DynamoDb. It is compatible with the approach taken in the upstream delta library and enables writers on the JVM (e.g., Apache Spark) and delta-rs to write to the same delta table concurrently.

@github-actions github-actions bot added binding/rust Issues for the Rust crate crate/core labels Nov 24, 2023
@dispanser
Copy link
Contributor Author

Not completely finished yet, a bunch of open questions / follow-up items that need to be addressed or discussed:

  1. Previously existing log store is not yet removed.
    The PR still uses the unmodified storage::s3 module, so the previous approach for consistent concurrent writes is still used, so locking happens twice (on two different lock tables). This is not hard to remove, but I wanted to seek confirmation that we're going with a hard cut before ripping out precious code. Do we want to offer a configuration option to toggle between the different locking solutions?

  2. No separate crate for the DynamoDbLockClient
    Previous implementation depended on a separate crate, dynamodb_lock. That crate is potentially useful in other contexts, as it embodies a more general approach to distributed locking. The lock client suggested here is specifically tied to the delta protocol, so I don't find it necessary to have the code in a separate crate.

  3. No retry / exponential back-off in case of overloading DynamoDb (i.e., ProvisionedThroughputExceeded)
    I would like to provide that in a separate PR next week, so it's easier to review I expect the implementation to be somewhat tricky

  4. Setup / Cleanup: no automatic lock table creation, and no functionality to clean up the lock table, e.g. by removing old, irrelevant items.

  5. Some things are pub that shouldn't be.
    There's a bunch of functionality that is part of the public API (e.g., DynamoDbLockClient itself). This helped a lot in writing decent integration tests that actually validate the lock table state, and also in injecting specific commit entries to test the "repair" code paths properly. I believe in a narrow, targetted public API though, and I'm open to suggestions here. Happy to delete stuff but actually most of the integration tests rely on that lock client :-/.

@roeap
Copy link
Collaborator

roeap commented Nov 24, 2023

Exiting work! Just some initial comments.

  1. Previously existing log store is not yet removed.
    The PR still uses the unmodified storage::s3 module, so the previous approach for consistent concurrent writes is still used, so locking happens twice (on two different lock tables). This is not hard to remove, but I wanted to seek confirmation that we're going with a hard cut before ripping out precious code. Do we want to offer a configuration option to toggle between the different locking solutions?

I'm not too invested in S3, so probably my opinion should have too much weight 😆. I believe the overall direction we were taken is to make this configurable. Specifically, there is a PR (#1825) about to be merged that separates the S3 lock into a separate crate. Would it maybe be possible to base this of that PR?

  1. No separate crate for the DynamoDbLockClient
    Previous implementation depended on a separate crate, dynamodb_lock. That crate is potentially useful in other contexts, as it embodies a more general approach to distributed locking. The lock client suggested here is specifically tied to the delta protocol, so I don't find it necessary to have the code in a separate crate.

Makes sense to have that in a deltalake-* crate? Not sure what the best layout would be, I guess to me the main concern is, that dynamo db dependencies are optional.

  1. No retry / exponential back-off in case of overloading DynamoDb (i.e., ProvisionedThroughputExceeded)
    I would like to provide that in a separate PR next week, so it's easier to review I expect the implementation to be somewhat tricky

makes sense.

  1. Setup / Cleanup: no automatic lock table creation, and no functionality to clean up the lock table, e.g. by removing old, irrelevant items.

Haven't looked through the actual code yet, but I seem to remember that the databricks mechanism uses a single dynamo table to manage locking for multiple delta tables? If so, it may make sense that creation is separate.

  1. Some things are pub that shouldn't be.
    There's a bunch of functionality that is part of the public API (e.g., DynamoDbLockClient itself). This helped a lot in writing decent integration tests that actually validate the lock table state, and also in injecting specific commit entries to test the "repair" code paths properly. I believe in a narrow, targetted public API though, and I'm open to suggestions here. Happy to delete stuff but actually most of the integration tests rely on that lock client :-/.

Elsewhere we have been adopting the visibility crate, to be able to expose some APIs, but make it clear that they are not part of the official API contact of the crate.

@dispanser
Copy link
Contributor Author

I'm not too invested in S3, so probably my opinion should have too much weight 😆. I believe the overall direction we were taken is to make this configurable. Specifically, there is a PR (#1825) about to be merged that separates the S3 lock into a separate crate. Would it maybe be possible to base this of that PR?

@roeap : unless I'm missing something, the linked PR #1825 only moves catalog-related logic into a separate crate, and S3 itself and its locking logic is unchanged. I don't think there would be any difficulties in rebasing but I also don't think the changes there are related. It touches on a different point, though:

Should I switch to the official AWS SDK instead of rusoto? (@rtyler)

@roeap
Copy link
Collaborator

roeap commented Nov 26, 2023

@dispanser - you are of course correct! i guess aws just triggered something 😆 ... since rusoto is no longer maintained. i think the aws sdk is the way to go ...

@dispanser
Copy link
Contributor Author

Haven't looked through the actual code yet, but I seem to remember that the databricks mechanism uses a single dynamo table to manage locking for multiple delta tables? If so, it may make sense that creation is separate.

Correct, there's a single DynamoDb lock table for a possibly very large number of tables.

Makes sense to have that in a deltalake-* crate? Not sure what the best layout would be, I guess to me the main concern is, that dynamo db dependencies are optional.

I've read through the discussion regarding the crate split at 1713:

deltalake-aws ❔

This crate would contain the dynamodb locking code, and other special case storage logic related to AWS/S3. Right now this code is kind of a mess (IMHO) in the Rust crate. There's some refactoring that @roeap has tried here but we've not merged. Additionally there are some changes that need to be made in #1601 which would be a good time to break the AWS code out.

and your response wrt object stores:

When it comes to the cloud crates I do have some doubts, since essentially only AWS has any special needs, due to locking. So not even all S3 APIs require any special dependencies. Factoring out the locking logic and mirroring object-stores features may be a way to go. Over there the specific cloud features - aws, azure, gcs - are essentially just legacy, since they are more or less just reference the "cloud" feature.

If the goal of the split is to create crates that make sense to release independently, I'm not sure what a good scope would be: all of S3, or just the lock client code?
I don't feel comfortable making this call, and I don't think it helps the review-ability of the PR if it adds new functionality, moves existing code into a new crate, and rewrites from rusoto to aws sdk all in one big pile.

My proposal would be to do these things in separate PRs, and I'm happy to assist in all of them :-). For now, I'll look into rusoto -> aws sdk for the new lock client logic, but this would result in a deltalake-coredepending on both of these frameworks, I'm not sure that is desirable.

@dispanser
Copy link
Contributor Author

I created a separate branch to play with the rusoto replacement at https://github.com/dispanser/delta-rs/tree/s3dynamodb-logstore-aws-sdk. The mechanics are mostly easy, but one thing that stands out is that the central config loading mechanism is async, from the docs:

let config = aws_config::load_from_env().await;

This leads to config::configure_logstore(options) being async, and from there basically everything that attempts to instantiates a delta table (and a log store) becomes async-poisoned as well. I stopped after adding about 50 async and awaits everywhere. Crucially, the deserialization logic of DeltaTable itself relies on re-creating a LogStore from the deserialized config, and this is sort of a dead end because deserialize can't possibly be async. I'm aware that it's possible to manually block on some async code, but I'd consider that a last resort. Any other ideas?

@dispanser
Copy link
Contributor Author

I've been playing with block_on, but that doesn't seem to work due to:

thread 'test_repair_incomplete_commits' panicked at crates/deltalake-core/src/logstore/s3/lock_client.rs:49:40:
Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

All my attempts to work (i.e., google) my way around this have so far failed, e.g following this blog post. Additionally, everyone on the Rust forums and reddit tells you that it's simply a mistake to even try doing this async wrapping sync wrapping async I'm trying to do here.

It seems that the requirement for de-serializing DeltaTable is coming from datafusion's LogicalExtensionCodec trait we implement, and even if we somehow sneak in an async serde replacement, we'd face the very same problem right inside LogicalExtensionCodec::try_decode_table_provider which is not async.

I'm out of my depth here.

As for the rest of this PR, I'd really get a somewhat authorative answer to some questions I raised above:

  1. Do we want to keep the existing locking logic based on LockClient from the dynamodb_lock crate and switch via configuration option?
  2. Would it make sense to put the (new) lock client into a separate crate, and if yes, would that crate be inside or outside of the delta-rs repository? As mentioned above, the lock client presented here is not useful outside of the context of delta, as it's full of delta-specific concepts.

@dispanser dispanser force-pushed the s3dynamodb-logstore branch from 1599b2c to 2bd60dc Compare December 2, 2023 18:48
@rtyler
Copy link
Member

rtyler commented Dec 2, 2023

As for the rest of this PR, I'd really get a somewhat authorative answer to some questions I raised above:

Do we want to keep the existing locking logic based on LockClient from the dynamodb_lock crate and switch via configuration option?

The existing locking code in the dynamodb-lock crate is more buggy than I am happy with. I am comfortable making a "hard" break in terms of functionality for the S3DynamoDbLogStore. What is important IMHO is that it works well in concert with Spark writers.

Would it make sense to put the (new) lock client into a separate crate, and if yes, would that crate be inside or outside of the delta-rs repository? As mentioned above, the lock client presented here is not useful outside of the context of delta, as it's full of delta-specific concepts.

I think putting it into a separate crate in this workspace is the correct path forward here. One of the goals for #1713 is that we remove some of the feature surface area of deltalake-core for something like this. That would mean that it would contain the LogStore implementation, and then a crate like deltalake-aws would contain the specific S3 locking code and bits for locking there.

As to the challenges on the block_on approach, what I have seen @wjones127 successfully in private code has been to spawn a system thread that tokio runs the block_on within, and then join on that thread's execution (or something equivalent). Honestly though, that's kind of the least interesting part of this great work to me, I'm confident we can figure out a decent way to load the AWS config, I think this work can continue by just assuming "somebody else" gives us the config and coming back to the async/sync later

@dispanser dispanser force-pushed the s3dynamodb-logstore branch 4 times, most recently from 9615bc4 to b12aaff Compare December 6, 2023 10:42
@dispanser
Copy link
Contributor Author

I went ahead and created a separate crate, deltalake-aws. This crate currently only contains the lock client.

@dispanser
Copy link
Contributor Author

I made a bunch of "manual integration tests" with Spark, and verified a bunch of things:

  • Spark and delta-rs can read each others' CommitEntry items in DynamoDb just fine (and read / write the same table)
  • Spark can "repair" an incomplete commit entry created by a (simulated) dying delta-rs write operation just fine
  • couldn't test the other direction b/c it's quite problematic to kill spark in the right millisecond, not enough control over the little pieces that make up a commit write

Some things I noticed:

  1. Spark writes its temporary commit files into _delta_log/.tmp/00000000000000000002.json.7714f1ab-8e1d-49bb-a318-6272397d8bca
  2. delta-rs writes into _delta_log/_commit_7d542922-a2a5-4d98-a555-280d8fafa945.json.tmp
  3. Interop is not a problem (see above), but I like their naming convention more (it gives away which commit was initially targetted, even though the commit could still end up as a later version).
  4. Spark does leave these tmp json files in place, while delta-rs does a move operation to the destination N.json. While this is not in itself a problem, I can imagine that Spark error handling, assuming that other writers follow the same protocol, doesn't properly handle the ObjectStoreError::NotFound correctly and assumes a broken state when trying to repair a commit entry (specifically a commit that has been created, temp file moved, but then the final "complete = true" operation failed). I'll test this particular case.

In general, I wasn't able to get Spark to use localstack, so I was running manual tests against a proper DynamoDb table, which makes the entire interop test hard to reproduce.

@dispanser dispanser force-pushed the s3dynamodb-logstore branch 2 times, most recently from 53d41f2 to 47efff7 Compare December 7, 2023 16:59
This log store implementation provides  an alternative approach to allow for multi-cluster writes for use with S3 and DynamoDb. It is compatible with the approach taken in the upstream delta library and enables writers on the JVM (e.g., Apache Spark) and `delta-rs` to write to the same delta table concurrently.
@dispanser dispanser force-pushed the s3dynamodb-logstore branch from 47efff7 to 9956e3b Compare December 7, 2023 17:08
@rtyler rtyler self-assigned this Dec 11, 2023
Copy link
Member

@rtyler rtyler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some things which need to be cleaned up here in pulling all the AWS things into the AWS crate. There's still some cross-dependencies which I have started to fix in a topic branch based on this work.

I am going to merge it so that work can continue off main prior to the next release

@rtyler rtyler enabled auto-merge (rebase) December 11, 2023 06:38
@rtyler rtyler merged commit f87afce into delta-io:main Dec 11, 2023
21 checks passed
@roeap
Copy link
Collaborator

roeap commented Dec 11, 2023

I am going to merge it so that work can continue off main prior to the next release

I guess fine if the work is done quickly, but in the past especially QP always emphasised that we wanted to keep main in a release-able state at all times.

For future scenarios I think a topic branch would be a better approach to collaborate on partial work.

@rtyler
Copy link
Member

rtyler commented Dec 11, 2023

@roeap Main has not been in a releasable state because of the substantial works going on with splitting crates and other things for some time, which is why I had created the 0.16.x release branch. main is certainly buildable which I think is essential, and while technically there's nothing that would prevent us from releasing from it right now, the separation of crates is not clean enough to make sense

@roeap
Copy link
Collaborator

roeap commented Dec 11, 2023

@rtyler - good point, since we are splitting, we need to rebuild release anyhow...

@dispanser dispanser deleted the s3dynamodb-logstore branch December 12, 2023 12:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/rust Issues for the Rust crate crate/core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants