Skip to content

Commit

Permalink
file cdk: fix typing, pull out non-scalar handling (#35687)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! 
Before you submit the pull request, 
I'd like to kindly remind you to take a moment and read through our guidelines
to ensure that your contribution aligns with the type of contributions our project accepts.
All the information you need can be found here:
   https://docs.airbyte.com/contributing-to-airbyte/

We truly appreciate your interest in contributing to Airbyte,
and we're excited to see what you have to offer! 

If you have any questions or need any assistance, feel free to reach out in #contributions Slack channel.
-->

## What
* Fix typing and handling of different types in `_to_output_value` - we don't always get a `Scalar`. We already handle the different cases correctly, but the typing doesn't reflect this. 
* Splitting out the methods to do the scalar separately is a helpful precursor to #35688, as the `DictionaryArray` object doesn't have an `as_py()` method.

## 🚨 User Impact 🚨
None

## Pre-merge Actions
*Expand the relevant checklist and delete the others.*

<details><summary><strong>New Connector</strong></summary>

### Community member or Airbyter

- **Community member?** Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests))
- Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run `./gradlew :airbyte-integrations:connectors:<name>:integrationTest`.
- Connector version is set to `0.0.1`
    - `Dockerfile` has version `0.0.1`
- Documentation updated
    - Connector's `README.md`
    - Connector's `bootstrap.md`. See [description and examples](https://docs.google.com/document/d/1ypdgmwmEHWv-TrO4_YOQ7pAJGVrMp5BOkEVh831N260/edit?usp=sharing)
    - `docs/integrations/<source or destination>/<name>.md` including changelog with an entry for the initial version. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog)
    - `docs/integrations/README.md`

### Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

- Create a non-forked branch based on this PR and test the below items on it
- Build is successful
- If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).

</details>

<details><summary><strong>Updating a connector</strong></summary>

### Community member or Airbyter

- Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests))
- Unit & integration tests added


### Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

- Create a non-forked branch based on this PR and test the below items on it
- Build is successful
- If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).

</details>

<details><summary><strong>Connector Generator</strong></summary>

- Issue acceptance criteria met
- PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook)
- If adding a new generator, add it to the [list of scaffold modules being tested](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/build.gradle#L41)
- The generator test modules (all connectors with `-scaffold` in their name) have been updated with the latest scaffold by running `./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds` then checking in your changes
- Documentation which references the generator is updated as needed

</details>

<details><summary><strong>Updating the Python CDK</strong></summary>

### Airbyter

Before merging:
- Pull Request description explains what problem it is solving
- Code change is unit tested
- Build and my-py check pass
- Smoke test the change on at least one affected connector
   - On Github: Run [this workflow](https://github.com/airbytehq/airbyte/actions/workflows/connectors_tests.yml), passing `--use-local-cdk --name=source-<connector>` as options
   - Locally: `airbyte-ci connectors --use-local-cdk --name=source-<connector> test`
- PR is reviewed and approved
      
After merging:
- [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml)
   - The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
   - Write a thoughtful changelog message so we know what was updated.
- Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
   - This step is optional if the change does not affect the connector builder or declarative connectors.

</details>
  • Loading branch information
erohmensing authored and xiaohansong committed Mar 7, 2024
1 parent 1031d60 commit e4a2981
Showing 1 changed file with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
import os
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
from urllib.parse import unquote

import pyarrow as pa
Expand All @@ -16,7 +16,7 @@
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
from pyarrow import Scalar
from pyarrow import DictionaryArray, Scalar


class ParquetParser(FileTypeParser):
Expand Down Expand Up @@ -95,7 +95,17 @@ def file_read_mode(self) -> FileReadMode:
return FileReadMode.READ_BINARY

@staticmethod
def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any:
def _to_output_value(parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat) -> Any:
"""
Convert an entry in a pyarrow table to a value that can be output by the source.
"""
if isinstance(parquet_value, DictionaryArray):
return ParquetParser._dictionary_array_to_python_value(parquet_value)
else:
return ParquetParser._scalar_to_python_value(parquet_value, parquet_format)

@staticmethod
def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any:
"""
Convert a pyarrow scalar to a value that can be output by the source.
"""
Expand All @@ -119,13 +129,6 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An
else:
return str(parquet_value.as_py())

# Dictionaries are stored as two columns: indices and values
# The indices column is an array of integers that maps to the values column
if pa.types.is_dictionary(parquet_value.type):
return {
"indices": parquet_value.indices.tolist(),
"values": parquet_value.dictionary.tolist(),
}
if pa.types.is_map(parquet_value.type):
return {k: v for k, v in parquet_value.as_py()}

Expand All @@ -149,6 +152,20 @@ def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> An
else:
return parquet_value.as_py()

@staticmethod
def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]:
"""
Convert a pyarrow dictionary array to a value that can be output by the source.
Dictionaries are stored as two columns: indices and values
The indices column is an array of integers that maps to the values column
"""

return {
"indices": parquet_value.indices.tolist(),
"values": parquet_value.dictionary.tolist(),
}

@staticmethod
def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]:
"""
Expand Down

0 comments on commit e4a2981

Please sign in to comment.