Skip to content

Commit

Permalink
Make de-duplication optional.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Jan 19, 2024
1 parent a0db70d commit d1d6ad2
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 6 deletions.
12 changes: 12 additions & 0 deletions src/hipscat_import/index/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,22 @@ class IndexArguments(RuntimeArguments):

## Output
include_hipscat_index: bool = True
"""Include the hipscat spatial partition index."""
include_order_pixel: bool = True
"""Include partitioning columns, Norder, Dir, and Npix. You probably want to keep these!"""
drop_duplicates: bool = True
"""Should we check for duplicate rows (including new indexing column),
and remove duplicates before writing to new index catalog?
If you know that your data will not have duplicates (e.g. an index over
a unique primary key), set to False to avoid unnecessary work."""

## Compute parameters
compute_partition_size: int = 1_000_000_000
"""partition size used when creating leaf parquet files."""
divisions: Optional[List] = None
"""Hints used when splitting up the rows by the new index. If you have
some prior knowledge about the distribution of your indexing_column,
providing it here can speed up calculations dramatically."""

def __post_init__(self):
self._check_arguments()
Expand Down
28 changes: 24 additions & 4 deletions src/hipscat_import/index/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,38 @@ def create_index(args):
)

if args.include_order_pixel:
## Take out the hive dictionary behavior.
## Take out the hive dictionary behavior that turns these into int32.
data["Norder"] = data["Norder"].astype(np.uint8)
data["Dir"] = data["Dir"].astype(np.uint64)
data["Npix"] = data["Npix"].astype(np.uint64)

# There are some silly dask things happening here:
# - Turn the existing index column into a regular column
# - If that had been the _hipscat_index, and we don't want it anymore, drop it
# - Create a new index, using our target indexing_column.
# Use division hints if provided.
# - Then re-partition the whole thing according to compute size,
# because the following de-duplication step requires smaller chunks.
# - Drop duplicate rows (but reset the index so it can be included in
# duplicate-finding)
data = data.reset_index()
if not args.include_hipscat_index:
data = data.drop(columns=[HIPSCAT_ID_COLUMN])

if args.divisions is not None and len(args.divisions) > 2:
data = data.set_index(args.indexing_column, drop=False, divisions=args.divisions)
data = data.set_index(args.indexing_column, divisions=args.divisions)

Check warning on line 53 in src/hipscat_import/index/map_reduce.py

View check run for this annotation

Codecov / codecov/patch

src/hipscat_import/index/map_reduce.py#L53

Added line #L53 was not covered by tests
else:
data = data.set_index(args.indexing_column, drop=False)
data = data.reset_index().drop_duplicates()
data = data.set_index(args.indexing_column)

if args.drop_duplicates:
data = (
data.repartition(partition_size=1_000_000_000)
.reset_index()
.drop_duplicates()
.set_index(args.indexing_column, sorted=True)
)

# Now just write it out to leaf parquet files!
result = data.to_parquet(
path=index_dir,
engine="pyarrow",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def use_ray(request):
return request.config.getoption("--use_ray")


@pytest.fixture(scope="session", name="dask_client", autouse=True)
@pytest.fixture(scope="session", name="dask_client")
def dask_client(use_ray):
"""Create a single client for use by all unit test cases."""
if use_ray:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ def test_catalog_object(tmp_path, small_sky_object_catalog):
assert str(args.tmp_path).startswith(tmp_path_str)


@pytest.mark.timeout(5)
def test_provenance_info(small_sky_object_catalog, tmp_path):
"""Verify that provenance info includes verification-specific fields."""
"""Verify that provenance info includes verification-specific fields.
NB: This is currently the last test in alpha-order, and may require additional
time to teardown fixtures."""
args = VerificationArguments(
input_catalog_path=small_sky_object_catalog,
output_path=tmp_path,
Expand Down

0 comments on commit d1d6ad2

Please sign in to comment.