-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Set Python script's column mappings from schema API
- Loading branch information
Showing
3 changed files
with
114 additions
and
125 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import sys | ||
from awsglue.transforms import * | ||
from awsglue.utils import getResolvedOptions | ||
from pyspark.context import SparkContext | ||
from awsglue.context import GlueContext | ||
from awsglue.job import Job | ||
import pyspark.sql.functions as func | ||
|
||
## @params: [TempDir, JOB_NAME] | ||
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) | ||
|
||
sc = SparkContext() | ||
glueContext = GlueContext(sc) | ||
spark = glueContext.spark_session | ||
job = Job(glueContext) | ||
job.init(args['JOB_NAME'], args) | ||
|
||
# Step 1: Read from the table in the data catalog | ||
## @type: DataSource | ||
## @args: [database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0"] | ||
## @return: datasource0 | ||
## @inputs: [] | ||
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0") | ||
|
||
# Step 2: Identify the latest partition in the data catalog. | ||
# This will correspond to the latest full export, stamped with the date. | ||
# Create a new DynamicFrame to read only that partition from the catalog. | ||
## @type: DataSource | ||
## @args: [database = "${catalog_database_name}", table_name = "signatures", push_down_predicate= f"(partition_0 == {latestpartition})", transformation_ctx = "datasource1"] | ||
## @return: datasource1 | ||
## @inputs: [] | ||
latestpartition = datasource0.toDF().agg(func.max("partition_0").alias("last_partition")).collect()[0]["last_partition"] | ||
datasource1 = glueContext.create_dynamic_frame.from_catalog( | ||
database = "${catalog_database_name}", | ||
table_name = "signatures", | ||
push_down_predicate = f"(partition_0 == {latestpartition})", | ||
transformation_ctx = "datasource1") | ||
|
||
# Step 3: Map the columns in the data catalog / S3 bucket to the columns we want in Redshift | ||
## @type: ApplyMapping | ||
## @args: [mapping = [dynamically generated from schema read from the API], transformation_ctx = "applymapping1"] | ||
## @return: applymapping1 | ||
## @inputs: [frame = datasource1] | ||
applymapping1 = ApplyMapping.apply( | ||
frame = datasource1, | ||
mappings = [ | ||
%{ for column in keys(signatures_table_columns) } | ||
( | ||
"${column}", | ||
%{ if length(regexall(join("|", unsupported_input_column_types), signatures_table_columns[column].sql_type)) > 0 } | ||
"string", | ||
%{ else } | ||
"${signatures_table_columns[column].sql_type}", | ||
%{ endif } | ||
"${column}", | ||
%{ if length(regexall(join("|", keys(unsupported_output_column_types)), signatures_table_columns[column].sql_type)) > 0 } | ||
"${[ for k,v in unsupported_output_column_types : "${v}" if length(regexall(k, signatures_table_columns[column].sql_type)) > 0 ][0]}", | ||
%{ else } | ||
"${signatures_table_columns[column].sql_type}" | ||
%{ endif } | ||
), | ||
%{ endfor } | ||
], | ||
transformation_ctx = "applymapping1") | ||
|
||
# Step 4: Deal with column types that aren't consistent | ||
## @type: ResolveChoice | ||
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"] | ||
## @return: resolvechoice2 | ||
## @inputs: [frame = applymapping1] | ||
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2") | ||
|
||
# Step 5: Write the transformed data into Redshift, replacing whatever data was in the redshift table previously | ||
## @type: DataSink | ||
## @args: [catalog_connection = "${redshift_connection_name}", connection_options = {"dbtable": "signatures", "database": "${redshift_database_name}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"] | ||
## @return: datasink4 | ||
## @inputs: [frame = resolvechoice2] | ||
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf( | ||
frame = resolvechoice2, | ||
catalog_connection = "${redshift_connection_name}", | ||
connection_options = {"preactions": "truncate table ${redshift_schema}.signatures;", | ||
"dbtable": "${redshift_schema}.signatures", | ||
"database": "${redshift_database_name}"}, | ||
redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4") | ||
|
||
job.commit() |
This file was deleted.
Oops, something went wrong.