Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Compute parquet stats #7831

Merged
merged 54 commits into from
Sep 20, 2023
Merged

Conversation

maxdebayser
Copy link
Contributor

@Fokko

This commit partly addresses issue #7256. Unfortunately the pyarrow library is not as flexible as we would like. When passing write_statistics=True to pyarrow.parquet.write_table the statistics are written out for each row group in the file, instead of computed globally.

In the issue a "metadata_collector" was mentioned which I assume is the parameter of the pyarrow.parquet.write_metadata function. The pyarrow.parquet.write_table function has no such parameter.

The function in this PR intentionally works at the level of individual parquet files instead of the dataset to support scenarios such as writing from Ray where each file of the dataset is written by a different task.

@Fokko
Copy link
Contributor

Fokko commented Jun 13, 2023

Hey @maxdebayser thanks for raising this PR. I had something different in mind with the issue. The main problem here is that we pull a lot of data through Python, which brings in a lot of issues (the binary conversion that's probably expensive, and limited scalability due to the GIL).

I just did a quick stab (and should have done that sooner), and found the following:

Desktop python3 
Python 3.11.3 (main, Apr  7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> table = pa.table({'n_legs': [2, 2, 4, 4, 5, 100],
...                   'animal': ["Flamingo", "Parrot", "Dog", "Horse",
...                              "Brittle stars", "Centipede"]})
>>> metadata_collector = []
>>> import pyarrow.parquet as pq
>>> pq.write_to_dataset(
...     table, '/tmp/table',
...      metadata_collector=metadata_collector)
>>> metadata_collector
[<pyarrow._parquet.FileMetaData object at 0x11f955850>
  created_by: parquet-cpp-arrow version 11.0.0
  num_columns: 2
  num_rows: 6
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 0]

>>> metadata_collector[0].row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x105837d80>
  num_columns: 2
  num_rows: 6
  total_byte_size: 256

>>> metadata_collector[0].row_group(0).to_dict()
{
	'num_columns': 2,
	'num_rows': 6,
	'total_byte_size': 256,
	'columns': [{
		'file_offset': 119,
		'file_path': 'c569c5eaf90c4395885f31e012068b69-0.parquet',
		'physical_type': 'INT64',
		'num_values': 6,
		'path_in_schema': 'n_legs',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 2,
			'max': 100,
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'INT64'
		},
		'compression': 'SNAPPY',
		'encodings': ('PLAIN_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 4,
		'data_page_offset': 46,
		'total_compressed_size': 115,
		'total_uncompressed_size': 117
	}, {
		'file_offset': 359,
		'file_path': 'c569c5eaf90c4395885f31e012068b69-0.parquet',
		'physical_type': 'BYTE_ARRAY',
		'num_values': 6,
		'path_in_schema': 'animal',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 'Brittle stars',
			'max': 'Parrot',
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'BYTE_ARRAY'
		},
		'compression': 'SNAPPY',
		'encodings': ('PLAIN_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 215,
		'data_page_offset': 302,
		'total_compressed_size': 144,
		'total_uncompressed_size': 139
	}]
}

I think it is much better to retrieve the min-max from there. This is done by PyArrow and is probably much faster than when we do it in Python. I really would like to stay away from processing data as much as possible.

I think the complexity here is:

@Fokko
Copy link
Contributor

Fokko commented Jun 13, 2023

It looks like the metadata is also available on the plain writer, where we don't have to write a dataset:

Desktop python3                       
Python 3.11.3 (main, Apr  7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> table = pa.table({'n_legs': [2, 2, 4, 4, 5, 100],
...                   'animal': ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"]})
>>> import pyarrow.parquet as pq
>>> 
>>> writer = pq.ParquetWriter('/tmp/vo.parquet', table.schema)
>>> writer.write_table(table)
>>> writer.close()
>>> writer.writer.metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x1375b9080>
  num_columns: 2
  num_rows: 6
  total_byte_size: 256
>>> writer.writer.metadata.row_group(0).to_dict()
{
	'num_columns': 2,
	'num_rows': 6,
	'total_byte_size': 256,
	'columns': [{
		'file_offset': 119,
		'file_path': '',
		'physical_type': 'INT64',
		'num_values': 6,
		'path_in_schema': 'n_legs',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 2,
			'max': 100,
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'INT64'
		},
		'compression': 'SNAPPY',
		'encodings': ('RLE_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 4,
		'data_page_offset': 46,
		'total_compressed_size': 115,
		'total_uncompressed_size': 117
	}, {
		'file_offset': 359,
		'file_path': '',
		'physical_type': 'BYTE_ARRAY',
		'num_values': 6,
		'path_in_schema': 'animal',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 'Brittle stars',
			'max': 'Parrot',
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'BYTE_ARRAY'
		},
		'compression': 'SNAPPY',
		'encodings': ('RLE_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 215,
		'data_page_offset': 302,
		'total_compressed_size': 144,
		'total_uncompressed_size': 139
	}]
}

@maxdebayser
Copy link
Contributor Author

@Fokko, I understand your concern, I think it's because we have different use cases in mind.

If I understand correctly you want to write a pyarrow.Table to a partitioned dataset with write_dataset. Therefore computing min/max on the whole Table is not what you need because you actually need the min/max for the columns of the individual files. (Just pointing out that with the metadata collector you get the stats for the row chunks, so you'll still have to compute the stats for the file from those).

I'm coming from a different use case. I would like to write from Ray using something like https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.write_parquet.html#ray.data.Dataset.write_parquet . In this case there is no global pyarrow.Table that represent the dataset, Pyarrow tables are the blocks of the dataset that each individual ray task sees, for example in map_batches. In this scenario the pyarrow.write_dataset cannot be used because the full dataset is not entirely loaded into the memory of any compute node. In this scenario the GIL is also not a big concern because ray uses multiple worker processes.

I think we have to see if there is a way to have a single API for both use cases or if we'll need to have different API calls for both. In the second case it would be better to share part of the implementation to ensure that the behavior is consistent, but it could perhaps lead to bad performance.

Regarding the efficiency, the pyarrow.compute.min function is implemented in C++, so I think the performance is probably not a huge concern here. But I can try to compare both approaches with a large enough data set to measure it.

@maxdebayser
Copy link
Contributor Author

It looks like the metadata is also available on the plain writer, where we don't have to write a dataset:

Desktop python3                       
Python 3.11.3 (main, Apr  7 2023, 20:13:31) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import pyarrow as pa
>>> table = pa.table({'n_legs': [2, 2, 4, 4, 5, 100],
...                   'animal': ["Flamingo", "Parrot", "Dog", "Horse", "Brittle stars", "Centipede"]})
>>> import pyarrow.parquet as pq
>>> 
>>> writer = pq.ParquetWriter('/tmp/vo.parquet', table.schema)
>>> writer.write_table(table)
>>> writer.close()
>>> writer.writer.metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x1375b9080>
  num_columns: 2
  num_rows: 6
  total_byte_size: 256
>>> writer.writer.metadata.row_group(0).to_dict()
{
	'num_columns': 2,
	'num_rows': 6,
	'total_byte_size': 256,
	'columns': [{
		'file_offset': 119,
		'file_path': '',
		'physical_type': 'INT64',
		'num_values': 6,
		'path_in_schema': 'n_legs',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 2,
			'max': 100,
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'INT64'
		},
		'compression': 'SNAPPY',
		'encodings': ('RLE_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 4,
		'data_page_offset': 46,
		'total_compressed_size': 115,
		'total_uncompressed_size': 117
	}, {
		'file_offset': 359,
		'file_path': '',
		'physical_type': 'BYTE_ARRAY',
		'num_values': 6,
		'path_in_schema': 'animal',
		'is_stats_set': True,
		'statistics': {
			'has_min_max': True,
			'min': 'Brittle stars',
			'max': 'Parrot',
			'null_count': 0,
			'distinct_count': 0,
			'num_values': 6,
			'physical_type': 'BYTE_ARRAY'
		},
		'compression': 'SNAPPY',
		'encodings': ('RLE_DICTIONARY', 'PLAIN', 'RLE'),
		'has_dictionary_page': True,
		'dictionary_page_offset': 215,
		'data_page_offset': 302,
		'total_compressed_size': 144,
		'total_uncompressed_size': 139
	}]
}

Thanks for pointing that out, I completely missed the fact that you can get this data from the ParquetWriter after caling close(). That changes everything.

This commit makes sure to test the metadata computation both using
`pyarrow.parquet.ParqueWriter` and `pyarrow.parquet.write_to_dataset`.
@maxdebayser
Copy link
Contributor Author

@Fokko , I've rewritten the implementation to use the pyarrow metadata_collector. I've added a test cases to make sure that it's compatible with write_dataset as well as ParquetWriter. I've also also added a function to map the path of columns in the parquet metadata to Iceberg field IDs.

@Fokko Fokko added this to the PyIceberg 0.5.0 release milestone Jul 6, 2023
@Fokko Fokko changed the title Compute parquet stats Python: Compute parquet stats Jul 6, 2023
@Fokko
Copy link
Contributor

Fokko commented Jul 6, 2023

@maxdebayser can you rebase against lastest master?

@maxdebayser
Copy link
Contributor Author

@Fokko done!

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some comments to generalize it, so we can also re-use it outside of PyArrow.

@Fokko
Copy link
Contributor

Fokko commented Jul 11, 2023

For some reason my big comment was collapsed:

Since Arrow is column-oriented, I'm sure that it will follow the order of the write schema:

image

The reason I try to avoid using too many internal details from PyArrow is that we support PyArrow [9.0.0, 12.0.1] currently. There is no guarantee that all these internals stay the same, therefore I think we should do as much as possible within our own control (also regarding the internal Parquet types, or how the column names are structured).

We already have the write schema, so we can easily filter out the primitive types:

class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]]):
    _field_id = 0
    _schema: Schema
    _properties: Properties

    def __init__(self, schema: Schema, properties: Dict[str, str]):
        self._schema = schema
        self._properties = properties

    def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
        return struct_result()

    def struct(self, struct: StructType, field_results: List[Callable[[], List[StatisticsCollector]]]) -> List[StatisticsCollector]:
        return list(chain(*[result() for result in field_results]))

    def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
        self._field_id = field.field_id
        result = field_result()
        return result

    def list(self, list_type: ListType, element_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
        self._field_id = list_type.element_id
        return element_result()

    def map(self, map_type: MapType, key_result: Callable[[], List[StatisticsCollector]], value_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
        self._field_id = map_type.key_id
        k = key_result()
        self._field_id = map_type.value_id
        v = value_result()
        return k + v

    def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
        return [StatisticsCollector(
            field_id=self._field_id,
            iceberg_type=primitive,
            mode=MetricsMode.TRUNC
            # schema
            # self._properties.get(f"write.metadata.metrics.column.{schema.find_column_name(self._field_id)}")
        )]

This way we get a nice list of columns that we need to collect statistics for. We have:

@dataclass(frozen=True)
class StatisticsCollector:
    field_id: int
    iceberg_type: PrimitiveType
    mode: MetricsMode

Where we can use the field_id to properly populate the maps, and the iceberg_type to feed into to_bytes to do the conversion (so we don't have to have yet another one for PyArrow).

I did a quick test, and it seems to work:

def test_complex_schema(table_schema_nested: Schema):
    tbl = pa.Table.from_pydict({
        "foo": ["a", "b"],
        "bar": [1, 2],
        "baz": [False, True],
        "qux": [["a", "b"], ["c", "d"]],
        "quux": [[("a", (("aa", 1), ("ab", 2)))], [("b", (("ba", 3), ("bb", 4)))]],
        "location": [[(52.377956, 4.897070), (4.897070, -122.431297)],
                     [(43.618881, -116.215019), (41.881832, -87.623177)]],
        "person": [("Fokko", 33), ("Max", 42)]  # Possible data quality issue
    },
        schema=schema_to_pyarrow(table_schema_nested)
    )
    stats_columns = pre_order_visit(table_schema_nested, PyArrowStatisticsCollector(table_schema_nested, {}))

    visited_paths = []

    def file_visitor(written_file: Any) -> None:
        visited_paths.append(written_file)

    with TemporaryDirectory() as tmpdir:
        pq.write_to_dataset(tbl, tmpdir, file_visitor=file_visitor)

    assert visited_paths[0].metadata.num_columns == len(stats_columns)

return list(chain(*[result() for result in field_results]))

def field(self, field: NestedField, field_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
self._field_id = field.field_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, and happy to add those fields. Currently, they are not there, so I would suggest doing that in a separate PR, to avoid going on a tangent in this PR. Created: #8273


stats_columns = pre_order_visit(schema, PyArrowStatisticsCollector(schema, table_metadata.properties))

if parquet_metadata.num_columns != len(stats_columns):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave this one in, for now, I think they are always the same, but when this is not the case, then we should be notified

import os
import tempfile
from typing import Any, List, Optional
import uuid
from datetime import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use this for PyArrow which has the datetime classes in their public API.

assert datafile.value_counts[1] == 4
assert datafile.value_counts[2] == 4
assert datafile.value_counts[5] == 10 # 3 lists with 3 items and a None value
assert datafile.value_counts[6] == 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Spark:

CREATE TABLE nyc.test_map_maps2 AS 
SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')) as map, array('a', 'b', 'c') as arr

Schema:

{
	"type": "struct",
	"schema-id": 0,
	"fields": [{
		"id": 1,
		"name": "map",
		"required": false,
		"type": {
			"type": "map",
			"key-id": 3,
			"key": "decimal(2, 1)",
			"value-id": 4,
			"value": "string",
			"value-required": false
		}
	}, {
		"id": 2,
		"name": "arr",
		"required": false,
		"type": {
			"type": "list",
			"element-id": 5,
			"element": "string",
			"element-required": false
		}
	}]
}

We don't get any stats in Spark:

{
	"status": 1,
	"snapshot_id": {
		"long": 4895801649705337905
	},
	"data_file": {
		"file_path": "s3://warehouse/nyc/test_map_maps/data/00000-95-750d8f3e-8d49-44ec-b37e-9e101e003a5d-00001.parquet",
		"file_format": "PARQUET",
		"partition": {},
		"record_count": 1,
		"file_size_in_bytes": 1438,
		"block_size_in_bytes": 67108864,
		"column_sizes": {
			"array": [{
				"key": 3,
				"value": 57
			}, {
				"key": 4,
				"value": 58
			}, {
				"key": 5,
				"value": 61
			}]
		},
		"value_counts": {
			"array": [{
				"key": 3,
				"value": 2
			}, {
				"key": 4,
				"value": 2
			}, {
				"key": 5,
				"value": 3
			}]
		},
		"null_value_counts": {
			"array": [{
				"key": 3,
				"value": 0
			}, {
				"key": 4,
				"value": 0
			}, {
				"key": 5,
				"value": 0
			}]
		},
		"nan_value_counts": {
			"array": []
		},
		"lower_bounds": {
			"array": []
		},
		"upper_bounds": {
			"array": []
		},
		"key_metadata": null,
		"split_offsets": {
			"array": [4]
		},
		"sort_order_id": {
			"int": 0
		}
	}
}

The stats are only computed for the primitive types (see PyArrowStatisticsCollector).

assert len(datafile.null_value_counts) == 5
assert datafile.null_value_counts[1] == 1
assert datafile.null_value_counts[2] == 0
assert datafile.null_value_counts[5] == 1
Copy link
Contributor

@Fokko Fokko Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Type:

{"type": "list", "element-id": 5, "element": "long", "element-required": False}

And the data:

_list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]

I count a single null


# #arrow does not include this in the statistics
# assert len(datafile.nan_value_counts) == 3
# assert datafile.nan_value_counts[1] == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering. The evaluator is not taking those into consideration for any not float or double type, any reason to still add those?

@Fokko
Copy link
Contributor

Fokko commented Sep 20, 2023

@maxdebayser Thanks for working on this, and for your patience because it took quite a while, and we also went a bit back and forth. Let's merge this in. As a next step, we can revisit #8012 and add a lot of integration tests to make sure that we don't have any correctness issues.

I think there are still some open-ends on the statistics, but they are not limited to Python. Please refer to #8598

@Fokko Fokko merged commit 47ae4ac into apache:master Sep 20, 2023
@Fokko
Copy link
Contributor

Fokko commented Sep 20, 2023

Thanks @rdblue for the review, that was very helpful.

@maxdebayser
Copy link
Contributor Author

Thanks, @Fokko and @rdblue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants