From 0370fd336f87db179a811c9bf7e6c4d8d656f3c6 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 28 Mar 2024 16:23:06 -0700 Subject: [PATCH 1/4] Have head() iterate across all partitions --- docs/tutorials/common_data_operations.ipynb | 4 +- src/tape/ensemble_frame.py | 33 +++++++++++++ tests/tape_tests/test_ensemble_frame.py | 53 +++++++++++++++++++++ 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/docs/tutorials/common_data_operations.ipynb b/docs/tutorials/common_data_operations.ipynb index 51e326a7..a9b23f3a 100644 --- a/docs/tutorials/common_data_operations.ipynb +++ b/docs/tutorials/common_data_operations.ipynb @@ -110,7 +110,7 @@ "Often, you'll want to peek at your data even though the full-size is too large for memory.\n", "\n", "> **_Note:_**\n", - "By default this only looks at the first partition of data, so any operations that remove all data from the first partition will produce an empty head result. Specify `npartitions=-1` to grab from all partitions.\n" + "some partitions may be empty and `head` will have to traverse these empty partitions to find enough rows for your result. An empty table with many partitions (O(100)k) might be costly even for an ultimately empty result. " ] }, { @@ -119,7 +119,7 @@ "metadata": {}, "outputs": [], "source": [ - "ens.source.head(5, npartitions=-1) # grabs the first 5 rows\n", + "ens.source.head(5) # grabs the first 5 rows\n", "\n", "# can also use tail to grab the last 5 rows" ] diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 6c86d3bd..471e8199 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -794,6 +794,39 @@ def repartition( ) return self._propagate_metadata(result) + def head(self, n=5, compute=True): + """Returns `n` rows of data for previewing purposes. + + Parameters + ---------- + n : int, optional + The number of desired rows. Default is 5. + compute : bool, optional + Whether to compute the result immediately. Default is True. + + Returns: + A pandas DataFrame with up to `n` rows of data. + """ + if not compute: + # Just use the Dask head method + return super().head(n, compute=False) + + if n <= 0: + return super().head(0) + + # Iterate over the partitions until we have enough rows + dfs = [] + remaining_rows = n + for partition in self.partitions: + if remaining_rows == 0: + break + # Note that partition is itself a _Frame object, so we need to compute to avoid infinite recursion + partition_head = partition.compute().head(remaining_rows) + dfs.append(partition_head) + remaining_rows -= len(partition_head) + + return pd.concat(dfs) + class TapeSeries(pd.Series): """A barebones extension of a Pandas series to be used for underlying Ensemble data. diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 8375a34a..d27caf05 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -1,5 +1,6 @@ """ Test EnsembleFrame (inherited from Dask.DataFrame) creation and manipulations. """ +from math import floor import numpy as np import pandas as pd from tape import ( @@ -440,3 +441,55 @@ def test_partition_slicing(parquet_ensemble_with_divisions): assert ens.source.npartitions == 2 # should return exactly 2 partitions assert len(ens.object) < prior_src_len # should affect objects + +@pytest.mark.parametrize( + "data_fixture", + [ + "parquet_ensemble", + "parquet_ensemble_with_divisions", + ], +) +def test_head(data_fixture, request): + """ + Tests that head returns the correct number of rows. + """ + ens = request.getfixturevalue(data_fixture) + + # Test witht repartitioning the source frame + frame = ens.source + frame = frame.repartition(npartitions=10) + + assert frame.npartitions == 10 + + # Test inputs that should return an empty frame + assert len(frame.head(-100)) == 0 + assert len(frame.head(0)) == 0 + assert len(frame.head(-1)) == 0 + + one_res = frame.head(1) + assert len(one_res) == 1 + assert isinstance(one_res, TapeFrame) + assert set(one_res.columns) == set(frame.columns) + + def_result = frame.head() + assert len(def_result) == 5 + assert isinstance(one_res, TapeFrame) + assert set(one_res.columns) == set(frame.columns) + + def_result = frame.head(24) + assert len(def_result) == 24 + assert isinstance(one_res, TapeFrame) + assert set(one_res.columns) == set(frame.columns) + + # Test that we have sane behavior even when the number of rows requested is larger than the number of rows in the frame. + assert len(frame.head(2 * len(frame))) == len(frame) + + # Choose a value that will be guaranteed to hit every partition for this data. + # Note that with parquet_ensemble_with_divisions some of the partitions are empty + # testing that as well. + rows = floor(len(frame.compute()) * 0.98) + result = frame.head(rows) + assert len(result) == rows + assert isinstance(result, TapeFrame) + assert set(result.columns) == set(frame.columns) + From d72d75bcd8753b53c4440b58d2255db5dd237b02 Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 28 Mar 2024 16:35:10 -0700 Subject: [PATCH 2/4] lint fix --- tests/tape_tests/test_ensemble_frame.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 4e531b3f..323b484e 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -472,6 +472,7 @@ def test_partition_slicing(parquet_ensemble_with_divisions): assert ens.source.npartitions == 2 # should return exactly 2 partitions assert len(ens.object) < prior_src_len # should affect objects + @pytest.mark.parametrize( "data_fixture", [ @@ -522,4 +523,3 @@ def test_head(data_fixture, request): assert len(result) == rows assert isinstance(result, TapeFrame) assert set(result.columns) == set(frame.columns) - From eff0de55c7fe4596ce158de80fc771d4ce8e110b Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 28 Mar 2024 16:44:37 -0700 Subject: [PATCH 3/4] Test compute=False --- tests/tape_tests/test_ensemble_frame.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 323b484e..0444c066 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -497,6 +497,8 @@ def test_head(data_fixture, request): assert len(frame.head(0)) == 0 assert len(frame.head(-1)) == 0 + assert len(frame.head(100, compute=False).compute()) == 100 + one_res = frame.head(1) assert len(one_res) == 1 assert isinstance(one_res, TapeFrame) From ac3167e326531f06d44bc2854e15926bac835f8a Mon Sep 17 00:00:00 2001 From: Wilson Beebe Date: Thu, 4 Apr 2024 15:45:19 -0700 Subject: [PATCH 4/4] Give warning if npartitions is provided to head --- src/tape/ensemble_frame.py | 9 ++++++++- tests/tape_tests/test_ensemble_frame.py | 4 ++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 8bbc759c..3344673a 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -845,7 +845,7 @@ def repartition( ) return self._propagate_metadata(result) - def head(self, n=5, compute=True): + def head(self, n=5, compute=True, npartitions=None): """Returns `n` rows of data for previewing purposes. Parameters @@ -854,10 +854,17 @@ def head(self, n=5, compute=True): The number of desired rows. Default is 5. compute : bool, optional Whether to compute the result immediately. Default is True. + npartitions : int, optional + `npartitions` is not supported and if provided will be ignored. Instead all partitions may be used. Returns: A pandas DataFrame with up to `n` rows of data. """ + if npartitions is not None: + warnings.warn( + "The 'npartitions' parameter is not supported for TAPE dataframes. All partitions may be used." + ) + if not compute: # Just use the Dask head method return super().head(n, compute=False) diff --git a/tests/tape_tests/test_ensemble_frame.py b/tests/tape_tests/test_ensemble_frame.py index 0444c066..f26fca3c 100644 --- a/tests/tape_tests/test_ensemble_frame.py +++ b/tests/tape_tests/test_ensemble_frame.py @@ -492,6 +492,10 @@ def test_head(data_fixture, request): assert frame.npartitions == 10 + # Check that a warning is raised when npartitions are requested. + with pytest.warns(UserWarning): + frame.head(5, npartitions=5) + # Test inputs that should return an empty frame assert len(frame.head(-100)) == 0 assert len(frame.head(0)) == 0