Skip to content

Commit

Permalink
file cdk: handle scalar values that resolve to None (#35688)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! 
Before you submit the pull request, 
I'd like to kindly remind you to take a moment and read through our guidelines
to ensure that your contribution aligns with the type of contributions our project accepts.
All the information you need can be found here:
   https://docs.airbyte.com/contributing-to-airbyte/

We truly appreciate your interest in contributing to Airbyte,
and we're excited to see what you have to offer! 

If you have any questions or need any assistance, feel free to reach out in #contributions Slack channel.
-->

## What
* Closes #34151
* Closes airbytehq/oncall#4386

## How
Handle cases where the python value of a pyarrow scalar is None. This can be due to null values in data, as well as null-like values like `NaT` (similar to `NaN`). We previously handled this for `None` binary types, but now handle this for `None` of any type.

## 🚨 User Impact 🚨
No breaking changes. After this CDK version is released we should update the CDK dependency in S3 and any other file sources that parse parquet


## Pre-merge Actions
*Expand the relevant checklist and delete the others.*

<details><summary><strong>New Connector</strong></summary>

### Community member or Airbyter

- **Community member?** Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests))
- Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run `./gradlew :airbyte-integrations:connectors:<name>:integrationTest`.
- Connector version is set to `0.0.1`
    - `Dockerfile` has version `0.0.1`
- Documentation updated
    - Connector's `README.md`
    - Connector's `bootstrap.md`. See [description and examples](https://docs.google.com/document/d/1ypdgmwmEHWv-TrO4_YOQ7pAJGVrMp5BOkEVh831N260/edit?usp=sharing)
    - `docs/integrations/<source or destination>/<name>.md` including changelog with an entry for the initial version. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog)
    - `docs/integrations/README.md`

### Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

- Create a non-forked branch based on this PR and test the below items on it
- Build is successful
- If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).

</details>

<details><summary><strong>Updating a connector</strong></summary>

### Community member or Airbyter

- Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests))
- Unit & integration tests added


### Airbyter

If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.

- Create a non-forked branch based on this PR and test the below items on it
- Build is successful
- If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).

</details>

<details><summary><strong>Connector Generator</strong></summary>

