Skip to content

Commit

Permalink
Create partition info csvs on catalog creation.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Feb 7, 2024
1 parent 0ae6657 commit b020e2c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
5 changes: 4 additions & 1 deletion src/hipscat_import/catalog/run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
from hipscat import pixel_math
from hipscat.catalog import PartitionInfo
from hipscat.io import paths
from hipscat.io.parquet_metadata import write_parquet_metadata
from tqdm import tqdm

Expand Down Expand Up @@ -168,10 +169,12 @@ def run(args, client):
storage_options=args.output_storage_options,
)
step_progress.update(1)
partition_info = PartitionInfo.from_healpix(destination_pixel_map.keys())
partition_info_file = paths.get_partition_info_pointer(args.catalog_path)
partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options)
if not args.debug_stats_only:
write_parquet_metadata(args.catalog_path, storage_options=args.output_storage_options)
else:
partition_info = PartitionInfo.from_healpix(destination_pixel_map.keys())
partition_info.write_to_metadata_files(
args.catalog_path, storage_options=args.output_storage_options
)
Expand Down
13 changes: 11 additions & 2 deletions src/hipscat_import/margin_cache/margin_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
from dask.distributed import as_completed
from hipscat import pixel_math
from hipscat.catalog import PartitionInfo
from hipscat.io import file_io, parquet_metadata, paths, write_metadata
from tqdm import tqdm

Expand Down Expand Up @@ -130,18 +131,26 @@ def generate_margin_cache(args, client):

with tqdm(total=4, desc="Finishing", disable=not args.progress_bar) as step_progress:
parquet_metadata.write_parquet_metadata(args.catalog_path)
step_progress.update(1)
total_rows = 0
metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path)
for row_group in parquet_metadata.read_row_group_fragments(metadata_path):
for row_group in parquet_metadata.read_row_group_fragments(
metadata_path, storage_options=args.output_storage_options
):
total_rows += row_group.num_rows
partition_info = PartitionInfo.read_from_file(
metadata_path, storage_options=args.output_storage_options
)
partition_info_file = paths.get_partition_info_pointer(args.catalog_path)
partition_info.write_to_file(partition_info_file, storage_options=args.output_storage_options)

step_progress.update(1)
margin_catalog_info = args.to_catalog_info(int(total_rows))
write_metadata.write_provenance_info(
catalog_base_dir=args.catalog_path,
dataset_info=margin_catalog_info,
tool_args=args.provenance_info(),
)
step_progress.update(1)
write_metadata.write_catalog_info(
catalog_base_dir=args.catalog_path, dataset_info=margin_catalog_info
)
Expand Down
7 changes: 7 additions & 0 deletions src/hipscat_import/soap/run_soap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
The actual logic of the map reduce is in the `map_reduce.py` file.
"""

from hipscat.catalog.association_catalog.partition_join_info import PartitionJoinInfo
from hipscat.io import parquet_metadata, paths, write_metadata
from tqdm import tqdm

Expand Down Expand Up @@ -57,6 +58,12 @@ def run(args, client):
metadata_path = paths.get_parquet_metadata_pointer(args.catalog_path)
for row_group in parquet_metadata.read_row_group_fragments(metadata_path):
total_rows += row_group.num_rows
partition_join_info = PartitionJoinInfo.read_from_file(
metadata_path, storage_options=args.output_storage_options
)
partition_join_info.write_to_csv(
catalog_path=args.catalog_path, storage_options=args.output_storage_options
)
else:
total_rows = combine_partial_results(args.tmp_path, args.catalog_path)
step_progress.update(1)
Expand Down

0 comments on commit b020e2c

Please sign in to comment.