Skip to content

Commit

Permalink
Merge pull request #15 from Vzzarr/add-partitioning-time-based
Browse files Browse the repository at this point in the history
Add partitioning time based
  • Loading branch information
crowemi authored May 8, 2023
2 parents 24b9d10 + 6870341 commit 9a963e8
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 9 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Build with the [Meltano Target SDK](https://sdk.meltano.com).
"stream_name_path_override": "StreamName",
"include_process_date": true,
"append_date_to_prefix": false,
"partition_name_enabled": false,
"append_date_to_prefix_grain": "day",
"append_date_to_filename": true,
"append_date_to_filename_grain": "microsecond",
Expand Down
1 change: 1 addition & 0 deletions sample-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"stream_name_path_override": "StreamName",
"include_process_date": true,
"append_date_to_prefix": false,
"partition_name_enabled": false,
"append_date_to_prefix_grain": "day",
"append_date_to_filename": true,
"append_date_to_filename_grain": "microsecond",
Expand Down
21 changes: 12 additions & 9 deletions target_s3/formats/format_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,25 @@ def create_key(self) -> str:
file_name = ""
if self.config["append_date_to_prefix"]:
grain = DATE_GRAIN[self.config["append_date_to_prefix_grain"].lower()]
folder_path += self.create_folder_structure(batch_start, grain)
partition_name_enabled = False
if self.config["partition_name_enabled"]:
partition_name_enabled = self.config["partition_name_enabled"]
folder_path += self.create_folder_structure(batch_start, grain, partition_name_enabled)
if self.config["append_date_to_filename"]:
grain = DATE_GRAIN[self.config["append_date_to_filename_grain"].lower()]
file_name += f"{self.create_file_structure(batch_start, grain)}"

return f"{folder_path}{file_name}"

def create_folder_structure(self, batch_start: datetime, grain: int) -> str:
def create_folder_structure(self, batch_start: datetime, grain: int, partition_name_enabled: bool) -> str:
ret = ""
ret += f"{batch_start.year}/" if grain <= 7 else ""
ret += f"{batch_start.month:02}/" if grain <= 6 else ""
ret += f"{batch_start.day:02}/" if grain <= 5 else ""
ret += f"{batch_start.hour:02}/" if grain <= 4 else ""
ret += f"{batch_start.minute:02}/" if grain <= 3 else ""
ret += f"{batch_start.second:02}/" if grain <= 4 else ""
ret += f"{batch_start.microsecond}/" if grain <= 1 else ""
ret += f"{'year=' if partition_name_enabled else ''}{batch_start.year}/" if grain <= 7 else ""
ret += f"{'month=' if partition_name_enabled else ''}{batch_start.month:02}/" if grain <= 6 else ""
ret += f"{'day=' if partition_name_enabled else ''}{batch_start.day:02}/" if grain <= 5 else ""
ret += f"{'hour=' if partition_name_enabled else ''}{batch_start.hour:02}/" if grain <= 4 else ""
ret += f"{'minute=' if partition_name_enabled else ''}{batch_start.minute:02}/" if grain <= 3 else ""
ret += f"{'second=' if partition_name_enabled else ''}{batch_start.second:02}/" if grain <= 4 else ""
ret += f"{'microsecond=' if partition_name_enabled else ''}{batch_start.microsecond}/" if grain <= 1 else ""
return ret

def create_file_structure(self, batch_start: datetime, grain: int) -> str:
Expand Down
6 changes: 6 additions & 0 deletions target_s3/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ class Targets3(Target):
description="A flag to append the date to the key prefix.",
default=True,
),
th.Property(
"partition_name_enabled",
th.BooleanType,
description="A flag (only works if append_date_to_prefix is enabled) to have partitioning name formatted e.g. 'year=2023/month=01/day=01'.",
default=False,
),
th.Property(
"append_date_to_prefix_grain",
th.StringType,
Expand Down

0 comments on commit 9a963e8

Please sign in to comment.