Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation for creating a parquet schema. #162

Merged
merged 6 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,25 @@ See dataset-specific notes on arguments:
* :doc:`guide/index_table`

Once you have created your arguments object, you pass it into the pipeline control,
and then wait:
and then wait. Running within a main guard will potentially avoid some python
threading issues with dask:

.. code-block:: python

import hipscat_import.pipeline as runner
from dask.distributed import Client
from hipscat_import.pipeline import pipeline_with_client

args = ...
runner.pipeline(args)
def main():
args = ...
with Client(
n_workers=10,
threads_per_worker=1,
...
) as client:
pipeline_with_client(args, client)

if __name__ == '__main__':
main()

.. toctree::
:hidden:
Expand Down
1 change: 1 addition & 0 deletions docs/notebooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ Notebooks
.. toctree::

Estimate Pixel Threshold <notebooks/estimate_pixel_threshold>
Creating empty schema parquet <notebooks/unequal_schema>
1 change: 0 additions & 1 deletion docs/notebooks/README.md

This file was deleted.

7 changes: 5 additions & 2 deletions docs/notebooks/estimate_pixel_threshold.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
"outputs": [],
"source": [
"### Change this path!!!\n",
"sample_parquet_file = \"../../tests/hipscat_import/data/sample.parquet\""
"import os\n",
"import tempfile\n",
"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
"sample_parquet_file = os.path.join(tmp_path.name, \"sample.parquet\")"
]
},
{
Expand All @@ -56,7 +60,6 @@
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"from hipscat_import.catalog.file_readers import CsvReader\n",
"\n",
"### Change this path!!!\n",
Expand Down
312 changes: 312 additions & 0 deletions docs/notebooks/unequal_schema.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Unequal schema problems\n",
"\n",
"There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.\n",
"\n",
"For example, at the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.\n",
"\n",
"e.g. \n",
"\n",
"#### File1\n",
"\n",
"| int_field | string_field | float_field |\n",
"| --------- | ------------ | ---------- |\n",
"| 5 | <empty> | 3.4 |\n",
"| 8 | <empty> | 3.8 |\n",
"\n",
"which will have a schema like:\n",
" \n",
" optional int64 field_id=-1 int_field;\n",
" optional int32 field_id=-1 string_field **(Null)**;\n",
" optional double field_id=-1 float_field;\n",
" \n",
"#### File2\n",
" \n",
"| int_field | string_field | float_field |\n",
"| --------- |------------- | ----------- |\n",
"| 6 | hello | 4.1 |\n",
"| 7 | <empty> | 3.9 |\n",
"\n",
"will have a schema like:\n",
"\n",
" optional int64 field_id=-1 int_field;\n",
" optional binary field_id=-1 string_field (String);\n",
" optional double field_id=-1 float_field;\n",
"\n",
"When we try to merge these files together, we see an error like the following:\n",
"```\n",
"Key: 4_2666\n",
"Function: reduce_pixel_shards\n",
"args: ()\n",
"kwargs: {...}\n",
"Exception: \"ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')\"\n",
"```\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Error Demonstration\n",
"\n",
"Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tempfile\n",
"import os\n",
"from dask.distributed import Client\n",
"\n",
"from hipscat_import.pipeline import pipeline_with_client\n",
"from hipscat_import.catalog.arguments import ImportArguments\n",
"from hipscat_import.catalog.file_readers import get_file_reader\n",
"\n",
"mixed_schema_csv_dir = \"../../tests/hipscat_import/data/mixed_schema\"\n",
"tmp_path = tempfile.TemporaryDirectory()\n",
"\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_bad\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
" highest_healpix_order=1,\n",
")\n",
"\n",
"client = Client(n_workers=1, threads_per_worker=1)\n",
"\n",
"try:\n",
" pipeline_with_client(args, client)\n",
"except:\n",
" pass # we know it's going to fail!!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).\n",
"\n",
"Let's take a look inside the schema structure and see the field types it expects to see:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow.parquet as pq\n",
"\n",
"mixed_schema_csv_parquet = \"../../tests/hipscat_import/data/mixed_schema/schema.parquet\"\n",
"\n",
"parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We already have a parquet metadata file for this data set, but we'll show you how to create one of your own later in this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tmp_path = tempfile.TemporaryDirectory()\n",
"args = ImportArguments(\n",
" output_catalog_name=\"mixed_csv_good\",\n",
" input_path=mixed_schema_csv_dir,\n",
" input_format=\"csv\",\n",
" output_path=tmp_path.name,\n",
" highest_healpix_order=1,\n",
" file_reader=get_file_reader(\"csv\", schema_file=mixed_schema_csv_parquet),\n",
" use_schema_file=mixed_schema_csv_parquet,\n",
" overwrite=True,\n",
")\n",
"pipeline_with_client(args, client)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Making a new parquet schema file\n",
"\n",
"There are a few different strategies we can use to create a schema file:\n",
"\n",
"* using some string representations of pandas datatypes\n",
"* using an explicit list of pyarrow data types\n",
"* and many more!\n",
"\n",
"We'll stick to these two, since they exercise the most common code paths through schema generation."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Using pandas type strings\n",
"\n",
"Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:\n",
"\n",
"```\n",
"name,type\n",
"ID,Int64\n",
"version,str\n",
"HIP,Int32\n",
"TYC,str\n",
"etc...\n",
"```\n",
"\n",
"Such files are a common way to send type information when the data files have no header.\n",
"\n",
"In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"\n",
"## Fetch the name/type information from a file.\n",
"type_list_frame = pd.read_csv(\"../static/tic_types.csv\")\n",
"\n",
"## For each row, add to a dictionary with name and a pandas series with the parsed data type.\n",
"## \"str\" is not understood as \"string\", so add a special case.\n",
"type_map = {\n",
" row[\"name\"]: pd.Series(dtype=(\"string\" if row[\"type\"] == \"str\" else row[\"type\"]))\n",
" for _, row in type_list_frame.iterrows()\n",
"}\n",
"dtype_frame = pd.DataFrame(type_map)\n",
"\n",
"## Now write our empty data frame to a parquet file.\n",
"schema_file = os.path.join(tmp_path.name, \"schema_from_csv_list.parquet\")\n",
"dtype_frame.to_parquet(schema_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's look at the parquet file's metadata and see if it matches what we'd expect.\n",
"\n",
"You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile(schema_file)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Explict list of pyarrow data types\n",
"\n",
"Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"\n",
"## List all of our columns as pyarrow fields.\n",
"schema_from_pyarrow = pa.schema(\n",
" [\n",
" pa.field(\"id\", pa.int64()),\n",
" pa.field(\"ra\", pa.float64()),\n",
" pa.field(\"dec\", pa.float64()),\n",
" pa.field(\"ra_error\", pa.float64()),\n",
" pa.field(\"dec_error\", pa.float64()),\n",
" pa.field(\"comment\", pa.string()),\n",
" pa.field(\"code\", pa.string()),\n",
" ]\n",
")\n",
"schema_file = os.path.join(tmp_path.name, \"schema_from_pyarrow.parquet\")\n",
"pq.write_metadata(schema_from_pyarrow, schema_file)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Again, we'll check that the generated parquet metadata is what we expect:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parquet_file = pq.ParquetFile(schema_file)\n",
"print(parquet_file.schema)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, let's clean up."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.shutdown()\n",
"tmp_path.cleanup()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "hipscatenv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
3 changes: 2 additions & 1 deletion src/hipscat_import/catalog/file_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def read(self, input_file):

if self.schema_file:
schema_parquet = file_io.load_parquet_to_pandas(
FilePointer(self.schema_file), dtype_backend="numpy_nullable"
FilePointer(self.schema_file),
**self.kwargs,
)

use_column_names = None
Expand Down
Loading