From 95c6545b91ffb63f5ac6bd7d8ab2e1bd38fccbd8 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 18 Jan 2024 07:13:23 -0800 Subject: [PATCH 1/7] Specify divisions for index generation --- src/hipscat_import/index/arguments.py | 2 ++ src/hipscat_import/index/map_reduce.py | 8 +++++--- tests/conftest.py | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/hipscat_import/index/arguments.py b/src/hipscat_import/index/arguments.py index 1fef5430..c3f34e6f 100644 --- a/src/hipscat_import/index/arguments.py +++ b/src/hipscat_import/index/arguments.py @@ -25,6 +25,8 @@ class IndexArguments(RuntimeArguments): include_order_pixel: bool = True compute_partition_size: int = 1_000_000_000 + divisions: Optional[List] = None + def __post_init__(self): self._check_arguments() diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index 988104de..eabde802 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -38,9 +38,11 @@ def create_index(args): data = data.reset_index() if not args.include_hipscat_index: data = data.drop(columns=[HIPSCAT_ID_COLUMN]) - data = data.drop_duplicates() - data = data.repartition(partition_size=args.compute_partition_size) - data = data.set_index(args.indexing_column) + if args.divisions is not None and len(args.divisions) > 2: + data = data.set_index(args.indexing_column, drop=False, divisions=args.divisions) + else: + data = data.set_index(args.indexing_column, drop=False) + data = data.drop_duplicates().drop(columns=[args.indexing_column]) result = data.to_parquet( path=index_dir, engine="pyarrow", diff --git a/tests/conftest.py b/tests/conftest.py index 05109d36..247a72ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,7 +15,7 @@ def use_ray(request): return request.config.getoption("--use_ray") -@pytest.fixture(scope="session", name="dask_client") +@pytest.fixture(scope="session", name="dask_client", autouse=True) def dask_client(use_ray): """Create a single client for use by all unit test cases.""" if use_ray: From 88e1a56b84071627b02179a087fa73abafa19861 Mon Sep 17 00:00:00 2001 From: Melissa DeLucchi Date: Thu, 18 Jan 2024 12:12:52 -0800 Subject: [PATCH 2/7] Swap index/drop. --- src/hipscat_import/index/map_reduce.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index eabde802..dc1b4114 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -42,7 +42,7 @@ def create_index(args): data = data.set_index(args.indexing_column, drop=False, divisions=args.divisions) else: data = data.set_index(args.indexing_column, drop=False) - data = data.drop_duplicates().drop(columns=[args.indexing_column]) + data = data.reset_index().drop_duplicates() result = data.to_parquet( path=index_dir, engine="pyarrow", From a0db70d7a5f639eaf20331b284ecb9f152c42c7b Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 18 Jan 2024 15:24:25 -0500 Subject: [PATCH 3/7] Index metadata. --- src/hipscat_import/index/arguments.py | 1 - tests/hipscat_import/index/test_run_index.py | 1 - 2 files changed, 2 deletions(-) diff --git a/src/hipscat_import/index/arguments.py b/src/hipscat_import/index/arguments.py index c3f34e6f..6dc068b8 100644 --- a/src/hipscat_import/index/arguments.py +++ b/src/hipscat_import/index/arguments.py @@ -27,7 +27,6 @@ class IndexArguments(RuntimeArguments): compute_partition_size: int = 1_000_000_000 divisions: Optional[List] = None - def __post_init__(self): self._check_arguments() diff --git a/tests/hipscat_import/index/test_run_index.py b/tests/hipscat_import/index/test_run_index.py index 6ebace01..d16f6e41 100644 --- a/tests/hipscat_import/index/test_run_index.py +++ b/tests/hipscat_import/index/test_run_index.py @@ -114,7 +114,6 @@ def test_run_index_on_source( @pytest.mark.dask def test_run_index_on_source_object_id( small_sky_source_catalog, - dask_client, # pylint: disable=unused-argument tmp_path, assert_parquet_file_index, ): From d1d6ad2bc00ada7bf663aac94806a9a1d19d78fd Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Fri, 19 Jan 2024 09:13:32 -0500 Subject: [PATCH 4/7] Make de-duplication optional. --- src/hipscat_import/index/arguments.py | 12 ++++++++ src/hipscat_import/index/map_reduce.py | 28 ++++++++++++++++--- tests/conftest.py | 2 +- .../test_verification_arguments.py | 5 +++- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/hipscat_import/index/arguments.py b/src/hipscat_import/index/arguments.py index 6dc068b8..7b96a455 100644 --- a/src/hipscat_import/index/arguments.py +++ b/src/hipscat_import/index/arguments.py @@ -22,10 +22,22 @@ class IndexArguments(RuntimeArguments): ## Output include_hipscat_index: bool = True + """Include the hipscat spatial partition index.""" include_order_pixel: bool = True + """Include partitioning columns, Norder, Dir, and Npix. You probably want to keep these!""" + drop_duplicates: bool = True + """Should we check for duplicate rows (including new indexing column), + and remove duplicates before writing to new index catalog? + If you know that your data will not have duplicates (e.g. an index over + a unique primary key), set to False to avoid unnecessary work.""" + ## Compute parameters compute_partition_size: int = 1_000_000_000 + """partition size used when creating leaf parquet files.""" divisions: Optional[List] = None + """Hints used when splitting up the rows by the new index. If you have + some prior knowledge about the distribution of your indexing_column, + providing it here can speed up calculations dramatically.""" def __post_init__(self): self._check_arguments() diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index dc1b4114..e7633d4d 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -31,18 +31,38 @@ def create_index(args): ) if args.include_order_pixel: - ## Take out the hive dictionary behavior. + ## Take out the hive dictionary behavior that turns these into int32. data["Norder"] = data["Norder"].astype(np.uint8) data["Dir"] = data["Dir"].astype(np.uint64) data["Npix"] = data["Npix"].astype(np.uint64) + + # There are some silly dask things happening here: + # - Turn the existing index column into a regular column + # - If that had been the _hipscat_index, and we don't want it anymore, drop it + # - Create a new index, using our target indexing_column. + # Use division hints if provided. + # - Then re-partition the whole thing according to compute size, + # because the following de-duplication step requires smaller chunks. + # - Drop duplicate rows (but reset the index so it can be included in + # duplicate-finding) data = data.reset_index() if not args.include_hipscat_index: data = data.drop(columns=[HIPSCAT_ID_COLUMN]) + if args.divisions is not None and len(args.divisions) > 2: - data = data.set_index(args.indexing_column, drop=False, divisions=args.divisions) + data = data.set_index(args.indexing_column, divisions=args.divisions) else: - data = data.set_index(args.indexing_column, drop=False) - data = data.reset_index().drop_duplicates() + data = data.set_index(args.indexing_column) + + if args.drop_duplicates: + data = ( + data.repartition(partition_size=1_000_000_000) + .reset_index() + .drop_duplicates() + .set_index(args.indexing_column, sorted=True) + ) + + # Now just write it out to leaf parquet files! result = data.to_parquet( path=index_dir, engine="pyarrow", diff --git a/tests/conftest.py b/tests/conftest.py index 247a72ce..05109d36 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,7 +15,7 @@ def use_ray(request): return request.config.getoption("--use_ray") -@pytest.fixture(scope="session", name="dask_client", autouse=True) +@pytest.fixture(scope="session", name="dask_client") def dask_client(use_ray): """Create a single client for use by all unit test cases.""" if use_ray: diff --git a/tests/hipscat_import/verification/test_verification_arguments.py b/tests/hipscat_import/verification/test_verification_arguments.py index 44094ed7..8ebd6c81 100644 --- a/tests/hipscat_import/verification/test_verification_arguments.py +++ b/tests/hipscat_import/verification/test_verification_arguments.py @@ -67,8 +67,11 @@ def test_catalog_object(tmp_path, small_sky_object_catalog): assert str(args.tmp_path).startswith(tmp_path_str) +@pytest.mark.timeout(5) def test_provenance_info(small_sky_object_catalog, tmp_path): - """Verify that provenance info includes verification-specific fields.""" + """Verify that provenance info includes verification-specific fields. + NB: This is currently the last test in alpha-order, and may require additional + time to teardown fixtures.""" args = VerificationArguments( input_catalog_path=small_sky_object_catalog, output_path=tmp_path, From 220dae536dbc2866427c10a960f15ad80c466d8f Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Fri, 19 Jan 2024 13:32:46 -0500 Subject: [PATCH 5/7] Add another repartition and test case. --- src/hipscat_import/index/map_reduce.py | 14 ++++-- .../index/test_index_map_reduce.py | 50 +++++++++++++++++-- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index e7633d4d..78c848a6 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -41,10 +41,6 @@ def create_index(args): # - If that had been the _hipscat_index, and we don't want it anymore, drop it # - Create a new index, using our target indexing_column. # Use division hints if provided. - # - Then re-partition the whole thing according to compute size, - # because the following de-duplication step requires smaller chunks. - # - Drop duplicate rows (but reset the index so it can be included in - # duplicate-finding) data = data.reset_index() if not args.include_hipscat_index: data = data.drop(columns=[HIPSCAT_ID_COLUMN]) @@ -55,6 +51,15 @@ def create_index(args): data = data.set_index(args.indexing_column) if args.drop_duplicates: + # More dask things: + # - Repartition the whole dataset to account for limited memory in + # pyarrow in the drop_duplicates implementation ( + # "array cannot contain more than 2147483646 bytes") + # - Reset the index, so the indexing_column values can be considered + # when de-duping. + # - Drop duplicate rows + # - Set the index back to our indexing_column, but this time, the + # values are still sorted so it's cheaper. data = ( data.repartition(partition_size=1_000_000_000) .reset_index() @@ -63,6 +68,7 @@ def create_index(args): ) # Now just write it out to leaf parquet files! + data = data.repartition(partition_size=args.compute_partition_size) result = data.to_parquet( path=index_dir, engine="pyarrow", diff --git a/tests/hipscat_import/index/test_index_map_reduce.py b/tests/hipscat_import/index/test_index_map_reduce.py index f876e131..46f580de 100644 --- a/tests/hipscat_import/index/test_index_map_reduce.py +++ b/tests/hipscat_import/index/test_index_map_reduce.py @@ -45,7 +45,7 @@ def test_create_index( @pytest.mark.dask def test_create_index_no_hipscat_index(small_sky_object_catalog, tmp_path): - """Create an index for simple object catalog""" + """Create an index for simple object catalog, without the _hipscat_index field.""" args = IndexArguments( input_catalog_path=small_sky_object_catalog, indexing_column="id", @@ -66,7 +66,8 @@ def test_create_index_no_hipscat_index(small_sky_object_catalog, tmp_path): @pytest.mark.dask def test_create_index_no_order_pixel(small_sky_object_catalog, tmp_path): - """Create an index for simple object catalog""" + """Create an index for simple object catalog, without the partitioning columns, + Norder, Dir, and Npix.""" args = IndexArguments( input_catalog_path=small_sky_object_catalog, indexing_column="id", @@ -91,13 +92,51 @@ def test_create_index_source( assert_parquet_file_index, tmp_path, ): - """test stuff""" + """Create simple index for the source table.""" + args = IndexArguments( + input_catalog_path=small_sky_source_catalog, + indexing_column="source_id", + output_path=tmp_path, + output_artifact_name="small_sky_source_index", + overwrite=True, + progress_bar=False, + ) + mr.create_index(args) + + output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet") + + expected_ids = [*range(70_000, 87_161)] + assert_parquet_file_index(output_file, expected_ids) + + data_frame = pd.read_parquet(output_file, engine="pyarrow") + npt.assert_array_equal( + data_frame.columns, + ["_hipscat_index", "Norder", "Dir", "Npix"], + ) + assert data_frame.index.name == "source_id" + assert len(data_frame) == 17161 + assert np.logical_and(data_frame["Norder"] >= 0, data_frame["Norder"] <= 2).all() + +@pytest.mark.dask +def test_create_index_with_divisions( + small_sky_source_catalog, + assert_parquet_file_index, + tmp_path, +): + """Create an index catalog for the large(r) source catalog, passing + some divisions hints. This should partition the final output according to + the `compute_partition_size` and not the provided divisions.""" + divisions = np.arange(start = 70_000, stop = 87_161, step = 5_000) + divisions = np.append(divisions, 87_161).tolist() + args = IndexArguments( input_catalog_path=small_sky_source_catalog, indexing_column="source_id", output_path=tmp_path, output_artifact_name="small_sky_source_index", overwrite=True, + divisions=divisions, + drop_duplicates=False, progress_bar=False, ) mr.create_index(args) @@ -123,7 +162,8 @@ def test_create_index_source_by_object( assert_parquet_file_index, tmp_path, ): - """test stuff""" + """Create an index for the source table, using the source's object ID + as the indexing key.""" args = IndexArguments( input_catalog_path=small_sky_source_catalog, indexing_column="object_id", @@ -154,7 +194,7 @@ def test_create_index_extra_columns( assert_parquet_file_index, tmp_path, ): - """test stuff""" + """Create an index with some additional payload columns.""" args = IndexArguments( input_catalog_path=small_sky_source_catalog, indexing_column="object_id", From 7bc2b54a7318f91c4b19365291aa5b7dde3bd7d4 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Fri, 19 Jan 2024 13:36:50 -0500 Subject: [PATCH 6/7] More comments! --- src/hipscat_import/index/map_reduce.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index 78c848a6..2bd53d24 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -48,6 +48,8 @@ def create_index(args): if args.divisions is not None and len(args.divisions) > 2: data = data.set_index(args.indexing_column, divisions=args.divisions) else: + # Try to avoid this! It's expensive! See: + # https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.set_index.html data = data.set_index(args.indexing_column) if args.drop_duplicates: From 56f1edcbb7c6454b35628cf34bb7a65f948d972b Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Tue, 30 Jan 2024 15:44:32 -0500 Subject: [PATCH 7/7] Rename divisions->division_hints --- src/hipscat_import/index/arguments.py | 6 ++++-- src/hipscat_import/index/map_reduce.py | 9 +++++---- tests/hipscat_import/index/test_index_map_reduce.py | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/hipscat_import/index/arguments.py b/src/hipscat_import/index/arguments.py index 7b96a455..60b773f0 100644 --- a/src/hipscat_import/index/arguments.py +++ b/src/hipscat_import/index/arguments.py @@ -34,10 +34,12 @@ class IndexArguments(RuntimeArguments): ## Compute parameters compute_partition_size: int = 1_000_000_000 """partition size used when creating leaf parquet files.""" - divisions: Optional[List] = None + division_hints: Optional[List] = None """Hints used when splitting up the rows by the new index. If you have some prior knowledge about the distribution of your indexing_column, - providing it here can speed up calculations dramatically.""" + providing it here can speed up calculations dramatically. Note that + these will NOT necessarily be the divisions that the data is partitioned + along.""" def __post_init__(self): self._check_arguments() diff --git a/src/hipscat_import/index/map_reduce.py b/src/hipscat_import/index/map_reduce.py index 2bd53d24..4b08cfed 100644 --- a/src/hipscat_import/index/map_reduce.py +++ b/src/hipscat_import/index/map_reduce.py @@ -45,8 +45,8 @@ def create_index(args): if not args.include_hipscat_index: data = data.drop(columns=[HIPSCAT_ID_COLUMN]) - if args.divisions is not None and len(args.divisions) > 2: - data = data.set_index(args.indexing_column, divisions=args.divisions) + if args.division_hints is not None and len(args.division_hints) > 2: + data = data.set_index(args.indexing_column, divisions=args.division_hints) else: # Try to avoid this! It's expensive! See: # https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.set_index.html @@ -66,11 +66,12 @@ def create_index(args): data.repartition(partition_size=1_000_000_000) .reset_index() .drop_duplicates() - .set_index(args.indexing_column, sorted=True) + .set_index(args.indexing_column, sorted=True, partition_size=args.compute_partition_size) ) + else: + data = data.repartition(partition_size=args.compute_partition_size) # Now just write it out to leaf parquet files! - data = data.repartition(partition_size=args.compute_partition_size) result = data.to_parquet( path=index_dir, engine="pyarrow", diff --git a/tests/hipscat_import/index/test_index_map_reduce.py b/tests/hipscat_import/index/test_index_map_reduce.py index 46f580de..6685b661 100644 --- a/tests/hipscat_import/index/test_index_map_reduce.py +++ b/tests/hipscat_import/index/test_index_map_reduce.py @@ -135,7 +135,7 @@ def test_create_index_with_divisions( output_path=tmp_path, output_artifact_name="small_sky_source_index", overwrite=True, - divisions=divisions, + division_hints=divisions, drop_duplicates=False, progress_bar=False, )