Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose peek next commit to python #1

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

PengLiVectra
Copy link
Collaborator

@PengLiVectra PengLiVectra commented Nov 8, 2023

Description

Expose peek_next_commit to python. Can be used for streaming delta commit changes. peek_next_commit will return actions in the commit and commit version of next commit. If current version is the latest version, it will return None (actions will be None) and the current version.

An example of usage:
actions, next_version = delta_table.peek_next_commit(current_version)
If current_version is the latest version, the return will be like None, current_version.

Related Issue(s)

delta-io#1886

Testing

Added unit tests that passed.

Breaking-Change

Not a breaking change.

Documentation

@ginevragaudioso
Copy link

Can we add a description to the PRs so that we also iterate on what description we want to present upstream? Description should contain a similar format as the template we have on internal PRs, such as brief description, testing, whether or not this is a breaking change...

@ginevragaudioso
Copy link

Could we also have all changes be in one commit? it is slightly confusing to go over commits and see one function added and then removed, we would eliminate the confusion by having one commit with all the changes.

@PengLiVectra PengLiVectra force-pushed the expose-peek-next-commit-to-python branch from f6293b2 to ce6ffbf Compare November 9, 2023 11:55
@PengLiVectra
Copy link
Collaborator Author

Could we also have all changes be in one commit? it is slightly confusing to go over commits and see one function added and then removed, we would eliminate the confusion by having one commit with all the changes.

Merged the commits

