From 4e836c3df861a1f961c1e246c92cef26350e21ac Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Mon, 9 Oct 2023 12:01:48 +0200 Subject: [PATCH 01/12] add schema detection --- target_s3/formats/format_parquet.py | 130 +++++++++++++++++++++++++++- target_s3/sinks.py | 4 +- target_s3/target.py | 37 +++++++- 3 files changed, 167 insertions(+), 4 deletions(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index c924b28..4cad4bd 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -160,6 +160,119 @@ def sanitize(self, value): return None return value + def create_batch_schema(self) -> pyarrow.schema: + """Generates schema from the records schema present in the tap. + This is effective way to declare schema instead of relying on pyarrow to + detect schema type. + + Note: At level 0 (outermost level) any key that is of type datetime in record + is converted to datetime by base target class. Hence string at level 0 is handled with + type datetime. + + :return: schema made from stream's schema definition + :rtype: pyarrow.schema + """ + + # TODO: handle non nullable types; by default nullable + def get_schema_from_array(items: dict, level: int): + """Returns item schema for an array. + + :param items: items definition of array + :type items: dict + :param level: depth level of array in jsonschema + :type level: int + :return: detected datatype for all items of array. + :rtype: pyarrow datatype + """ + type = items.get("type") + properties = items.get("properties") + items = items.get("items") + if "integer" in type: + return pyarrow.int64() + elif "number" in type: + return pyarrow.float64() + elif "string" in type: + return pyarrow.string() + elif "array" in type: + return pyarrow.list_(get_schema_from_array(items=items, level=level)) + elif "object" in type: + return pyarrow.struct( + get_schema_from_object(properties=properties, level=level + 1) + ) + else: + return pyarrow.null() + + def get_schema_from_object(properties: dict, level: int = 0): + """Returns schema for an object. + + :param properties: properties definition of object + :type properties: dict + :param level: depth level of object in jsonschema + :type level: int + :return: detected fields for properties in object. + :rtype: pyarrow datatype + """ + fields = [] + for key, val in properties.items(): + type = val["type"] + format = val.get("format") + if "integer" in type: + fields.append(pyarrow.field(key, pyarrow.int64())) + elif "number" in type: + fields.append(pyarrow.field(key, pyarrow.float64())) + elif "string" in type: + if format and level == 0: + # this is done to handle explicit datetime conversion + # which happens only at level 1 of a record + if format == "date": + fields.append(pyarrow.field(key, pyarrow.date64())) + elif format == "time": + fields.append(pyarrow.field(key, pyarrow.time64())) + else: + fields.append( + pyarrow.field(key, pyarrow.timestamp("s", tz="utc")) + ) + else: + fields.append(pyarrow.field(key, pyarrow.string())) + elif "array" in type: + items = val.get("items") + if items: + item_type = get_schema_from_array(items=items, level=level) + if item_type == pyarrow.null(): + self.logger.warn( + f""" + key: {key} is defined as list of null, while this would be + correct for list of all null but it is better to define + exact item types for the list, if not null.""" + ) + fields.append(pyarrow.field(key, pyarrow.list_(item_type))) + else: + self.logger.warn( + f""" + key: {key} is defined as list of null, while this would be + correct for list of all null but it is better to define + exact item types for the list, if not null.""" + ) + fields.append(pyarrow.field(key, pyarrow.list_(pyarrow.null()))) + elif "object" in type: + prop = val.get("properties") + inner_fields = get_schema_from_object( + properties=prop, level=level + 1 + ) + if not inner_fields: + self.logger.warn( + f""" + key: {key} has no fields defined, this may cause + saving parquet failure as parquet doesn't support + empty/null complex types [array, structs] """ + ) + fields.append(pyarrow.field(key, pyarrow.struct(inner_fields))) + return fields + + properties = self.context["batch_schema"].get("properties") + schema = pyarrow.schema(get_schema_from_object(properties=properties)) + return schema + def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: @@ -185,7 +298,13 @@ def create_dataframe(self) -> Table: for f in fields } - ret = Table.from_pydict(mapping=input) + if format_parquet and format_parquet.get("get_schema_from_tap", False): + ret = Table.from_pydict( + mapping=input, schema=self.create_batch_schema() + ) + else: + ret = Table.from_pydict(mapping=input) + except Exception as e: self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") @@ -208,9 +327,16 @@ def _write(self, contents: str = None) -> None: filesystem=self.file_system, ).write_table(df) except Exception as e: - self.logger.error("Failed to write parquet file to S3.") + self.logger.error(e) + if type(e) is pyarrow.lib.ArrowNotImplementedError: + self.logger.error( + """Failed to write parquet file to S3. Complex types [array, object] in schema cannot be left without type definition """ + ) + else: + self.logger.error("Failed to write parquet file to S3.") raise e def run(self) -> None: # use default behavior, no additional run steps needed return super().run(self.context["records"]) + \ No newline at end of file diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 27c05e7..2f60cfb 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -30,6 +30,7 @@ def __init__( super().__init__(target, stream_name, schema, key_properties) # what type of file are we building? self.format_type = self.config.get("format", None).get("format_type", None) + self.schema = schema if self.format_type: if self.format_type not in FORMAT_TYPE: raise Exception( @@ -43,6 +44,7 @@ def process_batch(self, context: dict) -> None: # add stream name to context context["stream_name"] = self.stream_name context["logger"] = self.logger + context["batch_schema"] = self.schema # creates new object for each batch format_type_client = format_type_factory( FORMAT_TYPE[self.format_type], self.config, context @@ -52,4 +54,4 @@ def process_batch(self, context: dict) -> None: isinstance(format_type_client, FormatBase) is True ), f"format_type_client must be of type Base; Type: {type(self.format_type_client)}." - format_type_client.run() + format_type_client.run() \ No newline at end of file diff --git a/target_s3/target.py b/target_s3/target.py index 7e3bc61..3e35a7d 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -1,6 +1,8 @@ """s3 target class.""" from __future__ import annotations +import decimal +import json from singer_sdk.target_base import Target from singer_sdk import typing as th @@ -38,6 +40,17 @@ class Targets3(Target): required=False, default=False, ), + th.Property( + "get_schema_from_tap", + th.BooleanType, + required=False, + default=False, + description="Set true if you want to declare schema of the\ + resulting parquet file based on taps. Doesn't \ + work with 'anyOf' types or when complex data is\ + not defined at element level. Doesn't work with \ + validate option for now." + ), ), required=False, ), @@ -165,6 +178,28 @@ class Targets3(Target): default_sink_class = s3Sink + def deserialize_json(self, line: str) -> dict: + """Override base target's method to overcome Decimal cast, + only applied when generating parquet schema from tap schema. + + :param line: serialized record from stream + :type line: str + :return: deserialized record + :rtype: dict + """ + try: + self.format = self.config.get("format", None) + format_parquet = self.format.get("format_parquet", None) + if format_parquet and format_parquet.get("get_schema_from_tap", False): + return json.loads(line) # type: ignore[no-any-return] + else: + return json.loads( # type: ignore[no-any-return] + line, parse_float=decimal.Decimal + ) + except json.decoder.JSONDecodeError as exc: + self.logger.error("Unable to parse:\n%s", line, exc_info=exc) + raise + if __name__ == "__main__": - Targets3.cli() + Targets3.cli() \ No newline at end of file From c279f8e963bc492662ad98833bb82c0d1f004bd6 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 10 Oct 2023 01:09:59 +0200 Subject: [PATCH 02/12] fix setting --- meltano.yml | 72 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/meltano.yml b/meltano.yml index b9df142..7b78dd3 100644 --- a/meltano.yml +++ b/meltano.yml @@ -1,25 +1,61 @@ version: 1 send_anonymous_usage_stats: false -project_id: "target-s3" +project_id: target-s3 default_environment: test environments: - - name: test +- name: test plugins: extractors: [] loaders: - - name: "target-s3" - namespace: "target_s3" - pip_url: -e . - capabilities: - - about - - stream-maps - - record-flattening - config: - start_date: "2010-01-01T00:00:00Z" - settings: - # TODO: To configure using Meltano, declare settings and their types here: - - name: username - - name: password - kind: password - - name: start_date - value: "2010-01-01T00:00:00Z" + - name: target-s3 + namespace: target_s3 + pip_url: -e . + capabilities: + - about + - stream-maps + - record-flattening + settings: + - name: format.format_type + - name: format.format_parquet.validate + kind: boolean + value: false + - name: format.format_parquet.get_schema_from_tap + kind: boolean + value: false + - name: cloud_provider.cloud_provider_type + value: aws + - name: cloud_provider.aws.aws_access_key_id + kind: password + - name: cloud_provider.aws.aws_secret_access_key + kind: password + - name: cloud_provider.aws.aws_session_token + kind: password + - name: cloud_provider.aws.aws_region + kind: password + - name: cloud_provider.aws.aws_profile_name + kind: password + - name: cloud_provider.aws.aws_bucket + kind: password + - name: cloud_provider.aws.aws_endpoint_override + - name: prefix + - name: stream_name_path_override + - name: include_process_date + kind: boolean + value: false + - name: append_date_to_prefix + kind: boolean + value: true + - name: partition_name_enabled + kind: boolean + value: false + - name: append_date_to_prefix_grain + value: day + - name: append_date_to_filename + kind: boolean + value: true + - name: append_date_to_filename_grain + value: microsecond + - name: flatten_records + kind: boolean + value: false + From e432954849854afc0db79b362838496b34f8c7b8 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Wed, 11 Oct 2023 18:47:18 +0200 Subject: [PATCH 03/12] added schema handling --- .secrets/.gitignore | 10 ----- target_s3/formats/format_parquet.py | 61 ++++++++++++++++------------- target_s3/sinks.py | 2 +- 3 files changed, 35 insertions(+), 38 deletions(-) delete mode 100644 .secrets/.gitignore diff --git a/.secrets/.gitignore b/.secrets/.gitignore deleted file mode 100644 index 33c6acd..0000000 --- a/.secrets/.gitignore +++ /dev/null @@ -1,10 +0,0 @@ -# IMPORTANT! This folder is hidden from git - if you need to store config files or other secrets, -# make sure those are never staged for commit into your git repo. You can store them here or another -# secure location. -# -# Note: This may be redundant with the global .gitignore for, and is provided -# for redundancy. If the `.secrets` folder is not needed, you may delete it -# from the project. - -* -!.gitignore diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 4cad4bd..dbaad78 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -16,6 +16,8 @@ def __init__(self, config, context) -> None: cloud_provider_config_type, cloud_provider_config.get(cloud_provider_config_type, None), ) + self.stream_schema = context.get("stream_schema", {}) + self.parquet_schema = None def create_filesystem( self, @@ -160,7 +162,7 @@ def sanitize(self, value): return None return value - def create_batch_schema(self) -> pyarrow.schema: + def create_schema(self) -> pyarrow.schema: """Generates schema from the records schema present in the tap. This is effective way to declare schema instead of relying on pyarrow to detect schema type. @@ -269,44 +271,49 @@ def get_schema_from_object(properties: dict, level: int = 0): fields.append(pyarrow.field(key, pyarrow.struct(inner_fields))) return fields - properties = self.context["batch_schema"].get("properties") - schema = pyarrow.schema(get_schema_from_object(properties=properties)) - return schema + properties = self.stream_schema.get("properties") + parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties)) + self.parquet_schema = parquet_schema + return parquet_schema def create_dataframe(self) -> Table: """Creates a pyarrow Table object from the record set.""" try: - fields = set() - for d in self.records: - fields = fields.union(d.keys()) - format_parquet = self.format.get("format_parquet", None) - if format_parquet and format_parquet.get("validate", None) == True: - # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html - # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. - schema = dict() - input = { - f: [ - self.validate(schema, self.sanitize(f), row.get(f)) - for row in self.records - ] - for f in fields - } - else: - input = { - f: [self.sanitize(row.get(f)) for row in self.records] - for f in fields - } - if format_parquet and format_parquet.get("get_schema_from_tap", False): + parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema() + fields = set([property.name for property in parquet_schema]) + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } + ret = Table.from_pydict( - mapping=input, schema=self.create_batch_schema() + mapping=input, schema=parquet_schema ) else: + fields = set() + for d in self.records: + fields = fields.union(d.keys()) + if format_parquet and format_parquet.get("validate", None) == True: + # NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html + # and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it. + schema = dict() + input = { + f: [ + self.validate(schema, self.sanitize(f), row.get(f)) + for row in self.records + ] + for f in fields + } + else: + input = { + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } ret = Table.from_pydict(mapping=input) except Exception as e: - self.logger.info(self.records) self.logger.error("Failed to create parquet dataframe.") self.logger.error(e) raise e diff --git a/target_s3/sinks.py b/target_s3/sinks.py index 2f60cfb..2fd6e9f 100644 --- a/target_s3/sinks.py +++ b/target_s3/sinks.py @@ -44,7 +44,7 @@ def process_batch(self, context: dict) -> None: # add stream name to context context["stream_name"] = self.stream_name context["logger"] = self.logger - context["batch_schema"] = self.schema + context["stream_schema"] = self.schema # creates new object for each batch format_type_client = format_type_factory( FORMAT_TYPE[self.format_type], self.config, context From e224989510b18d7d49577cc29667ca6c4528ca96 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Fri, 20 Oct 2023 10:33:27 +0200 Subject: [PATCH 04/12] version update --- poetry.lock | 110 +++++++++---------------------------------------- pyproject.toml | 2 +- 2 files changed, 21 insertions(+), 91 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9e5c3a6..7e4c84e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,10 +1,9 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "aiobotocore" version = "2.4.2" description = "Async client for aws services using botocore and aiohttp" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -26,7 +25,6 @@ boto3 = ["boto3 (>=1.24.59,<1.24.60)"] name = "aiohttp" version = "3.8.5" description = "Async http client/server framework (asyncio)" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -135,7 +133,6 @@ speedups = ["Brotli", "aiodns", "cchardet"] name = "aioitertools" version = "0.11.0" description = "itertools and builtins for AsyncIO and mixed iterables" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -150,7 +147,6 @@ typing_extensions = {version = ">=4.0", markers = "python_version < \"3.10\""} name = "aiosignal" version = "1.3.1" description = "aiosignal: a list of registered asynchronous callbacks" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -165,7 +161,6 @@ frozenlist = ">=1.1.0" name = "appdirs" version = "1.4.4" description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." -category = "main" optional = false python-versions = "*" files = [ @@ -177,7 +172,6 @@ files = [ name = "async-timeout" version = "4.0.3" description = "Timeout context manager for asyncio programs" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -189,7 +183,6 @@ files = [ name = "atomicwrites" version = "1.4.1" description = "Atomic file writes." -category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -200,7 +193,6 @@ files = [ name = "attrs" version = "23.1.0" description = "Classes Without Boilerplate" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -219,7 +211,6 @@ tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pyte name = "backoff" version = "2.2.1" description = "Function decoration for backoff and retry" -category = "main" optional = false python-versions = ">=3.7,<4.0" files = [ @@ -231,7 +222,6 @@ files = [ name = "black" version = "23.9.1" description = "The uncompromising code formatter." -category = "dev" optional = false python-versions = ">=3.8" files = [ @@ -278,7 +268,6 @@ uvloop = ["uvloop (>=0.15.2)"] name = "boto3" version = "1.24.59" description = "The AWS SDK for Python" -category = "main" optional = false python-versions = ">= 3.7" files = [ @@ -298,7 +287,6 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] name = "botocore" version = "1.27.59" description = "Low-level, data-driven core of boto 3." -category = "main" optional = false python-versions = ">= 3.7" files = [ @@ -318,7 +306,6 @@ crt = ["awscrt (==0.14.0)"] name = "certifi" version = "2023.7.22" description = "Python package for providing Mozilla's CA Bundle." -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -330,7 +317,6 @@ files = [ name = "cffi" version = "1.16.0" description = "Foreign Function Interface for Python calling C code." -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -395,7 +381,6 @@ pycparser = "*" name = "charset-normalizer" version = "3.3.0" description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -category = "main" optional = false python-versions = ">=3.7.0" files = [ @@ -495,7 +480,6 @@ files = [ name = "click" version = "8.1.7" description = "Composable command line interface toolkit" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -510,7 +494,6 @@ colorama = {version = "*", markers = "platform_system == \"Windows\""} name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" files = [ @@ -522,7 +505,6 @@ files = [ name = "cryptography" version = "41.0.4" description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers." -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -568,7 +550,6 @@ test-randomorder = ["pytest-randomly"] name = "distlib" version = "0.3.7" description = "Distribution utilities" -category = "dev" optional = false python-versions = "*" files = [ @@ -580,7 +561,6 @@ files = [ name = "dnspython" version = "2.4.2" description = "DNS toolkit" -category = "main" optional = false python-versions = ">=3.8,<4.0" files = [ @@ -600,7 +580,6 @@ wmi = ["wmi (>=1.5.1,<2.0.0)"] name = "filelock" version = "3.12.4" description = "A platform independent file lock." -category = "dev" optional = false python-versions = ">=3.8" files = [ @@ -617,7 +596,6 @@ typing = ["typing-extensions (>=4.7.1)"] name = "frozenlist" version = "1.4.0" description = "A list-like structure which implements collections.abc.MutableSequence" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -688,7 +666,6 @@ files = [ name = "fs" version = "2.4.16" description = "Python's filesystem abstraction layer" -category = "main" optional = false python-versions = "*" files = [ @@ -708,7 +685,6 @@ scandir = ["scandir (>=1.5,<2.0)"] name = "fsspec" version = "2022.11.0" description = "File-system specification" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -743,7 +719,6 @@ tqdm = ["tqdm"] name = "greenlet" version = "3.0.0" description = "Lightweight in-process concurrent programming" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -766,7 +741,7 @@ files = [ {file = "greenlet-3.0.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:0b72b802496cccbd9b31acea72b6f87e7771ccfd7f7927437d592e5c92ed703c"}, {file = "greenlet-3.0.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:527cd90ba3d8d7ae7dceb06fda619895768a46a1b4e423bdb24c1969823b8362"}, {file = "greenlet-3.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:37f60b3a42d8b5499be910d1267b24355c495064f271cfe74bf28b17b099133c"}, - {file = "greenlet-3.0.0-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:1482fba7fbed96ea7842b5a7fc11d61727e8be75a077e603e8ab49d24e234383"}, + {file = "greenlet-3.0.0-cp311-universal2-macosx_10_9_universal2.whl", hash = "sha256:c3692ecf3fe754c8c0f2c95ff19626584459eab110eaab66413b1e7425cd84e9"}, {file = "greenlet-3.0.0-cp312-cp312-macosx_13_0_arm64.whl", hash = "sha256:be557119bf467d37a8099d91fbf11b2de5eb1fd5fc5b91598407574848dc910f"}, {file = "greenlet-3.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:73b2f1922a39d5d59cc0e597987300df3396b148a9bd10b76a058a2f2772fc04"}, {file = "greenlet-3.0.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d1e22c22f7826096ad503e9bb681b05b8c1f5a8138469b255eb91f26a76634f2"}, @@ -776,6 +751,7 @@ files = [ {file = "greenlet-3.0.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:952256c2bc5b4ee8df8dfc54fc4de330970bf5d79253c863fb5e6761f00dda35"}, {file = "greenlet-3.0.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:269d06fa0f9624455ce08ae0179430eea61085e3cf6457f05982b37fd2cefe17"}, {file = "greenlet-3.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:9adbd8ecf097e34ada8efde9b6fec4dd2a903b1e98037adf72d12993a1c80b51"}, + {file = "greenlet-3.0.0-cp312-universal2-macosx_10_9_universal2.whl", hash = "sha256:553d6fb2324e7f4f0899e5ad2c427a4579ed4873f42124beba763f16032959af"}, {file = "greenlet-3.0.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c6b5ce7f40f0e2f8b88c28e6691ca6806814157ff05e794cdd161be928550f4c"}, {file = "greenlet-3.0.0-cp37-cp37m-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:ecf94aa539e97a8411b5ea52fc6ccd8371be9550c4041011a091eb8b3ca1d810"}, {file = "greenlet-3.0.0-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:80dcd3c938cbcac986c5c92779db8e8ce51a89a849c135172c88ecbdc8c056b7"}, @@ -819,7 +795,6 @@ test = ["objgraph", "psutil"] name = "idna" version = "3.4" description = "Internationalized Domain Names in Applications (IDNA)" -category = "main" optional = false python-versions = ">=3.5" files = [ @@ -831,7 +806,6 @@ files = [ name = "importlib-resources" version = "6.1.0" description = "Read resources from Python packages" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -850,7 +824,6 @@ testing = ["pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", name = "inflection" version = "0.5.1" description = "A port of Ruby on Rails inflector to Python" -category = "main" optional = false python-versions = ">=3.5" files = [ @@ -862,7 +835,6 @@ files = [ name = "iniconfig" version = "2.0.0" description = "brain-dead simple config-ini parsing" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -874,7 +846,6 @@ files = [ name = "isort" version = "5.12.0" description = "A Python utility / library to sort Python imports." -category = "dev" optional = false python-versions = ">=3.8.0" files = [ @@ -892,7 +863,6 @@ requirements-deprecated-finder = ["pip-api", "pipreqs"] name = "jmespath" version = "1.0.1" description = "JSON Matching Expressions" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -904,7 +874,6 @@ files = [ name = "joblib" version = "1.3.2" description = "Lightweight pipelining with Python functions" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -916,7 +885,6 @@ files = [ name = "jsonpath-ng" version = "1.6.0" description = "A final implementation of JSONPath for Python that aims to be standard compliant, including arithmetic and binary comparison operators and providing clear AST for metaprogramming." -category = "main" optional = false python-versions = "*" files = [ @@ -931,7 +899,6 @@ ply = "*" name = "jsonschema" version = "4.19.1" description = "An implementation of JSON Schema validation for Python" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -955,7 +922,6 @@ format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339- name = "jsonschema-specifications" version = "2023.7.1" description = "The JSON Schema meta-schemas and vocabularies, exposed as a Registry" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -971,7 +937,6 @@ referencing = ">=0.28.0" name = "memoization" version = "0.4.0" description = "A powerful caching library for Python, with TTL support and multiple algorithm options. (https://github.com/lonelyenvoy/python-memoization)" -category = "main" optional = false python-versions = ">=3, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4" files = [ @@ -982,7 +947,6 @@ files = [ name = "multidict" version = "6.0.4" description = "multidict implementation" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1066,7 +1030,6 @@ files = [ name = "mypy" version = "0.910" description = "Optional static typing for Python" -category = "dev" optional = false python-versions = ">=3.5" files = [ @@ -1108,7 +1071,6 @@ python2 = ["typed-ast (>=1.4.0,<1.5.0)"] name = "mypy-extensions" version = "0.4.4" description = "Experimental type system extensions for programs checked with the mypy typechecker." -category = "dev" optional = false python-versions = ">=2.7" files = [ @@ -1119,7 +1081,6 @@ files = [ name = "numpy" version = "1.24.4" description = "Fundamental package for array computing in Python" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1157,7 +1118,6 @@ files = [ name = "packaging" version = "23.2" description = "Core utilities for Python packages" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1169,7 +1129,6 @@ files = [ name = "pandas" version = "1.5.3" description = "Powerful data structures for data analysis, time series, and statistics" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1218,7 +1177,6 @@ test = ["hypothesis (>=5.5.3)", "pytest (>=6.0)", "pytest-xdist (>=1.31)"] name = "pathspec" version = "0.11.2" description = "Utility library for gitignore style pattern matching of file paths." -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1230,7 +1188,6 @@ files = [ name = "pendulum" version = "2.1.2" description = "Python datetimes made easy" -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" files = [ @@ -1265,7 +1222,6 @@ pytzdata = ">=2020.1" name = "pkgutil-resolve-name" version = "1.3.10" description = "Resolve a name to an object." -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1277,7 +1233,6 @@ files = [ name = "platformdirs" version = "3.11.0" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -1293,7 +1248,6 @@ test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4)", "pytest-co name = "pluggy" version = "1.3.0" description = "plugin and hook calling mechanisms for python" -category = "dev" optional = false python-versions = ">=3.8" files = [ @@ -1309,7 +1263,6 @@ testing = ["pytest", "pytest-benchmark"] name = "ply" version = "3.11" description = "Python Lex & Yacc" -category = "main" optional = false python-versions = "*" files = [ @@ -1321,7 +1274,6 @@ files = [ name = "py" version = "1.11.0" description = "library with cross-python path, ini-parsing, io, code, log facilities" -category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" files = [ @@ -1333,7 +1285,6 @@ files = [ name = "pyarrow" version = "10.0.1" description = "Python library for Apache Arrow" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1371,7 +1322,6 @@ numpy = ">=1.16.6" name = "pycparser" version = "2.21" description = "C parser in Python" -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -1383,7 +1333,6 @@ files = [ name = "pydocstyle" version = "6.3.0" description = "Python docstring style checker" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1401,7 +1350,6 @@ toml = ["tomli (>=1.2.3)"] name = "pyjwt" version = "2.8.0" description = "JSON Web Token implementation in Python" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1419,7 +1367,6 @@ tests = ["coverage[toml] (==5.0.4)", "pytest (>=6.0.0,<7.0.0)"] name = "pymongo" version = "4.5.0" description = "Python driver for MongoDB " -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1521,7 +1468,6 @@ zstd = ["zstandard"] name = "pytest" version = "6.2.5" description = "pytest: simple powerful testing with Python" -category = "dev" optional = false python-versions = ">=3.6" files = [ @@ -1546,7 +1492,6 @@ testing = ["argcomplete", "hypothesis (>=3.56)", "mock", "nose", "requests", "xm name = "python-dateutil" version = "2.8.2" description = "Extensions to the standard Python datetime module" -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" files = [ @@ -1561,7 +1506,6 @@ six = ">=1.5" name = "python-dotenv" version = "0.21.1" description = "Read key-value pairs from a .env file and set them as environment variables" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1576,7 +1520,6 @@ cli = ["click (>=5.0)"] name = "pytz" version = "2023.3.post1" description = "World timezone definitions, modern and historical" -category = "main" optional = false python-versions = "*" files = [ @@ -1588,7 +1531,6 @@ files = [ name = "pytzdata" version = "2020.1" description = "The Olson timezone database for Python." -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ @@ -1600,7 +1542,6 @@ files = [ name = "pyyaml" version = "6.0.1" description = "YAML parser and emitter for Python" -category = "main" optional = false python-versions = ">=3.6" files = [ @@ -1660,7 +1601,6 @@ files = [ name = "referencing" version = "0.30.2" description = "JSON Referencing + Python" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1676,7 +1616,6 @@ rpds-py = ">=0.7.0" name = "requests" version = "2.31.0" description = "Python HTTP for Humans." -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -1698,7 +1637,6 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] name = "rpds-py" version = "0.10.4" description = "Python bindings to Rust's persistent data structures (rpds)" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1807,7 +1745,6 @@ files = [ name = "s3fs" version = "2022.11.0" description = "Convenient Filesystem interface over S3" -category = "main" optional = false python-versions = ">= 3.7" files = [ @@ -1828,7 +1765,6 @@ boto3 = ["aiobotocore[boto3] (>=2.4.0,<2.5.0)"] name = "s3transfer" version = "0.6.2" description = "An Amazon S3 Transfer Manager" -category = "main" optional = false python-versions = ">= 3.7" files = [ @@ -1846,7 +1782,6 @@ crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"] name = "setuptools" version = "68.2.2" description = "Easily download, build, install, upgrade, and uninstall Python packages" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -1859,11 +1794,21 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-ruff", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.1)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +[[package]] +name = "simpleeval" +version = "0.9.13" +description = "A simple, safe single expression evaluator library." +optional = false +python-versions = "*" +files = [ + {file = "simpleeval-0.9.13-py2.py3-none-any.whl", hash = "sha256:22a2701a5006e4188d125d34accf2405c2c37c93f6b346f2484b6422415ae54a"}, + {file = "simpleeval-0.9.13.tar.gz", hash = "sha256:4a30f9cc01825fe4c719c785e3762623e350c4840d5e6855c2a8496baaa65fac"}, +] + [[package]] name = "simplejson" version = "3.19.2" description = "Simple, fast, extensible JSON encoder/decoder for Python" -category = "main" optional = false python-versions = ">=2.5, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -1969,14 +1914,13 @@ files = [ [[package]] name = "singer-sdk" -version = "0.32.0" +version = "0.33.0" description = "A framework for building Singer taps" -category = "main" optional = false python-versions = ">=3.7.1,<4" files = [ - {file = "singer_sdk-0.32.0-py3-none-any.whl", hash = "sha256:19b56e67bc9e6c7410e1830711894ae25d54609a72d06bef569d0c5a5a1b2b54"}, - {file = "singer_sdk-0.32.0.tar.gz", hash = "sha256:af666788faa027869fa045859bd070b3d0df487c89fda18e7b683e8bb94b4ced"}, + {file = "singer_sdk-0.33.0-py3-none-any.whl", hash = "sha256:ed0f9c67ca79964a8f517e7360197d4ddb342f341448fb3b310214ce91e48ac5"}, + {file = "singer_sdk-0.33.0.tar.gz", hash = "sha256:e2b43496b324ca9214b731c8d2c2711d9100b6238d20b1e3187c8ae3de1b0c9d"}, ] [package.dependencies] @@ -1997,6 +1941,7 @@ python-dotenv = ">=0.20,<0.22" pytz = ">=2022.2.1,<2024.0.0" PyYAML = ">=6.0" requests = ">=2.25.1" +simpleeval = ">=0.9.13,<0.10.0" simplejson = ">=3.17.6" sqlalchemy = ">=1.4,<3.0" typing-extensions = ">=4.2.0" @@ -2011,7 +1956,6 @@ testing = ["pytest (>=7.2.1)", "pytest-durations (>=1.2.0)"] name = "six" version = "1.16.0" description = "Python 2 and 3 compatibility utilities" -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -2023,7 +1967,6 @@ files = [ name = "smart-open" version = "6.4.0" description = "Utils for streaming large files (S3, HDFS, GCS, Azure Blob Storage, gzip, bz2...)" -category = "main" optional = false python-versions = ">=3.6,<4.0" files = [ @@ -2048,7 +1991,6 @@ webhdfs = ["requests"] name = "snowballstemmer" version = "2.2.0" description = "This package provides 29 stemmers for 28 languages generated from Snowball algorithms." -category = "dev" optional = false python-versions = "*" files = [ @@ -2060,7 +2002,6 @@ files = [ name = "sqlalchemy" version = "2.0.21" description = "Database Abstraction Library" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -2108,7 +2049,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\""} +greenlet = {version = "!=0.4.17", markers = "platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\""} typing-extensions = ">=4.2.0" [package.extras] @@ -2139,7 +2080,6 @@ sqlcipher = ["sqlcipher3-binary"] name = "toml" version = "0.10.2" description = "Python Library for Tom's Obvious, Minimal Language" -category = "dev" optional = false python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" files = [ @@ -2151,7 +2091,6 @@ files = [ name = "tomli" version = "2.0.1" description = "A lil' TOML parser" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -2163,7 +2102,6 @@ files = [ name = "tox" version = "3.28.0" description = "tox is a generic virtualenv management and test command line tool" -category = "dev" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" files = [ @@ -2189,7 +2127,6 @@ testing = ["flaky (>=3.4.0)", "freezegun (>=0.3.11)", "pathlib2 (>=2.3.3)", "psu name = "types-requests" version = "2.31.0.6" description = "Typing stubs for requests" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -2204,7 +2141,6 @@ types-urllib3 = "*" name = "types-urllib3" version = "1.26.25.14" description = "Typing stubs for urllib3" -category = "dev" optional = false python-versions = "*" files = [ @@ -2216,7 +2152,6 @@ files = [ name = "typing-extensions" version = "4.8.0" description = "Backported and Experimental Type Hints for Python 3.8+" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -2228,7 +2163,6 @@ files = [ name = "urllib3" version = "1.26.17" description = "HTTP library with thread-safe connection pooling, file post, and more." -category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" files = [ @@ -2245,7 +2179,6 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] name = "virtualenv" version = "20.24.5" description = "Virtual Python Environment builder" -category = "dev" optional = false python-versions = ">=3.7" files = [ @@ -2266,7 +2199,6 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess name = "wrapt" version = "1.15.0" description = "Module for decorators, wrappers and monkey patching." -category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" files = [ @@ -2351,7 +2283,6 @@ files = [ name = "yarl" version = "1.9.2" description = "Yet another URL library" -category = "main" optional = false python-versions = ">=3.7" files = [ @@ -2439,7 +2370,6 @@ multidict = ">=4.0" name = "zipp" version = "3.17.0" description = "Backport of pathlib-compatible object wrapper for zip files" -category = "main" optional = false python-versions = ">=3.8" files = [ @@ -2454,4 +2384,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.8,<3.12" -content-hash = "21f21a480507ef3594701eb9e0506a198354b61a7b36f25d28a6790459093ef7" +content-hash = "64bc9d0b597b6c5a88a7831f22dceb9a01c3ffd7c67c576ca094af786fd7f2a3" diff --git a/pyproject.toml b/pyproject.toml index 45563e1..75d4241 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ license = "Apache 2.0" [tool.poetry.dependencies] python = ">=3.8,<3.12" requests = "^2.25.1" -singer-sdk = "^0.32.0" +singer-sdk = "^0.33.0" smart-open = {extras = ["s3"], version = "^6.2.0"} pyarrow = "^10.0.0" pandas = "^1.5.1" From bb18003115334dd7a09d209c765e77d1408b468b Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 24 Oct 2023 17:19:21 +0200 Subject: [PATCH 05/12] added boolean datatype --- target_s3/formats/format_parquet.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index dbaad78..a291e3d 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -195,6 +195,8 @@ def get_schema_from_array(items: dict, level: int): return pyarrow.float64() elif "string" in type: return pyarrow.string() + elif "boolean" in type: + return pyarrow.bool_() elif "array" in type: return pyarrow.list_(get_schema_from_array(items=items, level=level)) elif "object" in type: @@ -222,6 +224,8 @@ def get_schema_from_object(properties: dict, level: int = 0): fields.append(pyarrow.field(key, pyarrow.int64())) elif "number" in type: fields.append(pyarrow.field(key, pyarrow.float64())) + elif "boolean" in type: + fields.append(pyarrow.field(key, pyarrow.bool_())) elif "string" in type: if format and level == 0: # this is done to handle explicit datetime conversion From d2e3e923e9029e298f53681d31a3e159c94cbbc9 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 24 Oct 2023 23:07:05 +0200 Subject: [PATCH 06/12] added process date --- target_s3/formats/format_base.py | 2 +- target_s3/formats/format_parquet.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/target_s3/formats/format_base.py b/target_s3/formats/format_base.py index 216232a..97fa594 100644 --- a/target_s3/formats/format_base.py +++ b/target_s3/formats/format_base.py @@ -185,7 +185,7 @@ def append_process_date(self, records) -> dict: """A function that appends the current UTC to every record""" def process_date(record): - record["_PROCESS_DATE"] = datetime.utcnow().isoformat() + record["_PROCESS_DATE"] = datetime.utcnow() return record return list(map(lambda x: process_date(x), records)) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index a291e3d..21319bd 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -277,6 +277,12 @@ def get_schema_from_object(properties: dict, level: int = 0): properties = self.stream_schema.get("properties") parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties)) + + # append process_date that is added in format_base + if self.config.get("include_process_date", None): + key = "_PROCESS_DATE" + parquet_schema = parquet_schema.append(pyarrow.field(key, pyarrow.timestamp("s", tz="utc"))) + self.parquet_schema = parquet_schema return parquet_schema From 9a9c5393558a99a0a9fe09b15597be42baee4db8 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Thu, 9 Nov 2023 11:42:41 +0100 Subject: [PATCH 07/12] some merge resolve --- poetry.lock | 4 ---- pyproject.toml | 5 ----- target_s3/target.py | 4 ---- 3 files changed, 13 deletions(-) diff --git a/poetry.lock b/poetry.lock index 17dc8f2..0aeee25 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,8 +1,4 @@ -<<<<<<< HEAD # This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. -======= -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. ->>>>>>> 8310ed963d58a9e62799238a54c3c318ce61400b [[package]] name = "aiobotocore" diff --git a/pyproject.toml b/pyproject.toml index 5d2b607..75d4241 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,12 +12,7 @@ license = "Apache 2.0" [tool.poetry.dependencies] python = ">=3.8,<3.12" requests = "^2.25.1" -<<<<<<< HEAD singer-sdk = "^0.33.0" -======= -singer-sdk = "^0.32.0" -simplejson = "*" ->>>>>>> 8310ed963d58a9e62799238a54c3c318ce61400b smart-open = {extras = ["s3"], version = "^6.2.0"} pyarrow = "^10.0.0" pandas = "^1.5.1" diff --git a/target_s3/target.py b/target_s3/target.py index ef53650..58b3ae8 100644 --- a/target_s3/target.py +++ b/target_s3/target.py @@ -49,11 +49,7 @@ class Targets3(Target): resulting parquet file based on taps. Doesn't \ work with 'anyOf' types or when complex data is\ not defined at element level. Doesn't work with \ -<<<<<<< HEAD validate option for now." -======= - validate option for now.", ->>>>>>> 8310ed963d58a9e62799238a54c3c318ce61400b ), ), required=False, From 48c423152d2668c3b1e3f4ad456c6ca603515be8 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 21 Nov 2023 11:52:59 +0530 Subject: [PATCH 08/12] handle anyof schema --- target_s3/formats/format_parquet.py | 73 ++++++++++++++++++++++++----- 1 file changed, 60 insertions(+), 13 deletions(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 21319bd..8ab3754 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -1,5 +1,7 @@ +from typing import List, Tuple, Union + import pyarrow -from pyarrow import fs, Table +from pyarrow import Table, fs from pyarrow.parquet import ParquetWriter from target_s3.formats.format_base import FormatBase @@ -175,6 +177,34 @@ def create_schema(self) -> pyarrow.schema: :rtype: pyarrow.schema """ + def process_anyof_schema(anyOf: List) -> Tuple[List, Union[str,None]]: + """This function takes in original array of anyOf's schema detected + and reduces it to the detected schema, based on rules, right now + just detects whether it is string or not. + + :param anyOf: Multiple types of anyOf schema from original schema + :type anyOf: List + :return: Returns final schema detected from multiple anyOf and format + :rtype: Tuple[List, str|None] + """ + types, formats = [], [] + for val in anyOf: + typ = val.get("type") + if val.get("format"): + formats.append(val["format"]) + if type(typ) is not list: + types.append(typ) + else: + types.extend(typ) + types = set(types) + formats = list(set(formats)) + ret_type = [] + if "string" in types: + ret_type.append("string") + if "null" in types: + ret_type.append("null") + return ret_type, formats[0] if formats else None + # TODO: handle non nullable types; by default nullable def get_schema_from_array(items: dict, level: int): """Returns item schema for an array. @@ -187,8 +217,17 @@ def get_schema_from_array(items: dict, level: int): :rtype: pyarrow datatype """ type = items.get("type") + # if there's anyOf instead of single type + any_of_types = items.get("anyOf") + # if the items are objects properties = items.get("properties") + # if the items are an array itself items = items.get("items") + + if any_of_types: + self.logger.info("array with anyof type schema detected.") + type, _ = process_anyof_schema(anyOf=any_of_types) + if "integer" in type: return pyarrow.int64() elif "number" in type: @@ -218,8 +257,15 @@ def get_schema_from_object(properties: dict, level: int = 0): """ fields = [] for key, val in properties.items(): - type = val["type"] - format = val.get("format") + if "type" in val.keys(): + type = val["type"] + format = val.get("format") + elif "anyOf" in val.keys(): + type, format = process_anyof_schema(val["anyOf"]) + else: + self.logger.warning("type information not given") + type = ["string", "null"] + if "integer" in type: fields.append(pyarrow.field(key, pyarrow.int64())) elif "number" in type: @@ -281,7 +327,9 @@ def get_schema_from_object(properties: dict, level: int = 0): # append process_date that is added in format_base if self.config.get("include_process_date", None): key = "_PROCESS_DATE" - parquet_schema = parquet_schema.append(pyarrow.field(key, pyarrow.timestamp("s", tz="utc"))) + parquet_schema = parquet_schema.append( + pyarrow.field(key, pyarrow.timestamp("s", tz="utc")) + ) self.parquet_schema = parquet_schema return parquet_schema @@ -291,16 +339,16 @@ def create_dataframe(self) -> Table: try: format_parquet = self.format.get("format_parquet", None) if format_parquet and format_parquet.get("get_schema_from_tap", False): - parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema() + parquet_schema = ( + self.parquet_schema if self.parquet_schema else self.create_schema() + ) fields = set([property.name for property in parquet_schema]) input = { - f: [self.sanitize(row.get(f)) for row in self.records] - for f in fields - } - - ret = Table.from_pydict( - mapping=input, schema=parquet_schema - ) + f: [self.sanitize(row.get(f)) for row in self.records] + for f in fields + } + + ret = Table.from_pydict(mapping=input, schema=parquet_schema) else: fields = set() for d in self.records: @@ -356,4 +404,3 @@ def _write(self, contents: str = None) -> None: def run(self) -> None: # use default behavior, no additional run steps needed return super().run(self.context["records"]) - \ No newline at end of file From d4d4ff6848893da0e56ce4dc0e1a017689ea4101 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 21 Nov 2023 13:10:43 +0530 Subject: [PATCH 09/12] added utf-16 handling --- target_s3/formats/format_parquet.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 8ab3754..e8702c5 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -1,3 +1,4 @@ +import json from typing import List, Tuple, Union import pyarrow @@ -162,6 +163,15 @@ def sanitize(self, value): if isinstance(value, dict) and not value: # pyarrow can't process empty struct return None + if isinstance(value, str) and not value: + # pyarrow can't process empty struct + try: + return value.encode("utf-16", "surrogatepass").decode("utf-16") + except Exception as e: + if e is UnicodeDecodeError: + return json.dumps(value) + else: + raise ValueError(f"Cannot process string value {value}") return value def create_schema(self) -> pyarrow.schema: @@ -177,7 +187,7 @@ def create_schema(self) -> pyarrow.schema: :rtype: pyarrow.schema """ - def process_anyof_schema(anyOf: List) -> Tuple[List, Union[str,None]]: + def process_anyof_schema(anyOf: List) -> Tuple[List, Union[str, None]]: """This function takes in original array of anyOf's schema detected and reduces it to the detected schema, based on rules, right now just detects whether it is string or not. From 0779cd879bc902cf8daea908c670868f4c0a569a Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 21 Nov 2023 15:14:15 +0530 Subject: [PATCH 10/12] handle utf-16 error --- target_s3/formats/format_parquet.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index e8702c5..d0b6430 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -168,10 +168,7 @@ def sanitize(self, value): try: return value.encode("utf-16", "surrogatepass").decode("utf-16") except Exception as e: - if e is UnicodeDecodeError: - return json.dumps(value) - else: - raise ValueError(f"Cannot process string value {value}") + return json.dumps(value) return value def create_schema(self) -> pyarrow.schema: From caabff397c05b2678116d00c008a50cb177570cf Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 21 Nov 2023 15:32:08 +0530 Subject: [PATCH 11/12] handle utf-16 error --- target_s3/formats/format_parquet.py | 1 + 1 file changed, 1 insertion(+) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index d0b6430..9c41e16 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -168,6 +168,7 @@ def sanitize(self, value): try: return value.encode("utf-16", "surrogatepass").decode("utf-16") except Exception as e: + self.logger.warning("surrogate encoding failed, serializing string") return json.dumps(value) return value From 7b9ea8dc02b48067508d1791f2c3f4de30622483 Mon Sep 17 00:00:00 2001 From: Prakhar Srivastava Date: Tue, 21 Nov 2023 16:04:39 +0530 Subject: [PATCH 12/12] fix utf error --- target_s3/formats/format_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_s3/formats/format_parquet.py b/target_s3/formats/format_parquet.py index 9c41e16..203cc64 100644 --- a/target_s3/formats/format_parquet.py +++ b/target_s3/formats/format_parquet.py @@ -163,7 +163,7 @@ def sanitize(self, value): if isinstance(value, dict) and not value: # pyarrow can't process empty struct return None - if isinstance(value, str) and not value: + if isinstance(value, str): # pyarrow can't process empty struct try: return value.encode("utf-16", "surrogatepass").decode("utf-16")