From daa5209e7af419a0870c8e388a903265ca733ade Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 2 Nov 2023 21:30:32 -0400 Subject: [PATCH 1/4] Add documentation for creating a parquet schema. --- docs/index.rst | 18 +- docs/notebooks.rst | 1 + docs/notebooks/README.md | 1 - docs/notebooks/estimate_pixel_threshold.ipynb | 7 +- docs/notebooks/unequal_schema.ipynb | 274 ++++++++++++++++++ src/hipscat_import/catalog/file_readers.py | 3 +- src/hipscat_import/catalog/map_reduce.py | 1 + src/hipscat_import/soap/map_reduce.py | 2 +- .../catalog/test_file_readers.py | 13 +- .../hipscat_import/catalog/test_map_reduce.py | 28 ++ .../catalog/test_run_round_trip.py | 17 +- 11 files changed, 347 insertions(+), 18 deletions(-) delete mode 100644 docs/notebooks/README.md create mode 100644 docs/notebooks/unequal_schema.ipynb diff --git a/docs/index.rst b/docs/index.rst index 13d9f80e..e157aa1f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -39,15 +39,25 @@ See dataset-specific notes on arguments: * :doc:`guide/index_table` Once you have created your arguments object, you pass it into the pipeline control, -and then wait: +and then wait. Running within a main guard will potentially avoid some python +threading issues with dask: .. code-block:: python - import hipscat_import.pipeline as runner + from dask.distributed import Client + from hipscat_import.pipeline import pipeline_with_client - args = ... - runner.pipeline(args) + def main(): + args = ... + with Client( + n_workers=10, + threads_per_worker=1, + ... + ) as client: + pipeline_with_client(args, client) + if __name__ == '__main__': + main() .. toctree:: :hidden: diff --git a/docs/notebooks.rst b/docs/notebooks.rst index 5e218f09..b00e6830 100644 --- a/docs/notebooks.rst +++ b/docs/notebooks.rst @@ -4,3 +4,4 @@ Notebooks .. toctree:: Estimate Pixel Threshold + Creating empty schema parquet diff --git a/docs/notebooks/README.md b/docs/notebooks/README.md deleted file mode 100644 index 4eb938ce..00000000 --- a/docs/notebooks/README.md +++ /dev/null @@ -1 +0,0 @@ -Put your Jupyter notebooks here :) \ No newline at end of file diff --git a/docs/notebooks/estimate_pixel_threshold.ipynb b/docs/notebooks/estimate_pixel_threshold.ipynb index 7f544a65..750e5397 100755 --- a/docs/notebooks/estimate_pixel_threshold.ipynb +++ b/docs/notebooks/estimate_pixel_threshold.ipynb @@ -46,7 +46,11 @@ "outputs": [], "source": [ "### Change this path!!!\n", - "sample_parquet_file = \"../../tests/hipscat_import/data/sample.parquet\"" + "import os\n", + "import tempfile\n", + "\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "sample_parquet_file = os.path.join(tmp_path.name, \"sample.parquet\")" ] }, { @@ -56,7 +60,6 @@ "metadata": {}, "outputs": [], "source": [ - "import pandas as pd\n", "from hipscat_import.catalog.file_readers import CsvReader\n", "\n", "### Change this path!!!\n", diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb new file mode 100644 index 00000000..7bb40c94 --- /dev/null +++ b/docs/notebooks/unequal_schema.ipynb @@ -0,0 +1,274 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Unequal schema problems\n", + "\n", + "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. They have a similar correction mechanism, so we discuss both here." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Error Demonstration\n", + "\n", + "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "import os\n", + "from dask.distributed import Client\n", + "\n", + "from hipscat_import.pipeline import pipeline_with_client\n", + "from hipscat_import.catalog.arguments import ImportArguments\n", + "from hipscat_import.catalog.file_readers import get_file_reader\n", + "\n", + "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_bad\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + ")\n", + "\n", + "client = Client(n_workers=1, threads_per_worker=1)\n", + "\n", + "try:\n", + " pipeline_with_client(args, client)\n", + "except:\n", + " pass # we know it's going to fail!!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", + "\n", + "Let's take a look inside the schema structure and see the field types it expects to see:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "\n", + "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", + "\n", + "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We already have one for this data set, but we'll show you how to create one of your own later in this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tmp_path = tempfile.TemporaryDirectory()\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_good\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", + " use_schema_file=mixed_schema_csv_parquet,\n", + " overwrite=True,\n", + ")\n", + "pipeline_with_client(args, client)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Making a new parquet schema file\n", + "\n", + "There are a few different strategies we can use to create a schema file:\n", + "\n", + "* using some string representations of pandas datatypes\n", + "* using an explicit list of pyarrow data types\n", + "* and many more!\n", + "\n", + "We'll stick to these two, since they exercise the most common code paths through schema generation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using pandas type strings\n", + "\n", + "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", + "\n", + "```\n", + "name,type\n", + "ID,Int64\n", + "version,str\n", + "HIP,Int32\n", + "TYC,str\n", + "etc...\n", + "```\n", + "\n", + "Such files are a common way to send type information when the data files have no header.\n", + "\n", + "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "## Fetch the name/type information from a file.\n", + "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", + "\n", + "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", + "## \"str\" is not understood as \"string\", so add a special case.\n", + "type_map = {\n", + " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", + " for _, row in type_list_frame.iterrows()\n", + "}\n", + "dtype_frame = pd.DataFrame(type_map)\n", + "\n", + "## Now write our empty data frame to a parquet file.\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", + "dtype_frame.to_parquet(schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", + "\n", + "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Explict list of pyarrow data types\n", + "\n", + "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "## List all of our columns as pyarrow fields.\n", + "schema_from_pyarrow = pa.schema(\n", + " [\n", + " pa.field(\"id\", pa.int64()),\n", + " pa.field(\"ra\", pa.float64()),\n", + " pa.field(\"dec\", pa.float64()),\n", + " pa.field(\"ra_error\", pa.float64()),\n", + " pa.field(\"dec_error\", pa.float64()),\n", + " pa.field(\"comment\", pa.string()),\n", + " pa.field(\"code\", pa.string()),\n", + " ]\n", + ")\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", + "pq.write_metadata(schema_from_pyarrow, schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Again, we'll check that the generated parquet metadata is what we expect:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's clean up." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.shutdown()\n", + "tmp_path.cleanup()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "hipscatenv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 97359271..6d6192b8 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -141,7 +141,8 @@ def read(self, input_file): if self.schema_file: schema_parquet = file_io.load_parquet_to_pandas( - FilePointer(self.schema_file), dtype_backend="numpy_nullable" + FilePointer(self.schema_file), + **self.kwargs, ) use_column_names = None diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 8ba4d8a3..97ed8def 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -110,6 +110,7 @@ def map_to_pixels( input_file, file_reader, highest_order, ra_column, dec_column ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) + mapped_pixel = mapped_pixel.astype(np.int64) histo[mapped_pixel] += count_at_pixel.astype(np.int64) ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo) diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 53db9305..68e50228 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -55,7 +55,7 @@ def count_joins( If any un-joined source pixels remain, stretch out to neighboring object pixels. Args: - soap_args(SoapArguments): set of arguments for pipeline execution + soap_args(`hipscat_import.soap.SoapArguments`): set of arguments for pipeline execution source_pixel(HealpixPixel): order and pixel for the source catalog single pixel. object_pixels(List[HealpixPixel]): set of tuples of order and pixel for the partitions of the object catalog to be joined. diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index 65cbd966..af402a23 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -96,7 +96,9 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path): schema_file, ) - frame = next(CsvReader(schema_file=schema_file).read(small_sky_single_file)) + frame = next( + CsvReader(schema_file=schema_file, dtype_backend="numpy_nullable").read(small_sky_single_file) + ) assert len(frame) == 131 column_types = frame.dtypes.to_dict() @@ -185,7 +187,14 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path): schema_file = os.path.join(tmp_path, "metadata.parquet") pq.write_metadata(parquet_schema_types, schema_file) - frame = next(CsvReader(header=None, separator="|", schema_file=schema_file).read(formats_pipe_csv)) + frame = next( + CsvReader( + header=None, + separator="|", + schema_file=schema_file, + dtype_backend="numpy_nullable", + ).read(formats_pipe_csv) + ) assert len(frame) == 3 assert np.all(frame["letters"] == ["AA", "BB", "CC"]) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index eb91e967..6168e4d9 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -134,6 +134,34 @@ def test_map_headers(tmp_path, formats_headers_csv): assert (result == expected).all() +def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet): + """Test loading the a file when using a parquet schema file for dtypes""" + os.makedirs(os.path.join(tmp_path, "histograms")) + input_file = os.path.join(mixed_schema_csv_dir, "input_01.csv") + mr.map_to_pixels( + input_file=input_file, + file_reader=get_file_reader( + "csv", + schema_file=mixed_schema_csv_parquet, + dtype_backend="numpy_nullable", + ), + highest_order=0, + ra_column="ra", + dec_column="dec", + resume_path=tmp_path, + mapping_key="map_0", + ) + + result = read_partial_histogram(tmp_path, "map_0") + + assert len(result) == 12 + + expected = hist.empty_histogram(0) + expected[11] = 4 + npt.assert_array_equal(result, expected) + assert (result == expected).all() + + def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" os.makedirs(os.path.join(tmp_path, "histograms")) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 28ac954b..e195008c 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -56,7 +56,7 @@ def test_import_source_table( assert len(catalog.get_healpix_pixels()) == 14 -@pytest.mark.dask +@pytest.mark.dask(timeout=10) def test_import_mixed_schema_csv( dask_client, mixed_schema_csv_dir, @@ -65,24 +65,27 @@ def test_import_mixed_schema_csv( tmp_path, ): """Test basic execution, with a mixed schema. - - the two input file in `mixed_schema_csv_dir` have different *implied* schemas + - the two input files in `mixed_schema_csv_dir` have different *implied* schemas when parsed by pandas. this verifies that they end up with the same schema and can be combined into a single parquet file. """ - - schema_parquet = pd.read_parquet(mixed_schema_csv_parquet) args = ImportArguments( - output_catalog_name="mixed_csv", + output_catalog_name="mixed_csv_bad", input_path=mixed_schema_csv_dir, input_format="csv", output_path=tmp_path, dask_tmp=tmp_path, highest_healpix_order=1, - file_reader=get_file_reader("csv", chunksize=1, type_map=schema_parquet.dtypes.to_dict()), + file_reader=get_file_reader("csv", chunksize=1, schema_file=mixed_schema_csv_parquet), progress_bar=False, - use_schema_file=mixed_schema_csv_parquet, ) + with pytest.raises(RuntimeError, match="Some reducing stages failed"): + runner.run(args, dask_client) + + ## Try again, but with the schema specified. + args.use_schema_file = mixed_schema_csv_parquet + args.output_catalog_name = "mixed_csv_good" runner.run(args, dask_client) # Check that the catalog parquet file exists From 41bf6f39fa33dfb535eb6ae0fcab4b3ec89a6248 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Thu, 2 Nov 2023 21:44:13 -0400 Subject: [PATCH 2/4] Remove weird notebook line. --- docs/notebooks/unequal_schema.ipynb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb index 7bb40c94..e913194e 100644 --- a/docs/notebooks/unequal_schema.ipynb +++ b/docs/notebooks/unequal_schema.ipynb @@ -266,8 +266,7 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.4" - }, - "orig_nbformat": 4 + } }, "nbformat": 4, "nbformat_minor": 2 From d7ecf01efa4ad24a65b812c9e795cdb0db16a3cf Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 6 Nov 2023 08:56:42 -0500 Subject: [PATCH 3/4] More motivating text. --- docs/notebooks/unequal_schema.ipynb | 581 +++++++++++++++------------- 1 file changed, 310 insertions(+), 271 deletions(-) diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb index e913194e..e7dce6b0 100644 --- a/docs/notebooks/unequal_schema.ipynb +++ b/docs/notebooks/unequal_schema.ipynb @@ -1,273 +1,312 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Unequal schema problems\n", - "\n", - "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. They have a similar correction mechanism, so we discuss both here." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Error Demonstration\n", - "\n", - "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import tempfile\n", - "import os\n", - "from dask.distributed import Client\n", - "\n", - "from hipscat_import.pipeline import pipeline_with_client\n", - "from hipscat_import.catalog.arguments import ImportArguments\n", - "from hipscat_import.catalog.file_readers import get_file_reader\n", - "\n", - "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", - "tmp_path = tempfile.TemporaryDirectory()\n", - "\n", - "args = ImportArguments(\n", - " output_catalog_name=\"mixed_csv_bad\",\n", - " input_path=mixed_schema_csv_dir,\n", - " input_format=\"csv\",\n", - " output_path=tmp_path.name,\n", - " highest_healpix_order=1,\n", - ")\n", - "\n", - "client = Client(n_workers=1, threads_per_worker=1)\n", - "\n", - "try:\n", - " pipeline_with_client(args, client)\n", - "except:\n", - " pass # we know it's going to fail!!" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", - "\n", - "Let's take a look inside the schema structure and see the field types it expects to see:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pyarrow.parquet as pq\n", - "\n", - "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", - "\n", - "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We already have one for this data set, but we'll show you how to create one of your own later in this notebook." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "tmp_path = tempfile.TemporaryDirectory()\n", - "args = ImportArguments(\n", - " output_catalog_name=\"mixed_csv_good\",\n", - " input_path=mixed_schema_csv_dir,\n", - " input_format=\"csv\",\n", - " output_path=tmp_path.name,\n", - " highest_healpix_order=1,\n", - " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", - " use_schema_file=mixed_schema_csv_parquet,\n", - " overwrite=True,\n", - ")\n", - "pipeline_with_client(args, client)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Making a new parquet schema file\n", - "\n", - "There are a few different strategies we can use to create a schema file:\n", - "\n", - "* using some string representations of pandas datatypes\n", - "* using an explicit list of pyarrow data types\n", - "* and many more!\n", - "\n", - "We'll stick to these two, since they exercise the most common code paths through schema generation." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Using pandas type strings\n", - "\n", - "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", - "\n", - "```\n", - "name,type\n", - "ID,Int64\n", - "version,str\n", - "HIP,Int32\n", - "TYC,str\n", - "etc...\n", - "```\n", - "\n", - "Such files are a common way to send type information when the data files have no header.\n", - "\n", - "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "\n", - "## Fetch the name/type information from a file.\n", - "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", - "\n", - "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", - "## \"str\" is not understood as \"string\", so add a special case.\n", - "type_map = {\n", - " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", - " for _, row in type_list_frame.iterrows()\n", - "}\n", - "dtype_frame = pd.DataFrame(type_map)\n", - "\n", - "## Now write our empty data frame to a parquet file.\n", - "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", - "dtype_frame.to_parquet(schema_file)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", - "\n", - "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parquet_file = pq.ParquetFile(schema_file)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Explict list of pyarrow data types\n", - "\n", - "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pyarrow as pa\n", - "\n", - "## List all of our columns as pyarrow fields.\n", - "schema_from_pyarrow = pa.schema(\n", - " [\n", - " pa.field(\"id\", pa.int64()),\n", - " pa.field(\"ra\", pa.float64()),\n", - " pa.field(\"dec\", pa.float64()),\n", - " pa.field(\"ra_error\", pa.float64()),\n", - " pa.field(\"dec_error\", pa.float64()),\n", - " pa.field(\"comment\", pa.string()),\n", - " pa.field(\"code\", pa.string()),\n", - " ]\n", - ")\n", - "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", - "pq.write_metadata(schema_from_pyarrow, schema_file)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Again, we'll check that the generated parquet metadata is what we expect:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parquet_file = pq.ParquetFile(schema_file)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Finally, let's clean up." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client.shutdown()\n", - "tmp_path.cleanup()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "hipscatenv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.4" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Unequal schema problems\n", + "\n", + "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.\n", + "\n", + "For example, at the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.\n", + "\n", + "e.g. \n", + "\n", + "#### File1\n", + "\n", + "| int_field | string_field | float_field |\n", + "| --------- | ------------ | ---------- |\n", + "| 5 | | 3.4 |\n", + "| 8 | | 3.8 |\n", + "\n", + "which will have a schema like:\n", + " \n", + " optional int64 field_id=-1 int_field;\n", + " optional int32 field_id=-1 string_field **(Null)**;\n", + " optional double field_id=-1 float_field;\n", + " \n", + "#### File2\n", + " \n", + "| int_field | string_field | float_field |\n", + "| --------- |------------- | ----------- |\n", + "| 6 | hello | 4.1 |\n", + "| 7 | | 3.9 |\n", + "\n", + "will have a schema like:\n", + "\n", + " optional int64 field_id=-1 int_field;\n", + " optional binary field_id=-1 string_field (String);\n", + " optional double field_id=-1 float_field;\n", + "\n", + "When we try to merge these files together, we see an error like the following:\n", + "```\n", + "Key: 4_2666\n", + "Function: reduce_pixel_shards\n", + "args: ()\n", + "kwargs: {...}\n", + "Exception: \"ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')\"\n", + "```\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Error Demonstration\n", + "\n", + "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "import os\n", + "from dask.distributed import Client\n", + "\n", + "from hipscat_import.pipeline import pipeline_with_client\n", + "from hipscat_import.catalog.arguments import ImportArguments\n", + "from hipscat_import.catalog.file_readers import get_file_reader\n", + "\n", + "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_bad\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + ")\n", + "\n", + "client = Client(n_workers=1, threads_per_worker=1)\n", + "\n", + "try:\n", + " pipeline_with_client(args, client)\n", + "except:\n", + " pass # we know it's going to fail!!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", + "\n", + "Let's take a look inside the schema structure and see the field types it expects to see:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "\n", + "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", + "\n", + "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We already have a parquet metadata file for this data set, but we'll show you how to create one of your own later in this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tmp_path = tempfile.TemporaryDirectory()\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_good\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", + " use_schema_file=mixed_schema_csv_parquet,\n", + " overwrite=True,\n", + ")\n", + "pipeline_with_client(args, client)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Making a new parquet schema file\n", + "\n", + "There are a few different strategies we can use to create a schema file:\n", + "\n", + "* using some string representations of pandas datatypes\n", + "* using an explicit list of pyarrow data types\n", + "* and many more!\n", + "\n", + "We'll stick to these two, since they exercise the most common code paths through schema generation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using pandas type strings\n", + "\n", + "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", + "\n", + "```\n", + "name,type\n", + "ID,Int64\n", + "version,str\n", + "HIP,Int32\n", + "TYC,str\n", + "etc...\n", + "```\n", + "\n", + "Such files are a common way to send type information when the data files have no header.\n", + "\n", + "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "## Fetch the name/type information from a file.\n", + "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", + "\n", + "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", + "## \"str\" is not understood as \"string\", so add a special case.\n", + "type_map = {\n", + " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", + " for _, row in type_list_frame.iterrows()\n", + "}\n", + "dtype_frame = pd.DataFrame(type_map)\n", + "\n", + "## Now write our empty data frame to a parquet file.\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", + "dtype_frame.to_parquet(schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", + "\n", + "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Explict list of pyarrow data types\n", + "\n", + "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "## List all of our columns as pyarrow fields.\n", + "schema_from_pyarrow = pa.schema(\n", + " [\n", + " pa.field(\"id\", pa.int64()),\n", + " pa.field(\"ra\", pa.float64()),\n", + " pa.field(\"dec\", pa.float64()),\n", + " pa.field(\"ra_error\", pa.float64()),\n", + " pa.field(\"dec_error\", pa.float64()),\n", + " pa.field(\"comment\", pa.string()),\n", + " pa.field(\"code\", pa.string()),\n", + " ]\n", + ")\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", + "pq.write_metadata(schema_from_pyarrow, schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Again, we'll check that the generated parquet metadata is what we expect:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's clean up." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.shutdown()\n", + "tmp_path.cleanup()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "hipscatenv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 } From 805aa88e345915f0f15227c2adbc2313549e8cf8 Mon Sep 17 00:00:00 2001 From: delucchi-cmu Date: Mon, 6 Nov 2023 09:15:38 -0500 Subject: [PATCH 4/4] Formatting. --- docs/notebooks/unequal_schema.ipynb | 620 +++++++++--------- src/hipscat_import/catalog/map_reduce.py | 8 +- .../hipscat_import/catalog/test_map_reduce.py | 2 + 3 files changed, 316 insertions(+), 314 deletions(-) diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb index e7dce6b0..7c109be0 100644 --- a/docs/notebooks/unequal_schema.ipynb +++ b/docs/notebooks/unequal_schema.ipynb @@ -1,312 +1,312 @@ { - "cells": [ - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Unequal schema problems\n", - "\n", - "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.\n", - "\n", - "For example, at the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.\n", - "\n", - "e.g. \n", - "\n", - "#### File1\n", - "\n", - "| int_field | string_field | float_field |\n", - "| --------- | ------------ | ---------- |\n", - "| 5 | | 3.4 |\n", - "| 8 | | 3.8 |\n", - "\n", - "which will have a schema like:\n", - " \n", - " optional int64 field_id=-1 int_field;\n", - " optional int32 field_id=-1 string_field **(Null)**;\n", - " optional double field_id=-1 float_field;\n", - " \n", - "#### File2\n", - " \n", - "| int_field | string_field | float_field |\n", - "| --------- |------------- | ----------- |\n", - "| 6 | hello | 4.1 |\n", - "| 7 | | 3.9 |\n", - "\n", - "will have a schema like:\n", - "\n", - " optional int64 field_id=-1 int_field;\n", - " optional binary field_id=-1 string_field (String);\n", - " optional double field_id=-1 float_field;\n", - "\n", - "When we try to merge these files together, we see an error like the following:\n", - "```\n", - "Key: 4_2666\n", - "Function: reduce_pixel_shards\n", - "args: ()\n", - "kwargs: {...}\n", - "Exception: \"ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')\"\n", - "```\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Error Demonstration\n", - "\n", - "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import tempfile\n", - "import os\n", - "from dask.distributed import Client\n", - "\n", - "from hipscat_import.pipeline import pipeline_with_client\n", - "from hipscat_import.catalog.arguments import ImportArguments\n", - "from hipscat_import.catalog.file_readers import get_file_reader\n", - "\n", - "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", - "tmp_path = tempfile.TemporaryDirectory()\n", - "\n", - "args = ImportArguments(\n", - " output_catalog_name=\"mixed_csv_bad\",\n", - " input_path=mixed_schema_csv_dir,\n", - " input_format=\"csv\",\n", - " output_path=tmp_path.name,\n", - " highest_healpix_order=1,\n", - ")\n", - "\n", - "client = Client(n_workers=1, threads_per_worker=1)\n", - "\n", - "try:\n", - " pipeline_with_client(args, client)\n", - "except:\n", - " pass # we know it's going to fail!!" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", - "\n", - "Let's take a look inside the schema structure and see the field types it expects to see:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pyarrow.parquet as pq\n", - "\n", - "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", - "\n", - "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We already have a parquet metadata file for this data set, but we'll show you how to create one of your own later in this notebook." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "tmp_path = tempfile.TemporaryDirectory()\n", - "args = ImportArguments(\n", - " output_catalog_name=\"mixed_csv_good\",\n", - " input_path=mixed_schema_csv_dir,\n", - " input_format=\"csv\",\n", - " output_path=tmp_path.name,\n", - " highest_healpix_order=1,\n", - " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", - " use_schema_file=mixed_schema_csv_parquet,\n", - " overwrite=True,\n", - ")\n", - "pipeline_with_client(args, client)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Making a new parquet schema file\n", - "\n", - "There are a few different strategies we can use to create a schema file:\n", - "\n", - "* using some string representations of pandas datatypes\n", - "* using an explicit list of pyarrow data types\n", - "* and many more!\n", - "\n", - "We'll stick to these two, since they exercise the most common code paths through schema generation." - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Using pandas type strings\n", - "\n", - "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", - "\n", - "```\n", - "name,type\n", - "ID,Int64\n", - "version,str\n", - "HIP,Int32\n", - "TYC,str\n", - "etc...\n", - "```\n", - "\n", - "Such files are a common way to send type information when the data files have no header.\n", - "\n", - "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pandas as pd\n", - "\n", - "## Fetch the name/type information from a file.\n", - "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", - "\n", - "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", - "## \"str\" is not understood as \"string\", so add a special case.\n", - "type_map = {\n", - " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", - " for _, row in type_list_frame.iterrows()\n", - "}\n", - "dtype_frame = pd.DataFrame(type_map)\n", - "\n", - "## Now write our empty data frame to a parquet file.\n", - "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", - "dtype_frame.to_parquet(schema_file)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", - "\n", - "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parquet_file = pq.ParquetFile(schema_file)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Explict list of pyarrow data types\n", - "\n", - "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import pyarrow as pa\n", - "\n", - "## List all of our columns as pyarrow fields.\n", - "schema_from_pyarrow = pa.schema(\n", - " [\n", - " pa.field(\"id\", pa.int64()),\n", - " pa.field(\"ra\", pa.float64()),\n", - " pa.field(\"dec\", pa.float64()),\n", - " pa.field(\"ra_error\", pa.float64()),\n", - " pa.field(\"dec_error\", pa.float64()),\n", - " pa.field(\"comment\", pa.string()),\n", - " pa.field(\"code\", pa.string()),\n", - " ]\n", - ")\n", - "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", - "pq.write_metadata(schema_from_pyarrow, schema_file)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Again, we'll check that the generated parquet metadata is what we expect:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "parquet_file = pq.ParquetFile(schema_file)\n", - "print(parquet_file.schema)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "Finally, let's clean up." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "client.shutdown()\n", - "tmp_path.cleanup()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "hipscatenv", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.4" - } - }, - "nbformat": 4, - "nbformat_minor": 2 + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Unequal schema problems\n", + "\n", + "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.\n", + "\n", + "For example, at the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.\n", + "\n", + "e.g. \n", + "\n", + "#### File1\n", + "\n", + "| int_field | string_field | float_field |\n", + "| --------- | ------------ | ---------- |\n", + "| 5 | | 3.4 |\n", + "| 8 | | 3.8 |\n", + "\n", + "which will have a schema like:\n", + " \n", + " optional int64 field_id=-1 int_field;\n", + " optional int32 field_id=-1 string_field **(Null)**;\n", + " optional double field_id=-1 float_field;\n", + " \n", + "#### File2\n", + " \n", + "| int_field | string_field | float_field |\n", + "| --------- |------------- | ----------- |\n", + "| 6 | hello | 4.1 |\n", + "| 7 | | 3.9 |\n", + "\n", + "will have a schema like:\n", + "\n", + " optional int64 field_id=-1 int_field;\n", + " optional binary field_id=-1 string_field (String);\n", + " optional double field_id=-1 float_field;\n", + "\n", + "When we try to merge these files together, we see an error like the following:\n", + "```\n", + "Key: 4_2666\n", + "Function: reduce_pixel_shards\n", + "args: ()\n", + "kwargs: {...}\n", + "Exception: \"ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')\"\n", + "```\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Error Demonstration\n", + "\n", + "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "import os\n", + "from dask.distributed import Client\n", + "\n", + "from hipscat_import.pipeline import pipeline_with_client\n", + "from hipscat_import.catalog.arguments import ImportArguments\n", + "from hipscat_import.catalog.file_readers import get_file_reader\n", + "\n", + "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_bad\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + ")\n", + "\n", + "client = Client(n_workers=1, threads_per_worker=1)\n", + "\n", + "try:\n", + " pipeline_with_client(args, client)\n", + "except:\n", + " pass # we know it's going to fail!!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", + "\n", + "Let's take a look inside the schema structure and see the field types it expects to see:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "\n", + "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", + "\n", + "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We already have a parquet metadata file for this data set, but we'll show you how to create one of your own later in this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tmp_path = tempfile.TemporaryDirectory()\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_good\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", + " use_schema_file=mixed_schema_csv_parquet,\n", + " overwrite=True,\n", + ")\n", + "pipeline_with_client(args, client)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Making a new parquet schema file\n", + "\n", + "There are a few different strategies we can use to create a schema file:\n", + "\n", + "* using some string representations of pandas datatypes\n", + "* using an explicit list of pyarrow data types\n", + "* and many more!\n", + "\n", + "We'll stick to these two, since they exercise the most common code paths through schema generation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using pandas type strings\n", + "\n", + "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", + "\n", + "```\n", + "name,type\n", + "ID,Int64\n", + "version,str\n", + "HIP,Int32\n", + "TYC,str\n", + "etc...\n", + "```\n", + "\n", + "Such files are a common way to send type information when the data files have no header.\n", + "\n", + "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "## Fetch the name/type information from a file.\n", + "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", + "\n", + "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", + "## \"str\" is not understood as \"string\", so add a special case.\n", + "type_map = {\n", + " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", + " for _, row in type_list_frame.iterrows()\n", + "}\n", + "dtype_frame = pd.DataFrame(type_map)\n", + "\n", + "## Now write our empty data frame to a parquet file.\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", + "dtype_frame.to_parquet(schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", + "\n", + "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Explict list of pyarrow data types\n", + "\n", + "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "## List all of our columns as pyarrow fields.\n", + "schema_from_pyarrow = pa.schema(\n", + " [\n", + " pa.field(\"id\", pa.int64()),\n", + " pa.field(\"ra\", pa.float64()),\n", + " pa.field(\"dec\", pa.float64()),\n", + " pa.field(\"ra_error\", pa.float64()),\n", + " pa.field(\"dec_error\", pa.float64()),\n", + " pa.field(\"comment\", pa.string()),\n", + " pa.field(\"code\", pa.string()),\n", + " ]\n", + ")\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", + "pq.write_metadata(schema_from_pyarrow, schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Again, we'll check that the generated parquet metadata is what we expect:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's clean up." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.shutdown()\n", + "tmp_path.cleanup()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "hipscatenv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 } diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 41e06db5..65ad5a6c 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -54,7 +54,7 @@ def _iterate_input_file( highest_order, ra_column, dec_column, - use_hipscat_index = False, + use_hipscat_index=False, ): """Helper function to handle input file reading and healpix pixel calculation""" if not file_reader: @@ -96,7 +96,7 @@ def map_to_pixels( highest_order, ra_column, dec_column, - use_hipscat_index = False + use_hipscat_index=False, ): """Map a file of input objects to their healpix pixels. @@ -138,7 +138,7 @@ def split_pixels( cache_shard_path: FilePointer, resume_path: FilePointer, alignment=None, - use_hipscat_index = False, + use_hipscat_index=False, ): """Map a file of input objects to their healpix pixels and split into shards. @@ -195,7 +195,7 @@ def reduce_pixel_shards( ra_column, dec_column, id_column, - use_hipscat_index = False, + use_hipscat_index=False, add_hipscat_index=True, delete_input_files=True, use_schema_file="", diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index 0a2d36a5..6e2a5e9d 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -166,6 +166,7 @@ def test_map_with_hipscat_index(tmp_path, formats_dir, small_sky_single_file): mapping_key="map_0", ) + def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet): """Test loading the a file when using a parquet schema file for dtypes""" os.makedirs(os.path.join(tmp_path, "histograms")) @@ -193,6 +194,7 @@ def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parque npt.assert_array_equal(result, expected) assert (result == expected).all() + def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" os.makedirs(os.path.join(tmp_path, "histograms"))