- Issue acceptance criteria met
- PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook)
- If adding a new generator, add it to the [list of scaffold modules being tested](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/build.gradle#L41)
- The generator test modules (all connectors with `-scaffold` in their name) have been updated with the latest scaffold by running `./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds` then checking in your changes
- Documentation which references the generator is updated as needed

</details>

<details><summary><strong>Updating the Python CDK</strong></summary>

### Airbyter

Before merging:
- Pull Request description explains what problem it is solving
- Code change is unit tested
- Build and my-py check pass
- Smoke test the change on at least one affected connector
   - On Github: Run [this workflow](https://github.com/airbytehq/airbyte/actions/workflows/connectors_tests.yml), passing `--use-local-cdk --name=source-<connector>` as options
   - Locally: `airbyte-ci connectors --use-local-cdk --name=source-<connector> test`
- PR is reviewed and approved
      
After merging:
- [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml)
   - The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise.
   - Write a thoughtful changelog message so we know what was updated.
- Merge the platform PR that was auto-created for updating the Connector Builder's CDK version
   - This step is optional if the change does not affect the connector builder or declarative connectors.

</details>
  • Loading branch information
erohmensing authored and xiaohansong committed Mar 7, 2024
1 parent e4a2981 commit 4643800
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat
"""
Convert a pyarrow scalar to a value that can be output by the source.
"""
if parquet_value.as_py() is None:
return None

# Convert date and datetime objects to isoformat strings
if pa.types.is_time(parquet_value.type) or pa.types.is_timestamp(parquet_value.type) or pa.types.is_date(parquet_value.type):
return parquet_value.as_py().isoformat()
Expand All @@ -119,10 +122,8 @@ def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat

# Decode binary strings to utf-8
if ParquetParser._is_binary(parquet_value.type):
py_value = parquet_value.as_py()
if py_value is None:
return py_value
return py_value.decode("utf-8")
return parquet_value.as_py().decode("utf-8")

if pa.types.is_decimal(parquet_value.type):
if parquet_format.decimal_as_float:
return parquet_value.as_py()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,57 @@ def test_value_dictionary() -> None:
assert py_value == {"indices": [0, 1, 2, 0, 1], "values": ["apple", "banana", "cherry"]}


def test_value_none_binary() -> None:
none_binary_scalar = pa.scalar(None, type=pa.binary())
try:
ParquetParser._to_output_value(none_binary_scalar, _default_parquet_format)
except AttributeError:
assert False, "`None` type binary should be handled properly"
@pytest.mark.parametrize(
"parquet_type, parquet_format",
[
pytest.param(pa.bool_(), _default_parquet_format, id="test_parquet_bool"),
pytest.param(pa.int8(), _default_parquet_format, id="test_parquet_int8"),
pytest.param(pa.int16(), _default_parquet_format, id="test_parquet_int16"),
pytest.param(pa.int32(), _default_parquet_format, id="test_parquet_int32"),
pytest.param(pa.int64(), _default_parquet_format, id="test_parquet_int64"),
pytest.param(pa.uint8(), _default_parquet_format, id="test_parquet_uint8"),
pytest.param(pa.uint16(), _default_parquet_format, id="test_parquet_uint16"),
pytest.param(pa.uint32(), _default_parquet_format, id="test_parquet_uint32"),
pytest.param(pa.uint64(), _default_parquet_format, id="test_parquet_uint64"),
pytest.param(pa.float16(), _default_parquet_format, id="test_parquet_float16"),
pytest.param(pa.float32(), _default_parquet_format, id="test_parquet_float32"),
pytest.param(pa.float64(), _default_parquet_format, id="test_parquet_float64"),
pytest.param(pa.time32("s"), _default_parquet_format, id="test_parquet_time32s"),
pytest.param(pa.time32("ms"), _default_parquet_format, id="test_parquet_time32ms"),
pytest.param(pa.time64("us"), _default_parquet_format, id="test_parquet_time64us"),
pytest.param(pa.time64("ns"), _default_parquet_format, id="test_parquet_time64ns"),
pytest.param(pa.timestamp("s"), _default_parquet_format, id="test_parquet_timestamps_s"),
pytest.param(pa.timestamp("ms"), _default_parquet_format, id="test_parquet_timestamp_ms"),
pytest.param(pa.timestamp("s", "utc"), _default_parquet_format, id="test_parquet_timestamps_s_with_tz"),
pytest.param(pa.timestamp("ms", "est"), _default_parquet_format, id="test_parquet_timestamps_ms_with_tz"),
pytest.param(pa.date32(), _default_parquet_format, id="test_parquet_date32"),
pytest.param(pa.date64(), _default_parquet_format, id="test_parquet_date64"),
pytest.param(pa.duration("s"), _default_parquet_format, id="test_duration_s"),
pytest.param(pa.duration("ms"), _default_parquet_format, id="test_duration_ms"),
pytest.param(pa.duration("us"), _default_parquet_format, id="test_duration_us"),
pytest.param(pa.duration("ns"), _default_parquet_format, id="test_duration_ns"),
pytest.param(pa.month_day_nano_interval(), _default_parquet_format, id="test_parquet_month_day_nano_interval"),
pytest.param(pa.binary(), _default_parquet_format, id="test_binary"),
pytest.param(pa.binary(2), _default_parquet_format, id="test_fixed_size_binary"),
pytest.param(pa.string(), _default_parquet_format, id="test_parquet_string"),
pytest.param(pa.utf8(), _default_parquet_format, id="test_utf8"),
pytest.param(pa.large_binary(), _default_parquet_format, id="test_large_binary"),
pytest.param(pa.large_string(), _default_parquet_format, id="test_large_string"),
pytest.param(pa.large_utf8(), _default_parquet_format, id="test_large_utf8"),
pytest.param(pa.dictionary(pa.int32(), pa.string()), _default_parquet_format, id="test_dictionary"),
pytest.param(pa.struct([pa.field("field", pa.int32())]), _default_parquet_format, id="test_struct"),
pytest.param(pa.list_(pa.int32()), _default_parquet_format, id="test_list"),
pytest.param(pa.large_list(pa.int32()), _default_parquet_format, id="test_large_list"),
pytest.param(pa.decimal128(2), _default_parquet_format, id="test_decimal128"),
pytest.param(pa.decimal256(2), _default_parquet_format, id="test_decimal256"),
pytest.param(pa.decimal128(2), _decimal_as_float_parquet_format, id="test_decimal128_as_float"),
pytest.param(pa.decimal256(2), _decimal_as_float_parquet_format, id="test_decimal256_as_float"),
pytest.param(pa.map_(pa.int32(), pa.int32()), _default_parquet_format, id="test_map"),
pytest.param(pa.null(), _default_parquet_format, id="test_null"),
])
def test_null_value_does_not_throw(parquet_type, parquet_format) -> None:
pyarrow_value = pa.scalar(None, type=parquet_type)
assert ParquetParser._to_output_value(pyarrow_value, parquet_format) is None


@pytest.mark.parametrize(
Expand Down

0 comments on commit 4643800

Please sign in to comment.