Skip to content

Commit

Permalink
Merge pull request #91 from astronomy-commons/issue/35/guide
Browse files Browse the repository at this point in the history
Fleshing out guide for importing catalogs
  • Loading branch information
delucchi-cmu committed Jun 26, 2023
2 parents d6c0d77 + efe6324 commit 435e1d1
Show file tree
Hide file tree
Showing 22 changed files with 633 additions and 174 deletions.
37 changes: 37 additions & 0 deletions docs/catalogs/advanced.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Advanced Usage
===============================================================================

We aim to support ingestion of a lot of kinds of catalog data. Here, we discuss
some ways you can tune the import pipeline for different kinds of data.

.. tip::
Reach out!

If you have some *very* interesting data that isn't well-supported by this
pipeline, we want to hear about it! :doc:`/guide/contact`


``add_hipscat_index``
-------------------------------------------------------------------------------

TODO

``use_schema_file``
-------------------------------------------------------------------------------

TODO

``debug_stats_only``
-------------------------------------------------------------------------------

TODO

``epoch``
-------------------------------------------------------------------------------

TODO

``catalog_type``
-------------------------------------------------------------------------------

TODO
115 changes: 115 additions & 0 deletions docs/catalogs/arguments.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
Catalog Import Arguments
===============================================================================

This page discusses a few topics around setting up a catalog pipeline.

For a full list of the available arguments, see the API documentation for
:py:class:`hipscat_import.catalog.arguments.ImportArguments`

Reading input files
-------------------------------------------------------------------------------

Catalog import reads through a list of files and converts them into a hipscatted catalog.

Which files?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

There are a few ways to specify the files to read:

* ``input_path`` + ``input_format``:
will search for files ending with the format string in the indicated directory.
* ``input_file_list``:
a list of fully-specified paths you want to read.

* this strategy can be useful to first run the import on a single input
file and validate the input, then run again on the full input set, or
to debug a single input file with odd behavior.
* if you have a mix of files in your target directory, you can use a glob
statement like the following to gather input files:

.. code-block:: python
in_file_paths = glob.glob("/data/object_and_source/object**.csv")
in_file_paths.sort()
How to read them?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Specify an instance of ``InputReader`` for the ``file_reader`` parameter.

see the API documentation for
:py:class:`hipscat_import.catalog.file_readers.InputReader`

We use the ``InputReader`` class to read files in chunks and pass the chunks
along to the map/reduce stages. We've provided reference implementations for
reading CSV, FITS, and Parquet input files, but you can subclass the reader
type to suit whatever input files you've got.

.. code-block:: python
class StarrReader(InputReader):
"""Class for fictional Starr file format."""
def __init__(self, chunksize=500_000, **kwargs):
self.chunksize = chunksize
self.kwargs = kwargs
def read(self, input_file):
starr_file = starr_io.read_table(input_file, **self.kwargs)
for smaller_table in starr_file.to_batches(max_chunksize=self.chunksize):
smaller_table = filter_nonsense(smaller_table)
yield smaller_table.to_pandas()
def provenance_info(self) -> dict:
provenance_info = {
"input_reader_type": "StarrReader",
"chunksize": self.chunksize,
}
return provenance_info
...
args = ImportArguments(
...
## Locates files like "/directory/to/files/**starr"
input_path="/directory/to/files/",
input_format="starr",
## NB - you need the parens here!
file_reader=StarrReader(),
)
Which fields?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Specify the ``ra_column`` and ``dec_column`` for the dataset.

There are two fields that we require in order to make a valid hipscatted
catalog, the right ascension and declination. At this time, this is the only
supported system for celestial coordinates.


Healpix order and thresholds
-------------------------------------------------------------------------------

Details for ``pixel_threshold``, ``highest_healpix_order``, and
``constant_healpix_order`` arguments

When creating a new catalog through the hipscat-import process, we try to
create partitions with approximately the same number of rows per partition.
This isn't perfect, because the sky is uneven, but we still try to create
smaller-area pixels in more dense areas, and larger-area pixels in less dense
areas.

We use the argument ``pixel_threshold`` and will split a partition into
smaller healpix pixels until the number of rows is smaller than ``pixel_threshold``.
We will only split by healpix pixels up to the ``highest_healpix_order``. If we
would need to split further, we'll throw an error at the "Binning" stage, and you
should adjust your parameters.

For more discussion of the ``pixel_threshold`` argument and a strategy for setting
this parameter, see notebook :doc:`/notebooks/estimate_pixel_threshold`

Alternatively, you can use the ``constant_healpix_order`` argument. This will
**ignore** both of the ``pixel_threshold`` and ``highest_healpix_order`` arguments
and the catalog will be partitioned by healpix pixels at the
``constant_healpix_order``. This can be useful for very sparse datasets.
16 changes: 16 additions & 0 deletions docs/catalogs/debug.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Debugging Tips
========================================================================================

.. tip::
If you're struggling with your dataset after looking over these tips, reach out!

:doc:`/guide/contact`

Reduce step
-------------------------------------------------------------------------------

Errors like:

```
Exception: "ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')"
```
13 changes: 0 additions & 13 deletions docs/catalogs/overview.rst

This file was deleted.

25 changes: 15 additions & 10 deletions docs/catalogs/allwise.rst → docs/catalogs/public/allwise.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ Challenges with this data set
- The numeric fields may be null, which is not directly supported by the
``int64`` type in pandas, so we must use the nullable ``Int64`` type.

