Skip to content

Commit

Permalink
Replace FilePointer with universal pathlib (#38)
Browse files Browse the repository at this point in the history
* Checkpoint upath

* Remove (most) storage options and clean up imports.

* github runner black format

* Correct docstrings.
  • Loading branch information
delucchi-cmu authored Sep 17, 2024
1 parent ee85fe8 commit f3cdc02
Show file tree
Hide file tree
Showing 25 changed files with 197 additions and 450 deletions.
20 changes: 7 additions & 13 deletions src/hipscat_cloudtests/file_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from hipscat.io.file_io.file_pointer import does_file_or_directory_exist


def assert_text_file_matches(expected_lines, file_name, storage_options: dict = None):
def assert_text_file_matches(expected_lines, file_name):
"""Convenience method to read a text file and compare the contents, line for line.
When file contents get even a little bit big, it can be difficult to see
Expand All @@ -23,13 +23,10 @@ def assert_text_file_matches(expected_lines, file_name, storage_options: dict =
Args:
expected_lines(:obj:`string array`) list of strings, formatted as regular expressions.
file_name (str): fully-specified path of the file to read
storage_options (dict): dictionary of filesystem storage options
file_name (UPath): fully-specified path of the file to read
"""
assert does_file_or_directory_exist(
file_name, storage_options=storage_options
), f"file not found [{file_name}]"
contents = load_text_file(file_name, storage_options=storage_options)
assert does_file_or_directory_exist(file_name), f"file not found [{file_name}]"
contents = load_text_file(file_name)

assert len(expected_lines) == len(
contents
Expand All @@ -40,22 +37,19 @@ def assert_text_file_matches(expected_lines, file_name, storage_options: dict =
)


def assert_parquet_file_ids(
file_name, id_column, schema: pa.Schema, expected_ids, resort_ids=True, storage_options: dict = None
):
def assert_parquet_file_ids(file_name, id_column, schema: pa.Schema, expected_ids, resort_ids=True):
"""
Convenience method to read a parquet file and compare the object IDs to
a list of expected objects.
Args:
file_name (str): fully-specified path of the file to read
file_name (UPath): fully-specified path of the file to read
id_column (str): column in the parquet file to read IDs from
expected_ids (:obj:`int[]`): list of expected ids in `id_column`
resort_ids (bool): should we re-sort the ids? if False, we will check that the ordering
is the same between the read IDs and expected_ids
storage_options (dict): dictionary of filesystem storage options
"""
data_frame = pd.read_parquet(file_name, engine="pyarrow", schema=schema, storage_options=storage_options)
data_frame = pd.read_parquet(file_name.path, engine="pyarrow", schema=schema, filesystem=file_name.fs)
assert id_column in data_frame.columns
ids = data_frame[id_column].tolist()
if resort_ids:
Expand Down
15 changes: 6 additions & 9 deletions src/hipscat_cloudtests/temp_cloud_directory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""Testing utility class to create a temporary directory that's local
to some unit test execution."""

import os
import time

import shortuuid
Expand All @@ -14,26 +13,24 @@ class TempCloudDirectory:
On exit, we will recursively remove the created directory."""

def __init__(self, prefix_path, method_name="", storage_options: dict = None):
def __init__(self, prefix_path, method_name=""):
"""Create a new context manager.
This will NOT create the new temp path - that happens when we enter the context.
Args:
prefix_path (str): base path to the cloud resource
prefix_path (UPath): base path to the cloud resource
method_name (str): optional token to indicate the method under test
storage_options (dict): dictionary that contains abstract filesystem credentials
"""
self.prefix_path = prefix_path
self.method_name = method_name
self.storage_options = storage_options
self.temp_path = ""

def __enter__(self):
"""Create a new temporary path
Returns:
string path that's been created. it will take the form of
UPath object that's been created. it will take the form of
<prefix_path>/<method_name><some random string>
"""
return self.open()
Expand All @@ -42,11 +39,11 @@ def open(self):
"""Create a new temporary path
Returns:
string path that's been created. it will take the form of
UPath object that's been created. it will take the form of
<prefix_path>/<method_name><some random string>
"""
my_uuid = shortuuid.uuid()
self.temp_path = os.path.join(self.prefix_path, f"{self.method_name}-{my_uuid}")
self.temp_path = self.prefix_path / f"{self.method_name}-{my_uuid}"
return self.temp_path

def __exit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -66,7 +63,7 @@ def close(self, num_retries=4):
for attempt_number in range(1, num_retries + 1):
## Try
try:
file_io.remove_directory(self.temp_path, storage_options=self.storage_options)
file_io.remove_directory(self.temp_path)
return
except RuntimeError:
if attempt_number == num_retries:
Expand Down
59 changes: 33 additions & 26 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pytest
import shortuuid
from hipscat.io.file_io import file_io
from upath import UPath

from hipscat_cloudtests.temp_cloud_directory import TempCloudDirectory

Expand Down Expand Up @@ -33,7 +34,14 @@ def cloud(request):
@pytest.fixture(scope="session", name="cloud_path")
def cloud_path(cloud):
if cloud == "abfs":
return "abfs://hipscat/pytests/"
storage_options = {
"account_name": os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME"),
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
}

root_dir = UPath("abfs://hipscat/pytests/", protocol="abfs", **storage_options)
assert root_dir.exists()
return root_dir

raise NotImplementedError("Cloud format not implemented for tests!")

Expand All @@ -42,8 +50,8 @@ def cloud_path(cloud):
def storage_options(cloud):
if cloud == "abfs":
storage_options = {
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
"account_name": os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME"),
"account_key": os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
}
return storage_options

Expand All @@ -58,56 +66,55 @@ def local_data_dir():

@pytest.fixture
def small_sky_dir_local(local_data_dir):
return os.path.join(local_data_dir, SMALL_SKY_DIR_NAME)
return local_data_dir / SMALL_SKY_DIR_NAME


@pytest.fixture
def small_sky_order1_dir_local(local_data_dir):
return os.path.join(local_data_dir, SMALL_SKY_ORDER1_DIR_NAME)
return local_data_dir / SMALL_SKY_ORDER1_DIR_NAME


@pytest.fixture
def small_sky_parts_dir_local(local_data_dir):
return os.path.join(local_data_dir, "small_sky_parts")
return local_data_dir / "small_sky_parts"


@pytest.fixture
def test_data_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data")
return cloud_path / "data"


@pytest.fixture
def almanac_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", ALMANAC_DIR_NAME)
return cloud_path / "data" / ALMANAC_DIR_NAME


@pytest.fixture
def small_sky_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", SMALL_SKY_DIR_NAME)
return cloud_path / "data" / SMALL_SKY_DIR_NAME


@pytest.fixture
def small_sky_order1_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", SMALL_SKY_ORDER1_DIR_NAME)
return cloud_path / "data" / SMALL_SKY_ORDER1_DIR_NAME


@pytest.fixture
def small_sky_index_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", "small_sky_object_index")
return cloud_path / "data" / "small_sky_object_index"


@pytest.fixture
def small_sky_margin_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", "small_sky_order1_margin")
return cloud_path / "data" / "small_sky_order1_margin"


@pytest.fixture(scope="session", name="tmp_dir_cloud")
def tmp_dir_cloud(cloud_path, storage_options):
def tmp_dir_cloud(cloud_path):
"""Create a single client for use by all unit test cases."""
tmp = TempCloudDirectory(
os.path.join(cloud_path, "tmp"),
cloud_path / "tmp",
method_name="full_test",
storage_options=storage_options,
)
yield tmp.open()
tmp.close()
Expand All @@ -119,41 +126,41 @@ def tmp_cloud_path(request, tmp_dir_cloud):
my_uuid = shortuuid.uuid()
# Strip out the "test_" at the beginning of each method name, make it a little
# shorter, and add a disambuating UUID.
return f"{tmp_dir_cloud}/{name[5:25]}_{my_uuid}"
return tmp_dir_cloud / f"{name[5:25]}_{my_uuid}"


@pytest.fixture
def small_sky_xmatch_dir_cloud(cloud_path):
return os.path.join(cloud_path, "data", SMALL_SKY_XMATCH_NAME)
return cloud_path / "data" / SMALL_SKY_XMATCH_NAME


@pytest.fixture
def small_sky_catalog_cloud(small_sky_dir_cloud, storage_options):
return lsdb.read_hipscat(small_sky_dir_cloud, storage_options=storage_options)
def small_sky_catalog_cloud(small_sky_dir_cloud):
return lsdb.read_hipscat(small_sky_dir_cloud)


@pytest.fixture
def small_sky_xmatch_catalog_cloud(small_sky_xmatch_dir_cloud, storage_options):
return lsdb.read_hipscat(small_sky_xmatch_dir_cloud, storage_options=storage_options)
def small_sky_xmatch_catalog_cloud(small_sky_xmatch_dir_cloud):
return lsdb.read_hipscat(small_sky_xmatch_dir_cloud)


@pytest.fixture
def small_sky_order1_hipscat_catalog_cloud(small_sky_order1_dir_cloud, storage_options):
return hc.catalog.Catalog.read_from_hipscat(small_sky_order1_dir_cloud, storage_options=storage_options)
def small_sky_order1_hipscat_catalog_cloud(small_sky_order1_dir_cloud):
return hc.catalog.Catalog.read_from_hipscat(small_sky_order1_dir_cloud)


@pytest.fixture
def small_sky_order1_catalog_cloud(small_sky_order1_dir_cloud, storage_options):
return lsdb.read_hipscat(small_sky_order1_dir_cloud, storage_options=storage_options)
def small_sky_order1_catalog_cloud(small_sky_order1_dir_cloud):
return lsdb.read_hipscat(small_sky_order1_dir_cloud)


@pytest.fixture
def xmatch_correct_cloud(local_data_dir):
pathway = os.path.join(local_data_dir, "xmatch", XMATCH_CORRECT_FILE)
pathway = local_data_dir / "xmatch" / XMATCH_CORRECT_FILE
return file_io.load_csv_to_pandas(pathway)


@pytest.fixture
def xmatch_with_margin(local_data_dir):
pathway = os.path.join(local_data_dir, "xmatch", "xmatch_with_margin.csv")
pathway = local_data_dir / "xmatch" / "xmatch_with_margin.csv"
return file_io.load_csv_to_pandas(pathway)
59 changes: 12 additions & 47 deletions tests/data/generate_cloud_data.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import tempfile\n",
"from upath import UPath\n",
"\n",
"import hipscat_import.pipeline as runner\n",
"from hipscat_import.catalog.arguments import ImportArguments\n",
"from hipscat_import.index.arguments import IndexArguments\n",
"from hipscat_import.margin_cache.margin_cache_arguments import MarginCacheArguments\n",
"import tempfile\n",
"from pathlib import Path\n",
"import os\n",
"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
"tmp_dir = tmp_path.name\n",
Expand All @@ -36,7 +37,9 @@
" \"account_key\": os.environ.get(\"ABFS_LINCCDATA_ACCOUNT_KEY\"),\n",
" \"account_name\": os.environ.get(\"ABFS_LINCCDATA_ACCOUNT_NAME\"),\n",
"}\n",
"storage_options"
"storage_options\n",
"\n",
"output_path = UPath(\"abfs://hipscat/pytests/data\", protocol=\"abfs\", **storage_options)"
]
},
{
Expand All @@ -58,9 +61,8 @@
" input_path=\"small_sky_parts\",\n",
" highest_healpix_order=1,\n",
" file_reader=\"csv\",\n",
" output_path=\"abfs://hipscat/pytests/data\",\n",
" output_path=output_path,\n",
" output_artifact_name=\"small_sky\",\n",
" output_storage_options=storage_options,\n",
" tmp_dir=tmp_dir,\n",
" dask_tmp=tmp_dir,\n",
")\n",
Expand Down Expand Up @@ -92,8 +94,7 @@
" input_path=\"small_sky_parts\",\n",
" file_reader=\"csv\",\n",
" constant_healpix_order=1,\n",
" output_path=\"abfs://hipscat/pytests/data\",\n",
" output_storage_options=storage_options,\n",
" output_path=output_path,\n",
" output_artifact_name=\"small_sky_order1\",\n",
" tmp_dir=tmp_dir,\n",
" dask_tmp=tmp_dir,\n",
Expand All @@ -119,9 +120,8 @@
"args = IndexArguments(\n",
" input_catalog_path=\"small_sky_order1\",\n",
" indexing_column=\"id\",\n",
" output_path=\"abfs://hipscat/pytests/data\",\n",
" output_path=output_path,\n",
" output_artifact_name=\"small_sky_object_index\",\n",
" output_storage_options=storage_options,\n",
" tmp_dir=tmp_dir,\n",
" dask_tmp=tmp_dir,\n",
")\n",
Expand All @@ -137,9 +137,8 @@
"margin_args = MarginCacheArguments(\n",
" margin_threshold=7200,\n",
" input_catalog_path=\"small_sky_order1\",\n",
" output_path=\"abfs://hipscat/pytests/data\",\n",
" output_path=output_path,\n",
" output_artifact_name=\"small_sky_order1_margin\",\n",
" output_storage_options=storage_options,\n",
" tmp_dir=tmp_dir,\n",
" dask_tmp=tmp_dir,\n",
")\n",
Expand All @@ -163,8 +162,7 @@
" input_file_list=[\"xmatch/xmatch_catalog_raw.csv\"],\n",
" file_reader=\"csv\",\n",
" constant_healpix_order=1,\n",
" output_path=\"abfs://hipscat/pytests/data\",\n",
" output_storage_options=storage_options,\n",
" output_path=output_path,\n",
" output_artifact_name=\"small_sky_xmatch\",\n",
" pixel_threshold=100,\n",
" tmp_dir=tmp_dir,\n",
Expand All @@ -173,39 +171,6 @@
"runner.pipeline(args)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Almanac info\n",
"\n",
"For the above catalogs, create almanac data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from hipscat.inspection.almanac import Almanac\n",
"from hipscat.inspection.almanac_info import AlmanacInfo\n",
"\n",
"almanac_info = AlmanacInfo.from_catalog_dir(\n",
" \"abfs://hipscat/pytests/data/small_sky\", storage_options=storage_options\n",
")\n",
"almanac_info.write_to_file(\n",
" directory=\"abfs://hipscat/pytests/data/almanac\", default_dir=False, storage_options=storage_options\n",
")\n",
"\n",
"almanac_info = AlmanacInfo.from_catalog_dir(\n",
" \"abfs://hipscat/pytests/data/small_sky_order1\", storage_options=storage_options\n",
")\n",
"almanac_info.write_to_file(\n",
" directory=\"abfs://hipscat/pytests/data/almanac\", default_dir=False, storage_options=storage_options\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
Loading

0 comments on commit f3cdc02

Please sign in to comment.