dt = DeltaTable(table_path)
actions, current_version = dt.peek_next_commit(version=version)
assert (len(actions), current_version) == expected

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add a test to cover DeltaLogNotFound.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's no way to test the loop? I'm thinking. dt.peek_next_commit(version) where version does not have an object but version+1 does and _latest_version is currently version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused here, do you mean use version=-1 as input or something else?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking there'd be a unit test in which the underlying delta table has actual latest version 10, the DeltaTable thinks the latest version is 6, and version 7 doesn't actually exist. So we call dt.peek_next_commit(6) and expect the next commit to be 8, which requires

  1. realizing 7 is bigger than the thought-to-be latest version 6
  2. updating the latest version which is now 10
  3. checking if version 7 is there, which it isn't
  4. trying again with version 8 and finding version 8 to return.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added data that missed a commit and a test for it. get_latest_version function can get the actual latest version when there is a missed commit (e.g., if version 7 doesn't exist and the actual latest version is 10, DeltaTable.get_latest_version will return 10), so we don't need to update the latest version.

@PengLiVectra PengLiVectra changed the title Expose peek next commit to python feat: Expose peek next commit to python Nov 15, 2023
@PengLiVectra
Copy link
Collaborator Author

I ran make build to build a wheel in python folder, got the following error:

--- Build Python binding ---
maturin build
⚠️ Warning: specify [package.metadata.maturin] name in Cargo.toml is deprecated, use module-name in [tool.maturin] section in pyproject.toml instead
🍹 Building a mixed python/rust project
🔗 Found pyo3 bindings with abi3 support for Python ≥ 3.7
🐍 Not using a specific python interpreter
💻 Using MACOSX_DEPLOYMENT_TARGET=10.7 for x86_64-apple-darwin by default
Compiling deltalake-core v0.17.0 (/Users/pengli/vectra/vectra-github/delta-rs/crates/deltalake-core)
error[E0432]: unresolved import dynamodb_lock::DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS
--> crates/deltalake-core/src/storage/s3.rs:6:56
|
6 | use dynamodb_lock::{DynamoError, LockClient, LockItem, DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS};
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ no DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS in the root

error: future cannot be sent between threads safelyeltalake-core
--> crates/deltalake-core/src/storage/s3.rs:497:91
|
497 | async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
| ___________________________________________________________________________________________^
498 | | if let Some(lock_client) = &self.s3_lock_client {
499 | | lock_client.rename_with_lock(self, from, to).await?;
500 | | } else if self.allow_unsafe_rename {
... |
506 | | Ok(())
507 | | }
| |_____^ future created by async block is not Send
|
note: opaque type is declared here
--> crates/deltalake-core/src/storage/s3.rs:190:64
|
190 | async fn acquire_lock_loop(&self, src: &str, dst: &str) -> Result<LockItem, S3LockError> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: this item depends on auto traits of the hidden type, but may also be registering the hidden type. This is not supported right now. You can try moving the opaque type and the item that actually registers a hidden type into a new submodule
--> crates/deltalake-core/src/storage/s3.rs:497:14
|
497 | async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
| ^^^^^^^^^^^^^^^^^^^^
note: future is not Send as it awaits another future which is not Send
--> crates/deltalake-core/src/storage/s3.rs:133:24
|
133 | let mut lock = self.acquire_lock_loop(src.as_ref(), dst.as_ref()).await?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here on type impl futures::Future<Output = std::result::Result<LockItem, S3LockError>>, which is not Send
= note: required for the cast from Pin<Box<[async block@crates/deltalake-core/src/storage/s3.rs:497:91: 507:6]>> to Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
= note: the full name for the target type has been written to '/Users/pengli/vectra/vectra-github/delta-rs/target/debug/deps/deltalake_core-8bbe1109083fff84.long-type-17076020203122974121.txt'

error[E0624]: associated function new is privatedeltalake-core
--> crates/deltalake-core/src/storage/s3.rs:538:66
|
538 | let lock_client = dynamodb_lock::DynamoDbLockClient::new(
| ^^^ private associated function
|
::: /Users/pengli/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dynamodb_lock-0.6.1/src/lib.rs:399:5
|
399 | fn new(client: DynamoDbClient, opts: DynamoDbOptions) -> Self {
| ------------------------------------------------------------- private associated function defined here

error: aborting due to 3 previous errors 413/416: deltalake-core

Some errors have detailed explanations: E0432, E0624.talake-core

For more information about an error, try rustc --explain E0432.

error: could not compile deltalake-core (lib) due to 4 previous errors
💥 maturin failed
Caused by: Failed to build a native library through cargo
Caused by: Cargo build finished with "exit status: 101": MACOSX_DEPLOYMENT_TARGET="10.7" "cargo" "rustc" "--manifest-path" "/Users/pengli/vectra/vectra-github/delta-rs/python/Cargo.toml" "--message-format" "json" "--lib" "--" "-C" "link-arg=-undefined" "-C" "link-arg=dynamic_lookup" "-C" "link-args=-Wl,-install_name,@rpath/deltalake.abi3.so"
make: *** [build] Error 1

They seems not relate to our change, I got same errors when I ran make build in origin/main branch.
Any ideas?

Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

}) => {
return Err(DeltaTableError::DeltaLogNotFound(current_version));
}
Err(err) => Err(err),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return Ok(Err(err)) instead of Err(err)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, where is Ok(Err(err))?
Here we use the same method as peek_next_commit below to get the commit_log_bytest, then we return the full commit_log_bytes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this line, we assign commit_log_bytes to Err(err). Then on line 470, we return Ok(commit_log_bytes). I believe as a result we will return Ok(Err(err)) if we reach line 468.

Am I interpreting this correctly? If so, this seems wrong. I think you'd want to return Err(err) here and on line 462 you'd want to assign commit_log_bytes to bytes, not Ok(bytes) (which would thus return Ok(Ok(bytes)) which is weird as well).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes seems weird we need to fix this, also need to add a test for error case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned by ohan, rust ? seems to tidy the results, https://doc.rust-lang.org/rust-by-example/std/result/question_mark.html

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flat out missed the question mark at the end, ok great.

next_version += 1
else:
raise
logging.info(f"Provided Delta Version is up to date. Version: {version}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this log line, I don't think it's necessary. Also removes the import statement above.

dt = DeltaTable(table_path)
actions, current_version = dt.peek_next_commit(version=version)
assert (len(actions), current_version) == expected

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there's no way to test the loop? I'm thinking. dt.peek_next_commit(version) where version does not have an object but version+1 does and _latest_version is currently version.

def test_delta_log_not_found():
table_path = "../crates/deltalake-core/tests/data/simple_table"
dt = DeltaTable(table_path)
latest_version = dt._table.get_latest_version()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, this is access to a private member _table. Is that OK or do we need to promote get_latest_version() to also be a public member function of DeltaTable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added get_latest_version() as a public member.

@syedashrafulla
Copy link

Should we have a GitHub issue in github/delta-io/delta-rs/issues for this feature request?

@PengLiVectra PengLiVectra changed the title feat: Expose peek next commit to python feat: expose peek next commit to python Nov 17, 2023
@syedashrafulla
Copy link

syedashrafulla commented Nov 17, 2023

@PengLiVectra regarding the errors about E0432 and the future and E0624, if they are on a fresh checkout of the upstream main branch, check if a GitHub issue already exists and if not, file a new issue.

@ginevragaudioso
Copy link

do we know why some tests are failing?

@PengLiVectra PengLiVectra force-pushed the expose-peek-next-commit-to-python branch 3 times, most recently from f392256 to 167e954 Compare November 28, 2023 16:46
@PengLiVectra PengLiVectra force-pushed the expose-peek-next-commit-to-python branch 2 times, most recently from 359306a to b402f3b Compare December 6, 2023 18:05
@PengLiVectra PengLiVectra force-pushed the expose-peek-next-commit-to-python branch from b402f3b to 005d52a Compare December 11, 2023 15:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants