-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMP-97070 Move Spark metadata upon transfer completion #8
base: AMP-96980
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import re | ||
import time | ||
|
||
import boto3 | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql import DataFrame | ||
from pyspark.sql.functions import col | ||
|
@@ -213,6 +214,55 @@ def get_partition_count(event_count: int, max_event_count_per_output_file: int) | |
return max(1, math.ceil(event_count / max_event_count_per_output_file)) | ||
|
||
|
||
def replace_double_slashes_with_single_slash(string: str) -> str: | ||
while '//' in string: | ||
string = string.replace('//', '/') | ||
return string | ||
|
||
|
||
def move_spark_metadata_to_separate_s3_folder(s3_client: boto3.client, s3_uri_with_spark_metadata: str): | ||
""" | ||
Lists all files in the s3_uri_with_spark_metadata directory and moves all spark metadata files (files without | ||
'/part-' in the name) to a separate subdirectory under s3_uri_with_spark_metadata. | ||
|
||
NOTE: Expects spark metadata files to be in the same directory as the data files and available at the time of the | ||
function call | ||
|
||
:param s3_client: boto3 client for s3 | ||
:param s3_uri_with_spark_metadata: s3 URI with spark metadata files | ||
""" | ||
print(f'Moving spark metadata files to a separate subdirectory in {s3_uri_with_spark_metadata}') | ||
|
||
if '://' not in s3_uri_with_spark_metadata: | ||
raise ValueError(f'Invalid s3 URI: {s3_uri_with_spark_metadata}. Expected to contain "://".') | ||
bucket, prefix = s3_uri_with_spark_metadata.split('://')[1].split('/', 1) | ||
bucket = replace_double_slashes_with_single_slash(bucket) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious where double slashes are coming from? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for bucket, it shouldn't It's just a sanitization in case some unnormalized input for s3uri is provided (e.g. |
||
prefix = replace_double_slashes_with_single_slash(prefix) | ||
print(f'Identified bucket: {bucket}, prefix: {prefix}') | ||
|
||
# List all files in the s3_path directory | ||
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this return folders as well? If only files are returned, then we are good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only files. Same for listing objects in our Java repo using Amplitude's S3 wrapper The example job mentioned in the description has the logs of what was discovered (note that |
||
print(f'Found {len(response["Contents"])} files in bucket {bucket} with prefix {prefix}:\n{response["Contents"]}') | ||
|
||
for file in response['Contents']: | ||
key = file['Key'] | ||
if '/part-' in key: | ||
# Skip data files | ||
continue | ||
|
||
# Construct the destination file name | ||
new_key = replace_double_slashes_with_single_slash(key.replace(prefix, f'{prefix}/spark_metadata/')) | ||
copy_source = replace_double_slashes_with_single_slash(f'{bucket}/{key}') | ||
print(f'Moving copy_source: {copy_source} to new_key: {new_key} in bucket: {bucket}') | ||
|
||
# Copy the file to the new location | ||
s3_client.copy_object(Bucket=bucket, CopySource=copy_source, Key=new_key) | ||
# Delete the original file | ||
s3_client.delete_object(Bucket=bucket, Key=key) | ||
|
||
print(f'Successfully moved {key} to {new_key}') | ||
|
||
|
||
def export_meta_data(event_count: int, partition_count: int): | ||
meta_data: list = [{'event_count': event_count, 'partition_count': partition_count}] | ||
spark.createDataFrame(meta_data).write.mode("overwrite").json(args.s3_path + "/meta") | ||
|
@@ -311,6 +361,14 @@ def export_meta_data(event_count: int, partition_count: int): | |
export_data = export_data.repartition(partition_count) | ||
# export data | ||
export_data.write.mode("overwrite").json(args.s3_path) | ||
# move spark metadata files to a separate subdirectory | ||
# s3 client initialized after the data is exported to avoid missing files in the destination s3 path due to | ||
# eventual consistency of s3 client | ||
s3_client = boto3.client('s3', | ||
aws_access_key_id=aws_access_key, | ||
aws_secret_access_key=aws_secret_key, | ||
aws_session_token=aws_session_token) | ||
move_spark_metadata_to_separate_s3_folder(s3_client, args.s3_path) | ||
print("Unloaded {event_count} events.".format(event_count=event_count)) | ||
else: | ||
print("No events were exported.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding tests!