Skip to content

Commit

Permalink
🐛 Destination AWS Datalake: Upgrade AWSWrangler (#29221)
Browse files Browse the repository at this point in the history
Signed-off-by: Henri Blancke <[email protected]>
Co-authored-by: Marcos Marx <[email protected]>
Co-authored-by: marcosmarxm <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2023
1 parent 148dda1 commit 8d60177
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
FROM python:3.9-slim
# FROM python:3.9.11-alpine3.15
FROM python:3.10-slim

# Bash is installed for more convenient debugging.
# RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

WORKDIR /airbyte/integration_code
COPY destination_aws_datalake ./destination_aws_datalake
COPY main.py ./
COPY setup.py ./
RUN pip install .

COPY destination_aws_datalake ./destination_aws_datalake

ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/destination-aws-datalake
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
from decimal import Decimal
from typing import Dict, Optional
from typing import Any, Dict, Optional

import awswrangler as wr
import boto3
Expand All @@ -15,11 +15,10 @@
from retrying import retry

from .config_reader import CompressionCodec, ConnectorConfig, CredentialsType, OutputFormat
from .constants import BOOLEAN_VALUES, EMPTY_VALUES

logger = logging.getLogger("airbyte")

null_values = ["", " ", "#N/A", "#N/A N/A", "#NA", "<NA>", "N/A", "NA", "NULL", "none", "None", "NaN", "n/a", "nan", "null"]


def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_type: str) -> pd.DataFrame:
if desired_type == "datetime64":
Expand All @@ -32,13 +31,13 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t
# First cast to string
df = _cast_pandas_column(df=df, col=col, current_type=current_type, desired_type="string")
# Then cast to decimal
df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in null_values else None)
df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in EMPTY_VALUES else None)
elif desired_type.lower() in ["float64", "int64"]:
df[col] = df[col].fillna("")
df[col] = pd.to_numeric(df[col])
elif desired_type in ["boolean", "bool"]:
if df[col].dtype in ["string", "O"]:
df[col] = df[col].fillna("false").apply(lambda x: str(x).lower() in ["true", "1", "1.0", "t", "y", "yes"])
df[col] = df[col].fillna("false").apply(lambda x: str(x).lower() in BOOLEAN_VALUES)

df[col] = df[col].astype(bool)
else:
Expand All @@ -53,7 +52,7 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t
"which may cause precision loss.",
UserWarning,
)
df[col] = df[col].apply(lambda x: int(x) if str(x) not in null_values else None).astype(desired_type)
df[col] = df[col].apply(lambda x: int(x) if str(x) not in EMPTY_VALUES else None).astype(desired_type)
return df


Expand All @@ -66,7 +65,7 @@ def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_t


class AwsHandler:
def __init__(self, connector_config: ConnectorConfig, destination: Destination):
def __init__(self, connector_config: ConnectorConfig, destination: Destination) -> None:
self._config: ConnectorConfig = connector_config
self._destination: Destination = destination
self._session: boto3.Session = None
Expand All @@ -79,7 +78,7 @@ def __init__(self, connector_config: ConnectorConfig, destination: Destination):
self._table_type = "GOVERNED" if self._config.lakeformation_governed_tables else "EXTERNAL_TABLE"

@retry(stop_max_attempt_number=10, wait_random_min=1000, wait_random_max=2000)
def create_session(self):
def create_session(self) -> None:
if self._config.credentials_type == CredentialsType.IAM_USER:
self._session = boto3.Session(
aws_access_key_id=self._config.aws_access_key,
Expand Down Expand Up @@ -108,7 +107,7 @@ def _get_s3_path(self, database: str, table: str) -> str:

return f"{bucket}/{database}/{table}/"

def _get_compression_type(self, compression: CompressionCodec):
def _get_compression_type(self, compression: CompressionCodec) -> Optional[str]:
if compression == CompressionCodec.GZIP:
return "gzip"
elif compression == CompressionCodec.SNAPPY:
Expand All @@ -127,14 +126,16 @@ def _write_parquet(
mode: str,
dtype: Optional[Dict[str, str]],
partition_cols: list = None,
):
) -> Any:
return wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
database=database,
table=table,
table_type=self._table_type,
glue_table_settings={
"table_type": self._table_type,
},
mode=mode,
use_threads=False, # True causes s3 NoCredentialsError error
catalog_versioning=True,
Expand All @@ -153,14 +154,16 @@ def _write_json(
mode: str,
dtype: Optional[Dict[str, str]],
partition_cols: list = None,
):
) -> Any:
return wr.s3.to_json(
df=df,
path=path,
dataset=True,
database=database,
table=table,
table_type=self._table_type,
glue_table_settings={
"table_type": self._table_type,
},
mode=mode,
use_threads=False, # True causes s3 NoCredentialsError error
orient="records",
Expand All @@ -172,7 +175,9 @@ def _write_json(
compression=self._get_compression_type(self._config.compression_codec),
)

def _write(self, df: pd.DataFrame, path: str, database: str, table: str, mode: str, dtype: Dict[str, str], partition_cols: list = None):
def _write(
self, df: pd.DataFrame, path: str, database: str, table: str, mode: str, dtype: Dict[str, str], partition_cols: list = None
) -> Any:
self._create_database_if_not_exists(database)

if self._config.format_type == OutputFormat.JSONL:
Expand All @@ -184,7 +189,7 @@ def _write(self, df: pd.DataFrame, path: str, database: str, table: str, mode: s
else:
raise Exception(f"Unsupported output format: {self._config.format_type}")

def _create_database_if_not_exists(self, database: str):
def _create_database_if_not_exists(self, database: str) -> None:
tag_key = self._config.lakeformation_database_default_tag_key
tag_values = self._config.lakeformation_database_default_tag_values

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

EMPTY_VALUES = ["", " ", "#N/A", "#N/A N/A", "#NA", "<NA>", "N/A", "NA", "NULL", "none", "None", "NaN", "n/a", "nan", "null", "[]", "{}"]
BOOLEAN_VALUES = ["true", "1", "1.0", "t", "y", "yes"]

PANDAS_TYPE_MAPPING = {
"string": "string",
"integer": "Int64",
"number": "float64",
"boolean": "bool",
"object": "object",
"array": "object",
}

GLUE_TYPE_MAPPING_DOUBLE = {
"string": "string",
"integer": "bigint",
"number": "double",
"boolean": "boolean",
"null": "string",
}

GLUE_TYPE_MAPPING_DECIMAL = {
**GLUE_TYPE_MAPPING_DOUBLE,
"number": "decimal(38, 25)",
}
Loading

0 comments on commit 8d60177

Please sign in to comment.