From 0f443a40d4717bfcd27bf8dd21cb4fa102774cf3 Mon Sep 17 00:00:00 2001 From: Ameen Tayyebi Date: Mon, 25 Sep 2017 13:29:06 -0400 Subject: [PATCH 1/4] Changing from CREDENTIALS to specific keywords Changed usage of the CREDENTIALS command for Redshift to specific keywords. In other words, the commands produced will no longer be of the form: CREDENTIALS('access_key=X&secrety_key=Y') and will instead be of the form access_key = 'X' secret_key = 'Y' This is needed because for loading encrypted payloads into Redshift using client side encryption, one needs to place symmetric_master_key as an argument on the copy command, however, it is also an options within the CREDENTIALS command, so if a query to Redshift includes a CREDENTIALS clause and also symmetric_master_key, then Redshift will report this error: com.amazon.support.exceptions.ErrorException: Amazon Invalid operation: conflicting or redundant options; --- .../spark/redshift/AWSCredentialsUtils.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala index 47ad0b06..93d06f81 100644 --- a/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala +++ b/src/main/scala/com/databricks/spark/redshift/AWSCredentialsUtils.scala @@ -28,6 +28,7 @@ private[redshift] object AWSCredentialsUtils { /** * Generates a credentials string for use in Redshift COPY and UNLOAD statements. * Favors a configured `aws_iam_role` if available in the parameters. + * http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html */ def getRedshiftCredentialsString( params: MergedParameters, @@ -36,15 +37,14 @@ private[redshift] object AWSCredentialsUtils { def awsCredsToString(credentials: AWSCredentials): String = { credentials match { case creds: AWSSessionCredentials => - s"aws_access_key_id=${creds.getAWSAccessKeyId};" + - s"aws_secret_access_key=${creds.getAWSSecretKey};token=${creds.getSessionToken}" + s"access_key_id '${creds.getAWSAccessKeyId}' secret_access_key '${creds.getAWSSecretKey}' " + + s"session_token '${creds.getSessionToken}'" case creds => - s"aws_access_key_id=${creds.getAWSAccessKeyId};" + - s"aws_secret_access_key=${creds.getAWSSecretKey}" + s"access_key_id '${creds.getAWSAccessKeyId}' secret_access_key '${creds.getAWSSecretKey}'" } } if (params.iamRole.isDefined) { - s"aws_iam_role=${params.iamRole.get}" + s"iam_role '${params.iamRole.get}'" } else if (params.temporaryAWSCredentials.isDefined) { awsCredsToString(params.temporaryAWSCredentials.get.getCredentials) } else if (params.forwardSparkS3Credentials) { From 9190d29c0afb0cc5b44fc9d6b2e8ad111fca3fde Mon Sep 17 00:00:00 2001 From: Ameen Tayyebi Date: Mon, 25 Sep 2017 13:32:48 -0400 Subject: [PATCH 2/4] Disabling EMRFS CSE during Manifest Generation When data is encrypted in S3 and a COPY command is invoked, it's expected that the manifest is not encrypted and is in plain-text form. Encryption on EMR through EMRFS is controlled by a Hadoop option (fs.enable.cse). Once set, all data that goes through the file system will be encrypted. This commit adds an exception around generation of manifest files so that even if the encryption option is set, the manifest file created on S3 is not encrypted. This enables Redshift to read the manifest and ingest the data even for cases where data is encrypted on the client side with a symmetric encryption key. --- .../spark/redshift/RedshiftWriter.scala | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala index 8383231d..7e23e19e 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftWriter.scala @@ -22,14 +22,13 @@ import java.sql.{Connection, Date, SQLException, Timestamp} import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.services.s3.AmazonS3Client import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.TaskContext import org.slf4j.LoggerFactory + import scala.collection.mutable import scala.util.control.NonFatal - import com.databricks.spark.redshift.Parameters.MergedParameters - +import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} import org.apache.spark.sql.types._ @@ -63,6 +62,9 @@ private[redshift] class RedshiftWriter( private val log = LoggerFactory.getLogger(getClass) + // http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-cluster-configuration-object-encryption.html + private val EMRFS_CLIENT_SIDE_ENCRYPTION_KEY :String = "fs.s3.cse.enabled" + /** * Generate CREATE TABLE statement for Redshift */ @@ -98,7 +100,7 @@ private[redshift] class RedshiftWriter( case "AVRO" => "AVRO 'auto'" case csv if csv == "CSV" || csv == "CSV GZIP" => csv + s" NULL AS '${params.nullString}'" } - s"COPY ${params.table.get} FROM '$fixedUrl' CREDENTIALS '$credsString' FORMAT AS " + + s"COPY ${params.table.get} FROM '$fixedUrl' $credsString FORMAT AS " + s"${format} manifest ${params.extraCopyOptions}" } @@ -295,19 +297,36 @@ private[redshift] class RedshiftWriter( }).save(tempDir) if (nonEmptyPartitions.value.isEmpty) { + log.info("Did not write any records. Not creating a manifest file.") None } else { // See https://docs.aws.amazon.com/redshift/latest/dg/loading-data-files-using-manifest.html // for a description of the manifest file format. The URLs in this manifest must be absolute // and complete. + // The manifest file must be stored in plain text, even if the contents of the files being loaded into + // Redshift are encrypted. To make this work, we create a separate FileSystem without any encryption options + // set + + // Clone existing configuration set by the user + val plainTextHadoopConfig = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + + // Make sure that we don't get a cached value of the file system, with client side encryption configuration + // set to true. FileSystem class will cache implementations of file system based on scheme and authority + // of the path of the file, so even if you pass a completely different configuration to FileSystem, you may + // still get a cached value of a FileSystem with different properties + plainTextHadoopConfig.set("fs.s3.impl.disable.cache", "true") + + // Turn off any CSE if it's set + plainTextHadoopConfig.set(EMRFS_CLIENT_SIDE_ENCRYPTION_KEY, "false") + // The partition filenames are of the form part-r-XXXXX-UUID.fileExtension. - val fs = FileSystem.get(URI.create(tempDir), sqlContext.sparkContext.hadoopConfiguration) + val fs = FileSystem.get(URI.create(tempDir), plainTextHadoopConfig) val partitionIdRegex = "^part-(?:r-)?(\\d+)[^\\d+].*$".r val filesToLoad: Seq[String] = { val nonEmptyPartitionIds = nonEmptyPartitions.value.toSet fs.listStatus(new Path(tempDir)).map(_.getPath.getName).collect { - case file @ partitionIdRegex(id) if nonEmptyPartitionIds.contains(id.toInt) => file + case file@partitionIdRegex(id) if nonEmptyPartitionIds.contains(id.toInt) => file } } // It's possible that tempDir contains AWS access keys. We shouldn't save those credentials to @@ -325,6 +344,7 @@ private[redshift] class RedshiftWriter( } finally { fsDataOut.close() } + Some(manifestPath) } } From 1bf27ac3c8b3b3fec957af5207434808e0ddfba5 Mon Sep 17 00:00:00 2001 From: Ameen Tayyebi Date: Tue, 28 Nov 2017 10:56:01 -0500 Subject: [PATCH 3/4] Switching UNLOAD to New Authorization Switching the UNLOAD statement to no longer use the WITH CREDENTIALS method and instead rely on explicitly passing the role, access key, secret key, session token, etc. Generally speaking this is a more flexible way of passing credentials, though for UNLOAD it doesn't make much difference. This change is pursued to achieve consistency with the COPY command. In COPY command, this change is necessary to enable copy of client-side encrypted data with Redshift. http://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html --- .../scala/com/databricks/spark/redshift/RedshiftRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala index 31dc11b2..3d8c3f6b 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala @@ -193,7 +193,7 @@ private[redshift] case class RedshiftRelation( // the credentials passed via `credsString`. val fixedUrl = Utils.fixS3Url(Utils.removeCredentialsFromURI(new URI(tempDir)).toString) - s"UNLOAD ('$query') TO '$fixedUrl' WITH CREDENTIALS '$credsString' ESCAPE MANIFEST" + s"UNLOAD ('$query') TO '$fixedUrl' $credsString ESCAPE MANIFEST" } private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { From ed3c36368c92fede6882788b349c746e276db160 Mon Sep 17 00:00:00 2001 From: Ameen Tayyebi Date: Tue, 28 Nov 2017 11:02:38 -0500 Subject: [PATCH 4/4] Updaning README and tests --- README.md | 4 ++++ .../spark/redshift/AWSCredentialsUtilsSuite.scala | 6 +++--- .../com/databricks/spark/redshift/RedshiftSourceSuite.scala | 6 +++--- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 2a299819..97de7036 100644 --- a/README.md +++ b/README.md @@ -428,7 +428,11 @@ The following describes how each connection can be authenticated: To use this capability, you should configure your Hadoop S3 FileSystem to use encryption by setting the appropriate configuration properties (which will vary depending on whether you are using `s3a`, `s3n`, EMRFS, etc.). + Note that the `MANIFEST` file (a list of all files written) will not be encrypted. + + To use CSE with S3 during COPY, generate your own encryption key, and pass it to the writer using + ```.option("extracopyoptions", s"encrypted master_symmetric_key '$encodedSymmetricKey'")``` ### Parameters diff --git a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala index 0315d3a1..f380fa66 100644 --- a/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/AWSCredentialsUtilsSuite.scala @@ -42,7 +42,7 @@ class AWSCredentialsUtilsSuite extends FunSuite { val params = Parameters.mergeParameters(baseParams ++ Map("forward_spark_s3_credentials" -> "true")) assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, creds) === - "aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY/WITH/SLASHES") + "access_key_id 'ACCESSKEYID' secret_access_key 'SECRET/KEY/WITH/SLASHES'") } test("credentialsString with STS temporary keys") { @@ -51,7 +51,7 @@ class AWSCredentialsUtilsSuite extends FunSuite { "temporary_aws_secret_access_key" -> "SECRET/KEY", "temporary_aws_session_token" -> "SESSION/Token")) assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) === - "aws_access_key_id=ACCESSKEYID;aws_secret_access_key=SECRET/KEY;token=SESSION/Token") + "access_key_id 'ACCESSKEYID' secret_access_key 'SECRET/KEY' session_token 'SESSION/Token'") } test("Configured IAM roles should take precedence") { @@ -59,7 +59,7 @@ class AWSCredentialsUtilsSuite extends FunSuite { val iamRole = "arn:aws:iam::123456789000:role/redshift_iam_role" val params = Parameters.mergeParameters(baseParams ++ Map("aws_iam_role" -> iamRole)) assert(AWSCredentialsUtils.getRedshiftCredentialsString(params, null) === - s"aws_iam_role=$iamRole") + s"iam_role '$iamRole'") } test("AWSCredentials.load() STS temporary keys should take precedence") { diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index ac2a644a..24526cd0 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -159,7 +159,7 @@ class RedshiftSourceSuite "\"testtimestamp\" " + "FROM \"PUBLIC\".\"test_table\" '\\) " + "TO '.*' " + - "WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " + + "access_key_id 'test1' secret_access_key 'test2' " + "ESCAPE").r val mockRedshift = new MockRedshift( defaultParams("url"), @@ -230,7 +230,7 @@ class RedshiftSourceSuite val expectedQuery = ( "UNLOAD \\('SELECT \"testbyte\", \"testbool\" FROM \"PUBLIC\".\"test_table\" '\\) " + "TO '.*' " + - "WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " + + "access_key_id 'test1' secret_access_key 'test2' " + "ESCAPE").r val mockRedshift = new MockRedshift(defaultParams("url"), Map("test_table" -> TestUtils.testSchema)) @@ -270,7 +270,7 @@ class RedshiftSourceSuite "AND \"testfloat\" >= 1.0 " + "AND \"testint\" <= 43'\\) " + "TO '.*' " + - "WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " + + "access_key_id 'test1' secret_access_key 'test2' " + "ESCAPE").r // scalastyle:on val mockRedshift = new MockRedshift(