Skip to content

Commit

Permalink
Merge pull request #27 from crowemi/23
Browse files Browse the repository at this point in the history
added parquet schema generation from tap
  • Loading branch information
crowemi authored Oct 18, 2023
2 parents 300eabc + 111ba6e commit 8310ed9
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 50 deletions.
72 changes: 54 additions & 18 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,61 @@
version: 1
send_anonymous_usage_stats: false
project_id: "target-s3"
project_id: target-s3
default_environment: test
environments:
- name: test
- name: test
plugins:
extractors: []
loaders:
- name: "target-s3"
namespace: "target_s3"
pip_url: -e .
capabilities:
- about
- stream-maps
- record-flattening
config:
start_date: "2010-01-01T00:00:00Z"
settings:
# TODO: To configure using Meltano, declare settings and their types here:
- name: username
- name: password
kind: password
- name: start_date
value: "2010-01-01T00:00:00Z"
- name: target-s3
namespace: target_s3
pip_url: -e .
capabilities:
- about
- stream-maps
- record-flattening
settings:
- name: format.format_type
- name: format.format_parquet.validate
kind: boolean
value: false
- name: format.format_parquet.get_schema_from_tap
kind: boolean
value: false
- name: cloud_provider.cloud_provider_type
value: aws
- name: cloud_provider.aws.aws_access_key_id
kind: password
- name: cloud_provider.aws.aws_secret_access_key
kind: password
- name: cloud_provider.aws.aws_session_token
kind: password
- name: cloud_provider.aws.aws_region
kind: password
- name: cloud_provider.aws.aws_profile_name
kind: password
- name: cloud_provider.aws.aws_bucket
kind: password
- name: cloud_provider.aws.aws_endpoint_override
- name: prefix
- name: stream_name_path_override
- name: include_process_date
kind: boolean
value: false
- name: append_date_to_prefix
kind: boolean
value: true
- name: partition_name_enabled
kind: boolean
value: false
- name: append_date_to_prefix_grain
value: day
- name: append_date_to_filename
kind: boolean
value: true
- name: append_date_to_filename_grain
value: microsecond
- name: flatten_records
kind: boolean
value: false

56 changes: 45 additions & 11 deletions target_s3/formats/format_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,55 @@ def create_key(self) -> str:
grain = DATE_GRAIN[self.config["append_date_to_prefix_grain"].lower()]
partition_name_enabled = False
if self.config["partition_name_enabled"]:
partition_name_enabled = self.config["partition_name_enabled"]
folder_path += self.create_folder_structure(batch_start, grain, partition_name_enabled)
partition_name_enabled = self.config["partition_name_enabled"]
folder_path += self.create_folder_structure(
batch_start, grain, partition_name_enabled
)
if self.config["append_date_to_filename"]:
grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()]
file_name += f"{self.create_file_structure(batch_start, grain)}"

return f"{folder_path}{file_name}"

def create_folder_structure(self, batch_start: datetime, grain: int, partition_name_enabled: bool) -> str:
def create_folder_structure(
self, batch_start: datetime, grain: int, partition_name_enabled: bool
) -> str:
ret = ""
ret += f"{'year=' if partition_name_enabled else ''}{batch_start.year}/" if grain <= DATE_GRAIN["year"] else ""
ret += f"{'month=' if partition_name_enabled else ''}{batch_start.month:02}/" if grain <= DATE_GRAIN["month"] else ""
ret += f"{'day=' if partition_name_enabled else ''}{batch_start.day:02}/" if grain <= DATE_GRAIN["day"] else ""
ret += f"{'hour=' if partition_name_enabled else ''}{batch_start.hour:02}/" if grain <= DATE_GRAIN["hour"] else ""
ret += f"{'minute=' if partition_name_enabled else ''}{batch_start.minute:02}/" if grain <= DATE_GRAIN["minute"] else ""
ret += f"{'second=' if partition_name_enabled else ''}{batch_start.second:02}/" if grain <= DATE_GRAIN["second"] else ""
ret += f"{'microsecond=' if partition_name_enabled else ''}{batch_start.microsecond}/" if grain <= DATE_GRAIN["microsecond"] else ""
ret += (
f"{'year=' if partition_name_enabled else ''}{batch_start.year}/"
if grain <= DATE_GRAIN["year"]
else ""
)
ret += (
f"{'month=' if partition_name_enabled else ''}{batch_start.month:02}/"
if grain <= DATE_GRAIN["month"]
else ""
)
ret += (
f"{'day=' if partition_name_enabled else ''}{batch_start.day:02}/"
if grain <= DATE_GRAIN["day"]
else ""
)
ret += (
f"{'hour=' if partition_name_enabled else ''}{batch_start.hour:02}/"
if grain <= DATE_GRAIN["hour"]
else ""
)
ret += (
f"{'minute=' if partition_name_enabled else ''}{batch_start.minute:02}/"
if grain <= DATE_GRAIN["minute"]
else ""
)
ret += (
f"{'second=' if partition_name_enabled else ''}{batch_start.second:02}/"
if grain <= DATE_GRAIN["second"]
else ""
)
ret += (
f"{'microsecond=' if partition_name_enabled else ''}{batch_start.microsecond}/"
if grain <= DATE_GRAIN["microsecond"]
else ""
)
return ret

