Skip to content

Commit

Permalink
[CHORE] Add tests for parquet size estimations (#3405)
Browse files Browse the repository at this point in the history
Unskipping the tests produces failures with 2 passing cases:

```
FAILED tests/test_size_estimations.py::test_estimations_unique_strings[dict-snappy] - AssertionError: Expected 8.16MB file estimated vs actual to be within 30.0%: 24.47MB vs 113.89MB (-365.40%)
FAILED tests/test_size_estimations.py::test_estimations_unique_strings[dict-no_compression] - AssertionError: Expected 109.93MB file estimated vs actual to be within 30.0%: 329.80MB vs 113.89MB (65.47%)
FAILED tests/test_size_estimations.py::test_estimations_unique_strings[no_dict-snappy] - AssertionError: Expected 8.14MB file estimated vs actual to be within 30.0%: 24.42MB vs 113.89MB (-366.43%)
FAILED tests/test_size_estimations.py::test_estimations_unique_strings[no_dict-no_compression] - AssertionError: Expected 109.91MB file estimated vs actual to be within 30.0%: 329.74MB vs 113.89MB (65.46%)
FAILED tests/test_size_estimations.py::test_estimations_dup_strings[dict-snappy] - AssertionError: Expected 0.00MB file estimated vs actual to be within 30.0%: 0.00MB vs 108.00MB (-3345625.15%)
FAILED tests/test_size_estimations.py::test_estimations_dup_strings[dict-no_compression] - AssertionError: Expected 0.00MB file estimated vs actual to be within 30.0%: 0.00MB vs 108.00MB (-3082092.01%)
FAILED tests/test_size_estimations.py::test_estimations_dup_strings[no_dict-snappy] - AssertionError: Expected 4.92MB file estimated vs actual to be within 30.0%: 14.76MB vs 108.00MB (-631.79%)
FAILED tests/test_size_estimations.py::test_estimations_dup_strings[no_dict-no_compression] - AssertionError: Expected 104.02MB file estimated vs actual to be within 30.0%: 312.07MB vs 108.00MB (65.39%)
FAILED tests/test_size_estimations.py::test_estimations_unique_ints[dict-snappy] - AssertionError: Expected 4.28MB file estimated vs actual to be within 30.0%: 12.85MB vs 8.00MB (37.72%)
FAILED tests/test_size_estimations.py::test_estimations_unique_ints[dict-no_compression] - AssertionError: Expected 8.28MB file estimated vs actual to be within 30.0%: 24.84MB vs 8.00MB (67.79%)
FAILED tests/test_size_estimations.py::test_estimations_unique_ints[no_dict-snappy] - AssertionError: Expected 4.00MB file estimated vs actual to be within 30.0%: 12.01MB vs 8.00MB (33.40%)
FAILED tests/test_size_estimations.py::test_estimations_unique_ints[no_dict-no_compression] - AssertionError: Expected 8.00MB file estimated vs actual to be within 30.0%: 24.00MB vs 8.00MB (66.67%)
FAILED tests/test_size_estimations.py::test_estimations_dup_ints[dict-snappy] - AssertionError: Expected 0.00MB file estimated vs actual to be within 30.0%: 0.00MB vs 8.00MB (-454962.57%)
FAILED tests/test_size_estimations.py::test_estimations_dup_ints[dict-no_compression] - AssertionError: Expected 0.00MB file estimated vs actual to be within 30.0%: 0.00MB vs 8.00MB (-458090.15%)
FAILED tests/test_size_estimations.py::test_estimations_dup_ints[no_dict-snappy] - AssertionError: Expected 0.38MB file estimated vs actual to be within 30.0%: 1.13MB vs 8.00MB (-607.74%)
FAILED tests/test_size_estimations.py::test_estimations_dup_ints[no_dict-no_compression] - AssertionError: Expected 8.00MB file estimated vs actual to be within 30.0%: 24.00MB vs 8.00MB (66.67%)
FAILED tests/test_size_estimations.py::test_canonical_files_in_hf[smoktalk] - AssertionError: Expected 105.42MB file estimated vs actual to be within 30.0%: 316.26MB vs 213.66MB (32.44%)
FAILED tests/test_size_estimations.py::test_canonical_files_in_hf[cifar10] - AssertionError: Expected 23.94MB file estimated vs actual to be within 30.0%: 71.82MB vs 22.85MB (68.18%)
FAILED tests/test_size_estimations.py::test_canonical_files_in_hf[standfordnlp-imdb] - AssertionError: Expected 42.00MB file estimated vs actual to be within 30.0%: 125.99MB vs 67.31MB (46.58%)
FAILED tests/test_size_estimations.py::test_canonical_files_in_hf[jat-dataset-tokenized] - AssertionError: Expected 7.34MB file estimated vs actual to be within 30.0%: 22.01MB vs 499.57MB (-2169.79%)
```

Follow-on fixes for the worst cases:
1. Duplicate strings/ints with dictionary encodings (`-3345625.15%`)
2. Duplicate strings/ints with no dictionary encodings, but with snappy
compression (`-631%`)
3. `jat-dataset-tokenized`: tokenization dataset with lots of
lists-of-ints (`-2169%`), likely highly compressed

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Dec 4, 2024
1 parent 75ad85a commit 95d7a43
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 1 deletion.
5 changes: 5 additions & 0 deletions daft/daft/testing.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

def estimate_in_memory_size_bytes(
uri: str, file_size: int, columns: list[str] | None = None, has_metadata: bool = False
) -> int: ...
73 changes: 72 additions & 1 deletion src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl PartialEq for PythonTablesFactoryArgs {
}

pub mod pylib {
use std::sync::Arc;
use std::{default, sync::Arc};

use common_daft_config::PyDaftExecutionConfig;
use common_error::DaftResult;
Expand Down Expand Up @@ -497,6 +497,68 @@ pub mod pylib {
) -> PyResult<PyLogicalPlanBuilder> {
Ok(LogicalPlanBuilder::table_scan(scan_operator.into(), None)?.into())
}

/// Estimates the in-memory size in bytes for a Parquet file.
///
/// This function calculates an approximate size that the Parquet file would occupy
/// when loaded into memory, considering only the specified columns if provided.
///
/// Used for testing only.
///
/// # Arguments
///
/// * `uri` - A string slice that holds the URI of the Parquet file.
/// * `file_size` - the size of the file on disk
/// * `columns` - An optional vector of strings representing the column names to consider.
/// If None, all columns in the file will be considered.
/// * `has_metadata` - whether or not metadata is pre-populated in the ScanTask. Defaults to false.
///
/// # Returns
///
/// Returns an `i64` representing the estimated size in bytes.
///
#[pyfunction]
pub fn estimate_in_memory_size_bytes(
uri: &str,
file_size: u64,
columns: Option<Vec<String>>,
has_metadata: Option<bool>,
) -> PyResult<usize> {
let (schema, metadata) = daft_parquet::read::read_parquet_schema(
uri,
default::Default::default(),
None,
default::Default::default(),
None,
)?;
let data_source = DataSource::File {
path: uri.to_string(),
chunk_spec: None,
size_bytes: Some(file_size),
iceberg_delete_files: None,
metadata: if has_metadata.unwrap_or(false) {
Some(TableMetadata {
length: metadata.num_rows,
})
} else {
None
},
partition_spec: None,
statistics: None,
parquet_metadata: None,
};
let st = ScanTask::new(
vec![data_source],
Arc::new(FileFormatConfig::Parquet(default::Default::default())),
Arc::new(schema),
Arc::new(crate::storage_config::StorageConfig::Native(Arc::new(
default::Default::default(),
))),
Pushdowns::new(None, None, columns.map(Arc::new), None),
None,
);
Ok(st.estimate_in_memory_size_bytes(None).unwrap())
}
}

pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {
Expand All @@ -513,3 +575,12 @@ pub fn register_modules(parent: &Bound<PyModule>) -> PyResult<()> {

Ok(())
}

pub fn register_testing_modules(parent: &Bound<PyModule>) -> PyResult<()> {
parent.add_function(wrap_pyfunction_bound!(
pylib::estimate_in_memory_size_bytes,
parent
)?)?;

Ok(())
}
5 changes: 5 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ pub mod pylib {
let catalog_module = daft_catalog::python::register_modules(m)?;
daft_catalog_python_catalog::python::register_modules(&catalog_module)?;

// Register testing module
let testing_module = PyModule::new_bound(m.py(), "testing")?;
m.add_submodule(&testing_module)?;
daft_scan::python::register_testing_modules(&testing_module)?;

m.add_wrapped(wrap_pyfunction!(version))?;
m.add_wrapped(wrap_pyfunction!(build_type))?;
m.add_wrapped(wrap_pyfunction!(refresh_logger))?;
Expand Down
103 changes: 103 additions & 0 deletions tests/test_size_estimations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pyarrow as pa
import pyarrow.parquet as papq
import pytest

from daft.daft import testing as native_testing_utils
from daft.table.micropartition import MicroPartition

pytest.skip(allow_module_level=True, reason="Skipping because these tests don't currently pass")


def get_scantask_estimated_size(pq_path: str, size_on_disk: int, columns: list[str] | None = None) -> int:
"""Retrieve the estimated size for reading a given Parquet file"""
return native_testing_utils.estimate_in_memory_size_bytes(str(pq_path), size_on_disk, columns=columns)


def get_actual_size(pq_path: str, columns: list[str] | None = None) -> int:
# Force materializationm of the MicroPartition using `.slice`
mp = MicroPartition.read_parquet(str(pq_path), columns=columns)
return mp.slice(0, len(mp)).size_bytes()


def assert_close(size_on_disk, estimated: int, actual: int, pct: float = 0.3):
assert (
abs(actual - estimated) / estimated < pct
), f"Expected {size_on_disk / 1_000_000:.2f}MB file estimated vs actual to be within {pct * 100}%: {estimated / 1_000_000:.2f}MB vs {actual / 1000_000:.2f}MB ({((estimated - actual) / estimated) * 100:.2f}%)"


@pytest.mark.parametrize("compression", ["snappy", None], ids=["snappy", "no_compression"])
@pytest.mark.parametrize("use_dictionary", [True, False], ids=["dict", "no_dict"])
def test_estimations_unique_strings(tmpdir, use_dictionary, compression):
pq_path = tmpdir / f"unique_strings.use_dictionary={use_dictionary}.compression={compression}.pq"
data = [f"{'a' * 100}{i}" for i in range(1000_000)]
tbl = pa.table({"foo": data})
papq.write_table(tbl, pq_path, use_dictionary=use_dictionary, compression=compression)

size_on_disk = pq_path.stat().size
assert_close(size_on_disk, get_scantask_estimated_size(pq_path, size_on_disk), get_actual_size(pq_path))


@pytest.mark.parametrize("compression", ["snappy", None], ids=["snappy", "no_compression"])
@pytest.mark.parametrize("use_dictionary", [True, False], ids=["dict", "no_dict"])
def test_estimations_dup_strings(tmpdir, use_dictionary, compression):
pq_path = tmpdir / f"dup_strings.use_dictionary={use_dictionary}.compression={compression}.pq"
data = ["a" * 100 for _ in range(1000_000)]
tbl = pa.table({"foo": data})
papq.write_table(tbl, pq_path, use_dictionary=use_dictionary, compression=compression)

size_on_disk = pq_path.stat().size
assert_close(size_on_disk, get_scantask_estimated_size(pq_path, size_on_disk), get_actual_size(pq_path))


@pytest.mark.parametrize("compression", ["snappy", None], ids=["snappy", "no_compression"])
@pytest.mark.parametrize("use_dictionary", [True, False], ids=["dict", "no_dict"])
def test_estimations_unique_ints(tmpdir, use_dictionary, compression):
pq_path = tmpdir / f"unique_ints.use_dictionary={use_dictionary}.compression={compression}.pq"

data = [i for i in range(1000_000)]
tbl = pa.table({"foo": data})
papq.write_table(tbl, pq_path, use_dictionary=use_dictionary, compression=compression)

size_on_disk = pq_path.stat().size
assert_close(size_on_disk, get_scantask_estimated_size(pq_path, size_on_disk), get_actual_size(pq_path))


@pytest.mark.parametrize("compression", ["snappy", None], ids=["snappy", "no_compression"])
@pytest.mark.parametrize("use_dictionary", [True, False], ids=["dict", "no_dict"])
def test_estimations_dup_ints(tmpdir, use_dictionary, compression):
pq_path = tmpdir / f"dup_ints.use_dictionary={use_dictionary}.compression={compression}.pq"

data = [1 for _ in range(1000_000)]
tbl = pa.table({"foo": data})
papq.write_table(tbl, pq_path, use_dictionary=use_dictionary, compression=compression)

size_on_disk = pq_path.stat().size
assert_close(size_on_disk, get_scantask_estimated_size(pq_path, size_on_disk), get_actual_size(pq_path))


@pytest.mark.parametrize(
"path",
[
"https://huggingface.co/datasets/HuggingFaceTB/smoltalk/resolve/main/data/all/test-00000-of-00001.parquet",
"https://huggingface.co/datasets/uoft-cs/cifar10/resolve/main/plain_text/test-00000-of-00001.parquet",
"https://huggingface.co/datasets/stanfordnlp/imdb/resolve/main/plain_text/unsupervised-00000-of-00001.parquet",
"https://huggingface.co/datasets/jat-project/jat-dataset-tokenized/resolve/main/atari-alien/test-00000-of-00011.parquet",
"https://huggingface.co/datasets/princeton-nlp/SWE-bench_Verified/resolve/main/data/test-00000-of-00001.parquet",
"https://huggingface.co/datasets/lmms-lab/LLaVA-OneVision-Data/resolve/main/CLEVR-Math(MathV360K)/train-00000-of-00002.parquet",
],
ids=[
"smoktalk",
"cifar10",
"standfordnlp-imdb",
"jat-dataset-tokenized",
"swe-bench-verified",
"llava-onevision-data",
],
)
def test_canonical_files_in_hf(path):
import requests

response = requests.head(path, allow_redirects=True)
size_on_disk = int(response.headers["Content-Length"])

assert_close(size_on_disk, get_scantask_estimated_size(path, size_on_disk), get_actual_size(path))

0 comments on commit 95d7a43

Please sign in to comment.