Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for TIMESTAMPTZ as a Timestamp field to support the Am… #367

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,12 @@ to the `extracopyoptions` setting. For example, with a bucket in the US East (Vi
As a result, this use-case is not supported by this library. The only workaround is to use a new bucket in the same region as your Redshift cluster.

## Migration Guide
### 4.0
- Amazon Redshift JDBC Drivers should be version 1.2.8.1005 or later
- `TIMESTAMPTZ` fields should be treated as a `TimestampType` normalized to UTC instead of a `StringType`

- Version 3.0 now requires `forward_spark_s3_credentials` to be explicitly set before Spark S3
### 3.0
- `forward_spark_s3_credentials` should be explicitly set before Spark S3
credentials will be forwarded to Redshift. Users who use the `aws_iam_role` or `temporary_aws_*`
authentication mechanisms will be unaffected by this change. Users who relied on the old default
behavior will now need to explicitly set `forward_spark_s3_credentials` to `true` to continue
Expand Down
3 changes: 2 additions & 1 deletion project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ object SparkRedshiftBuild extends Build {
credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),
scalacOptions ++= Seq("-target:jvm-1.6"),
javacOptions ++= Seq("-source", "1.6", "-target", "1.6"),
resolvers ++= Seq("redshift" at "http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release"),
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.5",
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4",
Expand All @@ -75,7 +76,7 @@ object SparkRedshiftBuild extends Build {
// A Redshift-compatible JDBC driver must be present on the classpath for spark-redshift to work.
// For testing, we use an Amazon driver, which is available from
// http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html
"com.amazon.redshift" % "jdbc4" % "1.1.7.1007" % "test" from "https://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC4-1.1.7.1007.jar",
"com.amazon.redshift" % "redshift-jdbc42" % "1.2.8.1005" % "it,test",
// Although support for the postgres driver is lower priority than support for Amazon's
// official Redshift driver, we still run basic tests with it.
"postgresql" % "postgresql" % "8.3-606.jdbc4" % "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,20 @@ trait IntegrationSuiteBase
|testlong int8,
|testshort int2,
|teststring varchar(256),
|testtimestamp timestamp
|testtimestamp timestamp,
|testtimestamptz timestamptz
|)
""".stripMargin
)
// scalastyle:off
conn.createStatement().executeUpdate(
s"""
|insert into $tableName values
|(null, null, null, null, null, null, null, null, null, null),
|(0, null, '2015-07-03', 0.0, -1.0, 4141214, 1239012341823719, null, 'f', '2015-07-03 00:00:00.000'),
|(0, false, null, -1234152.12312498, 100000.0, null, 1239012341823719, 24, '___|_123', null),
|(1, false, '2015-07-02', 0.0, 0.0, 42, 1239012341823719, -13, 'asdf', '2015-07-02 00:00:00.000'),
|(1, true, '2015-07-01', 1234152.12312498, 1.0, 42, 1239012341823719, 23, 'Unicode''s樂趣', '2015-07-01 00:00:00.001')
|(null, null, null, null, null, null, null, null, null, null, null),
|(0, null, '2015-07-03', 0.0, -1.0, 4141214, 1239012341823719, null, 'f', '2015-07-03 00:00:00.000', '2015-07-03 00:00:00.000'),
|(0, false, null, -1234152.12312498, 100000.0, null, 1239012341823719, 24, '___|_123', null, null),
|(1, false, '2015-07-02', 0.0, 0.0, 42, 1239012341823719, -13, 'asdf', '2015-07-02 00:00:00.000', '2015-07-02 00:00:00.000'),
|(1, true, '2015-07-01', 1234152.12312498, 1.0, 42, 1239012341823719, 23, 'Unicode''s樂趣', '2015-07-01 00:00:00.001', '2015-07-01 00:00:00.001')
""".stripMargin
)
// scalastyle:on
Expand Down
30 changes: 27 additions & 3 deletions src/main/scala/com/databricks/spark/redshift/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
package com.databricks.spark.redshift

import java.sql.Timestamp
import java.text.{DecimalFormat, DecimalFormatSymbols, SimpleDateFormat}
import java.util.Locale
import java.text.{DecimalFormat, DecimalFormatSymbols, ParseException, SimpleDateFormat}
import java.util.{Locale, TimeZone}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._

import scala.util.Try

/**
* Data type conversions for Redshift unloaded data
*/
Expand Down Expand Up @@ -104,7 +106,8 @@ private[redshift] object Conversions {
case LongType => (data: String) => java.lang.Long.parseLong(data)
case ShortType => (data: String) => java.lang.Short.parseShort(data)
case StringType => (data: String) => data
case TimestampType => (data: String) => Timestamp.valueOf(data)
case TimestampType => (data: String) =>
Try(Timestamp.valueOf(data)).getOrElse(parseTimestampTz(data))
case _ => (data: String) => data
}
}
Expand All @@ -122,4 +125,25 @@ private[redshift] object Conversions {
encoder.toRow(externalRow)
}
}

private def parseTimestampTz(data: String): Timestamp = {
// Cannot just parse using SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSX")
// since "2014-02-28 22:00:00.10+00"
// should be equivalent to "2014-02-28 22:00:00.100" (100 millis)
// but returns "2014-02-28 22:00:00.010" (10 millis).
var timezoneOffsetIndex = data.indexOf("+", "yyyy-MM-dd HH:mm:ss".length)
val timezoneHourOffset = if (timezoneOffsetIndex >= 0) {
-data.substring(timezoneOffsetIndex + 1).toInt
} else {
timezoneOffsetIndex = data.indexOf("-", "yyyy-MM-dd HH:mm:ss".length)
if (timezoneOffsetIndex < 0) {
throw new ParseException(s"""Unparseable date. Timezone data not recognized: "$data"""", 0)
}
data.substring(timezoneOffsetIndex + 1).toInt
}
val currentTimezone = TimeZone.getDefault
val of = Timestamp.valueOf(data.substring(0, timezoneOffsetIndex))
new Timestamp(of.getTime + timezoneHourOffset * 3600000 +
currentTimezone.getRawOffset + currentTimezone.getDSTSavings)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ private[redshift] class JDBCWrapper {
properties.setProperty("user", user)
properties.setProperty("password", password)
}
driver.connect(url, properties)
val connection = driver.connect(url, properties)
connection.setAutoCommit(false)
connection
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package com.databricks.spark.redshift

import java.sql.Timestamp
import java.util.Locale

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.scalatest.FunSuite
import java.util.{Locale, TimeZone}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.scalatest.FunSuite

/**
* Unit test for data type conversions
Expand All @@ -43,17 +42,20 @@ class ConversionsSuite extends FunSuite {
// scalastyle:on

val timestampWithMillis = "2014-03-01 00:00:01.123"
val timestampTzWithMillis = "2014-02-28 21:00:01.123-11"

val expectedDateMillis = TestUtils.toMillis(2015, 6, 1, 0, 0, 0)
val expectedTimestampMillis = TestUtils.toMillis(2014, 2, 1, 0, 0, 1, 123)
val expectedTimestampTzMillis = TestUtils.toMillis(2014, 2, 1, 9, 0, 1, 123,
TimeZone.getTimeZone("UTC"))

val convertedRow = convertRow(
Array("1", "t", "2015-07-01", doubleMin, "1.0", "42",
longMax, "23", unicodeString, timestampWithMillis))
longMax, "23", unicodeString, timestampWithMillis, timestampTzWithMillis))

val expectedRow = Row(1.asInstanceOf[Byte], true, new Timestamp(expectedDateMillis),
Double.MinValue, 1.0f, 42, Long.MaxValue, 23.toShort, unicodeString,
new Timestamp(expectedTimestampMillis))
new Timestamp(expectedTimestampMillis), new Timestamp(expectedTimestampTzMillis))

assert(convertedRow == expectedRow)
}
Expand Down Expand Up @@ -120,4 +122,39 @@ class ConversionsSuite extends FunSuite {
assert(convertRow(Array("inf")) === Row(Double.PositiveInfinity))
assert(convertRow(Array("-inf")) === Row(Double.NegativeInfinity))
}

test("timestamps with timezones have multiple formats") {
val schema = StructType(Seq(StructField("a", TimestampType)))
val convertRow = createRowConverter(schema)
val utc = TimeZone.getTimeZone("UTC")
Seq(
"2014-03-01 00:00:01+01" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1000,
timezone = utc),
// daylights savings
"2014-03-01 01:00:01.000+02" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1000,
timezone = utc),
// no daylights savings
"2014-04-01 01:00:01.000+01" -> TestUtils.toMillis(2014, 3, 1, 0, 0, 0, millis = 1000,
timezone = utc),
"2014-02-28 22:00:00.1-01" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100,
timezone = utc),
"2014-03-01 01:00:00.10+02" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100,
timezone = utc),
"2014-02-28 23:00:00.100-00" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 100,
timezone = utc),
"2014-02-28 14:00:00.01-09" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 10,
timezone = utc),
"2014-03-01 09:00:00.010+10" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 10,
timezone = utc),
"2014-03-01 09:00:00.001+10" -> TestUtils.toMillis(2014, 2, 1, 0, 0, 0, millis = 1,
timezone = utc)
).foreach { case (timestampString, expectedTime) =>
withClue(s"timestamp string is '$timestampString'") {
val convertedRow = convertRow(Array(timestampString))
val convertedTimestamp = convertedRow.get(0).asInstanceOf[Timestamp]
assert(convertedTimestamp === new Timestamp(expectedTime))
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ class RedshiftSourceSuite
// scalastyle:off
unloadedData =
"""
|1|t|2015-07-01|1234152.12312498|1.0|42|1239012341823719|23|Unicode's樂趣|2015-07-01 00:00:00.001
|1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0
|0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00
|0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123|
||||||||||
|1|t|2015-07-01|1234152.12312498|1.0|42|1239012341823719|23|Unicode's樂趣|2015-07-01 00:00:00.001|2015-07-01 00:00:00.001+00
|1|f|2015-07-02|0|0.0|42|1239012341823719|-13|asdf|2015-07-02 00:00:00.0|2015-07-02 00:00:00.0+00
|0||2015-07-03|0.0|-1.0|4141214|1239012341823719||f|2015-07-03 00:00:00|2015-07-03 00:00:00+00
|0|f||-1234152.12312498|100000.0||1239012341823719|24|___\|_123||
|||||||||||
""".stripMargin.trim
// scalastyle:on
val expectedQuery = (
"UNLOAD \\('SELECT \"testbyte\", \"testbool\", \"testdate\", \"testdouble\"," +
" \"testfloat\", \"testint\", \"testlong\", \"testshort\", \"teststring\", " +
"\"testtimestamp\" " +
"\"testtimestamp\", \"testtimestamptz\" " +
"FROM \"PUBLIC\".\"test_table\" '\\) " +
"TO '.*' " +
"WITH CREDENTIALS 'aws_access_key_id=test1;aws_secret_access_key=test2' " +
Expand Down
28 changes: 17 additions & 11 deletions src/test/scala/com/databricks/spark/redshift/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.databricks.spark.redshift

import java.sql.{Date, Timestamp}
import java.util.{Calendar, Locale}
import java.util.{Calendar, Locale, TimeZone}

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
Expand All @@ -42,7 +42,8 @@ object TestUtils {
StructField("testlong", LongType),
StructField("testshort", ShortType),
StructField("teststring", StringType),
StructField("testtimestamp", TimestampType)))
StructField("testtimestamp", TimestampType),
StructField("testtimestamptz", TimestampType)))
}

// scalastyle:off
Expand All @@ -52,14 +53,17 @@ object TestUtils {
val expectedData: Seq[Row] = Seq(
Row(1.toByte, true, TestUtils.toDate(2015, 6, 1), 1234152.12312498,
1.0f, 42, 1239012341823719L, 23.toShort, "Unicode's樂趣",
TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1)),
TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1), TestUtils.toTimestamp(2015, 6, 1, 0, 0, 0, 1,
TimeZone.getTimeZone("UTC"))),
Row(1.toByte, false, TestUtils.toDate(2015, 6, 2), 0.0, 0.0f, 42,
1239012341823719L, -13.toShort, "asdf", TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0)),
1239012341823719L, -13.toShort, "asdf", TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0),
TestUtils.toTimestamp(2015, 6, 2, 0, 0, 0, 0, TimeZone.getTimeZone("UTC"))),
Row(0.toByte, null, TestUtils.toDate(2015, 6, 3), 0.0, -1.0f, 4141214,
1239012341823719L, null, "f", TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0)),
1239012341823719L, null, "f", TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0),
TestUtils.toTimestamp(2015, 6, 3, 0, 0, 0, timezone = TimeZone.getTimeZone("UTC"))),
Row(0.toByte, false, null, -1234152.12312498, 100000.0f, null, 1239012341823719L, 24.toShort,
"___|_123", null),
Row(List.fill(10)(null): _*))
"___|_123", null, null),
Row(List.fill(11)(null): _*))
// scalastyle:on

/**
Expand All @@ -84,8 +88,9 @@ object TestUtils {
hour: Int,
minutes: Int,
seconds: Int,
millis: Int = 0): Long = {
val calendar = Calendar.getInstance()
millis: Int = 0,
timezone: TimeZone = TimeZone.getDefault): Long = {
val calendar = Calendar.getInstance(timezone)
calendar.set(year, zeroBasedMonth, date, hour, minutes, seconds)
calendar.set(Calendar.MILLISECOND, millis)
calendar.getTime.getTime
Expand All @@ -101,8 +106,9 @@ object TestUtils {
hour: Int,
minutes: Int,
seconds: Int,
millis: Int = 0): Timestamp = {
new Timestamp(toMillis(year, zeroBasedMonth, date, hour, minutes, seconds, millis))
millis: Int = 0,
timezone: TimeZone = TimeZone.getDefault): Timestamp = {
new Timestamp(toMillis(year, zeroBasedMonth, date, hour, minutes, seconds, millis, timezone))
}

/**
Expand Down