def create_file_structure(self, batch_start: datetime, grain: int) -> str:
Expand All @@ -139,7 +171,9 @@ def create_file_structure(self, batch_start: datetime, grain: int) -> str:
ret += f"-{batch_start.hour:02}" if grain <= DATE_GRAIN["hour"] else ""
ret += f"{batch_start.minute:02}" if grain <= DATE_GRAIN["minute"] else ""
ret += f"{batch_start.second:02}" if grain <= DATE_GRAIN["second"] else ""
ret += f"{batch_start.microsecond}" if grain <= DATE_GRAIN["microsecond"] else ""
ret += (
f"{batch_start.microsecond}" if grain <= DATE_GRAIN["microsecond"] else ""
)
return ret

def append_process_date(self, records) -> dict:
Expand Down
174 changes: 153 additions & 21 deletions target_s3/formats/format_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def __init__(self, config, context) -> None:
cloud_provider_config_type,
cloud_provider_config.get(cloud_provider_config_type, None),
)
self.stream_schema = context.get("stream_schema", {})
self.parquet_schema = None

def create_filesystem(
self,
Expand Down Expand Up @@ -160,34 +162,158 @@ def sanitize(self, value):
return None
return value

def create_schema(self) -> pyarrow.schema:
"""Generates schema from the records schema present in the tap.
This is effective way to declare schema instead of relying on pyarrow to
detect schema type.
Note: At level 0 (outermost level) any key that is of type datetime in record
is converted to datetime by base target class. Hence string at level 0 is handled with
type datetime.
:return: schema made from stream's schema definition
:rtype: pyarrow.schema
"""

# TODO: handle non nullable types; by default nullable
def get_schema_from_array(items: dict, level: int):
"""Returns item schema for an array.
:param items: items definition of array
:type items: dict
:param level: depth level of array in jsonschema
:type level: int
:return: detected datatype for all items of array.
:rtype: pyarrow datatype
"""
type = items.get("type")
properties = items.get("properties")
items = items.get("items")
if "integer" in type:
return pyarrow.int64()
elif "number" in type:
return pyarrow.float64()
elif "string" in type:
return pyarrow.string()
elif "array" in type:
return pyarrow.list_(get_schema_from_array(items=items, level=level))
elif "object" in type:
return pyarrow.struct(
get_schema_from_object(properties=properties, level=level + 1)
)
else:
return pyarrow.null()

def get_schema_from_object(properties: dict, level: int = 0):
"""Returns schema for an object.
:param properties: properties definition of object
:type properties: dict
:param level: depth level of object in jsonschema
:type level: int
:return: detected fields for properties in object.
:rtype: pyarrow datatype
"""
fields = []
for key, val in properties.items():
type = val["type"]
format = val.get("format")
if "integer" in type:
fields.append(pyarrow.field(key, pyarrow.int64()))
elif "number" in type:
fields.append(pyarrow.field(key, pyarrow.float64()))
elif "string" in type:
if format and level == 0:
# this is done to handle explicit datetime conversion
# which happens only at level 1 of a record
if format == "date":
fields.append(pyarrow.field(key, pyarrow.date64()))
elif format == "time":
fields.append(pyarrow.field(key, pyarrow.time64()))
else:
fields.append(
pyarrow.field(key, pyarrow.timestamp("s", tz="utc"))
)
else:
fields.append(pyarrow.field(key, pyarrow.string()))
elif "array" in type:
items = val.get("items")
if items:
item_type = get_schema_from_array(items=items, level=level)
if item_type == pyarrow.null():
self.logger.warn(
f"""
key: {key} is defined as list of null, while this would be
correct for list of all null but it is better to define
exact item types for the list, if not null."""
)
fields.append(pyarrow.field(key, pyarrow.list_(item_type)))
else:
self.logger.warn(
f"""
key: {key} is defined as list of null, while this would be
correct for list of all null but it is better to define
exact item types for the list, if not null."""
)
fields.append(pyarrow.field(key, pyarrow.list_(pyarrow.null())))
elif "object" in type:
prop = val.get("properties")
inner_fields = get_schema_from_object(
properties=prop, level=level + 1
)
if not inner_fields:
self.logger.warn(
f"""
key: {key} has no fields defined, this may cause
saving parquet failure as parquet doesn't support
empty/null complex types [array, structs] """
)
fields.append(pyarrow.field(key, pyarrow.struct(inner_fields)))
return fields

properties = self.stream_schema.get("properties")
parquet_schema = pyarrow.schema(get_schema_from_object(properties=properties))
self.parquet_schema = parquet_schema
return parquet_schema

def create_dataframe(self) -> Table:
"""Creates a pyarrow Table object from the record set."""
try:
fields = set()
for d in self.records:
fields = fields.union(d.keys())

format_parquet = self.format.get("format_parquet", None)
if format_parquet and format_parquet.get("validate", None) == True:
# NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html
# and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it.
schema = dict()
if format_parquet and format_parquet.get("get_schema_from_tap", False):
parquet_schema = self.parquet_schema if self.parquet_schema else self.create_schema()
fields = set([property.name for property in parquet_schema])
input = {
f: [
self.validate(schema, self.sanitize(f), row.get(f))
for row in self.records
]
for f in fields
}
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}

