Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into datafusion-head
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Nov 17, 2023
2 parents 5032040 + b5bb4b3 commit ecf0782
Show file tree
Hide file tree
Showing 16 changed files with 268 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
crates/ @wjones127 @roeap @rtyler
delta-inspect/ @wjones127 @rtyler
proofs/ @houqp
python/ @wjones127 @fvaleye @roeap
python/ @wjones127 @fvaleye @roeap @ion-elgreco
tlaplus/ @houqp
.github/ @wjones127 @rtyler
docs/ @MrPowers
25 changes: 25 additions & 0 deletions .github/workflows/issue_comments.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Comment Commands
on:
issue_comment:
types: created

permissions:
issues: write

jobs:
issue_assign:
runs-on: ubuntu-latest
if: (!github.event.issue.pull_request) && github.event.comment.body == 'take'
concurrency:
# Only run one a time per user
group: ${{ github.actor }}-issue-assign
steps:
- run: |
CODE=$(curl -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" -LI https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.issue.number }}/assignees/${{ github.event.comment.user.login }} -o /dev/null -w '%{http_code}\n' -s)
if [ "$CODE" -eq "204" ]
then
echo "Assigning issue ${{ github.event.issue.number }} to ${{ github.event.comment.user.login }}"
curl -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" -d '{"assignees": ["${{ github.event.comment.user.login }}"]}' https://api.github.com/repos/${{ github.repository }}/issues/${{ github.event.issue.number }}/assignees
else
echo "Issue ${{ github.event.issue.number }} cannot be assigned to ${{ github.event.comment.user.login }}"
fi
25 changes: 9 additions & 16 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: 3.7
python-version: 3.8

- name: Check Python
run: |
Expand All @@ -36,20 +36,19 @@ jobs:
run: make check-rust

test-minimal:
name: Python Build (Python 3.7 PyArrow 8.0.0)
name: Python Build (Python 3.8 PyArrow 8.0.0)
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=line-tables-only"
CARGO_INCREMENTAL: 0

# use the same environment we have for python release
container: quay.io/pypa/manylinux2014_x86_64:2022-09-24-4f086d0
steps:
# actions/checkout@v3 is a node action, which runs on a fairly new
# version of node. however, manylinux environment's glibc is too old for
# that version of the node. so we will have to use v1 instead, which is a
# docker based action.
- uses: actions/checkout@v1
- uses: actions/checkout@v3

- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: 3.8

- name: Install latest nightly
uses: actions-rs/toolchain@v1
Expand All @@ -60,14 +59,8 @@ jobs:

- uses: Swatinem/rust-cache@v2

- name: Enable manylinux Python targets
run: |
echo "/opt/python/cp37-cp37m/bin" >> $GITHUB_PATH
- name: Build and install deltalake
run: |
# Needed for openssl build
yum install -y perl-IPC-Cmd
pip install virtualenv
virtualenv venv
source venv/bin/activate
Expand Down Expand Up @@ -238,7 +231,7 @@ jobs:

strategy:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
Expand Down
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ Please take note of our [code of conduct](CODE_OF_CONDUCT.md).
If you want to start contributing, first look at our good first issues: https://github.com/delta-io/delta-rs/contribute

If you want to contribute something more substantial, see our "Projects seeking contributors" section on our roadmap: https://github.com/delta-io/delta-rs/issues/1128

## Claiming an issue

If you want to claim an issue to work on, you can write the word `take` as a comment in it and you will be automatically assigned.
50 changes: 25 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ The Delta Lake project aims to unlock the power of the Deltalake for as many use
by providing native low-level APIs aimed at developers and integrators, as well as a high-level operations
API that lets you query, inspect, and operate your Delta Lake with ease.

| Source | Downloads | Installation Command | Docs |
| --------------------- | --------------------------------- | ----------------------- | --------------- |
| **[PyPi][pypi]** | [![Downloads][pypi-dl]][pypi] | `pip install deltalake` | [Docs][py-docs] |
| **[Crates.io][pypi]** | [![Downloads][crates-dl]][crates] | `cargo add deltalake` | [Docs][rs-docs] |
| Source | Downloads | Installation Command | Docs |
| ----------------------- | --------------------------------- | ----------------------- | --------------- |
| **[PyPi][pypi]** | [![Downloads][pypi-dl]][pypi] | `pip install deltalake` | [Docs][py-docs] |
| **[Crates.io][crates]** | [![Downloads][crates-dl]][crates] | `cargo add deltalake` | [Docs][rs-docs] |

[pypi]: https://pypi.org/project/deltalake/
[pypi-dl]: https://img.shields.io/pypi/dm/deltalake?style=flat-square&color=00ADD4
Expand Down Expand Up @@ -130,36 +130,36 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc

### Cloud Integrations

| Storage | Rust | Python | Comment |
| -------------------- | :-------------------: | :-------------------: | ----------------------------------- |
| Local | ![done] | ![done] | |
| S3 - AWS | ![done] | ![done] | requires lock for concurrent writes |
| S3 - MinIO | ![done] | ![done] | requires lock for concurrent writes |
| S3 - R2 | ![done] | ![done] | requires lock for concurrent writes |
| Azure Blob | ![done] | ![done] | |
| Azure ADLS Gen2 | ![done] | ![done] | |
| Microsoft OneLake | ![done] | ![done] | |
| Google Cloud Storage | ![done] | ![done] | |
| Storage | Rust | Python | Comment |
| -------------------- | :-----: | :-----: | ----------------------------------- |
| Local | ![done] | ![done] | |
| S3 - AWS | ![done] | ![done] | requires lock for concurrent writes |
| S3 - MinIO | ![done] | ![done] | requires lock for concurrent writes |
| S3 - R2 | ![done] | ![done] | requires lock for concurrent writes |
| Azure Blob | ![done] | ![done] | |
| Azure ADLS Gen2 | ![done] | ![done] | |
| Microsoft OneLake | ![done] | ![done] | |
| Google Cloud Storage | ![done] | ![done] | |

### Supported Operations

| Operation | Rust | Python | Description |
| --------------------- | :----------------------: | :-----------------: | ------------------------------------------- |
| Create | ![done] | ![done] | Create a new table |
| Read | ![done] | ![done] | Read data from a table |
| Vacuum | ![done] | ![done] | Remove unused files and log entries |
| Delete - partitions | | ![done] | Delete a table partition |
| Delete - predicates | ![done] | ![done] | Delete data based on a predicate |
| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
| Operation | Rust | Python | Description |
| --------------------- | :----------------------: | :----------------------: | ------------------------------------------- |
| Create | ![done] | ![done] | Create a new table |
| Read | ![done] | ![done] | Read data from a table |
| Vacuum | ![done] | ![done] | Remove unused files and log entries |
| Delete - partitions | | ![done] | Delete a table partition |
| Delete - predicates | ![done] | ![done] | Delete data based on a predicate |
| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
| Merge | [![semi-done]][merge-rs] | [![semi-done]][merge-py] | Merge two tables (limited to full re-write) |
| FS check | ![done] | ![done] | Remove corrupted files from table |
| FS check | ![done] | ![done] | Remove corrupted files from table |

### Protocol Support Level

| Writer Version | Requirement | Status |
| -------------- | --------------------------------------------- | :------------------: |
| Version 2 | Append Only Tables | ![done]
| Version 2 | Append Only Tables | ![done] |
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
Expand Down
49 changes: 27 additions & 22 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl MergePlan {
Some(task_parameters.input_parameters.target_size as usize),
None,
)?;
let mut writer = PartitionWriter::try_with_config(object_store.clone(), writer_config)?;
let mut writer = PartitionWriter::try_with_config(object_store, writer_config)?;

let mut read_stream = read_stream.await?;

Expand Down Expand Up @@ -478,19 +478,7 @@ impl MergePlan {

let object_store_ref = context.object_store.clone();
// Read all batches into a vec
let batches: Vec<RecordBatch> = futures::stream::iter(files.clone())
.then(|file| {
let object_store_ref = object_store_ref.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store_ref.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.try_collect::<Vec<_>>()
.await?;
let batches = zorder::collect_batches(object_store_ref, files).await?;

// For each batch, compute the zorder key
let zorder_keys: Vec<ArrayRef> =
Expand Down Expand Up @@ -608,7 +596,7 @@ impl MergePlan {
for file in files.iter() {
debug!(" file {}", file.location);
}
let object_store_ref = log_store.object_store().clone();
let object_store_ref = log_store.object_store();
let batch_stream = futures::stream::iter(files.clone())
.then(move |file| {
let object_store_ref = object_store_ref.clone();
Expand Down Expand Up @@ -636,33 +624,30 @@ impl MergePlan {
#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
log_store.object_store().clone(),
log_store.object_store(),
// If there aren't enough bins to use all threads, then instead
// use threads within the bins. This is important for the case where
// the table is un-partitioned, in which case the entire table is just
// one big bin.
bins.len() <= num_cpus::get(),
));
let object_store = log_store.object_store().clone();

#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
object_store.clone(),
log_store.object_store(),
max_spill_size,
)?);
let task_parameters = self.task_parameters.clone();
let log_store = log_store.clone();
futures::stream::iter(bins)
.map(move |(partition, files)| {
let batch_stream = Self::read_zorder(files.clone(), exec_context.clone());

let object_store = object_store.clone();

let rewrite_result = tokio::task::spawn(Self::rewrite_files(
task_parameters.clone(),
partition,
files,
object_store,
log_store.object_store(),
batch_stream,
));
util::flatten_join_error(rewrite_result)
Expand Down Expand Up @@ -1107,6 +1092,26 @@ pub(super) mod zorder {
}
}

/// Read all batches into a vec - is an async function in disguise
#[cfg(not(feature = "datafusion"))]
pub(super) fn collect_batches(
object_store: ObjectStoreRef,
files: MergeBin,
) -> impl Future<Output = Result<Vec<RecordBatch>, ParquetError>> {
futures::stream::iter(files.clone())
.then(move |file| {
let object_store = object_store.clone();
async move {
let file_reader = ParquetObjectReader::new(object_store.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()
}
})
.try_flatten()
.try_collect::<Vec<_>>()
}

#[cfg(feature = "datafusion")]
pub use self::datafusion::ZOrderExecContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,8 @@ impl<'a> ConflictChecker<'a> {
.winning_commit_summary
.removed_files()
.iter()
// TODO remove cloned
.cloned()
.find(|f| read_file_path.contains(&f.path));
.find(|&f| read_file_path.contains(&f.path))
.cloned();
if deleted_read_overlap.is_some()
|| (!self.winning_commit_summary.removed_files().is_empty()
&& self.txn_info.read_whole_table())
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ mod tests {
.with_update("value", col("value") + lit(1))
.await
.unwrap();

assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);
assert_eq!(metrics.num_added_files, 1);
Expand Down
Loading

0 comments on commit ecf0782

Please sign in to comment.