-
Notifications
You must be signed in to change notification settings - Fork 820
/
JoinAndRelationalize.scala
75 lines (58 loc) · 3.55 KB
/
JoinAndRelationalize.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
* Copyright 2016-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: MIT-0
*/
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import org.apache.spark.SparkContext
object JoinAndRelationalize {
def main(sysArgs: Array[String]): Unit = {
val sc: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(sc)
// catalog: database and table names
val dbName = "legislators"
val tblPersons = "persons_json"
val tblMembership = "memberships_json"
val tblOrganization = "organizations_json"
// output s3 and temp directories
val outputHistoryDir = "s3://glue-sample-target/output-dir/legislator_history"
val outputLgSingleDir = "s3://glue-sample-target/output-dir/legislator_single"
val outputLgPartitionedDir = "s3://glue-sample-target/output-dir/legislator_part"
val redshiftTmpDir = "s3://glue-sample-target/temp-dir/"
// Create dynamic frames from the source tables
val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val memberships: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblMembership).getDynamicFrame()
var orgs: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblOrganization).getDynamicFrame()
// Keep the fields we need and rename some.
orgs = orgs.dropFields(Seq("other_names", "identifiers")).renameField("id", "org_id").renameField("name", "org_name")
// Join the frames to create history
val personMemberships = persons.join(keys1 = Seq("id"), keys2 = Seq("person_id"), frame2 = memberships)
val lHistory = orgs.join(keys1 = Seq("org_id"), keys2 = Seq("organization_id"), frame2 = personMemberships)
.dropFields(Seq("person_id", "org_id"))
// ---- Write out the history ----
// Write out the dynamic frame into parquet in "legislator_history" directory
println("Writing to /legislator_history ...")
lHistory.printSchema()
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputHistoryDir)),
format = "parquet", transformationContext = "").writeDynamicFrame(lHistory)
// Write out a single file to directory "legislator_single"
val sHistory: DynamicFrame = lHistory.repartition(1)
println("Writing to /legislator_single ...")
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> outputLgSingleDir)),
format = "parquet", transformationContext = "").writeDynamicFrame(lHistory)
// Convert to data frame, write to directory "legislator_part", partitioned by (separate) Senate and House.
println("Writing to /legislator_part, partitioned by Senate and House ...")
glueContext.getSinkWithFormat(connectionType = "s3",
options = JsonOptions(Map("path" -> outputLgSingleDir, "partitionKeys" -> List("org_name"))),
format = "parquet", transformationContext = "").writeDynamicFrame(lHistory)
// ---- Write out to relational databases ----
println("Converting to flat tables ...")
val frames: Seq[DynamicFrame] = lHistory.relationalize(rootTableName = "hist_root",
stagingPath = redshiftTmpDir, JsonOptions.empty)
frames.foreach { frame =>
val options = JsonOptions(Map("dbtable" -> frame.getName(), "database" -> "dev"))
glueContext.getJDBCSink(catalogConnection = "test-redshift-3", options = options, redshiftTmpDir = redshiftTmpDir)
.writeDynamicFrame(frame)
}
}
}