Skip to content

Commit

Permalink
Use pathlib for pytest fixtures. (#333)
Browse files Browse the repository at this point in the history
* Use pathlib for pytest fixtures.

* Use Path.mkdirs
  • Loading branch information
delucchi-cmu authored Jun 11, 2024
1 parent 3bcec06 commit 04c1a8f
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 161 deletions.
2 changes: 1 addition & 1 deletion tests/hipscat_import/catalog/test_argument_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_good_paths(blank_data_dir, blank_data_file, tmp_path):
)
assert args.input_path == blank_data_dir
assert len(args.input_paths) == 1
assert blank_data_file in args.input_paths[0]
assert str(blank_data_file) in args.input_paths[0]


def test_multiple_files_in_path(small_sky_parts_dir, tmp_path):
Expand Down
16 changes: 7 additions & 9 deletions tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
"""Test dataframe-generating file readers"""

import os

import hipscat.io.write_metadata as io
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -91,7 +89,7 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path):
pa.field("dec_error", pa.float64()),
]
)
schema_file = os.path.join(tmp_path, "metadata.parquet")
schema_file = tmp_path / "metadata.parquet"
pq.write_metadata(
small_sky_schema,
schema_file,
Expand Down Expand Up @@ -187,7 +185,7 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path):
pa.field("numeric", pa.int64()),
]
)
schema_file = os.path.join(tmp_path, "metadata.parquet")
schema_file = tmp_path / "metadata.parquet"
pq.write_metadata(parquet_schema_types, schema_file)

frame = next(
Expand Down Expand Up @@ -224,7 +222,7 @@ def test_csv_reader_provenance_info(tmp_path, basic_catalog_info):
)
provenance_info = reader.provenance_info()
catalog_base_dir = tmp_path / "test_catalog"
os.makedirs(catalog_base_dir)
catalog_base_dir.mkdir(parents=True)
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)

with open(catalog_base_dir / "provenance_info.json", "r", encoding="utf-8") as file:
Expand Down Expand Up @@ -258,8 +256,8 @@ def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info):
"""Test that we get some provenance info and it is parseable into JSON."""
reader = ParquetReader(chunksize=1)
provenance_info = reader.provenance_info()
catalog_base_dir = os.path.join(tmp_path, "test_catalog")
os.makedirs(catalog_base_dir)
catalog_base_dir = tmp_path / "test_catalog"
catalog_base_dir.mkdir(parents=True)
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)


Expand Down Expand Up @@ -309,6 +307,6 @@ def test_fits_reader_provenance_info(tmp_path, basic_catalog_info):
"""Test that we get some provenance info and it is parseable into JSON."""
reader = FitsReader()
provenance_info = reader.provenance_info()
catalog_base_dir = os.path.join(tmp_path, "test_catalog")
os.makedirs(catalog_base_dir)
catalog_base_dir = tmp_path / "test_catalog"
catalog_base_dir.mkdir(parents=True)
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)
51 changes: 25 additions & 26 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def pickle_file_reader(tmp_path, file_reader) -> str:
"""Utility method to pickle a file reader, and return path to pickle."""
pickled_reader_file = os.path.join(tmp_path, "reader.pickle")
pickled_reader_file = tmp_path / "reader.pickle"
with open(pickled_reader_file, "wb") as pickle_file:
pickle.dump(file_reader, pickle_file)
return pickled_reader_file
Expand Down Expand Up @@ -86,14 +86,14 @@ def test_read_bad_fileformat(blank_data_file, capsys, tmp_path):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.npz")
histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz"
hist = SparseHistogram.from_file(histogram_file)
return hist.to_array()


