diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py index 00b78c489801..b57e413c0247 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py @@ -5,7 +5,7 @@ import json import logging import os -from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union from urllib.parse import unquote import pyarrow as pa @@ -16,7 +16,7 @@ from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import SchemaType -from pyarrow import Scalar +from pyarrow import DictionaryArray, Scalar class ParquetParser(FileTypeParser): @@ -95,7 +95,17 @@ def file_read_mode(self) -> FileReadMode: return FileReadMode.READ_BINARY @staticmethod - def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any: + def _to_output_value(parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat) -> Any: + """ + Convert an entry in a pyarrow table to a value that can be output by the source. + """ + if isinstance(parquet_value, DictionaryArray): + return ParquetParser._dictionary_array_to_python_value(parquet_value) + else: + return ParquetParser._scalar_to_python_value(parquet_value, parquet_format) + + @staticmethod + def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any: """ Convert a pyarrow scalar to a value that can be output by the source. """ @@ -119,13 +129,6 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An else: return str(parquet_value.as_py()) - # Dictionaries are stored as two columns: indices and values - # The indices column is an array of integers that maps to the values column - if pa.types.is_dictionary(parquet_value.type): - return { - "indices": parquet_value.indices.tolist(), - "values": parquet_value.dictionary.tolist(), - } if pa.types.is_map(parquet_value.type): return {k: v for k, v in parquet_value.as_py()} @@ -149,6 +152,20 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An else: return parquet_value.as_py() + @staticmethod + def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]: + """ + Convert a pyarrow dictionary array to a value that can be output by the source. + + Dictionaries are stored as two columns: indices and values + The indices column is an array of integers that maps to the values column + """ + + return { + "indices": parquet_value.indices.tolist(), + "values": parquet_value.dictionary.tolist(), + } + @staticmethod def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]: """