Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use pathlib for pytest fixtures. #333

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/hipscat_import/catalog/test_argument_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_good_paths(blank_data_dir, blank_data_file, tmp_path):
)
assert args.input_path == blank_data_dir
assert len(args.input_paths) == 1
assert blank_data_file in args.input_paths[0]
assert str(blank_data_file) in args.input_paths[0]


def test_multiple_files_in_path(small_sky_parts_dir, tmp_path):
Expand Down
8 changes: 4 additions & 4 deletions tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path):
pa.field("dec_error", pa.float64()),
]
)
schema_file = os.path.join(tmp_path, "metadata.parquet")
schema_file = tmp_path / "metadata.parquet"
pq.write_metadata(
small_sky_schema,
schema_file,
Expand Down Expand Up @@ -187,7 +187,7 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path):
pa.field("numeric", pa.int64()),
]
)
schema_file = os.path.join(tmp_path, "metadata.parquet")
schema_file = tmp_path / "metadata.parquet"
pq.write_metadata(parquet_schema_types, schema_file)

frame = next(
Expand Down Expand Up @@ -258,7 +258,7 @@ def test_parquet_reader_provenance_info(tmp_path, basic_catalog_info):
"""Test that we get some provenance info and it is parseable into JSON."""
reader = ParquetReader(chunksize=1)
provenance_info = reader.provenance_info()
catalog_base_dir = os.path.join(tmp_path, "test_catalog")
catalog_base_dir = tmp_path / "test_catalog"
os.makedirs(catalog_base_dir)
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)

Expand Down Expand Up @@ -309,6 +309,6 @@ def test_fits_reader_provenance_info(tmp_path, basic_catalog_info):
"""Test that we get some provenance info and it is parseable into JSON."""
reader = FitsReader()
provenance_info = reader.provenance_info()
catalog_base_dir = os.path.join(tmp_path, "test_catalog")
catalog_base_dir = tmp_path / "test_catalog"
os.makedirs(catalog_base_dir)
io.write_provenance_info(catalog_base_dir, basic_catalog_info, provenance_info)
49 changes: 24 additions & 25 deletions tests/hipscat_import/catalog/test_map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def pickle_file_reader(tmp_path, file_reader) -> str:
"""Utility method to pickle a file reader, and return path to pickle."""
pickled_reader_file = os.path.join(tmp_path, "reader.pickle")
pickled_reader_file = tmp_path / "reader.pickle"
with open(pickled_reader_file, "wb") as pickle_file:
pickle.dump(file_reader, pickle_file)
return pickled_reader_file
Expand Down Expand Up @@ -86,14 +86,14 @@ def test_read_bad_fileformat(blank_data_file, capsys, tmp_path):

def read_partial_histogram(tmp_path, mapping_key):
"""Helper to read in the former result of a map operation."""
histogram_file = os.path.join(tmp_path, "histograms", f"{mapping_key}.npz")
histogram_file = tmp_path / "histograms" / f"{mapping_key}.npz"
hist = SparseHistogram.from_file(histogram_file)
return hist.to_array()


def test_read_single_fits(tmp_path, formats_fits):
"""Success case - fits file that exists being read as fits"""
os.makedirs(os.path.join(tmp_path, "histograms"))
os.makedirs(tmp_path / "histograms")
mr.map_to_pixels(
input_file=formats_fits,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("fits")),
Expand Down Expand Up @@ -127,7 +127,7 @@ def test_map_headers_wrong(formats_headers_csv, tmp_path):

def test_map_headers(tmp_path, formats_headers_csv):
"""Test loading the a file with non-default headers"""
os.makedirs(os.path.join(tmp_path, "histograms"))
os.makedirs(tmp_path / "histograms")
mr.map_to_pixels(
input_file=formats_headers_csv,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand All @@ -149,8 +149,8 @@ def test_map_headers(tmp_path, formats_headers_csv):


def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file):
os.makedirs(os.path.join(tmp_path, "histograms"))
input_file = os.path.join(formats_dir, "hipscat_index.csv")
os.makedirs(tmp_path / "histograms")
delucchi-cmu marked this conversation as resolved.
Show resolved Hide resolved
input_file = formats_dir / "hipscat_index.csv"
mr.map_to_pixels(
input_file=input_file,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand Down Expand Up @@ -183,8 +183,8 @@ def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file):

def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet):
"""Test loading the a file when using a parquet schema file for dtypes"""
os.makedirs(os.path.join(tmp_path, "histograms"))
input_file = os.path.join(mixed_schema_csv_dir, "input_01.csv")
os.makedirs(tmp_path / "histograms")
input_file = mixed_schema_csv_dir / "input_01.csv"
mr.map_to_pixels(
input_file=input_file,
pickled_reader_file=pickle_file_reader(
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque

def test_map_small_sky_order0(tmp_path, small_sky_single_file):
"""Test loading the small sky catalog and partitioning each object into the same large bucket"""
os.makedirs(os.path.join(tmp_path, "histograms"))
os.makedirs(tmp_path / "histograms")
mr.map_to_pixels(
input_file=small_sky_single_file,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand All @@ -239,7 +239,7 @@ def test_map_small_sky_part_order1(tmp_path, small_sky_file0):
Test loading a small portion of the small sky catalog and
partitioning objects into four smaller buckets
"""
os.makedirs(os.path.join(tmp_path, "histograms"))
os.makedirs(tmp_path / "histograms")
mr.map_to_pixels(
input_file=small_sky_file0,
pickled_reader_file=pickle_file_reader(tmp_path, get_file_reader("csv")),
Expand Down Expand Up @@ -279,7 +279,6 @@ def test_split_pixels_bad_format(blank_data_file, tmp_path, capsys):
)
captured = capsys.readouterr()
assert "No such file or directory" in captured.out
os.makedirs(os.path.join(tmp_path, "splitting"))


def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_path):
Expand All @@ -300,17 +299,17 @@ def test_split_pixels_headers(formats_headers_csv, assert_parquet_file_ids, tmp_
alignment_file=alignment_file,
)

file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_11", "shard_0_0.parquet")
file_name = tmp_path / "order_0" / "dir_0" / "pixel_11" / "shard_0_0.parquet"
expected_ids = [*range(700, 708)]
assert_parquet_file_ids(file_name, "object_id", expected_ids)

file_name = os.path.join(tmp_path, "order_0", "dir_0", "pixel_1", "shard_0_0.parquet")
file_name = tmp_path / "order_0" / "dir_0" / "pixel_1" / "shard_0_0.parquet"
assert not os.path.exists(file_name)


def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing into one large pixel"""
os.makedirs(os.path.join(tmp_path, "reducing"))
os.makedirs(tmp_path / "reducing")
mr.reduce_pixel_shards(
cache_shard_path=parquet_shards_dir,
resume_path=tmp_path,
Expand All @@ -326,15 +325,15 @@ def test_reduce_order0(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
delete_input_files=False,
)

output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)


def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_path):
"""Test reducing with or without a _hipscat_index field"""
os.makedirs(os.path.join(tmp_path, "reducing"))
os.makedirs(tmp_path / "reducing")
mr.reduce_pixel_shards(
cache_shard_path=parquet_shards_dir,
resume_path=tmp_path,
Expand All @@ -349,7 +348,7 @@ def test_reduce_hipscat_index(parquet_shards_dir, assert_parquet_file_ids, tmp_p
delete_input_files=False,
)

output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)
Expand Down Expand Up @@ -414,10 +413,10 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
First, we take some time to set up these silly data points, then we test out
reducing them into a single parquet file using a mix of reduction options.
"""
os.makedirs(os.path.join(tmp_path, "reducing"))
shard_dir = os.path.join(tmp_path, "reduce_shards", "order_0", "dir_0", "pixel_11")
os.makedirs(tmp_path / "reducing")
shard_dir = tmp_path / "reduce_shards" / "order_0" / "dir_0" / "pixel_11"
os.makedirs(shard_dir)
output_file = os.path.join(tmp_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = tmp_path / "Norder=0" / "Dir=0" / "Npix=11.parquet"

file1_string = """source_id,object_id,time,ra,dec
1200,700,3000,282.5,-58.5
Expand All @@ -427,15 +426,15 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
1404,702,3200,310.5,-27.5
1505,703,4000,286.5,-69.5"""
file1_data = pd.read_csv(StringIO(file1_string))
file1_data.to_parquet(os.path.join(shard_dir, "file_1_shard_1.parquet"))
file1_data.to_parquet(shard_dir / "file_1_shard_1.parquet")

file2_string = """source_id,object_id,time,ra,dec
1206,700,2000,282.5,-58.5
1307,701,2200,299.5,-48.5
1308,701,2100,299.5,-48.5
1309,701,2000,299.5,-48.5"""
file2_data = pd.read_csv(StringIO(file2_string))
file2_data.to_parquet(os.path.join(shard_dir, "file_2_shard_1.parquet"))
file2_data.to_parquet(shard_dir / "file_2_shard_1.parquet")

combined_data = pd.concat([file1_data, file2_data])
combined_data["norder19_healpix"] = hp.ang2pix(
Expand All @@ -452,7 +451,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
## This will sort WITHIN an order 19 healpix pixel. In that ordering, the objects are
## (703, 700, 701, 702)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down Expand Up @@ -489,7 +488,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
######################## Sort option 2: by object id and time
## sort order is effectively (norder19 healpix, object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down Expand Up @@ -526,7 +525,7 @@ def test_reduce_with_sorting_complex(assert_parquet_file_ids, tmp_path):
## spatial properties for sorting, only numeric.
## sort order is effectively (object id, time)
mr.reduce_pixel_shards(
cache_shard_path=os.path.join(tmp_path, "reduce_shards"),
cache_shard_path=tmp_path / "reduce_shards",
resume_path=tmp_path,
reducing_key="0_11",
destination_pixel_order=0,
Expand Down
50 changes: 17 additions & 33 deletions tests/hipscat_import/catalog/test_run_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -41,13 +42,10 @@ def test_resume_dask_runner(
"""Test execution in the presence of some resume files."""
## First, copy over our intermediate files.
## This prevents overwriting source-controlled resume files.
intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate")
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
intermediate_dir,
)
intermediate_dir = tmp_path / "resume_catalog" / "intermediate"
shutil.copytree(resume_dir / "intermediate", intermediate_dir)
## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
plan = ResumePlan(tmp_path=resume_tmp, progress_bar=False)
histogram = SparseHistogram.make_from_counts([11], [131], 0)
empty = SparseHistogram.make_empty(0)
Expand All @@ -63,10 +61,7 @@ def test_resume_dask_runner(

ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.REDUCING_STAGE, "0_11")

shutil.copytree(
os.path.join(resume_dir, "Norder=0"),
os.path.join(tmp_path, "resume_catalog", "Norder=0"),
)
shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0")

args = ImportArguments(
output_artifact_name="resume_catalog",
Expand All @@ -75,7 +70,7 @@ def test_resume_dask_runner(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
highest_healpix_order=0,
pixel_threshold=1000,
progress_bar=False,
Expand All @@ -93,17 +88,14 @@ def test_resume_dask_runner(
assert len(catalog.get_healpix_pixels()) == 1

# Check that the catalog parquet file exists and contains correct object IDs
output_file = os.path.join(args.catalog_path, "Norder=0", "Dir=0", "Npix=11.parquet")
output_file = Path(args.catalog_path) / "Norder=0" / "Dir=0" / "Npix=11.parquet"

expected_ids = [*range(700, 831)]
assert_parquet_file_ids(output_file, "id", expected_ids)

## Re-running the pipeline with fully done intermediate files
## should result in no changes to output files.
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
resume_tmp,
)
shutil.copytree(resume_dir / "intermediate", resume_tmp)
plan = args.resume_plan
plan.touch_stage_done_file(ResumePlan.MAPPING_STAGE)
plan.touch_stage_done_file(ResumePlan.SPLITTING_STAGE)
Expand Down Expand Up @@ -145,25 +137,17 @@ def test_resume_dask_runner_diff_pixel_order(
with the current HEALPix order."""
## First, copy over our intermediate files.
## This prevents overwriting source-controlled resume files.
intermediate_dir = os.path.join(tmp_path, "resume_catalog", "intermediate")
shutil.copytree(
os.path.join(resume_dir, "intermediate"),
intermediate_dir,
)
intermediate_dir = tmp_path / "resume_catalog" / "intermediate"
shutil.copytree(resume_dir / "intermediate", intermediate_dir)

## Now set up our resume files to match previous work.
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
ResumePlan(tmp_path=resume_tmp, progress_bar=False)
SparseHistogram.make_from_counts([11], [131], 0).to_file(
os.path.join(resume_tmp, "mapping_histogram.npz")
)
SparseHistogram.make_from_counts([11], [131], 0).to_file(resume_tmp / "mapping_histogram.npz")
for file_index in range(0, 5):
ResumePlan.touch_key_done_file(resume_tmp, ResumePlan.SPLITTING_STAGE, f"split_{file_index}")

shutil.copytree(
os.path.join(resume_dir, "Norder=0"),
os.path.join(tmp_path, "resume_catalog", "Norder=0"),
)
shutil.copytree(resume_dir / "Norder=0", tmp_path / "resume_catalog" / "Norder=0")

with pytest.raises(ValueError, match="incompatible with the highest healpix order"):
args = ImportArguments(
Expand All @@ -173,7 +157,7 @@ def test_resume_dask_runner_diff_pixel_order(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand All @@ -188,7 +172,7 @@ def test_resume_dask_runner_diff_pixel_order(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand Down Expand Up @@ -220,7 +204,7 @@ def test_resume_dask_runner_histograms_diff_size(
tmp_path,
):
"""Tests that the pipeline errors if the partial histograms have different sizes."""
resume_tmp = os.path.join(tmp_path, "tmp", "resume_catalog")
resume_tmp = tmp_path / "tmp" / "resume_catalog"
ResumePlan(tmp_path=resume_tmp, progress_bar=False)

# We'll create mock partial histograms of size 0 and 2
Expand All @@ -246,7 +230,7 @@ def test_resume_dask_runner_histograms_diff_size(
output_path=tmp_path,
dask_tmp=tmp_path,
tmp_dir=tmp_path,
resume_tmp=os.path.join(tmp_path, "tmp"),
resume_tmp=tmp_path / "tmp",
constant_healpix_order=1,
pixel_threshold=1000,
progress_bar=False,
Expand Down
Loading