def test_read_single_fits(tmp_path, formats_fits):
"""Success case - fits file that exists being read as fits"""
os.makedirs(os.path.join(tmp_path, "histograms"))
(tmp_path / "histograms").mkdir(parents=True)
mr.map_to_pixels(
input_file=formats_fits,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")),
Expand Down Expand Up @@ -127,7 +127,7 @@ def test_map_headers_wrong(formats_headers_csv, tmp_path):

def test_map_headers(tmp_path, formats_headers_csv):
"""Test loading the a file with non-default headers"""
os.makedirs(os.path.join(tmp_path, "histograms"))
(tmp_path / "histograms").mkdir(parents=True)
mr.map_to_pixels(
input_file=formats_headers_csv,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand All @@ -149,8 +149,8 @@ def test_map_headers(tmp_path, formats_headers_csv):


def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file):
os.makedirs(os.path.join(tmp_path, "histograms"))
input_file = os.path.join(formats_dir, "hipscat_index.csv")
(tmp_path / "histograms").mkdir(parents=True)
input_file = formats_dir / "hipscat_index.csv"
mr.map_to_pixels(
input_file=input_file,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand Down Expand Up @@ -183,8 +183,8 @@ def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file):

def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet):
"""Test loading the a file when using a parquet schema file for dtypes"""
os.makedirs(os.path.join(tmp_path, "histograms"))
input_file = os.path.join(mixed_schema_csv_dir, "input_01.csv")
(tmp_path / "histograms").mkdir(parents=True)
input_file = mixed_schema_csv_dir / "input_01.csv"
mr.map_to_pixels(
input_file=input_file,
pickled_reader_file=pickle_file_reader(
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque

def test_map_small_sky_order0(tmp_path, small_sky_single_file):
"""Test loading the small sky catalog and partitioning each object into the same large bucket"""
os.makedirs(os.path.join(tmp_path, "histograms"))
(tmp_path / "histograms").mkdir(parents=True)
mr.map_to_pixels(
input_file=small_sky_single_file,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand All @@ -239,7 +239,7 @@ def test_map_small_sky_part_order1(tmp_path, small_sky_file0):
Test loading a small portion of the small sky catalog and
partitioning objects into four smaller buckets
"""
os.makedirs(os.path.join(tmp_path, "histograms"))
(tmp_path / "histograms").mkdir(parents=True)
mr.map_to_pixels(
input_file=small_sky_file0,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand Down Expand Up @@ -279,7 +279,6 @@ def test_split_pixels_bad_format(blank_data_file, tmp_path, capsys):
)
captured = capsys.readouterr()
assert "No such file or directory" in captured.out
os.makedirs(os.path.join(tmp_path, "splitting"))


def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_path):
Expand All @@ -300,17 +299,17 @@ def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_
alignment_file=alignment_file,
)

file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_11", "shard_0_0.parquet")
file_name = tmp_path / "order_0" / "dir_0" / "pixel_11" / "shard_0_0.parquet"
expected_ids = [*range(700, 708)]
assert_parquet_file_ids(file_name, "object_id", expected_ids)

file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_1", "shard_0_0.parquet")
file_name = tmp_path / "order_0" / "dir_0" / "pixel_1" / "shard_0_0.parquet"
assert not os.path.exists(file_name)


def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing into one large pixel"""
os.makedirs(os.path.join(tmp_path, "reducing"))
(tmp_path / "reducing").mkdir(parents=True)
mr.reduce_pixel_shards(
cache_shard_path=parquet_shards_dir,
resume_path=tmp_path,
Expand All @@ -326,15 +325,15 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
delete_input_files=False,
)

output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)


def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing with or without a _hipscat_index field"""
os.makedirs(os.path.join(tmp_path, "reducing"))
(tmp_path / "reducing").mkdir(parents=True)
mr.reduce_pixel_shards(
cache_shard_path=parquet_shards_dir,
resume_path=tmp_path,
Expand All @@ -349,7 +348,7 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p
delete_input_files=False,
)

output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)
Expand Down Expand Up @@ -414,10 +413,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
First, we take some time to set up these silly data points, then we test out
reducing them into a single parquet file using a mix of reduction options.
"""
os.makedirs(os.path.join(tmp_path, "reducing"))
shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11")
os.makedirs(shard_dir)
output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
(tmp_path / "reducing").mkdir(parents=True)
shard_dir = tmp_path / "reduce_shards" / "order_0" / "dir_0" / "pixel_11"
shard_dir.mkdir(parents=True)
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

file1_string = """source_id,object_id,time,ra,dec
1200,700,3000,282.5,-58.5
Expand All @@ -427,15 +426,15 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
1404,702,3200,310.5,-27.5
1505,703,4000,286.5,-69.5"""
file1_data = pd.read_csv(StringIO(file1_string))
file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet"))
file1_data.to_parquet(shard_dir / "file_1_shard_1.parquet")

file2_string = """source_id,object_id,time,ra,dec
1206,700,2000,282.5,-58.5
1307,701,2200,299.5,-48.5
1308,701,2100,299.5,-48.5
1309,701,2000,299.5,-48.5"""
file2_data = pd.read_csv(StringIO(file2_string))
file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet"))
file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet")

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
Expand All @@ -452,7 +451,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are
## (703, 700, 701, 702)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down Expand Up @@ -489,7 +488,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
######################## Sort option 2: by object id and time
## sort order is effectively (norder19 healpix, object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down Expand Up @@ -526,7 +525,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
## spatial properties for sorting, only numeric.
## sort order is effectively (object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down
50 changes: 17 additions & 33 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -41,13 +42,10 @@ def test_resume_dask_runner(
"""Test execution in the presence of some resume files."""
## First, copy over our intermediate files.
## This prevents overwriting source-controlled resume files.
intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate")
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
intermediate_dir,
)
intermediate_dir = tmp_path / "resume_catalog" / "intermediate"
shutil.copytree(resume_dir / "intermediate", intermediate_dir)
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = SparseHistogram.make_from_counts([11], [131], 0)
empty = SparseHistogram.make_empty(0)
Expand All @@ -63,10 +61,7 @@ def test_resume_dask_runner(

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")

shutil.copytree(
os.path.join(resume_dir, "Norder=0"),
os.path.join(tmp_path, "resume_catalog", "Norder=0"),
)
shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0")

args = ImportArguments(
output_artifact_name="resume_catalog",
Expand All @@ -75,7 +70,7 @@ def test_resume_dask_runner(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
highest_healpix_order=0,
pixel_threshold=1000,
progress_bar=False,
Expand All @@ -93,17 +88,14 @@ def test_resume_dask_runner(
assert len(catalog.get_healpix_pixels()) == 1

# Check that the catalog parquet file exists and contains correct object IDs
output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = Path(args.catalog_path) / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)

## Re-running the pipeline with fully done intermediate files
## should result in no changes to output files.
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
resume_tmp,
)
shutil.copytree(resume_dir / "intermediate", resume_tmp)
plan = args.resume_plan
plan.touch_stage_done_file(ResumePlan.MAPPING_STAGE)
plan.touch_stage_done_file(ResumePlan.SPLITTING_STAGE)
Expand Down Expand Up @@ -145,25 +137,17 @@ def test_resume_dask_runner_diff_pixel_order(
with the current HEALPix order."""
## First, copy over our intermediate files.
## This prevents overwriting source-controlled resume files.
intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate")
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
intermediate_dir,
)
intermediate_dir = tmp_path / "resume_catalog" / "intermediate"
shutil.copytree(resume_dir / "intermediate", intermediate_dir)

## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
ResumePlan(tmp_path=resume_tmp, progress_bar=False)
SparseHistogram.make_from_counts([11], [131], 0).to_file(
os.path.join(resume_tmp, "mapping_histogram.npz")
)
SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz")
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")

shutil.copytree(
os.path.join(resume_dir, "Norder=0"),
os.path.join(tmp_path, "resume_catalog", "Norder=0"),
)
shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0")

with pytest.raises(ValueError, match="incompatible with the highest healpix order"):
args = ImportArguments(
Expand All @@ -173,7 +157,7 @@ def test_resume_dask_runner_diff_pixel_order(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand All @@ -188,7 +172,7 @@ def test_resume_dask_runner_diff_pixel_order(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand Down Expand Up @@ -220,7 +204,7 @@ def test_resume_dask_runner_histograms_diff_size(
tmp_path,
):
"""Tests that the pipeline errors if the partial histograms have different sizes."""
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
ResumePlan(tmp_path=resume_tmp, progress_bar=False)

# We'll create mock partial histograms of size 0 and 2
Expand All @@ -246,7 +230,7 @@ def test_resume_dask_runner_histograms_diff_size(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand Down
Loading

0 comments on commit 04c1a8f

Please sign in to comment.