Skip to content

Commit

Permalink
Merge pull request #30 from prakharcode/main
Browse files Browse the repository at this point in the history
added processed date in schema generation
  • Loading branch information
crowemi authored Nov 23, 2023
2 parents 8310ed9 + 7b9ea8d commit 521b59f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 33 deletions.
10 changes: 0 additions & 10 deletions .secrets/.gitignore

This file was deleted.

29 changes: 23 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ license = "Apache 2.0"
[tool.poetry.dependencies]
python = ">=3.8,<3.12"
requests = "^2.25.1"
singer-sdk = "^0.32.0"
simplejson = "*"
singer-sdk = "^0.33.0"
smart-open = {extras = ["s3"], version = "^6.2.0"}
pyarrow = "^10.0.0"
pandas = "^1.5.1"
Expand Down
2 changes: 1 addition & 1 deletion target_s3/formats/format_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def append_process_date(self, records) -> dict:
"""A function that appends the current UTC to every record"""

def process_date(record):
record["_PROCESS_DATE"] = datetime.utcnow().isoformat()
record["_PROCESS_DATE"] = datetime.utcnow()
return record

return list(map(lambda x: process_date(x), records))
88 changes: 77 additions & 11 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
from typing import List, Tuple, Union

import pyarrow
from pyarrow import fs, Table
from pyarrow import Table, fs
from pyarrow.parquet import ParquetWriter

from target_s3.formats.format_base import FormatBase
Expand Down Expand Up @@ -160,6 +163,13 @@ def sanitize(self, value):
if isinstance(value, dict) and not value:
# pyarrow can't process empty struct
return None
if isinstance(value, str):
# pyarrow can't process empty struct
try:
return value.encode("utf-16", "surrogatepass").decode("utf-16")
except Exception as e:
self.logger.warning("surrogate encoding failed, serializing string")
return json.dumps(value)
return value

def create_schema(self) -> pyarrow.schema:
Expand All @@ -175,6 +185,34 @@ def create_schema(self) -> pyarrow.schema:
:rtype: pyarrow.schema
"""

def process_anyof_schema(anyOf: List) -> Tuple[List, Union[str, None]]:
"""This function takes in original array of anyOf's schema detected
and reduces it to the detected schema, based on rules, right now
just detects whether it is string or not.
:param anyOf: Multiple types of anyOf schema from original schema
:type anyOf: List
:return: Returns final schema detected from multiple anyOf and format
:rtype: Tuple[List, str|None]
"""
types, formats = [], []
for val in anyOf:
typ = val.get("type")
if val.get("format"):
formats.append(val["format"])
if type(typ) is not list:
types.append(typ)
else:
types.extend(typ)
types = set(types)
formats = list(set(formats))
ret_type = []
if "string" in types:
ret_type.append("string")
if "null" in types:
ret_type.append("null")
return ret_type, formats[0] if formats else None

# TODO: handle non nullable types; by default nullable
def get_schema_from_array(items: dict, level: int):
"""Returns item schema for an array.
Expand All @@ -187,14 +225,25 @@ def get_schema_from_array(items: dict, level: int):
:rtype: pyarrow datatype
"""
type = items.get("type")
# if there's anyOf instead of single type
any_of_types = items.get("anyOf")
# if the items are objects
properties = items.get("properties")
# if the items are an array itself
items = items.get("items")

if any_of_types:
self.logger.info("array with anyof type schema detected.")
type, _ = process_anyof_schema(anyOf=any_of_types)

if "integer" in type:
return pyarrow.int64()
elif "number" in type:
return pyarrow.float64()
elif "string" in type:
return pyarrow.string()
elif "boolean" in type:
return pyarrow.bool_()
elif "array" in type:
return pyarrow.list_(get_schema_from_array(items=items, level=level))
elif "object" in type:
Expand All @@ -216,12 +265,21 @@ def get_schema_from_object(properties: dict, level: int = 0):
"""
fields = []
for key, val in properties.items():
type = val["type"]
format = val.get("format")
if "type" in val.keys():
type = val["type"]
format = val.get("format")
elif "anyOf" in val.keys():
type, format = process_anyof_schema(val["anyOf"])
else:
self.logger.warning("type information not given")
type = ["string", "null"]

if "integer" in type:
fields.append(pyarrow.field(key, pyarrow.int64()))
elif "number" in type:
fields.append(pyarrow.field(key, pyarrow.float64()))
elif "boolean" in type:
fields.append(pyarrow.field(key, pyarrow.bool_()))
elif "string" in type:
if format and level == 0:
# this is done to handle explicit datetime conversion
Expand Down Expand Up @@ -273,6 +331,14 @@ def get_schema_from_object(properties: dict, level: int = 0):

properties = self.stream_schema.get("properties")
parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties))

# append process_date that is added in format_base
if self.config.get("include_process_date", None):
key = "_PROCESS_DATE"
parquet_schema = parquet_schema.append(
pyarrow.field(key, pyarrow.timestamp("s", tz="utc"))
)

self.parquet_schema = parquet_schema
return parquet_schema

Expand All @@ -281,16 +347,16 @@ def create_dataframe(self) -> Table:
try:
format_parquet = self.format.get("format_parquet", None)
if format_parquet and format_parquet.get("get_schema_from_tap", False):
parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema()
parquet_schema = (
self.parquet_schema if self.parquet_schema else self.create_schema()
)
fields = set([property.name for property in parquet_schema])
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

ret = Table.from_pydict(
mapping=input, schema=parquet_schema
)
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

ret = Table.from_pydict(mapping=input, schema=parquet_schema)
else:
fields = set()
for d in self.records:
Expand Down
2 changes: 1 addition & 1 deletion target_s3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ def process_batch(self, context: dict) -> None:
isinstance(format_type_client, FormatBase) is True
), f"format_type_client must be of type Base; Type: {type(self.format_type_client)}."

format_type_client.run()
format_type_client.run()
4 changes: 2 additions & 2 deletions target_s3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Targets3(Target):
resulting parquet file based on taps. Doesn't \
work with 'anyOf' types or when complex data is\
not defined at element level. Doesn't work with \
validate option for now.",
validate option for now."
),
),
required=False,
Expand Down Expand Up @@ -197,4 +197,4 @@ def deserialize_json(self, line: str) -> dict:


if __name__ == "__main__":
Targets3.cli()
Targets3.cli()

0 comments on commit 521b59f

Please sign in to comment.