Skip to content

Commit

Permalink
Add documentation for creating a parquet schema.
Browse files Browse the repository at this point in the history
  • Loading branch information
delucchi-cmu committed Nov 3, 2023
1 parent 0629cb4 commit daa5209
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 18 deletions.
18 changes: 14 additions & 4 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,25 @@ See dataset-specific notes on arguments:
* :doc:`guide/index_table`

Once you have created your arguments object, you pass it into the pipeline control,
and then wait:
and then wait. Running within a main guard will potentially avoid some python
threading issues with dask:

.. code-block:: python
import hipscat_import.pipeline as runner
from dask.distributed import Client
from hipscat_import.pipeline import pipeline_with_client
args = ...
runner.pipeline(args)
def main():
args = ...
with Client(
n_workers=10,
threads_per_worker=1,
...
) as client:
pipeline_with_client(args, client)
if __name__ == '__main__':
main()
.. toctree::
:hidden:
Expand Down
1 change: 1 addition & 0 deletions docs/notebooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ Notebooks
.. toctree::

Estimate Pixel Threshold <notebooks/estimate_pixel_threshold>
Creating empty schema parquet <notebooks/unequal_schema>
1 change: 0 additions & 1 deletion docs/notebooks/README.md

This file was deleted.

7 changes: 5 additions & 2 deletions docs/notebooks/estimate_pixel_threshold.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
"outputs": [],
"source": [
"### Change this path!!!\n",
"sample_parquet_file = \"../../tests/hipscat_import/data/sample.parquet\""
"import os\n",
"import tempfile\n",
"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
"sample_parquet_file = os.path.join(tmp_path.name, \"sample.parquet\")"
]
},
{
Expand All @@ -56,7 +60,6 @@
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from hipscat_import.catalog.file_readers import CsvReader\n",
"\n",
"### Change this path!!!\n",
Expand Down
274 changes: 274 additions & 0 deletions docs/notebooks/unequal_schema.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Unequal schema problems\n",
"\n",
"There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. They have a similar correction mechanism, so we discuss both here."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Error Demonstration\n",
"\n",
"Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tempfile\n",
"import os\n",
"from dask.distributed import Client\n",
"\n",
"from hipscat_import.pipeline import pipeline_with_client\n",
"from hipscat_import.catalog.arguments import ImportArguments\n",
"from hipscat_import.catalog.file_readers import get_file_reader\n",
"\n",
"mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
"\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_bad\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
" highest_healpix_order=1,\n",
")\n",
"\n",
"client = Client(n_workers=1, threads_per_worker=1)\n",
"\n",
"try:\n",
" pipeline_with_client(args, client)\n",
"except:\n",
" pass # we know it's going to fail!!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n",
"\n",
"Let's take a look inside the schema structure and see the field types it expects to see:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.parquet as pq\n",
"\n",
"mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n",
"\n",
"parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We already have one for this data set, but we'll show you how to create one of your own later in this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tmp_path = tempfile.TemporaryDirectory()\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_good\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
" highest_healpix_order=1,\n",
" file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n",
" use_schema_file=mixed_schema_csv_parquet,\n",
" overwrite=True,\n",
")\n",
"pipeline_with_client(args, client)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Making a new parquet schema file\n",
"\n",
"There are a few different strategies we can use to create a schema file:\n",
"\n",
"* using some string representations of pandas datatypes\n",
"* using an explicit list of pyarrow data types\n",
"* and many more!\n",
"\n",
"We'll stick to these two, since they exercise the most common code paths through schema generation."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using pandas type strings\n",
"\n",
"Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n",
"\n",
"```\n",
"name,type\n",
"ID,Int64\n",
"version,str\n",
"HIP,Int32\n",
"TYC,str\n",
"etc...\n",
"```\n",
"\n",
"Such files are a common way to send type information when the data files have no header.\n",
"\n",
"In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"## Fetch the name/type information from a file.\n",
"type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n",
"\n",
"## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n",
"## \"str\" is not understood as \"string\", so add a special case.\n",
"type_map = {\n",
" row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n",
" for _, row in type_list_frame.iterrows()\n",
"}\n",
"dtype_frame = pd.DataFrame(type_map)\n",
"\n",
"## Now write our empty data frame to a parquet file.\n",
"schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n",
"dtype_frame.to_parquet(schema_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's look at the parquet file's metadata and see if it matches what we'd expect.\n",
"\n",
"You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile(schema_file)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Explict list of pyarrow data types\n",
"\n",
"Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"\n",
"## List all of our columns as pyarrow fields.\n",
"schema_from_pyarrow = pa.schema(\n",
" [\n",
" pa.field(\"id\", pa.int64()),\n",
" pa.field(\"ra\", pa.float64()),\n",
" pa.field(\"dec\", pa.float64()),\n",
" pa.field(\"ra_error\", pa.float64()),\n",
" pa.field(\"dec_error\", pa.float64()),\n",
" pa.field(\"comment\", pa.string()),\n",
" pa.field(\"code\", pa.string()),\n",
" ]\n",
")\n",
"schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n",
"pq.write_metadata(schema_from_pyarrow, schema_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Again, we'll check that the generated parquet metadata is what we expect:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile(schema_file)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, let's clean up."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()\n",
"tmp_path.cleanup()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "hipscatenv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
3 changes: 2 additions & 1 deletion src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def read(self, input_file):

if self.schema_file:
schema_parquet = file_io.load_parquet_to_pandas(
FilePointer(self.schema_file), dtype_backend="numpy_nullable"
FilePointer(self.schema_file),
**self.kwargs,
)

use_column_names = None
Expand Down
1 change: 1 addition & 0 deletions src/hipscat_import/catalog/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def map_to_pixels(
input_file, file_reader, highest_order, ra_column, dec_column
):
mapped_pixel, count_at_pixel = np.unique(mapped_pixels, return_counts=True)
mapped_pixel = mapped_pixel.astype(np.int64)
histo[mapped_pixel] += count_at_pixel.astype(np.int64)
ResumePlan.write_partial_histogram(tmp_path=resume_path, mapping_key=mapping_key, histogram=histo)

Expand Down
2 changes: 1 addition & 1 deletion src/hipscat_import/soap/map_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def count_joins(
If any un-joined source pixels remain, stretch out to neighboring object pixels.
Args:
soap_args(SoapArguments): set of arguments for pipeline execution
soap_args(`hipscat_import.soap.SoapArguments`): set of arguments for pipeline execution
source_pixel(HealpixPixel): order and pixel for the source catalog single pixel.
object_pixels(List[HealpixPixel]): set of tuples of order and pixel for the partitions
of the object catalog to be joined.
Expand Down
13 changes: 11 additions & 2 deletions tests/hipscat_import/catalog/test_file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ def test_csv_reader_parquet_metadata(small_sky_single_file, tmp_path):
schema_file,
)

frame = next(CsvReader(schema_file=schema_file).read(small_sky_single_file))
frame = next(
CsvReader(schema_file=schema_file, dtype_backend="numpy_nullable").read(small_sky_single_file)
)
assert len(frame) == 131

column_types = frame.dtypes.to_dict()
Expand Down Expand Up @@ -185,7 +187,14 @@ def test_csv_reader_pipe_delimited(formats_pipe_csv, tmp_path):
schema_file = os.path.join(tmp_path, "metadata.parquet")
pq.write_metadata(parquet_schema_types, schema_file)

frame = next(CsvReader(header=None, separator="|", schema_file=schema_file).read(formats_pipe_csv))
frame = next(
CsvReader(
header=None,
separator="|",
schema_file=schema_file,
dtype_backend="numpy_nullable",
).read(formats_pipe_csv)
)

assert len(frame) == 3
assert np.all(frame["letters"] == ["AA", "BB", "CC"])
Expand Down
Loading

0 comments on commit daa5209

Please sign in to comment.