diff --git a/README.md b/README.md index 794e5c4f..2171b634 100644 --- a/README.md +++ b/README.md @@ -783,8 +783,12 @@ to the `extracopyoptions` setting. For example, with a bucket in the US East (Vi As a result, this use-case is not supported by this library. The only workaround is to use a new bucket in the same region as your Redshift cluster. ## Migration Guide +### 4.0 +- Amazon Redshift JDBC Drivers should be version 1.2.8.1005 or later +- `TIMESTAMPTZ` fields should be treated as a `TimestampType` normalized to UTC instead of a `StringType` -- Version 3.0 now requires `forward_spark_s3_credentials` to be explicitly set before Spark S3 +### 3.0 +- `forward_spark_s3_credentials` should be explicitly set before Spark S3 credentials will be forwarded to Redshift. Users who use the `aws_iam_role` or `temporary_aws_*` authentication mechanisms will be unaffected by this change. Users who relied on the old default behavior will now need to explicitly set `forward_spark_s3_credentials` to `true` to continue diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index 1a5301f9..4b422b8e 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -59,6 +59,7 @@ object SparkRedshiftBuild extends Build { credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"), scalacOptions ++= Seq("-target:jvm-1.6"), javacOptions ++= Seq("-source", "1.6", "-target", "1.6"), + resolvers ++= Seq("redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release"), libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5", "com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4", @@ -75,7 +76,7 @@ object SparkRedshiftBuild extends Build { // A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work. // For testing, we use an Amazon driver, which is available from // http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html - "com.amazon.redshift" % "jdbc4" % "1.1.7.1007" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.1.7.1007.jar", + "com.amazon.redshift" % "redshift-jdbc42" % "1.2.8.1005" % "it,test", // Although support for the postgres driver is lower priority than support for Amazon's // official Redshift driver, we still run basic tests with it. "postgresql" % "postgresql" % "8.3-606.jdbc4" % "test", diff --git a/src/it/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala b/src/it/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala index f635e528..3237e73b 100644 --- a/src/it/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala +++ b/src/it/scala/com/databricks/spark/redshift/IntegrationSuiteBase.scala @@ -156,7 +156,8 @@ trait IntegrationSuiteBase |testlong int8, |testshort int2, |teststring varchar(256), - |testtimestamp timestamp + |testtimestamp timestamp, + |testtimestamptz timestamptz |) """.stripMargin ) @@ -164,11 +165,11 @@ trait IntegrationSuiteBase conn.createStatement().executeUpdate( s""" |insert into $tableName values - |(null, null, null, null, null, null, null, null, null, null), - |(0, null, '2015-07-03', 0.0, -1.0, 4141214, 1239012341823719, null, 'f', '2015-07-03 00:00:00.000'), - |(0, false, null, -1234152.12312498, 100000.0, null, 1239012341823719, 24, '___|_123', null), - |(1, false, '2015-07-02', 0.0, 0.0, 42, 1239012341823719, -13, 'asdf', '2015-07-02 00:00:00.000'), - |(1, true, '2015-07-01', 1234152.12312498, 1.0, 42, 1239012341823719, 23, 'Unicode''s樂趣', '2015-07-01 00:00:00.001') + |(null, null, null, null, null, null, null, null, null, null, null), + |(0, null, '2015-07-03', 0.0, -1.0, 4141214, 1239012341823719, null, 'f', '2015-07-03 00:00:00.000', '2015-07-03 00:00:00.000'), + |(0, false, null, -1234152.12312498, 100000.0, null, 1239012341823719, 24, '___|_123', null, null), + |(1, false, '2015-07-02', 0.0, 0.0, 42, 1239012341823719, -13, 'asdf', '2015-07-02 00:00:00.000', '2015-07-02 00:00:00.000'), + |(1, true, '2015-07-01', 1234152.12312498, 1.0, 42, 1239012341823719, 23, 'Unicode''s樂趣', '2015-07-01 00:00:00.001', '2015-07-01 00:00:00.001') """.stripMargin ) // scalastyle:on diff --git a/src/main/scala/com/databricks/spark/redshift/Conversions.scala b/src/main/scala/com/databricks/spark/redshift/Conversions.scala index f638a393..93817640 100644 --- a/src/main/scala/com/databricks/spark/redshift/Conversions.scala +++ b/src/main/scala/com/databricks/spark/redshift/Conversions.scala @@ -17,14 +17,16 @@ package com.databricks.spark.redshift import java.sql.Timestamp -import java.text.{DecimalFormat, DecimalFormatSymbols, SimpleDateFormat} -import java.util.Locale +import java.text.{DecimalFormat, DecimalFormatSymbols, ParseException, SimpleDateFormat} +import java.util.{Locale, TimeZone} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types._ +import scala.util.Try + /** * Data type conversions for Redshift unloaded data */ @@ -104,7 +106,8 @@ private[redshift] object Conversions { case LongType => (data: String) => java.lang.Long.parseLong(data) case ShortType => (data: String) => java.lang.Short.parseShort(data) case StringType => (data: String) => data - case TimestampType => (data: String) => Timestamp.valueOf(data) + case TimestampType => (data: String) => + Try(Timestamp.valueOf(data)).getOrElse(parseTimestampTz(data)) case _ => (data: String) => data } } @@ -122,4 +125,25 @@ private[redshift] object Conversions { encoder.toRow(externalRow) } } + + private def parseTimestampTz(data: String): Timestamp = { + // Cannot just parse using SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSX") + // since "2014-02-28 22:00:00.10+00" + // should be equivalent to "2014-02-28 22:00:00.100" (100 millis) + // but returns "2014-02-28 22:00:00.010" (10 millis). + var timezoneOffsetIndex = data.indexOf("+", "yyyy-MM-dd HH:mm:ss".length) + val timezoneHourOffset = if (timezoneOffsetIndex >= 0) { + -data.substring(timezoneOffsetIndex + 1).toInt + } else { + timezoneOffsetIndex = data.indexOf("-", "yyyy-MM-dd HH:mm:ss".length) + if (timezoneOffsetIndex < 0) { + throw new ParseException(s"""Unparseable date. Timezone data not recognized: "$data"""", 0) + } + data.substring(timezoneOffsetIndex + 1).toInt + } + val currentTimezone = TimeZone.getDefault + val of = Timestamp.valueOf(data.substring(0, timezoneOffsetIndex)) + new Timestamp(of.getTime + timezoneHourOffset * 3600000 + + currentTimezone.getRawOffset + currentTimezone.getDSTSavings) + } } diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala index dc72dccf..ad8da502 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftJDBCWrapper.scala @@ -229,7 +229,9 @@ private[redshift] class JDBCWrapper { properties.setProperty("user", user) properties.setProperty("password", password) } - driver.connect(url, properties) + val connection = driver.connect(url, properties) + connection.setAutoCommit(false) + connection } /** diff --git a/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala b/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala index 5c10a802..042386c9 100644 --- a/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/ConversionsSuite.scala @@ -17,13 +17,12 @@ package com.databricks.spark.redshift import java.sql.Timestamp -import java.util.Locale - -import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.scalatest.FunSuite +import java.util.{Locale, TimeZone} import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ +import org.scalatest.FunSuite /** * Unit test for data type conversions @@ -43,17 +42,20 @@ class ConversionsSuite extends FunSuite { // scalastyle:on val timestampWithMillis = "2014-03-01 00:00:01.123" + val timestampTzWithMillis = "2014-02-28 21:00:01.123-11" val expectedDateMillis = TestUtils.toMillis(2015, 6, 1, 0, 0, 0) val expectedTimestampMillis = TestUtils.toMillis(2014, 2, 1, 0, 0, 1, 123) + val expectedTimestampTzMillis = TestUtils.toMillis(2014, 2, 1, 9, 0, 1, 123, + TimeZone.getTimeZone("UTC")) val convertedRow = convertRow( Array("1", "t", "2015-07-01", doubleMin, "1.0", "42", - longMax, "23", unicodeString, timestampWithMillis)) + longMax, "23", unicodeString, timestampWithMillis, timestampTzWithMillis)) val expectedRow = Row(1.asInstanceOf[Byte], true, new Timestamp(expectedDateMillis), Double.MinValue, 1.0f, 42, Long.MaxValue, 23.toShort, unicodeString, - new Timestamp(expectedTimestampMillis)) + new Timestamp(expectedTimestampMillis), new Timestamp(expectedTimestampTzMillis)) assert(convertedRow == expectedRow) } @@ -120,4 +122,39 @@ class ConversionsSuite extends FunSuite { assert(convertRow(Array("inf")) === Row(Double.PositiveInfinity)) assert(convertRow(Array("-inf")) === Row(Double.NegativeInfinity)) } + + test("timestamps with timezones have multiple formats") { + val schema = StructType(Seq(StructField("a", TimestampType))) + val convertRow = createRowConverter(schema) + val utc = TimeZone.getTimeZone("UTC") + Seq( + "2014-03-01 00:00:01+01" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1000, + timezone = utc), + // daylights savings + "2014-03-01 01:00:01.000+02" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1000, + timezone = utc), + // no daylights savings + "2014-04-01 01:00:01.000+01" -> TestUtils.toMillis(2014, 3, 1, 0, 0, 0, millis = 1000, + timezone = utc), + "2014-02-28 22:00:00.1-01" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100, + timezone = utc), + "2014-03-01 01:00:00.10+02" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100, + timezone = utc), + "2014-02-28 23:00:00.100-00" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100, + timezone = utc), + "2014-02-28 14:00:00.01-09" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 10, + timezone = utc), + "2014-03-01 09:00:00.010+10" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 10, + timezone = utc), + "2014-03-01 09:00:00.001+10" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1, + timezone = utc) + ).foreach { case (timestampString, expectedTime) => + withClue(s"timestamp string is '$timestampString'") { + val convertedRow = convertRow(Array(timestampString)) + val convertedTimestamp = convertedRow.get(0).asInstanceOf[Timestamp] + assert(convertedTimestamp === new Timestamp(expectedTime)) + } + } + } + } diff --git a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala index ac2a644a..3cc6ce1f 100644 --- a/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/RedshiftSourceSuite.scala @@ -146,17 +146,17 @@ class RedshiftSourceSuite // scalastyle:off unloadedData = """ - |1|t|2015-07-01|1234152.12312498|1.0|42|1239012341823719|23|Unicode's樂趣|2015-07-01 00:00:00.001 - |1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0 - |0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00 - |0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123| - |||||||||| + |1|t|2015-07-01|1234152.12312498|1.0|42|1239012341823719|23|Unicode's樂趣|2015-07-01 00:00:00.001|2015-07-01 00:00:00.001+00 + |1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0|2015-07-02 00:00:00.0+00 + |0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00|2015-07-03 00:00:00+00 + |0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123|| + ||||||||||| """.stripMargin.trim // scalastyle:on val expectedQuery = ( "UNLOAD \\('SELECT \"testbyte\", \"testbool\", \"testdate\", \"testdouble\"," + " \"testfloat\", \"testint\", \"testlong\", \"testshort\", \"teststring\", " + - "\"testtimestamp\" " + + "\"testtimestamp\", \"testtimestamptz\" " + "FROM \"PUBLIC\".\"test_table\" '\\) " + "TO '.*' " + "WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " + diff --git a/src/test/scala/com/databricks/spark/redshift/TestUtils.scala b/src/test/scala/com/databricks/spark/redshift/TestUtils.scala index ec48fdd9..31c4dc42 100644 --- a/src/test/scala/com/databricks/spark/redshift/TestUtils.scala +++ b/src/test/scala/com/databricks/spark/redshift/TestUtils.scala @@ -17,7 +17,7 @@ package com.databricks.spark.redshift import java.sql.{Date, Timestamp} -import java.util.{Calendar, Locale} +import java.util.{Calendar, Locale, TimeZone} import org.apache.spark.sql.Row import org.apache.spark.sql.types._ @@ -42,7 +42,8 @@ object TestUtils { StructField("testlong", LongType), StructField("testshort", ShortType), StructField("teststring", StringType), - StructField("testtimestamp", TimestampType))) + StructField("testtimestamp", TimestampType), + StructField("testtimestamptz", TimestampType))) } // scalastyle:off @@ -52,14 +53,17 @@ object TestUtils { val expectedData: Seq[Row] = Seq( Row(1.toByte, true, TestUtils.toDate(2015, 6, 1), 1234152.12312498, 1.0f, 42, 1239012341823719L, 23.toShort, "Unicode's樂趣", - TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1)), + TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1), TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1, + TimeZone.getTimeZone("UTC"))), Row(1.toByte, false, TestUtils.toDate(2015, 6, 2), 0.0, 0.0f, 42, - 1239012341823719L, -13.toShort, "asdf", TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0)), + 1239012341823719L, -13.toShort, "asdf", TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0), + TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0, TimeZone.getTimeZone("UTC"))), Row(0.toByte, null, TestUtils.toDate(2015, 6, 3), 0.0, -1.0f, 4141214, - 1239012341823719L, null, "f", TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0)), + 1239012341823719L, null, "f", TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0), + TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0, timezone = TimeZone.getTimeZone("UTC"))), Row(0.toByte, false, null, -1234152.12312498, 100000.0f, null, 1239012341823719L, 24.toShort, - "___|_123", null), - Row(List.fill(10)(null): _*)) + "___|_123", null, null), + Row(List.fill(11)(null): _*)) // scalastyle:on /** @@ -84,8 +88,9 @@ object TestUtils { hour: Int, minutes: Int, seconds: Int, - millis: Int = 0): Long = { - val calendar = Calendar.getInstance() + millis: Int = 0, + timezone: TimeZone = TimeZone.getDefault): Long = { + val calendar = Calendar.getInstance(timezone) calendar.set(year, zeroBasedMonth, date, hour, minutes, seconds) calendar.set(Calendar.MILLISECOND, millis) calendar.getTime.getTime @@ -101,8 +106,9 @@ object TestUtils { hour: Int, minutes: Int, seconds: Int, - millis: Int = 0): Timestamp = { - new Timestamp(toMillis(year, zeroBasedMonth, date, hour, minutes, seconds, millis)) + millis: Int = 0, + timezone: TimeZone = TimeZone.getDefault): Timestamp = { + new Timestamp(toMillis(year, zeroBasedMonth, date, hour, minutes, seconds, millis, timezone)) } /**