Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance improvements for index generation #202

Merged
merged 8 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/hipscat_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,22 @@ class IndexArguments(RuntimeArguments):

## Output
include_hipscat_index: bool = True
"""Include the hipscat spatial partition index."""
include_order_pixel: bool = True
"""Include partitioning columns, Norder, Dir, and Npix. You probably want to keep these!"""
drop_duplicates: bool = True
"""Should we check for duplicate rows (including new indexing column),
and remove duplicates before writing to new index catalog?
If you know that your data will not have duplicates (e.g. an index over
a unique primary key), set to False to avoid unnecessary work."""

## Compute parameters
compute_partition_size: int = 1_000_000_000
"""partition size used when creating leaf parquet files."""
divisions: Optional[List] = None
"""Hints used when splitting up the rows by the new index. If you have
some prior knowledge about the distribution of your indexing_column,
providing it here can speed up calculations dramatically."""

def __post_init__(self):
self._check_arguments()
Expand Down
34 changes: 31 additions & 3 deletions src/hipscat_import/index/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,44 @@ def create_index(args):
)

if args.include_order_pixel:
## Take out the hive dictionary behavior.
## Take out the hive dictionary behavior that turns these into int32.
data["Norder"] = data["Norder"].astype(np.uint8)
data["Dir"] = data["Dir"].astype(np.uint64)
data["Npix"] = data["Npix"].astype(np.uint64)

# There are some silly dask things happening here:
# - Turn the existing index column into a regular column
# - If that had been the _hipscat_index, and we don't want it anymore, drop it
# - Create a new index, using our target indexing_column.
# Use division hints if provided.
data = data.reset_index()
if not args.include_hipscat_index:
data = data.drop(columns=[HIPSCAT_ID_COLUMN])
data = data.drop_duplicates()

if args.divisions is not None and len(args.divisions) > 2:
data = data.set_index(args.indexing_column, divisions=args.divisions)
else:
data = data.set_index(args.indexing_column)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved

if args.drop_duplicates:
# More dask things:
# - Repartition the whole dataset to account for limited memory in
# pyarrow in the drop_duplicates implementation (
# "array cannot contain more than 2147483646 bytes")
# - Reset the index, so the indexing_column values can be considered
# when de-duping.
# - Drop duplicate rows
# - Set the index back to our indexing_column, but this time, the
# values are still sorted so it's cheaper.
data = (
data.repartition(partition_size=1_000_000_000)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
.reset_index()
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
.drop_duplicates()
.set_index(args.indexing_column, sorted=True)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
)

# Now just write it out to leaf parquet files!
data = data.repartition(partition_size=args.compute_partition_size)
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
data = data.set_index(args.indexing_column)
result = data.to_parquet(
path=index_dir,
engine="pyarrow",
Expand Down
50 changes: 45 additions & 5 deletions tests/hipscat_import/index/test_index_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_create_index(

@pytest.mark.dask
def test_create_index_no_hipscat_index(small_sky_object_catalog, tmp_path):
"""Create an index for simple object catalog"""
"""Create an index for simple object catalog, without the _hipscat_index field."""
args = IndexArguments(
input_catalog_path=small_sky_object_catalog,
indexing_column="id",
Expand All @@ -66,7 +66,8 @@ def test_create_index_no_hipscat_index(small_sky_object_catalog, tmp_path):

@pytest.mark.dask
def test_create_index_no_order_pixel(small_sky_object_catalog, tmp_path):
"""Create an index for simple object catalog"""
"""Create an index for simple object catalog, without the partitioning columns,
Norder, Dir, and Npix."""
args = IndexArguments(
input_catalog_path=small_sky_object_catalog,
indexing_column="id",
Expand All @@ -91,13 +92,51 @@ def test_create_index_source(
assert_parquet_file_index,
tmp_path,
):
"""test stuff"""
"""Create simple index for the source table."""
args = IndexArguments(
input_catalog_path=small_sky_source_catalog,
indexing_column="source_id",
output_path=tmp_path,
output_artifact_name="small_sky_source_index",
overwrite=True,
progress_bar=False,
)
mr.create_index(args)

output_file = os.path.join(tmp_path, "small_sky_source_index", "index", "part.0.parquet")

expected_ids = [*range(70_000, 87_161)]
assert_parquet_file_index(output_file, expected_ids)

data_frame = pd.read_parquet(output_file, engine="pyarrow")
npt.assert_array_equal(
data_frame.columns,
["_hipscat_index", "Norder", "Dir", "Npix"],
)
assert data_frame.index.name == "source_id"
assert len(data_frame) == 17161
assert np.logical_and(data_frame["Norder"] >= 0, data_frame["Norder"] <= 2).all()

@pytest.mark.dask
def test_create_index_with_divisions(
small_sky_source_catalog,
assert_parquet_file_index,
tmp_path,
):
"""Create an index catalog for the large(r) source catalog, passing
some divisions hints. This should partition the final output according to
the `compute_partition_size` and not the provided divisions."""
divisions = np.arange(start = 70_000, stop = 87_161, step = 5_000)
divisions = np.append(divisions, 87_161).tolist()

args = IndexArguments(
input_catalog_path=small_sky_source_catalog,
indexing_column="source_id",
output_path=tmp_path,
output_artifact_name="small_sky_source_index",
overwrite=True,
divisions=divisions,
drop_duplicates=False,
progress_bar=False,
)
mr.create_index(args)
Expand All @@ -123,7 +162,8 @@ def test_create_index_source_by_object(
assert_parquet_file_index,
tmp_path,
):
"""test stuff"""
"""Create an index for the source table, using the source's object ID
as the indexing key."""
args = IndexArguments(
input_catalog_path=small_sky_source_catalog,
indexing_column="object_id",
Expand Down Expand Up @@ -154,7 +194,7 @@ def test_create_index_extra_columns(
assert_parquet_file_index,
tmp_path,
):
"""test stuff"""
"""Create an index with some additional payload columns."""
args = IndexArguments(
input_catalog_path=small_sky_source_catalog,
indexing_column="object_id",
Expand Down
1 change: 0 additions & 1 deletion tests/hipscat_import/index/test_run_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ def test_run_index_on_source(
@pytest.mark.dask
def test_run_index_on_source_object_id(
small_sky_source_catalog,
dask_client, # pylint: disable=unused-argument
tmp_path,
assert_parquet_file_index,
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ def test_catalog_object(tmp_path, small_sky_object_catalog):
assert str(args.tmp_path).startswith(tmp_path_str)


@pytest.mark.timeout(5)
def test_provenance_info(small_sky_object_catalog, tmp_path):
"""Verify that provenance info includes verification-specific fields."""
"""Verify that provenance info includes verification-specific fields.
NB: This is currently the last test in alpha-order, and may require additional
time to teardown fixtures."""
args = VerificationArguments(
input_catalog_path=small_sky_object_catalog,
output_path=tmp_path,
Expand Down