ret = Table.from_pydict(
mapping=input, schema=parquet_schema
)
else:
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}
fields = set()
for d in self.records:
fields = fields.union(d.keys())
if format_parquet and format_parquet.get("validate", None) == True:
# NOTE: we may could use schema to build a pyarrow schema https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html
# and pass that into from_pydict(). The schema is inferred by pyarrow, but we could always be explicit about it.
schema = dict()
input = {
f: [
self.validate(schema, self.sanitize(f), row.get(f))
for row in self.records
]
for f in fields
}
else:
input = {
f: [self.sanitize(row.get(f)) for row in self.records]
for f in fields
}
ret = Table.from_pydict(mapping=input)

ret = Table.from_pydict(mapping=input)
except Exception as e:
self.logger.info(self.records)
self.logger.error("Failed to create parquet dataframe.")
self.logger.error(e)
raise e
Expand All @@ -208,7 +334,13 @@ def _write(self, contents: str = None) -> None:
filesystem=self.file_system,
).write_table(df)
except Exception as e:
self.logger.error("Failed to write parquet file to S3.")
self.logger.error(e)
if type(e) is pyarrow.lib.ArrowNotImplementedError:
self.logger.error(
"""Failed to write parquet file to S3. Complex types [array, object] in schema cannot be left without type definition """
)
else:
self.logger.error("Failed to write parquet file to S3.")
raise e

def run(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions target_s3/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(
super().__init__(target, stream_name, schema, key_properties)
# what type of file are we building?
self.format_type = self.config.get("format", None).get("format_type", None)
self.schema = schema
if self.format_type:
if self.format_type not in FORMAT_TYPE:
raise Exception(
Expand All @@ -43,6 +44,7 @@ def process_batch(self, context: dict) -> None:
# add stream name to context
context["stream_name"] = self.stream_name
context["logger"] = self.logger
context["stream_schema"] = self.schema
# creates new object for each batch
format_type_client = format_type_factory(
FORMAT_TYPE[self.format_type], self.config, context
Expand Down
Loading

0 comments on commit 8310ed9

Please sign in to comment.