diff --git a/glue_job.tf b/glue_job.tf index ed1f1d7..c24bf8e 100644 --- a/glue_job.tf +++ b/glue_job.tf @@ -4,6 +4,8 @@ resource "aws_glue_catalog_database" "catalog_db" { locals { signatures_s3_path = "s3://agra-data-exports-${var.controlshift_environment}/${var.controlshift_organization_slug}/full/signatures" + signatures_table_index = index(local.parsed_bulk_data_schemas.tables.*.table.name, "signatures") + signatures_table_columns = local.parsed_bulk_data_schemas.tables[local.signatures_table_index].table.columns } resource "aws_glue_crawler" "signatures_crawler" { @@ -35,14 +37,35 @@ resource "aws_s3_bucket_server_side_encryption_configuration" "glue_resources" { } } -data "template_file" "signatures_script" { - template = file("${path.module}/templates/signatures_job.py.tpl") - vars = { +locals { + # Unsupported column types read from CSV files: all of these will be read as 'string' + unsupported_input_column_types = [ + "boolean", + "character varying.*", + "decimal.*", + "hstore", + "jsonb", + "numeric.*", + "timestamp" + ] + + # Unsupported columnt types for Redshift: these will be replaced by the mapped type + unsupported_output_column_types = { + "hstore" = "string" + "jsonb" = "string" + "numeric\\(3,2\\)" = "decimal(3,2)" + "timestamp without time zone" = "timestamp" + } + + signatures_script = templatefile("${path.module}/templates/signatures_job.py.tftpl", { catalog_database_name = aws_glue_catalog_database.catalog_db.name + unsupported_input_column_types = local.unsupported_input_column_types + unsupported_output_column_types = local.unsupported_output_column_types redshift_database_name = var.redshift_database_name redshift_schema = var.redshift_schema redshift_connection_name = aws_glue_connection.redshift_connection.name - } + signatures_table_columns = local.signatures_table_columns + }) } resource "aws_s3_object" "signatures_script" { @@ -50,7 +73,7 @@ resource "aws_s3_object" "signatures_script" { key = "${var.controlshift_environment}/signatures_job.py" acl = "private" - content = data.template_file.signatures_script.rendered + content = local.signatures_script } resource "aws_iam_role" "glue_service_role" { diff --git a/templates/signatures_job.py.tftpl b/templates/signatures_job.py.tftpl new file mode 100644 index 0000000..ff6cdea --- /dev/null +++ b/templates/signatures_job.py.tftpl @@ -0,0 +1,86 @@ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +import pyspark.sql.functions as func + +## @params: [TempDir, JOB_NAME] +args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) + +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session +job = Job(glueContext) +job.init(args['JOB_NAME'], args) + +# Step 1: Read from the table in the data catalog +## @type: DataSource +## @args: [database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0"] +## @return: datasource0 +## @inputs: [] +datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0") + +# Step 2: Identify the latest partition in the data catalog. +# This will correspond to the latest full export, stamped with the date. +# Create a new DynamicFrame to read only that partition from the catalog. +## @type: DataSource +## @args: [database = "${catalog_database_name}", table_name = "signatures", push_down_predicate= f"(partition_0 == {latestpartition})", transformation_ctx = "datasource1"] +## @return: datasource1 +## @inputs: [] +latestpartition = datasource0.toDF().agg(func.max("partition_0").alias("last_partition")).collect()[0]["last_partition"] +datasource1 = glueContext.create_dynamic_frame.from_catalog( + database = "${catalog_database_name}", + table_name = "signatures", + push_down_predicate = f"(partition_0 == {latestpartition})", + transformation_ctx = "datasource1") + +# Step 3: Map the columns in the data catalog / S3 bucket to the columns we want in Redshift +## @type: ApplyMapping +## @args: [mapping = [dynamically generated from schema read from the API], transformation_ctx = "applymapping1"] +## @return: applymapping1 +## @inputs: [frame = datasource1] +applymapping1 = ApplyMapping.apply( + frame = datasource1, + mappings = [ + %{ for column in keys(signatures_table_columns) } + ( + "${column}", + %{ if length(regexall(join("|", unsupported_input_column_types), signatures_table_columns[column].sql_type)) > 0 } + "string", + %{ else } + "${signatures_table_columns[column].sql_type}", + %{ endif } + "${column}", + %{ if length(regexall(join("|", keys(unsupported_output_column_types)), signatures_table_columns[column].sql_type)) > 0 } + "${[ for k,v in unsupported_output_column_types : "${v}" if length(regexall(k, signatures_table_columns[column].sql_type)) > 0 ][0]}", + %{ else } + "${signatures_table_columns[column].sql_type}" + %{ endif } + ), + %{ endfor } + ], + transformation_ctx = "applymapping1") + +# Step 4: Deal with column types that aren't consistent +## @type: ResolveChoice +## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"] +## @return: resolvechoice2 +## @inputs: [frame = applymapping1] +resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2") + +# Step 5: Write the transformed data into Redshift, replacing whatever data was in the redshift table previously +## @type: DataSink +## @args: [catalog_connection = "${redshift_connection_name}", connection_options = {"dbtable": "signatures", "database": "${redshift_database_name}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"] +## @return: datasink4 +## @inputs: [frame = resolvechoice2] +datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf( + frame = resolvechoice2, + catalog_connection = "${redshift_connection_name}", + connection_options = {"preactions": "truncate table ${redshift_schema}.signatures;", + "dbtable": "${redshift_schema}.signatures", + "database": "${redshift_database_name}"}, + redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4") + +job.commit() \ No newline at end of file diff --git a/templates/signatures_job.py.tpl b/templates/signatures_job.py.tpl deleted file mode 100644 index ff573e1..0000000 --- a/templates/signatures_job.py.tpl +++ /dev/null @@ -1,120 +0,0 @@ -import sys -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job -import pyspark.sql.functions as func - -## @params: [TempDir, JOB_NAME] -args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME']) - -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session -job = Job(glueContext) -job.init(args['JOB_NAME'], args) - -# Step 1: Read from the table in the data catalog -## @type: DataSource -## @args: [database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0"] -## @return: datasource0 -## @inputs: [] -datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "${catalog_database_name}", table_name = "signatures", transformation_ctx = "datasource0") - -# Step 2: Identify the latest partition in the data catalog. -# This will correspond to the latest full export, stamped with the date. -# Create a new DynamicFrame to read only that partition from the catalog. -## @type: DataSource -## @args: [database = "${catalog_database_name}", table_name = "signatures", push_down_predicate= f"(partition_0 == {latestpartition})", transformation_ctx = "datasource1"] -## @return: datasource1 -## @inputs: [] -latestpartition = datasource0.toDF().agg(func.max("partition_0").alias("last_partition")).collect()[0]["last_partition"] -datasource1 = glueContext.create_dynamic_frame.from_catalog( - database = "${catalog_database_name}", - table_name = "signatures", - push_down_predicate = f"(partition_0 == {latestpartition})", - transformation_ctx = "datasource1") - -# Step 3: Map the columns in the data catalog / S3 bucket to the columns we want in Redshift -## @type: ApplyMapping -## @args: [mapping = [("id", "bigint", "id", "long"), ("petition_id", "bigint", "petition_id", "long"), ("email", "string", "email", "string"), ("first_name", "string", "first_name", "string"), ("last_name", "string", "last_name", "string"), ("phone_number", "string", "phone_number", "string"), ("postcode", "string", "postcode", "string"), ("created_at", "string", "created_at", "timestamp"), ("join_organisation", "string", "join_organisation", "boolean"), ("deleted_at", "string", "deleted_at", "timestamp"), ("unsubscribe_at", "string", "unsubscribe_at", "timestamp"), ("external_constituent_id", "string", "external_constituent_id", "string"), ("member_id", "bigint", "member_id", "long"), ("additional_fields", "string", "additional_fields", "string"), ("cached_organisation_slug", "string", "cached_organisation_slug", "string"), ("source", "string", "source", "string"), ("join_group", "string", "join_group", "boolean"), ("external_id", "bigint", "external_id", "long"), ("new_member", "string", "new_member", "boolean"), ("external_action_id", "string", "external_action_id", "string"), ("locale", "string", "locale", "string"), ("bucket", "string", "bucket", "string"), ("country", "string", "country", "string"), ("updated_at", "string", "updated_at", "timestamp"), ("user_ip", "string", "user_ip", "string"), ("confirmation_token", "string", "confirmation_token", "string"), ("confirmed_at", "string", "confirmed_at", "timestamp"), ("confirmation_sent_at", "string", "confirmation_sent_at", "timestamp"), ("last_signed_at", "string", "last_signed_at", "timestamp"), ("join_list_suppressed", "string", "join_list_suppressed", "boolean"), ("old_daisy_chain_used", "string", "old_daisy_chain_used", "string"), ("from_embed", "string", "from_embed", "boolean"), ("user_agent", "string", "user_agent", "string"), ("confirmed_reason", "string", "confirmed_reason", "string"), ("synced_to_crm_at", "string", "synced_to_crm_at", "timestamp"), ("daisy_chain_experiment_slug", "string", "daisy_chain_experiment_slug", "string"), ("eu_data_processing_consent", "string", "eu_data_processing_consent", "boolean"), ("from_one_click", "string", "from_one_click", "boolean"), ("consent_content_version_id", "bigint", "consent_content_version_id", "bigint"), ("daisy_chain_id_used", "bigint", "daisy_chain_id_used", "bigint"), ("email_opt_in_type_id", "bigint", "email_opt_in_type_id", "long"), ("facebook_id", "string", "facebook_id", "string"), ("utm_params", "string", "utm_params", "string"), ("postcode_id", "bigint", "postcode_id", "long"), ("referring_share_click_id", "bigint", "referring_share_click_id", "int"), ("opt_in_sms", "string", "opt_in_sms", "boolean"), ("external_ids", "string", "external_ids", "string")], transformation_ctx = "applymapping1"] -## @return: applymapping1 -## @inputs: [frame = datasource1] -applymapping1 = ApplyMapping.apply( - frame = datasource1, - mappings = [ - ("id", "bigint", "id", "int"), - ("petition_id", "bigint", "petition_id", "int"), - ("email", "string", "email", "string"), - ("first_name", "string", "first_name", "string"), - ("last_name", "string", "last_name", "string"), - ("phone_number", "string", "phone_number", "string"), - ("postcode", "string", "postcode", "string"), - ("created_at", "string", "created_at", "timestamp"), - ("join_organisation", "string", "join_organisation", "boolean"), - ("deleted_at", "string", "deleted_at", "timestamp"), - ("unsubscribe_at", "string", "unsubscribe_at", "timestamp"), - ("external_constituent_id", "string", "external_constituent_id", "string"), - ("member_id", "bigint", "member_id", "int"), - ("additional_fields", "string", "additional_fields", "string"), - ("cached_organisation_slug", "string", "cached_organisation_slug", "string"), - ("source", "string", "source", "string"), - ("join_partnership", "string", "join_partnership", "boolean"), - ("external_id", "bigint", "external_id", "string"), - ("new_member", "string", "new_member", "boolean"), - ("external_action_id", "string", "external_action_id", "string"), - ("locale", "string", "locale", "string"), - ("bucket", "string", "bucket", "string"), - ("country", "string", "country", "string"), - ("updated_at", "string", "updated_at", "timestamp"), - ("user_ip", "string", "user_ip", "string"), - ("confirmation_token", "string", "confirmation_token", "string"), - ("confirmed_at", "string", "confirmed_at", "timestamp"), - ("confirmation_sent_at", "string", "confirmation_sent_at", "timestamp"), - ("last_signed_at", "string", "last_signed_at", "timestamp"), - ("join_list_suppressed", "string", "join_list_suppressed", "boolean"), - ("old_daisy_chain_used", "string", "old_daisy_chain_used", "string"), - ("from_embed", "string", "from_embed", "boolean"), - ("user_agent", "string", "user_agent", "string"), - ("confirmed_reason", "string", "confirmed_reason", "string"), - ("synced_to_crm_at", "string", "synced_to_crm_at", "timestamp"), - ("daisy_chain_experiment_slug", "string", "daisy_chain_experiment_slug", "string"), - ("eu_data_processing_consent", "string", "eu_data_processing_consent", "boolean"), - ("from_one_click", "string", "from_one_click", "boolean"), - ("consent_content_version_id", "bigint", "consent_content_version_id", "bigint"), - ("daisy_chain_id_used", "bigint", "daisy_chain_id_used", "bigint"), - ("email_opt_in_type_id", "bigint", "email_opt_in_type_id", "bigint"), - ("facebook_id", "string", "facebook_id", "string"), - ("utm_params", "string", "utm_params", "string"), - ("postcode_id", "bigint", "postcode_id", "bigint"), - ("referring_share_click_id", "bigint", "referring_share_click_id", "int"), - ("opt_in_sms", "string", "opt_in_sms", "boolean"), - ("sms_opt_in_type_id", "string", "sms_opt_in_type_id", "bigint"), - ("recaptcha_score", "string", "recaptcha_score", "decimal(3,2)"), - ("new_mobile_subscriber", "string", "new_mobile_subscriber", "boolean"), - ("external_ids", "string", "external_ids", "string") - ], - transformation_ctx = "applymapping1") - -# Step 4: Deal with column types that aren't consistent -## @type: ResolveChoice -## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"] -## @return: resolvechoice2 -## @inputs: [frame = applymapping1] -resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2") - -# Step 5: Write the transformed data into Redshift, replacing whatever data was in the redshift table previously -## @type: DataSink -## @args: [catalog_connection = "${redshift_connection_name}", connection_options = {"dbtable": "signatures", "database": "${redshift_database_name}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"] -## @return: datasink4 -## @inputs: [frame = resolvechoice2] -datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf( - frame = resolvechoice2, - catalog_connection = "${redshift_connection_name}", - connection_options = {"preactions": "truncate table ${redshift_schema}.signatures;", - "dbtable": "${redshift_schema}.signatures", - "database": "${redshift_database_name}"}, - redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4") - -job.commit() \ No newline at end of file