Skip to content

Commit

Permalink
Merge pull request #16 from crowemi/update-validate
Browse files Browse the repository at this point in the history
Added scrub method
  • Loading branch information
crowemi authored May 17, 2023
2 parents 9a963e8 + da8bb0b commit 709734e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com).
"format": {
"format_type": "json",
"format_parquet": {
"validate": "[true/false]"
"validate": "[true|false]"
},
"format_json": {},
"format_csv": {}
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "target-s3"
version = "1.0.3"
version = "1.0.6"
description = "`target-s3` is a Singer target for s3, built with the Meltano Singer SDK."
authors = ["crowemi"]
keywords = [
Expand Down
41 changes: 37 additions & 4 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,22 @@ def create_filesystem(
raise e

def validate(self, schema: dict, field, value) -> dict:
"""Validates data elements."""
"""
Validates data elements against a given schema and field. If the field is not in the schema, it will be added.
If the value does not match the expected type in the schema, it will be cast to the expected type.
The method returns the validated value.
:param schema: A dictionary representing the schema to validate against.
:param field: The field to validate.
:param value: The value to validate.
:return: The validated value.
"""

def unpack_dict(record) -> dict:
ret = dict()
# set empty dictionaries to type string
if len(record) == 0:
ret = {"type": type(str())}
for field in record:
if isinstance(record[field], dict):
ret[field] = unpack_dict(record[field])
Expand Down Expand Up @@ -120,21 +132,34 @@ def validate_list(value, fields):
else:
expected_type = schema[field].get("type")
if not isinstance(value, expected_type):
# if the values don't match try to cast current value to expected type, this souldn't happen,
# if the values don't match try to cast current value to expected type, this shouldn't happen,
# an error will occur during target instantiation.
value = expected_type(value)

else:
# add new entry for field
if isinstance(value, dict):
schema[field] = {"type": type(value), "fields": unpack_dict(value)}
validate_dict(value, schema[field].get("fields"))
elif isinstance(value, list):
schema[field] = {"type": type(value), "fields": unpack_list(value)}
validate_list(value, schema[field].get("fields"))
else:
schema[field] = {"type": type(value)}
expected_type = schema[field].get("type")
if not isinstance(value, expected_type):
# if the values don't match try to cast current value to expected type, this shouldn't happen,
# an error will occur during target instantiation.
value = expected_type(value)

return value

def sanitize(self, value):
if isinstance(value, dict) and not value:
# pyarrow can't process empty struct
return None
return value

def create_dataframe(self) -> Table:
"""Creates a pyarrow Table object from the record set."""
try:
Expand All @@ -144,13 +169,21 @@ def create_dataframe(self) -> Table:

format_parquet = self.format.get("format_parquet", None)
if format_parquet and format_parquet.get("validate", None) == True:
# NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html
# and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it.
schema = dict()
input = {
f: [self.validate(schema, f, row.get(f)) for row in self.records]
f: [
self.validate(schema, self.sanitize(f), row.get(f))
for row in self.records
]
for f in fields
}
else:
input = {f: [row.get(f) for row in self.records] for f in fields}
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

ret = Table.from_pydict(mapping=input)
except Exception as e:
Expand Down

0 comments on commit 709734e

Please sign in to comment.