Skip to content

Commit

Permalink
feat: JSON: refactor IO for JSON; make input implementations column o…
Browse files Browse the repository at this point in the history
…ptimizable (#94)
  • Loading branch information
douglasdavis authored Sep 15, 2023
1 parent 3d7d2c5 commit b60a25e
Show file tree
Hide file tree
Showing 14 changed files with 1,040 additions and 419 deletions.
9 changes: 7 additions & 2 deletions docs/how-to/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ with the form:

.. code-block:: python
with dask.config.set({"awkward.<option>": True}):
with dask.config.set({"awkward.<option>": value}):
...
- ``raise-failed-meta`` (default: ``False``): If this option is set to
Expand All @@ -42,7 +42,7 @@ For example, they can be set with the form:

.. code-block:: python
with dask.config.set({"awkward.optimization.<option>": False}):
with dask.config.set({"awkward.optimization.<option>": value}):
...
- ``enabled`` (default: ``True``): Enable dask-awkward specific
Expand All @@ -52,6 +52,11 @@ For example, they can be set with the form:
optimizations to run. The default setting is to run all available
optimizations. (if ``enabled`` is set to ``False`` this option is
ignored).
- ``columns-opt-formats`` (default: ``[parquet]``): Which input
formats should use the column optimization. The posibilities are
``parquet`` (on by default) and ``json`` (currently opt-in). More
information can be found in the :ref:`necessary columns optimization
<more/optimization:necessary columns>` section of the docs.
- ``on-fail`` (default: ``warn``): When set to ``warn`` throw a
warning of the optimization fails and continue without performing
the optimization. If set to ``raise``, raise an exception at
Expand Down
30 changes: 29 additions & 1 deletion docs/more/optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ operating on real data*. The data-less execution of the graph helps
determine which parts of a dataset sitting on disk are actually
required to read in order to successfully complete the compute.

Once we've determined which parts of the data are necessary, we can
pass that information to awkward's input functions at the data reading
layers of our task graph. With Parquet, this is the ``columns=``
argument of :func:`ak.from_parquet`. With JSON, we construct a
JSONSchema that contains only the necessary parts of the data that we
want, and we pass that to the ``schema=`` argument of
:func:`ak.from_json`.

.. note::

Two file formats are supported by the necessary columns
optimization: Parquet and JSON. The optimization is on by default
for reading Parquet, but it is opt-in for JSON. One can control via
configuration which formats will use the columns optimization when
read from disk. For example, the following code snippet shows how
to opt-in to using the necessary columns optimization via
JSONSchema

.. code:: python
import dask_awkward as dak
import dask.config
ds = dak.from_json("/path/to/data")
thing = dak.max(ds.field1, axis=1)
with dask.config.set({"awkward.optimization.columns-opt-formats": ["json"]}):
thing.compute()
Let's look at a simple example dataset: an awkward array with two top
level fields (``foo`` and ``bar``), with one field having two
subfields (``bar.x`` and ``bar.y``). Imagine this dataset is going to
Expand Down Expand Up @@ -146,7 +175,6 @@ necessary columns optimization after already defining, by hand, the
minimal set (one should be sure about what is needed with this
workflow).


.. raw:: html

<script data-goatcounter="https://dask-awkward.goatcounter.com/count"
Expand Down
2 changes: 1 addition & 1 deletion src/dask_awkward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
to_dataframe,
to_delayed,
)
from dask_awkward.lib.io.json import from_json, to_json
from dask_awkward.lib.io.json import from_json, layout_to_jsonschema, to_json
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.io.text import from_text
from dask_awkward.lib.operations import concatenate
Expand Down
17 changes: 16 additions & 1 deletion src/dask_awkward/awkward.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ awkward:
# Optimization specific configuration
optimization:

# If true dask-awkward specific optimizations will be run.
# If True dask-awkward specific optimizations will be run. If
# False, none of the remaining options under the optimization
# section matter because dask-awkward optimizations will be off.
enabled: True

# Which of the optimizations do we want to run; options include:
Expand All @@ -29,6 +31,19 @@ awkward:
# layer in the task graph.
which: [columns, layer-chains]

# Which input formats to run the columns optimization on; options
# include:
# - parquet
# when using dask_awkward.from_parquet the columns
# optimization will be used to read only the necessary columns
# from parquet files on disk.
# - json
# when using dask_awkward.from_json the columns optimization
# will be used to automatically generate a jsonschema that
# instructs the awkward JSON parser to skip unncessary keys in
# JSON datasets read from disk.
columns-opt-formats: [parquet]

# This option controls whether or not a warning is thrown, an
# exception is raised, or if nothing is done if a dask-awkward
# specific optimization fails (right now this is only the column
Expand Down
4 changes: 4 additions & 0 deletions src/dask_awkward/lib/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def set_form_keys(form: Form, *, key: str) -> Form:
elif form.is_numpy:
form.form_key = key

elif form.is_union:
for entry in form.contents:
set_form_keys(entry, key=key)

# Anything else grab the content and keep recursing
else:
set_form_keys(form.content, key=key)
Expand Down
12 changes: 11 additions & 1 deletion src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,17 @@ def _getitem_single(self, where: Any) -> Array:

raise DaskAwkwardNotImplemented(f"__getitem__ doesn't support where={where}.")

def __getitem__(self, where: Any) -> AwkwardDaskCollection:
@overload
def __getitem__(self, where: Array | str | Sequence[str] | slice) -> Array:
...

@overload
def __getitem__(self, where: int) -> Scalar:
...

def __getitem__(
self, where: Array | str | Sequence[str] | int | slice
) -> Array | Scalar:
"""Select items from the collection.
Heavily under construction.
Expand Down
13 changes: 12 additions & 1 deletion src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.spec import AbstractFileSystem
from fsspec.utils import infer_compression

from dask_awkward.layers import AwkwardBlockwiseLayer, AwkwardInputLayer
Expand All @@ -28,6 +27,7 @@
from dask.bag.core import Bag as DaskBag
from dask.dataframe.core import DataFrame as DaskDataFrame
from dask.delayed import Delayed
from fsspec.spec import AbstractFileSystem

from dask_awkward.lib.core import Array

Expand Down Expand Up @@ -586,6 +586,16 @@ class _BytesReadingInstructions:
length: int | None
delimiter: bytes

def expand(self):
return (
self.fs,
self.path,
self.compression,
self.offset,
self.length,
self.delimiter,
)


def _bytes_with_sample(
fs: AbstractFileSystem,
Expand Down Expand Up @@ -630,6 +640,7 @@ def _bytes_with_sample(
Sample bytes.
"""

if blocksize is not None:
if isinstance(blocksize, str):
blocksize = parse_bytes(blocksize)
Expand Down
Loading

0 comments on commit b60a25e

Please sign in to comment.