You can download the :download:`allwise_types</static/allwise_types.csv>` CSV file we used.
You can download the :download:`allwise_types</static/allwise_types.csv>` CSV file we used,
and the associated schema file :download:`allwise_schema</static/allwise_schema.parquet>`
with column-level parquet metadata.

Example import
-------------------------------------------------------------------------------
Expand All @@ -27,16 +29,16 @@ Example import
import pandas as pd
import hipscat_import.run_import as runner
from hipscat_import.arguments import ImportArguments
from hipscat_import.file_readers import CsvReader
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import CsvReader
# Load the column names and types from a side file.
type_frame = pd.read_csv("allwise_types.csv")
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
catalog_name="allwise",
output_catalog_name="allwise",
input_path="/path/to/allwise/",
input_format="csv.bz2",
file_reader=CsvReader(
Expand All @@ -45,10 +47,13 @@ Example import
column_names=type_frame["name"].values.tolist(),
type_map=type_map,
chunksize=250_000,
).read,
ra_column="RA",
dec_column="DEC",
id_column="SOURCE_ID",
),
use_schema_file="allwise_schema.parquet",
ra_column="ra",
dec_column="dec",
id_column="source_id",
pixel_threshold=1_000_000,
highest_healpix_order=7,
output_path="/path/to/catalogs/",
)
runner.run(args)
runner.pipeline(args)
28 changes: 28 additions & 0 deletions docs/catalogs/public/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
Public Catalogs
===============================================================================

The LINCC Frameworks team has built the import tool hoping to handle catalogs
in various formats. We've learned some lessons in importing public data sets,
and provide steps to import those catalogs in case these hints help anyone else.

.. note::
These are datasets that our team has data rights to. We make no guarantees
about the data rights of others, the quality of the datasets, or their
availability.

Further, please respect the publication policy associated with the datasets.

.. toctree::
:maxdepth: 1

allwise
neowise
tic
zubercal

.. tip::
Want to see more?

Have you used this tool with a dataset, and you want to help others with
the idiosyncrasies? Is there a commonly used public dataset that you'd like
some tips for importing? :doc:`/guide/contact`
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ Example import
import pandas as pd
import hipscat_import.run_import as runner
from hipscat_import.arguments import ImportArguments
from hipscat_import.file_readers import CsvReader
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import CsvReader
# Load the column names and types from a side file.
type_frame = pd.read_csv("neowise_types.csv")
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
catalog_name="neowise_1",
output_catalog_name="neowise_1",
input_path="/path/to/neowiser_year8/",
input_format="csv.bz2",
file_reader=CsvReader(
Expand Down
8 changes: 4 additions & 4 deletions docs/catalogs/tic.rst → docs/catalogs/public/tic.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ Example import
import pandas as pd
import hipscat_import.run_import as runner
from hipscat_import.arguments import ImportArguments
from hipscat_import.file_readers import CsvReader
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import CsvReader
type_frame = pd.read_csv("tic_types.csv")
type_map = dict(zip(type_frame["name"], type_frame["type"]))
args = ImportArguments(
catalog_name="tic_1",
output_catalog_name="tic_1",
input_path="/path/to/tic/",
input_format="csv.gz",
file_reader=CsvReader(
Expand Down
85 changes: 85 additions & 0 deletions docs/catalogs/public/zubercal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
Zubercal
===============================================================================

Getting the data
-------------------------------------------------------------------------------

See docs at CalTech.

http://atua.caltech.edu/ZTF/Fields/ReadMe.txt


Challenges with this data set
-------------------------------------------------------------------------------

- The ``__index_level_0__`` pandas index should be ignored when reading.

- it is identical to the ``objectid`` column, and is just bloat

- it is non-unique, and that makes it tricky when splitting the file up

- The files are written out by band, and the band is included in the file
name (but not as a field in the resulting data product). this uses a
regular expression to parse out the band and insert it as a column in
the dataframe.
- the parquet files are all a fine size for input files, so we don't mess
with another chunk size.
- there are over 500 thousand data files (TODO - how we handle this=])

.. code-block:: python
import hipscat_import.pipeline as runner
from hipscat_import.catalog.arguments import ImportArguments
from hipscat_import.catalog.file_readers import ParquetReader
import pyarrow.parquet as pq
import pyarrow as pa
import re
import glob
class ZubercalParquetReader(ParquetReader):
def read(self, input_file):
"""Reader for the specifics of zubercal parquet files."""
columns = [
"mjd",
"mag",
"objdec",
"objra",
"magerr",
"objectid",
"info",
"flag",
"rcidin",
"fieldid",
]
## Parse the band from the file name, and hold onto it for later.
match = re.match(r".*ztf_[\d]+_[\d]+_([gir]).parquet", str(input_file))
band = match.group(1)
parquet_file = pq.read_table(input_file, columns=columns, **self.kwargs)
for smaller_table in parquet_file.to_batches(max_chunksize=self.chunksize):
frame = pa.Table.from_batches([smaller_table]).to_pandas()
frame["band"] = band
yield frame
files = glob.glob("/path/to/downloads/**/**.parquet")
files.sort()
args = ImportArguments(
output_catalog_name="zubercal",
input_file_list=files,
## NB - you need the parens here!
file_reader=ZubercalParquetReader(),
input_format="parquet",
catalog_type="source",
ra_column="objra",
dec_column="objdec",
id_column="objectid",
highest_healpix_order=10,
pixel_threshold=20_000_000,
output_path="/path/to/catalogs/",
)
runner.pipeline(args)
File renamed without changes.
Loading

0 comments on commit 435e1d1

Please sign in to comment.