diff --git a/src/hats_import/catalog/file_readers.py b/src/hats_import/catalog/file_readers.py index 40c05be5..68a1fafc 100644 --- a/src/hats_import/catalog/file_readers.py +++ b/src/hats_import/catalog/file_readers.py @@ -3,7 +3,7 @@ import abc import pandas as pd -import pyarrow +import pyarrow as pa import pyarrow.dataset import pyarrow.parquet as pq from astropy.io import ascii as ascii_reader @@ -356,6 +356,34 @@ def read(self, input_file, read_columns=None): yield smaller_table.to_pandas() +class ParquetPyarrowReader(InputReader): + """Parquet reader for the most common Parquet reading arguments. + + Attributes: + chunksize (int): number of rows of the file to process at once. + For large files, this can prevent loading the entire file + into memory at once. + column_names (list[str] or None): Names of columns to use from the input dataset. + If None, use all columns. + kwargs: arguments to pass along to pyarrow.parquet.ParquetFile. + See https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html + """ + + def __init__(self, chunksize=500_000, column_names=None, **kwargs): + self.chunksize = chunksize + self.column_names = column_names + self.kwargs = kwargs + + def read(self, input_file, read_columns=None): + self.regular_file_exists(input_file, **self.kwargs) + columns = read_columns or self.column_names + parquet_file = pq.ParquetFile(input_file, **self.kwargs) + for smaller_table in parquet_file.iter_batches(batch_size=self.chunksize, columns=columns): + table = pa.Table.from_batches([smaller_table]) + table = table.replace_schema_metadata() + yield table + + class IndexedParquetReader(InputReader): """Reads an index file, containing paths to parquet files to be read and batched diff --git a/src/hats_import/catalog/map_reduce.py b/src/hats_import/catalog/map_reduce.py index 24444929..8822bde8 100644 --- a/src/hats_import/catalog/map_reduce.py +++ b/src/hats_import/catalog/map_reduce.py @@ -4,6 +4,7 @@ import hats.pixel_math.healpix_shim as hp import numpy as np +import pandas as pd import pyarrow as pa import pyarrow.parquet as pq from hats import pixel_math @@ -50,7 +51,7 @@ def _iterate_input_file( for chunk_number, data in enumerate(file_reader.read(input_file, read_columns=read_columns)): if use_healpix_29: - if data.index.name == SPATIAL_INDEX_COLUMN: + if isinstance(data, pd.DataFrame) and data.index.name == SPATIAL_INDEX_COLUMN: mapped_pixels = spatial_index_to_healpix(data.index, target_order=highest_order) else: mapped_pixels = spatial_index_to_healpix( @@ -58,13 +59,22 @@ def _iterate_input_file( ) else: # Set up the pixel data - mapped_pixels = hp.ang2pix( - 2**highest_order, - data[ra_column].to_numpy(copy=False, dtype=float), - data[dec_column].to_numpy(copy=False, dtype=float), - lonlat=True, - nest=True, - ) + if isinstance(data, pd.DataFrame): + mapped_pixels = hp.ang2pix( + 2**highest_order, + data[ra_column].to_numpy(copy=False, dtype=float), + data[dec_column].to_numpy(copy=False, dtype=float), + lonlat=True, + nest=True, + ) + else: + mapped_pixels = hp.ang2pix( + 2**highest_order, + data[ra_column].to_numpy(), + data[dec_column].to_numpy(), + lonlat=True, + nest=True, + ) yield chunk_number, data, mapped_pixels @@ -168,17 +178,20 @@ def split_pixels( unique_pixels, unique_inverse = np.unique(aligned_pixels, return_inverse=True) for unique_index, [order, pixel, _] in enumerate(unique_pixels): - filtered_data = data.iloc[unique_inverse == unique_index] - pixel_dir = get_pixel_cache_directory(cache_shard_path, HealpixPixel(order, pixel)) file_io.make_directory(pixel_dir, exist_ok=True) output_file = file_io.append_paths_to_pointer( pixel_dir, f"shard_{splitting_key}_{chunk_number}.parquet" ) - if _has_named_index(filtered_data): - filtered_data.to_parquet(output_file.path, index=True, filesystem=output_file.fs) + if isinstance(data, pd.DataFrame): + filtered_data = data.iloc[unique_inverse == unique_index] + if _has_named_index(filtered_data): + filtered_data = filtered_data.reset_index() + filtered_data = pa.Table.from_pandas(filtered_data, preserve_index=False) else: - filtered_data.to_parquet(output_file.path, index=False, filesystem=output_file.fs) + filtered_data = data.filter(unique_inverse == unique_index) + + pq.write_table(filtered_data, output_file.path, filesystem=output_file.fs) del filtered_data ResumePlan.splitting_key_done(tmp_path=resume_path, splitting_key=splitting_key) @@ -258,15 +271,10 @@ def reduce_pixel_shards( if use_schema_file: schema = file_io.read_parquet_metadata(use_schema_file).schema.to_arrow_schema() - tables = [] + healpix_pixel = HealpixPixel(destination_pixel_order, destination_pixel_number) pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) - if schema: - tables.append(pq.read_table(pixel_dir, schema=schema)) - else: - tables.append(pq.read_table(pixel_dir)) - - merged_table = pa.concat_tables(tables) + merged_table = pq.read_table(pixel_dir, schema=schema) rows_written = len(merged_table) @@ -277,38 +285,36 @@ def reduce_pixel_shards( f" Expected {destination_pixel_size}, wrote {rows_written}" ) - dataframe = merged_table.to_pandas() if sort_columns: - dataframe = dataframe.sort_values(sort_columns.split(","), kind="stable") + split_columns = sort_columns.split(",") + if len(split_columns) > 1: + merged_table = merged_table.sort_by([(col_name, "ascending") for col_name in split_columns]) + else: + merged_table = merged_table.sort_by(sort_columns) if add_healpix_29: - ## If we had a meaningful index before, preserve it as a column. - if _has_named_index(dataframe): - dataframe = dataframe.reset_index() - - dataframe[SPATIAL_INDEX_COLUMN] = pixel_math.compute_spatial_index( - dataframe[ra_column].values, - dataframe[dec_column].values, - ) - dataframe = dataframe.set_index(SPATIAL_INDEX_COLUMN).sort_index(kind="stable") - - # Adjust the schema to make sure that the _healpix_29 will - # be saved as a uint64 + merged_table = merged_table.add_column( + 0, + SPATIAL_INDEX_COLUMN, + [ + pixel_math.compute_spatial_index( + merged_table[ra_column].to_numpy(), + merged_table[dec_column].to_numpy(), + ) + ], + ).sort_by(SPATIAL_INDEX_COLUMN) elif use_healpix_29: - if dataframe.index.name != SPATIAL_INDEX_COLUMN: - dataframe = dataframe.set_index(SPATIAL_INDEX_COLUMN) - dataframe = dataframe.sort_index(kind="stable") + merged_table = merged_table.sort_by(SPATIAL_INDEX_COLUMN) - dataframe["Norder"] = np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8) - dataframe["Dir"] = np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64) - dataframe["Npix"] = np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64) - - if schema: - schema = _modify_arrow_schema(schema, add_healpix_29) - dataframe.to_parquet(destination_file.path, schema=schema, filesystem=destination_file.fs) - else: - dataframe.to_parquet(destination_file.path, filesystem=destination_file.fs) + merged_table = ( + merged_table.append_column( + "Norder", [np.full(rows_written, fill_value=healpix_pixel.order, dtype=np.uint8)] + ) + .append_column("Dir", [np.full(rows_written, fill_value=healpix_pixel.dir, dtype=np.uint64)]) + .append_column("Npix", [np.full(rows_written, fill_value=healpix_pixel.pixel, dtype=np.uint64)]) + ) - del dataframe, merged_table, tables + pq.write_table(merged_table, destination_file.path, filesystem=destination_file.fs) + del merged_table if delete_input_files: pixel_dir = get_pixel_cache_directory(cache_shard_path, healpix_pixel) @@ -322,18 +328,3 @@ def reduce_pixel_shards( exception, ) raise exception - - -def _modify_arrow_schema(schema, add_healpix_29): - if add_healpix_29: - pandas_index_column = schema.get_field_index("__index_level_0__") - if pandas_index_column != -1: - schema = schema.remove(pandas_index_column) - schema = schema.insert(0, pa.field(SPATIAL_INDEX_COLUMN, pa.int64())) - schema = ( - schema.append(pa.field("Norder", pa.uint8())) - .append(pa.field("Dir", pa.uint64())) - .append(pa.field("Npix", pa.uint64())) - ) - - return schema diff --git a/src/hats_import/index/map_reduce.py b/src/hats_import/index/map_reduce.py index fef062a7..4077779e 100644 --- a/src/hats_import/index/map_reduce.py +++ b/src/hats_import/index/map_reduce.py @@ -17,8 +17,9 @@ def read_leaf_file(input_file, include_columns, include_healpix_29, drop_duplica schema=schema, ) - data = data.reset_index() - if not include_healpix_29: + if data.index.name == SPATIAL_INDEX_COLUMN: + data = data.reset_index() + if not include_healpix_29 and SPATIAL_INDEX_COLUMN in data.columns: data = data.drop(columns=[SPATIAL_INDEX_COLUMN]) if drop_duplicates: @@ -32,6 +33,8 @@ def create_index(args, client): include_columns = [args.indexing_column] if args.extra_columns: include_columns.extend(args.extra_columns) + if args.include_healpix_29: + include_columns.append(SPATIAL_INDEX_COLUMN) if args.include_order_pixel: include_columns.extend(["Norder", "Dir", "Npix"]) diff --git a/tests/data/small_sky_object_catalog/dataset/Norder=0/Dir=0/Npix=11.parquet b/tests/data/small_sky_object_catalog/dataset/Norder=0/Dir=0/Npix=11.parquet index af44a761..d3217ae2 100644 Binary files a/tests/data/small_sky_object_catalog/dataset/Norder=0/Dir=0/Npix=11.parquet and b/tests/data/small_sky_object_catalog/dataset/Norder=0/Dir=0/Npix=11.parquet differ diff --git a/tests/data/small_sky_object_catalog/dataset/_common_metadata b/tests/data/small_sky_object_catalog/dataset/_common_metadata index b473e52c..ba86110d 100644 Binary files a/tests/data/small_sky_object_catalog/dataset/_common_metadata and b/tests/data/small_sky_object_catalog/dataset/_common_metadata differ diff --git a/tests/data/small_sky_object_catalog/dataset/_metadata b/tests/data/small_sky_object_catalog/dataset/_metadata index b216cda6..4b2440b5 100644 Binary files a/tests/data/small_sky_object_catalog/dataset/_metadata and b/tests/data/small_sky_object_catalog/dataset/_metadata differ diff --git a/tests/data/small_sky_object_catalog/properties b/tests/data/small_sky_object_catalog/properties index 0a7630fc..55d467d6 100644 --- a/tests/data/small_sky_object_catalog/properties +++ b/tests/data/small_sky_object_catalog/properties @@ -7,8 +7,8 @@ hats_col_dec=dec hats_max_rows=1000000 hats_order=0 moc_sky_fraction=0.08333 -hats_builder=hats-import v0.3.6.dev26+g40366b4 -hats_creation_date=2024-10-11T15\:02UTC -hats_estsize=74 +hats_builder=hats-import v0.4.1.dev2+gaeb92ae +hats_creation_date=2024-10-21T13\:22UTC +hats_estsize=70 hats_release_date=2024-09-18 hats_version=v0.1 diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=0/Dir=0/Npix=4.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=0/Dir=0/Npix=4.parquet index e52f7e2e..3f82fdfb 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=0/Dir=0/Npix=4.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=0/Dir=0/Npix=4.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=1/Dir=0/Npix=47.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=1/Dir=0/Npix=47.parquet index ce27aded..aad2a928 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=1/Dir=0/Npix=47.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=1/Dir=0/Npix=47.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=176.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=176.parquet index b2503107..c4153937 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=176.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=176.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=177.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=177.parquet index c49db625..55462bf5 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=177.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=177.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=178.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=178.parquet index fc6d65db..5254a3d7 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=178.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=178.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=179.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=179.parquet index a8a6fea2..b8011d37 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=179.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=179.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=180.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=180.parquet index fb92ec4c..95f1fef4 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=180.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=180.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=181.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=181.parquet index a124074a..37356aae 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=181.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=181.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=182.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=182.parquet index 7693c764..276cf2ca 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=182.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=182.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=183.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=183.parquet index d53d7d38..ccd25883 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=183.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=183.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=184.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=184.parquet index df02fae2..6a7ced69 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=184.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=184.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=185.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=185.parquet index d112aca5..dbcbebd6 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=185.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=185.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=186.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=186.parquet index 3964dc93..bc6c4172 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=186.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=186.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=187.parquet b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=187.parquet index d1ef8c78..d264fe61 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=187.parquet and b/tests/data/small_sky_source_catalog/dataset/Norder=2/Dir=0/Npix=187.parquet differ diff --git a/tests/data/small_sky_source_catalog/dataset/_common_metadata b/tests/data/small_sky_source_catalog/dataset/_common_metadata index 2aa9b2e5..319e0505 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/_common_metadata and b/tests/data/small_sky_source_catalog/dataset/_common_metadata differ diff --git a/tests/data/small_sky_source_catalog/dataset/_metadata b/tests/data/small_sky_source_catalog/dataset/_metadata index c184e061..cca27e7d 100644 Binary files a/tests/data/small_sky_source_catalog/dataset/_metadata and b/tests/data/small_sky_source_catalog/dataset/_metadata differ diff --git a/tests/data/small_sky_source_catalog/properties b/tests/data/small_sky_source_catalog/properties index 8f1b7900..d6d8704f 100644 --- a/tests/data/small_sky_source_catalog/properties +++ b/tests/data/small_sky_source_catalog/properties @@ -7,8 +7,8 @@ hats_col_dec=source_dec hats_max_rows=3000 hats_order=2 moc_sky_fraction=0.16667 -hats_builder=hats-import v0.3.6.dev26+g40366b4 -hats_creation_date=2024-10-11T15\:02UTC -hats_estsize=1105 +hats_builder=hats-import v0.4.1.dev2+gaeb92ae +hats_creation_date=2024-10-21T13\:22UTC +hats_estsize=1083 hats_release_date=2024-09-18 hats_version=v0.1 diff --git a/tests/hats_import/catalog/test_map_reduce.py b/tests/hats_import/catalog/test_map_reduce.py index 4fa6dffd..130431e6 100644 --- a/tests/hats_import/catalog/test_map_reduce.py +++ b/tests/hats_import/catalog/test_map_reduce.py @@ -353,10 +353,9 @@ def test_reduce_healpix_29(parquet_shards_dir, assert_parquet_file_ids, tmp_path expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) data_frame = pd.read_parquet(output_file, engine="pyarrow") - assert data_frame.index.name == "_healpix_29" npt.assert_array_equal( data_frame.columns, - ["id", "ra", "dec", "ra_error", "dec_error", "Norder", "Dir", "Npix"], + ["_healpix_29", "id", "ra", "dec", "ra_error", "dec_error", "Norder", "Dir", "Npix"], ) mr.reduce_pixel_shards( diff --git a/tests/hats_import/catalog/test_run_import.py b/tests/hats_import/catalog/test_run_import.py index afd7f1a1..5cc41ce3 100644 --- a/tests/hats_import/catalog/test_run_import.py +++ b/tests/hats_import/catalog/test_run_import.py @@ -278,6 +278,7 @@ def test_dask_runner( # Check that the schema is correct for leaf parquet and _metadata files expected_parquet_schema = pa.schema( [ + pa.field("_healpix_29", pa.int64()), pa.field("id", pa.int64()), pa.field("ra", pa.float32()), pa.field("dec", pa.float32()), @@ -286,7 +287,6 @@ def test_dask_runner( pa.field("Norder", pa.uint8()), pa.field("Dir", pa.uint64()), pa.field("Npix", pa.uint64()), - pa.field("_healpix_29", pa.int64()), ] ) schema = pq.read_metadata(output_file).schema.to_arrow_schema() @@ -298,6 +298,7 @@ def test_dask_runner( data_frame = pd.read_parquet(output_file, engine="pyarrow") expected_dtypes = pd.Series( { + "_healpix_29": np.int64, "id": np.int64, "ra": np.float32, "dec": np.float32, diff --git a/tests/hats_import/catalog/test_run_round_trip.py b/tests/hats_import/catalog/test_run_round_trip.py index 2276a824..28f80fbb 100644 --- a/tests/hats_import/catalog/test_run_round_trip.py +++ b/tests/hats_import/catalog/test_run_round_trip.py @@ -17,10 +17,11 @@ import pytest from hats.catalog.catalog import Catalog from hats.pixel_math.spatial_index import spatial_index_to_healpix +from pyarrow import csv import hats_import.catalog.run_import as runner from hats_import.catalog.arguments import ImportArguments -from hats_import.catalog.file_readers import CsvReader, get_file_reader +from hats_import.catalog.file_readers import CsvReader, ParquetPyarrowReader, get_file_reader @pytest.mark.dask @@ -101,6 +102,7 @@ def test_import_mixed_schema_csv( # Check that the schema is correct for leaf parquet and _metadata files expected_parquet_schema = pa.schema( [ + pa.field("_healpix_29", pa.int64()), pa.field("id", pa.int64()), pa.field("ra", pa.float64()), pa.field("dec", pa.float64()), @@ -111,7 +113,6 @@ def test_import_mixed_schema_csv( pa.field("Norder", pa.uint8()), pa.field("Dir", pa.uint64()), pa.field("Npix", pa.uint64()), - pa.field("_healpix_29", pa.int64()), ] ) schema = pq.read_metadata(output_file).schema.to_arrow_schema() @@ -130,9 +131,8 @@ def test_import_preserve_index( ): """Test basic execution, with input with pandas metadata. - the input file is a parquet file with some pandas metadata. - this verifies that the parquet file at the end also has pandas - metadata, and the user's preferred id is retained as the index, - when requested. + this verifies that the parquet file at the end also still has the + contents of that column. """ expected_indexes = [ @@ -171,12 +171,10 @@ def test_import_preserve_index( # Check that the catalog parquet file exists output_file = os.path.join(args.catalog_path, "dataset", "Norder=0", "Dir=0", "Npix=11.parquet") - assert_parquet_file_index(output_file, expected_indexes) data_frame = pd.read_parquet(output_file, engine="pyarrow") - assert data_frame.index.name == "obs_id" npt.assert_array_equal( data_frame.columns, - ["obj_id", "band", "ra", "dec", "mag", "Norder", "Dir", "Npix"], + ["obs_id", "obj_id", "band", "ra", "dec", "mag", "Norder", "Dir", "Npix"], ) ## DO generate a hats spatial index. Verify that the original index is preserved in a column. @@ -198,10 +196,9 @@ def test_import_preserve_index( output_file = os.path.join(args.catalog_path, "dataset", "Norder=0", "Dir=0", "Npix=11.parquet") data_frame = pd.read_parquet(output_file, engine="pyarrow") - assert data_frame.index.name == "_healpix_29" npt.assert_array_equal( data_frame.columns, - ["obs_id", "obj_id", "band", "ra", "dec", "mag", "Norder", "Dir", "Npix"], + ["_healpix_29", "obs_id", "obj_id", "band", "ra", "dec", "mag", "Norder", "Dir", "Npix"], ) assert_parquet_file_ids(output_file, "obs_id", expected_indexes) @@ -488,6 +485,167 @@ def test_import_starr_file( assert_parquet_file_ids(output_file, "id", expected_ids) +class PyarrowCsvReader(CsvReader): + """Use pyarrow for CSV reading, and force some pyarrow dtypes. + Return a pyarrow table instead of pd.DataFrame.""" + + def read(self, input_file, read_columns=None): + table = csv.read_csv(input_file) + extras = pa.array([[True, False, True]] * len(table), type=pa.list_(pa.bool_(), 3)) + table = table.append_column("extras", extras) + yield table + + +@pytest.mark.dask +def test_import_pyarrow_types( + dask_client, + small_sky_single_file, + assert_parquet_file_ids, + tmp_path, +): + """Test basic execution. + - tests that we can run pipeline with a totally unknown file type, so long + as a valid InputReader implementation is provided. + """ + + args = ImportArguments( + output_artifact_name="pyarrow_dtype", + input_file_list=[small_sky_single_file], + file_reader=PyarrowCsvReader(), + output_path=tmp_path, + dask_tmp=tmp_path, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_hats(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = args.catalog_path / "dataset" / "Norder=0" / "Dir=0" / "Npix=11.parquet" + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + + expected_parquet_schema = pa.schema( + [ + pa.field("_healpix_29", pa.int64()), + pa.field("id", pa.int64()), + pa.field("ra", pa.float64()), + pa.field("dec", pa.float64()), + pa.field("ra_error", pa.int64()), + pa.field("dec_error", pa.int64()), + pa.field("extras", pa.list_(pa.bool_(), 3)), # The 3 is the length for `fixed_size_list` + pa.field("Norder", pa.uint8()), + pa.field("Dir", pa.uint64()), + pa.field("Npix", pa.uint64()), + ] + ) + schema = pq.read_metadata(output_file).schema.to_arrow_schema() + assert schema.equals(expected_parquet_schema, check_metadata=False) + schema = pq.read_metadata(args.catalog_path / "dataset" / "_metadata").schema.to_arrow_schema() + assert schema.equals(expected_parquet_schema, check_metadata=False) + + +class SimplePyarrowCsvReader(CsvReader): + """Use pyarrow for CSV reading, and force some pyarrow dtypes. + Return a pyarrow table instead of pd.DataFrame.""" + + def read(self, input_file, read_columns=None): + yield csv.read_csv(input_file) + + +@pytest.mark.dask +def test_import_healpix_29_pyarrow_table_csv( + dask_client, + small_sky_single_file, + assert_parquet_file_ids, + tmp_path, +): + """Should be identical to the above test, but uses the ParquetPyarrowReader.""" + args = ImportArguments( + output_artifact_name="small_sky_pyarrow", + input_file_list=[small_sky_single_file], + file_reader=SimplePyarrowCsvReader(), + output_path=tmp_path, + dask_tmp=tmp_path, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_hats(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = args.catalog_path / "dataset" / "Norder=0" / "Dir=0" / "Npix=11.parquet" + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + assert data_frame.index.name is None + npt.assert_array_equal( + data_frame.columns, + ["_healpix_29", "id", "ra", "dec", "ra_error", "dec_error", "Norder", "Dir", "Npix"], + ) + + +@pytest.mark.dask +def test_import_healpix_29_pyarrow_table_parquet( + dask_client, + formats_dir, + assert_parquet_file_ids, + tmp_path, +): + """Should be identical to the above test, but uses the ParquetPyarrowReader.""" + input_file = formats_dir / "healpix_29_index.parquet" + args = ImportArguments( + output_artifact_name="using_healpix_index", + input_file_list=[input_file], + file_reader=ParquetPyarrowReader(), + output_path=tmp_path, + dask_tmp=tmp_path, + use_healpix_29=True, + highest_healpix_order=2, + pixel_threshold=3_000, + progress_bar=False, + ) + + runner.run(args, dask_client) + + # Check that the catalog metadata file exists + catalog = Catalog.read_hats(args.catalog_path) + assert catalog.on_disk + assert catalog.catalog_path == args.catalog_path + assert catalog.catalog_info.total_rows == 131 + assert len(catalog.get_healpix_pixels()) == 1 + + # Check that the catalog parquet file exists and contains correct object IDs + output_file = args.catalog_path / "dataset" / "Norder=0" / "Dir=0" / "Npix=11.parquet" + + expected_ids = [*range(700, 831)] + assert_parquet_file_ids(output_file, "id", expected_ids) + data_frame = pd.read_parquet(output_file, engine="pyarrow") + + npt.assert_array_equal( + data_frame.columns, + ["id", "_healpix_29", "Norder", "Dir", "Npix"], + ) + + @pytest.mark.dask def test_import_healpix_29( dask_client, @@ -535,10 +693,9 @@ def test_import_healpix_29( expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) data_frame = pd.read_parquet(output_file, engine="pyarrow") - assert data_frame.index.name == "_healpix_29" npt.assert_array_equal( data_frame.columns, - ["id", "Norder", "Dir", "Npix"], + ["_healpix_29", "id", "Norder", "Dir", "Npix"], ) @@ -578,10 +735,9 @@ def test_import_healpix_29_no_pandas( expected_ids = [*range(700, 831)] assert_parquet_file_ids(output_file, "id", expected_ids) data_frame = pd.read_parquet(output_file, engine="pyarrow") - assert data_frame.index.name == "_healpix_29" npt.assert_array_equal( data_frame.columns, - ["id", "magnitude", "nobs", "Norder", "Dir", "Npix"], + ["id", "_healpix_29", "magnitude", "nobs", "Norder", "Dir", "Npix"], ) @@ -627,8 +783,7 @@ def test_import_gaia_minimum( data_frame = pd.read_parquet(output_file) # Make sure that the spatial index values match the pixel for the partition (0,5) - assert data_frame.index.name == "_healpix_29" - spatial_index_pixels = spatial_index_to_healpix(data_frame.index.values, 0) + spatial_index_pixels = spatial_index_to_healpix(data_frame["_healpix_29"].values, 0) npt.assert_array_equal(spatial_index_pixels, [5, 5, 5]) column_names = data_frame.columns @@ -676,6 +831,7 @@ def test_gaia_ecsv( # Check that the schema is correct for leaf parquet and _metadata files expected_parquet_schema = pa.schema( [ + pa.field("_healpix_29", pa.int64()), pa.field("solution_id", pa.int64()), pa.field("source_id", pa.int64()), pa.field("ra", pa.float64()), @@ -729,7 +885,6 @@ def test_gaia_ecsv( pa.field("Norder", pa.uint8()), pa.field("Dir", pa.uint64()), pa.field("Npix", pa.uint64()), - pa.field("_healpix_29", pa.int64()), ] ) diff --git a/tests/hats_import/index/test_index_map_reduce.py b/tests/hats_import/index/test_index_map_reduce.py index a9a77878..7759af68 100644 --- a/tests/hats_import/index/test_index_map_reduce.py +++ b/tests/hats_import/index/test_index_map_reduce.py @@ -205,7 +205,7 @@ def test_create_index_extra_columns( data_frame = pd.read_parquet(output_file, engine="pyarrow") npt.assert_array_equal( data_frame.columns, - ["_healpix_29", "source_ra", "Norder", "Dir", "Npix"], + ["source_ra", "_healpix_29", "Norder", "Dir", "Npix"], ) assert data_frame.index.name == "object_id" assert len(data_frame) == 17161 diff --git a/tests/hats_import/index/test_run_index.py b/tests/hats_import/index/test_run_index.py index b45e189d..be39f5f5 100644 --- a/tests/hats_import/index/test_run_index.py +++ b/tests/hats_import/index/test_run_index.py @@ -92,9 +92,9 @@ def test_run_index_on_source( basic_index_parquet_schema = pa.schema( [ - pa.field("_healpix_29", pa.int64()), pa.field("mag", pa.float64()), pa.field("band", pa.large_string()), + pa.field("_healpix_29", pa.int64()), pa.field("Norder", pa.uint8()), pa.field("Dir", pa.uint64()), pa.field("Npix", pa.uint64()), diff --git a/tests/hats_import/margin_cache/test_margin_cache.py b/tests/hats_import/margin_cache/test_margin_cache.py index 1e4768b0..ebe11558 100644 --- a/tests/hats_import/margin_cache/test_margin_cache.py +++ b/tests/hats_import/margin_cache/test_margin_cache.py @@ -48,6 +48,7 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): npt.assert_array_equal( data.columns, [ + "_healpix_29", "source_id", "source_ra", "source_dec", @@ -65,7 +66,6 @@ def test_margin_cache_gen(small_sky_source_catalog, tmp_path, dask_client): "margin_Npix", ], ) - assert data.index.name == "_healpix_29" catalog = HealpixDataset.read_hats(args.catalog_path) assert catalog.on_disk