From 8d60177786808e07967260cf0050aba7d75aef6d Mon Sep 17 00:00:00 2001 From: Henri Blancke Date: Wed, 25 Oct 2023 13:00:35 -0400 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Destination=20AWS=20Datalake:=20?= =?UTF-8?q?Upgrade=20AWSWrangler=20(#29221)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Henri Blancke Co-authored-by: Marcos Marx Co-authored-by: marcosmarxm --- .../destination-aws-datalake/Dockerfile | 8 +- .../destination_aws_datalake/aws.py | 35 +- .../destination_aws_datalake/constants.py | 28 ++ .../destination_aws_datalake/stream_writer.py | 138 +++--- .../destination-aws-datalake/metadata.yaml | 2 +- .../destination-aws-datalake/setup.py | 4 +- .../unit_tests/stream_writer_test.py | 429 +++++++++++++++++- .../integrations/destinations/aws-datalake.md | 1 + 8 files changed, 569 insertions(+), 76 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/constants.py diff --git a/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile index 5337033a942d..6e3c7234dde5 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile +++ b/airbyte-integrations/connectors/destination-aws-datalake/Dockerfile @@ -1,17 +1,17 @@ -FROM python:3.9-slim -# FROM python:3.9.11-alpine3.15 +FROM python:3.10-slim # Bash is installed for more convenient debugging. # RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/* WORKDIR /airbyte/integration_code -COPY destination_aws_datalake ./destination_aws_datalake COPY main.py ./ COPY setup.py ./ RUN pip install . +COPY destination_aws_datalake ./destination_aws_datalake + ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-aws-datalake diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py index 1e1a679938d7..8458d01c9e1d 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/aws.py @@ -4,7 +4,7 @@ import logging from decimal import Decimal -from typing import Dict, Optional +from typing import Any, Dict, Optional import awswrangler as wr import boto3 @@ -15,11 +15,10 @@ from retrying import retry from .config_reader import CompressionCodec, ConnectorConfig, CredentialsType, OutputFormat +from .constants import BOOLEAN_VALUES, EMPTY_VALUES logger = logging.getLogger("airbyte") -null_values = ["", " ", "#N/A", "#N/A N/A", "#NA", "", "N/A", "NA", "NULL", "none", "None", "NaN", "n/a", "nan", "null"] - def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_type: str) -> pd.DataFrame: if desired_type == "datetime64": @@ -32,13 +31,13 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t # First cast to string df = _cast_pandas_column(df=df, col=col, current_type=current_type, desired_type="string") # Then cast to decimal - df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in null_values else None) + df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in EMPTY_VALUES else None) elif desired_type.lower() in ["float64", "int64"]: df[col] = df[col].fillna("") df[col] = pd.to_numeric(df[col]) elif desired_type in ["boolean", "bool"]: if df[col].dtype in ["string", "O"]: - df[col] = df[col].fillna("false").apply(lambda x: str(x).lower() in ["true", "1", "1.0", "t", "y", "yes"]) + df[col] = df[col].fillna("false").apply(lambda x: str(x).lower() in BOOLEAN_VALUES) df[col] = df[col].astype(bool) else: @@ -53,7 +52,7 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t "which may cause precision loss.", UserWarning, ) - df[col] = df[col].apply(lambda x: int(x) if str(x) not in null_values else None).astype(desired_type) + df[col] = df[col].apply(lambda x: int(x) if str(x) not in EMPTY_VALUES else None).astype(desired_type) return df @@ -66,7 +65,7 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t class AwsHandler: - def __init__(self, connector_config: ConnectorConfig, destination: Destination): + def __init__(self, connector_config: ConnectorConfig, destination: Destination) -> None: self._config: ConnectorConfig = connector_config self._destination: Destination = destination self._session: boto3.Session = None @@ -79,7 +78,7 @@ def __init__(self, connector_config: ConnectorConfig, destination: Destination): self._table_type = "GOVERNED" if self._config.lakeformation_governed_tables else "EXTERNAL_TABLE" @retry(stop_max_attempt_number=10, wait_random_min=1000, wait_random_max=2000) - def create_session(self): + def create_session(self) -> None: if self._config.credentials_type == CredentialsType.IAM_USER: self._session = boto3.Session( aws_access_key_id=self._config.aws_access_key, @@ -108,7 +107,7 @@ def _get_s3_path(self, database: str, table: str) -> str: return f"{bucket}/{database}/{table}/" - def _get_compression_type(self, compression: CompressionCodec): + def _get_compression_type(self, compression: CompressionCodec) -> Optional[str]: if compression == CompressionCodec.GZIP: return "gzip" elif compression == CompressionCodec.SNAPPY: @@ -127,14 +126,16 @@ def _write_parquet( mode: str, dtype: Optional[Dict[str, str]], partition_cols: list = None, - ): + ) -> Any: return wr.s3.to_parquet( df=df, path=path, dataset=True, database=database, table=table, - table_type=self._table_type, + glue_table_settings={ + "table_type": self._table_type, + }, mode=mode, use_threads=False, # True causes s3 NoCredentialsError error catalog_versioning=True, @@ -153,14 +154,16 @@ def _write_json( mode: str, dtype: Optional[Dict[str, str]], partition_cols: list = None, - ): + ) -> Any: return wr.s3.to_json( df=df, path=path, dataset=True, database=database, table=table, - table_type=self._table_type, + glue_table_settings={ + "table_type": self._table_type, + }, mode=mode, use_threads=False, # True causes s3 NoCredentialsError error orient="records", @@ -172,7 +175,9 @@ def _write_json( compression=self._get_compression_type(self._config.compression_codec), ) - def _write(self, df: pd.DataFrame, path: str, database: str, table: str, mode: str, dtype: Dict[str, str], partition_cols: list = None): + def _write( + self, df: pd.DataFrame, path: str, database: str, table: str, mode: str, dtype: Dict[str, str], partition_cols: list = None + ) -> Any: self._create_database_if_not_exists(database) if self._config.format_type == OutputFormat.JSONL: @@ -184,7 +189,7 @@ def _write(self, df: pd.DataFrame, path: str, database: str, table: str, mode: s else: raise Exception(f"Unsupported output format: {self._config.format_type}") - def _create_database_if_not_exists(self, database: str): + def _create_database_if_not_exists(self, database: str) -> None: tag_key = self._config.lakeformation_database_default_tag_key tag_values = self._config.lakeformation_database_default_tag_values diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/constants.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/constants.py new file mode 100644 index 000000000000..75ff5562c20f --- /dev/null +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/constants.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +EMPTY_VALUES = ["", " ", "#N/A", "#N/A N/A", "#NA", "", "N/A", "NA", "NULL", "none", "None", "NaN", "n/a", "nan", "null", "[]", "{}"] +BOOLEAN_VALUES = ["true", "1", "1.0", "t", "y", "yes"] + +PANDAS_TYPE_MAPPING = { + "string": "string", + "integer": "Int64", + "number": "float64", + "boolean": "bool", + "object": "object", + "array": "object", +} + +GLUE_TYPE_MAPPING_DOUBLE = { + "string": "string", + "integer": "bigint", + "number": "double", + "boolean": "boolean", + "null": "string", +} + +GLUE_TYPE_MAPPING_DECIMAL = { + **GLUE_TYPE_MAPPING_DOUBLE, + "number": "decimal(38, 25)", +} diff --git a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py index ed110c35ef57..d7ee96e27688 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/destination_aws_datalake/stream_writer.py @@ -4,19 +4,40 @@ import json import logging +from datetime import date, datetime +from decimal import Decimal, getcontext from typing import Any, Dict, List, Optional, Tuple, Union import pandas as pd from airbyte_cdk.models import ConfiguredAirbyteStream, DestinationSyncMode -from destination_aws_datalake.config_reader import ConnectorConfig, PartitionOptions from .aws import AwsHandler +from .config_reader import ConnectorConfig, PartitionOptions +from .constants import EMPTY_VALUES, GLUE_TYPE_MAPPING_DECIMAL, GLUE_TYPE_MAPPING_DOUBLE, PANDAS_TYPE_MAPPING +# By default we set glue decimal type to decimal(28,25) +# this setting matches that precision. +getcontext().prec = 25 logger = logging.getLogger("airbyte") +class DictEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, Decimal): + return str(obj) + + if isinstance(obj, (pd.Timestamp, datetime)): + # all timestamps and datetimes are converted to UTC + return obj.strftime("%Y-%m-%dT%H:%M:%SZ") + + if isinstance(obj, date): + return obj.strftime("%Y-%m-%d") + + return super(DictEncoder, self).default(obj) + + class StreamWriter: - def __init__(self, aws_handler: AwsHandler, config: ConnectorConfig, configured_stream: ConfiguredAirbyteStream): + def __init__(self, aws_handler: AwsHandler, config: ConnectorConfig, configured_stream: ConfiguredAirbyteStream) -> None: self._aws_handler: AwsHandler = aws_handler self._config: ConnectorConfig = config self._configured_stream: ConfiguredAirbyteStream = configured_stream @@ -32,17 +53,18 @@ def __init__(self, aws_handler: AwsHandler, config: ConnectorConfig, configured_ logger.info(f"Creating StreamWriter for {self._database}:{self._table}") - def _get_date_columns(self) -> list: + def _get_date_columns(self) -> List[str]: date_columns = [] for key, val in self._schema.items(): typ = val.get("type") - if (isinstance(typ, str) and typ == "string") or (isinstance(typ, list) and "string" in typ): + typ = self._get_json_schema_type(typ) + if isinstance(typ, str) and typ == "string": if val.get("format") in ["date-time", "date"]: date_columns.append(key) return date_columns - def _add_partition_column(self, col: str, df: pd.DataFrame) -> list: + def _add_partition_column(self, col: str, df: pd.DataFrame) -> Dict[str, str]: partitioning = self._config.partitioning if partitioning == PartitionOptions.NONE: @@ -91,36 +113,63 @@ def _drop_additional_top_level_properties(self, record: Dict[str, Any]) -> Dict[ return record - def _fix_obvious_type_violations(self, record: Dict[str, Any]) -> Dict[str, Any]: + def _json_schema_cast_value(self, value, schema_entry) -> Any: + typ = schema_entry.get("type") + typ = self._get_json_schema_type(typ) + props = schema_entry.get("properties") + items = schema_entry.get("items") + + if typ == "string": + format = schema_entry.get("format") + if format == "date-time": + return pd.to_datetime(value, errors="coerce", utc=True) + + return str(value) if value and value != "" else None + + elif typ == "integer": + return pd.to_numeric(value, errors="coerce") + + elif typ == "number": + if self._config.glue_catalog_float_as_decimal: + return Decimal(str(value)) if value else Decimal("0") + return pd.to_numeric(value, errors="coerce") + + elif typ == "boolean": + return bool(value) + + elif typ == "null": + return None + + elif typ == "object": + if value in EMPTY_VALUES: + return None + + if isinstance(value, dict) and props: + for key, val in value.items(): + if key in props: + value[key] = self._json_schema_cast_value(val, props[key]) + return value + + elif typ == "array" and items: + if value in EMPTY_VALUES: + return None + + if isinstance(value, list): + return [self._json_schema_cast_value(item, items) for item in value] + + return value + + def _json_schema_cast(self, record: Dict[str, Any]) -> Dict[str, Any]: """ Helper that fixes obvious type violations in a record's top level keys that may cause issues when casting data to pyarrow types. Such as: - Objects having empty strings or " " or "-" as value instead of null or {} - Arrays having empty strings or " " or "-" as value instead of null or [] """ - schema_keys = self._schema.keys() - for key in schema_keys: + for key, schema_type in self._schema.items(): typ = self._schema[key].get("type") typ = self._get_json_schema_type(typ) - if typ in ["object", "array"]: - if record.get(key) in ["", " ", "-", "/", "null"]: - record[key] = None - - return record - - def _add_missing_columns(self, record: Dict[str, Any]) -> Dict[str, Any]: - """ - Helper that adds missing columns to a record's top level keys. Required - for awswrangler to create the correct schema in glue, even with the explicit - schema passed in, awswrangler will remove those columns when not present - in the dataframe - """ - schema_keys = self._schema.keys() - records_keys = record.keys() - difference = list(set(schema_keys).difference(set(records_keys))) - - for key in difference: - record[key] = None + record[key] = self._json_schema_cast_value(record.get(key), schema_type) return record @@ -153,15 +202,6 @@ def _get_json_schema_type(self, types: Union[List[str], str]) -> str: return types[0] def _get_pandas_dtypes_from_json_schema(self, df: pd.DataFrame) -> Dict[str, str]: - type_mapper = { - "string": "string", - "integer": "Int64", - "number": "float64", - "boolean": "bool", - "object": "object", - "array": "object", - } - column_types = {} typ = "string" @@ -176,15 +216,16 @@ def _get_pandas_dtypes_from_json_schema(self, df: pd.DataFrame) -> Dict[str, str typ = self._get_json_schema_type(typ) - column_types[col] = type_mapper.get(typ, "string") + column_types[col] = PANDAS_TYPE_MAPPING.get(typ, "string") return column_types - def _get_json_schema_types(self): + def _get_json_schema_types(self) -> Dict[str, str]: types = {} for key, val in self._schema.items(): typ = val.get("type") types[key] = self._get_json_schema_type(typ) + return types def _is_invalid_struct_or_array(self, schema: Dict[str, Any]) -> bool: @@ -243,19 +284,11 @@ def _get_glue_dtypes_from_json_schema(self, schema: Dict[str, Any]) -> Tuple[Dic """ Helper that infers glue dtypes from a json schema. """ - - type_mapper = { - "string": "string", - "integer": "bigint", - "number": "decimal(38, 25)" if self._config.glue_catalog_float_as_decimal else "double", - "boolean": "boolean", - "null": "string", - } + type_mapper = GLUE_TYPE_MAPPING_DECIMAL if self._config.glue_catalog_float_as_decimal else GLUE_TYPE_MAPPING_DOUBLE column_types = {} json_columns = set() - for (col, definition) in schema.items(): - + for col, definition in schema.items(): result_typ = None col_typ = definition.get("type") airbyte_type = definition.get("airbyte_type") @@ -275,7 +308,7 @@ def _get_glue_dtypes_from_json_schema(self, schema: Dict[str, Any]) -> Tuple[Dic if col_typ == "object": properties = definition.get("properties") - allow_additional_properties = definition.get("additionalProperties") + allow_additional_properties = definition.get("additionalProperties", False) if properties and not allow_additional_properties and self._is_invalid_struct_or_array(properties): object_props, _ = self._get_glue_dtypes_from_json_schema(properties) result_typ = f"struct<{','.join([f'{k}:{v}' for k, v in object_props.items()])}>" @@ -336,8 +369,7 @@ def _cursor_fields(self) -> Optional[List[str]]: def append_message(self, message: Dict[str, Any]): clean_message = self._drop_additional_top_level_properties(message) - clean_message = self._fix_obvious_type_violations(clean_message) - clean_message = self._add_missing_columns(clean_message) + clean_message = self._json_schema_cast(clean_message) self._messages.append(clean_message) def reset(self): @@ -362,7 +394,7 @@ def flush(self, partial: bool = False): date_columns = self._get_date_columns() for col in date_columns: if col in df.columns: - df[col] = pd.to_datetime(df[col]) + df[col] = pd.to_datetime(df[col], format="mixed", utc=True) # Create date column for partitioning if self._cursor_fields and col in self._cursor_fields: @@ -378,7 +410,7 @@ def flush(self, partial: bool = False): # so they can be queried with json_extract for col in json_casts: if col in df.columns: - df[col] = df[col].apply(json.dumps) + df[col] = df[col].apply(lambda x: json.dumps(x, cls=DictEncoder)) if self._sync_mode == DestinationSyncMode.overwrite and self._partial_flush_count < 1: logger.debug(f"Overwriting {len(df)} records to {self._database}:{self._table}") diff --git a/airbyte-integrations/connectors/destination-aws-datalake/metadata.yaml b/airbyte-integrations/connectors/destination-aws-datalake/metadata.yaml index baf2cbf1ad98..9240657289d7 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-aws-datalake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 99878c90-0fbd-46d3-9d98-ffde879d17fc - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 dockerRepository: airbyte/destination-aws-datalake githubIssueLabel: destination-aws-datalake icon: awsdatalake.svg diff --git a/airbyte-integrations/connectors/destination-aws-datalake/setup.py b/airbyte-integrations/connectors/destination-aws-datalake/setup.py index 4dccbefb7619..ca8628655464 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/setup.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/setup.py @@ -8,8 +8,8 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk~=0.1", "retrying", - "awswrangler==2.17.0", - "pandas==1.4.4", + "awswrangler==3.3.0", + "pandas==2.0.3", ] TEST_REQUIREMENTS = ["pytest~=6.1"] diff --git a/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/stream_writer_test.py b/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/stream_writer_test.py index b0a7ac314696..e981907cec40 100644 --- a/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/stream_writer_test.py +++ b/airbyte-integrations/connectors/destination-aws-datalake/unit_tests/stream_writer_test.py @@ -4,14 +4,16 @@ import json from datetime import datetime +from decimal import Decimal from typing import Any, Dict, Mapping +import numpy as np import pandas as pd from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode from destination_aws_datalake import DestinationAwsDatalake from destination_aws_datalake.aws import AwsHandler from destination_aws_datalake.config_reader import ConnectorConfig -from destination_aws_datalake.stream_writer import StreamWriter +from destination_aws_datalake.stream_writer import DictEncoder, StreamWriter def get_config() -> Mapping[str, Any]: @@ -196,6 +198,109 @@ def get_big_schema_configured_stream(): ) +def get_camelcase_configured_stream(): + stream_name = "append_camelcase" + stream_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": ["null", "object"], + "properties": { + "TaxRateRef": { + "properties": {"name": {"type": ["null", "string"]}, "value": {"type": ["null", "string"]}}, + "type": ["null", "object"], + }, + "DocNumber": {"type": ["null", "string"]}, + "CurrencyRef": { + "properties": {"name": {"type": ["null", "string"]}, "value": {"type": ["null", "string"]}}, + "type": ["null", "object"], + }, + "Id": {"type": ["null", "string"]}, + "domain": {"type": ["null", "string"]}, + "SyncToken": {"type": ["null", "string"]}, + "Line": { + "items": { + "properties": { + "Id": {"type": ["null", "string"]}, + "Amount": {"type": ["null", "number"]}, + "JournalEntryLineDetail": { + "properties": { + "AccountRef": { + "properties": {"name": {"type": ["null", "string"]}, "value": {"type": ["null", "string"]}}, + "type": ["null", "object"], + }, + "PostingType": {"type": ["null", "string"]}, + }, + "type": ["null", "object"], + }, + "DetailType": {"type": ["null", "string"]}, + "Description": {"type": ["null", "string"]}, + }, + "type": ["null", "object"], + }, + "type": ["null", "array"], + }, + "TxnDate": {"format": "date", "type": ["null", "string"]}, + "TxnTaxDetail": { + "type": ["null", "object"], + "properties": { + "TotalTax": {"type": ["null", "number"]}, + "TxnTaxCodeRef": { + "type": ["null", "object"], + "properties": {"value": {"type": ["null", "string"]}, "name": {"type": ["null", "string"]}}, + }, + "TaxLine": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "properties": { + "DetailType": {"type": ["null", "string"]}, + "Amount": {"type": ["null", "number"]}, + "TaxLineDetail": { + "type": ["null", "object"], + "properties": { + "TaxPercent": {"type": ["null", "number"]}, + "OverrideDeltaAmount": {"type": ["null", "number"]}, + "TaxInclusiveAmount": {"type": ["null", "number"]}, + "PercentBased": {"type": ["null", "boolean"]}, + "NetAmountTaxable": {"type": ["null", "number"]}, + "TaxRateRef": { + "type": ["null", "object"], + "properties": {"name": {"type": ["null", "string"]}, "value": {"type": ["null", "string"]}}, + }, + }, + }, + }, + }, + }, + }, + }, + "PrivateNote": {"type": ["null", "string"]}, + "ExchangeRate": {"type": ["null", "number"]}, + "MetaData": { + "properties": { + "CreateTime": {"format": "date-time", "type": ["null", "string"]}, + "LastUpdatedTime": {"format": "date-time", "type": ["null", "string"]}, + }, + "type": ["null", "object"], + }, + "Adjustment": {"type": ["null", "boolean"]}, + "sparse": {"type": ["null", "boolean"]}, + "airbyte_cursor": {"type": ["null", "string"]}, + }, + } + + return ConfiguredAirbyteStream( + stream=AirbyteStream( + name=stream_name, + json_schema=stream_schema, + default_cursor_field=["airbyte_cursor"], + supported_sync_modes=[SyncMode.incremental, SyncMode.full_refresh], + ), + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=["airbyte_cursor"], + ) + + def get_big_schema_writer(config: Dict[str, Any]): connector_config = ConnectorConfig(**config) aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) @@ -291,6 +396,30 @@ def test_get_glue_dtypes_from_json_schema(): } +def test_get_glue_types_from_json_schema_camel_case(): + connector_config = ConnectorConfig(**get_config()) + aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) + writer = StreamWriter(aws_handler, connector_config, get_camelcase_configured_stream()) + result, _ = writer._get_glue_dtypes_from_json_schema(writer._schema) + assert result == { + "Adjustment": "boolean", + "CurrencyRef": "struct", + "DocNumber": "string", + "ExchangeRate": "double", + "Id": "string", + "Line": "array,PostingType:string>,DetailType:string,Description:string>>", + "MetaData": "struct", + "PrivateNote": "string", + "SyncToken": "string", + "TaxRateRef": "struct", + "TxnDate": "date", + "TxnTaxDetail": "struct,TaxLine:array>>>>", + "airbyte_cursor": "string", + "domain": "string", + "sparse": "boolean", + } + + def test_has_objects_with_no_properties_good(): writer = get_big_schema_writer(get_config()) assert writer._is_invalid_struct_or_array( @@ -328,3 +457,301 @@ def test_has_objects_with_no_properties_nested_bad(): } } ) + + +def test_json_schema_cast_value(): + writer = get_big_schema_writer(get_config()) + assert ( + writer._json_schema_cast_value( + "test", + { + "type": "string", + }, + ) + == "test" + ) + assert ( + writer._json_schema_cast_value( + "1", + { + "type": "integer", + }, + ) + == 1 + ) + + +def test_json_schema_cast_decimal(): + config = get_config() + config["glue_catalog_float_as_decimal"] = True + connector_config = ConnectorConfig(**config) + aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) + writer = StreamWriter(aws_handler, connector_config, get_camelcase_configured_stream()) + + assert writer._json_schema_cast( + { + "Adjustment": False, + "domain": "QBO", + "sparse": "true", + "Id": "147491", + "SyncToken": "2", + "MetaData": {"CreateTime": "2023-02-09T10:36:39-08:00", "LastUpdatedTime": "2023-06-15T16:08:39-07:00"}, + "DocNumber": "wt_JE001032", + "TxnDate": "2023-01-13", + "CurrencyRef": {"value": "USD", "name": "United States Dollar"}, + "Line": [ + { + "Id": "0", + "Description": "Payroll 01/13/23", + "Amount": "137973.66", + "DetailType": "JournalEntryLineDetail", + "JournalEntryLineDetail": { + "PostingType": "Debit", + "Entity": {"Type": "Vendor", "EntityRef": {"value": "1", "name": "Test"}}, + "AccountRef": {"value": "234", "name": "Expense"}, + "ClassRef": {"value": "14", "name": "Business"}, + }, + }, + ], + "airbyte_cursor": "2023-06-15T16:08:39-07:00", + } + ) == { + "Adjustment": False, + "CurrencyRef": {"name": "United States Dollar", "value": "USD"}, + "DocNumber": "wt_JE001032", + "Id": "147491", + "ExchangeRate": Decimal("0"), + "Line": [ + { + "Amount": Decimal("137973.66"), + "Description": "Payroll 01/13/23", + "DetailType": "JournalEntryLineDetail", + "Id": "0", + "JournalEntryLineDetail": { + "PostingType": "Debit", + "Entity": {"Type": "Vendor", "EntityRef": {"value": "1", "name": "Test"}}, + "AccountRef": {"value": "234", "name": "Expense"}, + "ClassRef": {"value": "14", "name": "Business"}, + }, + } + ], + "MetaData": { + "CreateTime": pd.to_datetime("2023-02-09T10:36:39-08:00", utc=True), + "LastUpdatedTime": pd.to_datetime("2023-06-15T16:08:39-07:00", utc=True), + }, + "PrivateNote": None, + "SyncToken": "2", + "TxnDate": "2023-01-13", + "TaxRateRef": None, + "TxnTaxDetail": None, + "airbyte_cursor": "2023-06-15T16:08:39-07:00", + "domain": "QBO", + "sparse": True, + } + + +def test_json_schema_cast(): + connector_config = ConnectorConfig(**get_config()) + aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) + writer = StreamWriter(aws_handler, connector_config, get_camelcase_configured_stream()) + + input = { + "Adjustment": False, + "domain": "QBO", + "sparse": False, + "Id": "147491", + "SyncToken": "2", + "ExchangeRate": "1.33", + "MetaData": {"CreateTime": "2023-02-09T10:36:39-08:00", "LastUpdatedTime": "2023-06-15T16:08:39-07:00"}, + "DocNumber": "wt_JE001032", + "TxnDate": "2023-01-13", + "CurrencyRef": {"value": "USD", "name": "United States Dollar"}, + "Line": [ + { + "Id": "0", + "Description": "Money", + "Amount": "137973.66", + "DetailType": "JournalEntryLineDetail", + "JournalEntryLineDetail": { + "PostingType": "Debit", + "Entity": {"Type": "Vendor", "EntityRef": {"value": "1", "name": "Test"}}, + "AccountRef": {"value": "234", "name": "Expense"}, + "ClassRef": {"value": "14", "name": "Business"}, + }, + }, + ], + "airbyte_cursor": "2023-06-15T16:08:39-07:00", + } + + expected = { + "Adjustment": False, + "ExchangeRate": 1.33, + "CurrencyRef": {"name": "United States Dollar", "value": "USD"}, + "DocNumber": "wt_JE001032", + "Id": "147491", + "Line": [ + { + "Amount": 137973.66, + "Description": "Money", + "DetailType": "JournalEntryLineDetail", + "Id": "0", + "JournalEntryLineDetail": { + "PostingType": "Debit", + "Entity": {"Type": "Vendor", "EntityRef": {"value": "1", "name": "Test"}}, + "AccountRef": {"value": "234", "name": "Expense"}, + "ClassRef": {"value": "14", "name": "Business"}, + }, + } + ], + "MetaData": { + "CreateTime": pd.to_datetime("2023-02-09T10:36:39-08:00", utc=True), + "LastUpdatedTime": pd.to_datetime("2023-06-15T16:08:39-07:00", utc=True), + }, + "PrivateNote": None, + "SyncToken": "2", + "TxnDate": "2023-01-13", + "TaxRateRef": None, + "TxnTaxDetail": None, + "airbyte_cursor": "2023-06-15T16:08:39-07:00", + "domain": "QBO", + "sparse": False, + } + + assert writer._json_schema_cast(input) == expected + + +def test_json_schema_cast_empty_values(): + connector_config = ConnectorConfig(**get_config()) + aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) + writer = StreamWriter(aws_handler, connector_config, get_camelcase_configured_stream()) + + input = { + "Line": [ + { + "Id": "0", + "Description": "Money", + "Amount": "", + "DetailType": "JournalEntryLineDetail", + "JournalEntryLineDetail": "", + }, + ], + "MetaData": {"CreateTime": "", "LastUpdatedTime": "2023-06-15"}, + } + + expected = { + "Adjustment": False, + "CurrencyRef": None, + "DocNumber": None, + "Id": None, + "Line": [ + { + "Description": "Money", + "DetailType": "JournalEntryLineDetail", + "Id": "0", + "JournalEntryLineDetail": None, + } + ], + "MetaData": {"LastUpdatedTime": pd.to_datetime("2023-06-15", utc=True)}, + "PrivateNote": None, + "SyncToken": None, + "TaxRateRef": None, + "TxnDate": None, + "TxnTaxDetail": None, + "airbyte_cursor": None, + "domain": None, + "sparse": False, + } + + result = writer._json_schema_cast(input) + exchange_rate = result.pop("ExchangeRate") + created_time = result["MetaData"].pop("CreateTime") + line_amount = result["Line"][0].pop("Amount") + + assert result == expected + assert np.isnan(exchange_rate) + assert np.isnan(line_amount) + assert pd.isna(created_time) + + +def test_json_schema_cast_bad_values(): + connector_config = ConnectorConfig(**get_config()) + aws_handler = AwsHandler(connector_config, DestinationAwsDatalake()) + writer = StreamWriter(aws_handler, connector_config, get_camelcase_configured_stream()) + + input = { + "domain": 12, + "sparse": "true", + "Adjustment": 0, + "Line": [ + { + "Id": "0", + "Description": "Money", + "Amount": "hello", + "DetailType": "JournalEntryLineDetail", + "JournalEntryLineDetail": "", + }, + ], + "MetaData": {"CreateTime": "hello", "LastUpdatedTime": "2023-06-15"}, + } + + expected = { + "Adjustment": False, + "CurrencyRef": None, + "DocNumber": None, + "Id": None, + "Line": [ + { + "Description": "Money", + "DetailType": "JournalEntryLineDetail", + "Id": "0", + "JournalEntryLineDetail": None, + } + ], + "MetaData": {"LastUpdatedTime": pd.to_datetime("2023-06-15", utc=True)}, + "PrivateNote": None, + "SyncToken": None, + "TaxRateRef": None, + "TxnDate": None, + "TxnTaxDetail": None, + "airbyte_cursor": None, + "domain": "12", + "sparse": True, + } + + result = writer._json_schema_cast(input) + exchange_rate = result.pop("ExchangeRate") + created_time = result["MetaData"].pop("CreateTime") + line_amount = result["Line"][0].pop("Amount") + + assert result == expected + assert np.isnan(exchange_rate) + assert np.isnan(line_amount) + assert pd.isna(created_time) + + +def test_json_dict_encoder(): + dt = "2023-08-01T23:32:11Z" + dt = pd.to_datetime(dt, utc=True) + + input = { + "boolean": False, + "integer": 1, + "float": 2.0, + "decimal": Decimal("13.232"), + "datetime": dt.to_pydatetime(), + "date": dt.date(), + "timestamp": dt, + "nested": { + "boolean": False, + "datetime": dt.to_pydatetime(), + "very_nested": { + "boolean": False, + "datetime": dt.to_pydatetime(), + }, + }, + } + + assert ( + json.dumps(input, cls=DictEncoder) + == '{"boolean": false, "integer": 1, "float": 2.0, "decimal": "13.232", "datetime": "2023-08-01T23:32:11Z", "date": "2023-08-01", "timestamp": "2023-08-01T23:32:11Z", "nested": {"boolean": false, "datetime": "2023-08-01T23:32:11Z", "very_nested": {"boolean": false, "datetime": "2023-08-01T23:32:11Z"}}}' + ) diff --git a/docs/integrations/destinations/aws-datalake.md b/docs/integrations/destinations/aws-datalake.md index 50cda4825fe2..f8ca4dfc07e1 100644 --- a/docs/integrations/destinations/aws-datalake.md +++ b/docs/integrations/destinations/aws-datalake.md @@ -71,6 +71,7 @@ and types in the destination table as in the source except for the following typ | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.4 | 2023-10-25 | [\#29221](https://github.com/airbytehq/airbyte/pull/29221) | Upgrade AWSWrangler | | 0.1.3 | 2023-03-28 | [\#24642](https://github.com/airbytehq/airbyte/pull/24642) | Prefer airbyte type for complex types when available | | 0.1.2 | 2022-09-26 | [\#17193](https://github.com/airbytehq/airbyte/pull/17193) | Fix schema keyerror and add parquet support | | 0.1.1 | 2022-04-20 | [\#11811](https://github.com/airbytehq/airbyte/pull/11811) | Fix name of required param in specification |