diff --git a/docs/index.rst b/docs/index.rst index 13d9f80e..e157aa1f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -39,15 +39,25 @@ See dataset-specific notes on arguments: * :doc:`guide/index_table` Once you have created your arguments object, you pass it into the pipeline control, -and then wait: +and then wait. Running within a main guard will potentially avoid some python +threading issues with dask: .. code-block:: python - import hipscat_import.pipeline as runner + from dask.distributed import Client + from hipscat_import.pipeline import pipeline_with_client - args = ... - runner.pipeline(args) + def main(): + args = ... + with Client( + n_workers=10, + threads_per_worker=1, + ... + ) as client: + pipeline_with_client(args, client) + if __name__ == '__main__': + main() .. toctree:: :hidden: diff --git a/docs/notebooks.rst b/docs/notebooks.rst index 5e218f09..b00e6830 100644 --- a/docs/notebooks.rst +++ b/docs/notebooks.rst @@ -4,3 +4,4 @@ Notebooks .. toctree:: Estimate Pixel Threshold + Creating empty schema parquet diff --git a/docs/notebooks/README.md b/docs/notebooks/README.md deleted file mode 100644 index 4eb938ce..00000000 --- a/docs/notebooks/README.md +++ /dev/null @@ -1 +0,0 @@ -Put your Jupyter notebooks here :) \ No newline at end of file diff --git a/docs/notebooks/estimate_pixel_threshold.ipynb b/docs/notebooks/estimate_pixel_threshold.ipynb index 7f544a65..750e5397 100755 --- a/docs/notebooks/estimate_pixel_threshold.ipynb +++ b/docs/notebooks/estimate_pixel_threshold.ipynb @@ -46,7 +46,11 @@ "outputs": [], "source": [ "### Change this path!!!\n", - "sample_parquet_file = \"../../tests/hipscat_import/data/sample.parquet\"" + "import os\n", + "import tempfile\n", + "\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "sample_parquet_file = os.path.join(tmp_path.name, \"sample.parquet\")" ] }, { @@ -56,7 +60,6 @@ "metadata": {}, "outputs": [], "source": [ - "import pandas as pd\n", "from hipscat_import.catalog.file_readers import CsvReader\n", "\n", "### Change this path!!!\n", diff --git a/docs/notebooks/unequal_schema.ipynb b/docs/notebooks/unequal_schema.ipynb new file mode 100644 index 00000000..7bb40c94 --- /dev/null +++ b/docs/notebooks/unequal_schema.ipynb @@ -0,0 +1,274 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Unequal schema problems\n", + "\n", + "There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. They have a similar correction mechanism, so we discuss both here." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Error Demonstration\n", + "\n", + "Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import tempfile\n", + "import os\n", + "from dask.distributed import Client\n", + "\n", + "from hipscat_import.pipeline import pipeline_with_client\n", + "from hipscat_import.catalog.arguments import ImportArguments\n", + "from hipscat_import.catalog.file_readers import get_file_reader\n", + "\n", + "mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n", + "tmp_path = tempfile.TemporaryDirectory()\n", + "\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_bad\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + ")\n", + "\n", + "client = Client(n_workers=1, threads_per_worker=1)\n", + "\n", + "try:\n", + " pipeline_with_client(args, client)\n", + "except:\n", + " pass # we know it's going to fail!!" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n", + "\n", + "Let's take a look inside the schema structure and see the field types it expects to see:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow.parquet as pq\n", + "\n", + "mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n", + "\n", + "parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We already have one for this data set, but we'll show you how to create one of your own later in this notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "tmp_path = tempfile.TemporaryDirectory()\n", + "args = ImportArguments(\n", + " output_catalog_name=\"mixed_csv_good\",\n", + " input_path=mixed_schema_csv_dir,\n", + " input_format=\"csv\",\n", + " output_path=tmp_path.name,\n", + " highest_healpix_order=1,\n", + " file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n", + " use_schema_file=mixed_schema_csv_parquet,\n", + " overwrite=True,\n", + ")\n", + "pipeline_with_client(args, client)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Making a new parquet schema file\n", + "\n", + "There are a few different strategies we can use to create a schema file:\n", + "\n", + "* using some string representations of pandas datatypes\n", + "* using an explicit list of pyarrow data types\n", + "* and many more!\n", + "\n", + "We'll stick to these two, since they exercise the most common code paths through schema generation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Using pandas type strings\n", + "\n", + "Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n", + "\n", + "```\n", + "name,type\n", + "ID,Int64\n", + "version,str\n", + "HIP,Int32\n", + "TYC,str\n", + "etc...\n", + "```\n", + "\n", + "Such files are a common way to send type information when the data files have no header.\n", + "\n", + "In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "\n", + "## Fetch the name/type information from a file.\n", + "type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n", + "\n", + "## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n", + "## \"str\" is not understood as \"string\", so add a special case.\n", + "type_map = {\n", + " row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n", + " for _, row in type_list_frame.iterrows()\n", + "}\n", + "dtype_frame = pd.DataFrame(type_map)\n", + "\n", + "## Now write our empty data frame to a parquet file.\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n", + "dtype_frame.to_parquet(schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's look at the parquet file's metadata and see if it matches what we'd expect.\n", + "\n", + "You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Explict list of pyarrow data types\n", + "\n", + "Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "## List all of our columns as pyarrow fields.\n", + "schema_from_pyarrow = pa.schema(\n", + " [\n", + " pa.field(\"id\", pa.int64()),\n", + " pa.field(\"ra\", pa.float64()),\n", + " pa.field(\"dec\", pa.float64()),\n", + " pa.field(\"ra_error\", pa.float64()),\n", + " pa.field(\"dec_error\", pa.float64()),\n", + " pa.field(\"comment\", pa.string()),\n", + " pa.field(\"code\", pa.string()),\n", + " ]\n", + ")\n", + "schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n", + "pq.write_metadata(schema_from_pyarrow, schema_file)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Again, we'll check that the generated parquet metadata is what we expect:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parquet_file = pq.ParquetFile(schema_file)\n", + "print(parquet_file.schema)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's clean up." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.shutdown()\n", + "tmp_path.cleanup()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "hipscatenv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/src/hipscat_import/catalog/file_readers.py b/src/hipscat_import/catalog/file_readers.py index 97359271..6d6192b8 100644 --- a/src/hipscat_import/catalog/file_readers.py +++ b/src/hipscat_import/catalog/file_readers.py @@ -141,7 +141,8 @@ def read(self, input_file): if self.schema_file: schema_parquet = file_io.load_parquet_to_pandas( - FilePointer(self.schema_file), dtype_backend="numpy_nullable" + FilePointer(self.schema_file), + **self.kwargs, ) use_column_names = None diff --git a/src/hipscat_import/catalog/map_reduce.py b/src/hipscat_import/catalog/map_reduce.py index 8ba4d8a3..97ed8def 100644 --- a/src/hipscat_import/catalog/map_reduce.py +++ b/src/hipscat_import/catalog/map_reduce.py @@ -110,6 +110,7 @@ def map_to_pixels( input_file, file_reader, highest_order, ra_column, dec_column ): mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True) + mapped_pixel = mapped_pixel.astype(np.int64) histo[mapped_pixel] += count_at_pixel.astype(np.int64) ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo) diff --git a/src/hipscat_import/soap/map_reduce.py b/src/hipscat_import/soap/map_reduce.py index 53db9305..68e50228 100644 --- a/src/hipscat_import/soap/map_reduce.py +++ b/src/hipscat_import/soap/map_reduce.py @@ -55,7 +55,7 @@ def count_joins( If any un-joined source pixels remain, stretch out to neighboring object pixels. Args: - soap_args(SoapArguments): set of arguments for pipeline execution + soap_args(`hipscat_import.soap.SoapArguments`): set of arguments for pipeline execution source_pixel(HealpixPixel): order and pixel for the source catalog single pixel. object_pixels(List[HealpixPixel]): set of tuples of order and pixel for the partitions of the object catalog to be joined. diff --git a/tests/hipscat_import/catalog/test_file_readers.py b/tests/hipscat_import/catalog/test_file_readers.py index 65cbd966..af402a23 100644 --- a/tests/hipscat_import/catalog/test_file_readers.py +++ b/tests/hipscat_import/catalog/test_file_readers.py @@ -96,7 +96,9 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path): schema_file, ) - frame = next(CsvReader(schema_file=schema_file).read(small_sky_single_file)) + frame = next( + CsvReader(schema_file=schema_file, dtype_backend="numpy_nullable").read(small_sky_single_file) + ) assert len(frame) == 131 column_types = frame.dtypes.to_dict() @@ -185,7 +187,14 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path): schema_file = os.path.join(tmp_path, "metadata.parquet") pq.write_metadata(parquet_schema_types, schema_file) - frame = next(CsvReader(header=None, separator="|", schema_file=schema_file).read(formats_pipe_csv)) + frame = next( + CsvReader( + header=None, + separator="|", + schema_file=schema_file, + dtype_backend="numpy_nullable", + ).read(formats_pipe_csv) + ) assert len(frame) == 3 assert np.all(frame["letters"] == ["AA", "BB", "CC"]) diff --git a/tests/hipscat_import/catalog/test_map_reduce.py b/tests/hipscat_import/catalog/test_map_reduce.py index eb91e967..6168e4d9 100644 --- a/tests/hipscat_import/catalog/test_map_reduce.py +++ b/tests/hipscat_import/catalog/test_map_reduce.py @@ -134,6 +134,34 @@ def test_map_headers(tmp_path, formats_headers_csv): assert (result == expected).all() +def test_map_with_schema(tmp_path, mixed_schema_csv_dir, mixed_schema_csv_parquet): + """Test loading the a file when using a parquet schema file for dtypes""" + os.makedirs(os.path.join(tmp_path, "histograms")) + input_file = os.path.join(mixed_schema_csv_dir, "input_01.csv") + mr.map_to_pixels( + input_file=input_file, + file_reader=get_file_reader( + "csv", + schema_file=mixed_schema_csv_parquet, + dtype_backend="numpy_nullable", + ), + highest_order=0, + ra_column="ra", + dec_column="dec", + resume_path=tmp_path, + mapping_key="map_0", + ) + + result = read_partial_histogram(tmp_path, "map_0") + + assert len(result) == 12 + + expected = hist.empty_histogram(0) + expected[11] = 4 + npt.assert_array_equal(result, expected) + assert (result == expected).all() + + def test_map_small_sky_order0(tmp_path, small_sky_single_file): """Test loading the small sky catalog and partitioning each object into the same large bucket""" os.makedirs(os.path.join(tmp_path, "histograms")) diff --git a/tests/hipscat_import/catalog/test_run_round_trip.py b/tests/hipscat_import/catalog/test_run_round_trip.py index 28ac954b..e195008c 100644 --- a/tests/hipscat_import/catalog/test_run_round_trip.py +++ b/tests/hipscat_import/catalog/test_run_round_trip.py @@ -56,7 +56,7 @@ def test_import_source_table( assert len(catalog.get_healpix_pixels()) == 14 -@pytest.mark.dask +@pytest.mark.dask(timeout=10) def test_import_mixed_schema_csv( dask_client, mixed_schema_csv_dir, @@ -65,24 +65,27 @@ def test_import_mixed_schema_csv( tmp_path, ): """Test basic execution, with a mixed schema. - - the two input file in `mixed_schema_csv_dir` have different *implied* schemas + - the two input files in `mixed_schema_csv_dir` have different *implied* schemas when parsed by pandas. this verifies that they end up with the same schema and can be combined into a single parquet file. """ - - schema_parquet = pd.read_parquet(mixed_schema_csv_parquet) args = ImportArguments( - output_catalog_name="mixed_csv", + output_catalog_name="mixed_csv_bad", input_path=mixed_schema_csv_dir, input_format="csv", output_path=tmp_path, dask_tmp=tmp_path, highest_healpix_order=1, - file_reader=get_file_reader("csv", chunksize=1, type_map=schema_parquet.dtypes.to_dict()), + file_reader=get_file_reader("csv", chunksize=1, schema_file=mixed_schema_csv_parquet), progress_bar=False, - use_schema_file=mixed_schema_csv_parquet, ) + with pytest.raises(RuntimeError, match="Some reducing stages failed"): + runner.run(args, dask_client) + + ## Try again, but with the schema specified. + args.use_schema_file = mixed_schema_csv_parquet + args.output_catalog_name = "mixed_csv_good" runner.run(args, dask_client) # Check that the catalog parquet file exists