diff --git a/build.sbt b/build.sbt index 7c05621..4729d29 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ resolvers += "Maven Central Server" at "http://repo1.maven.org/maven2" val commonSettings = sonatypeSettings ++ Seq( organization := "org.zalando.reactivenakadi", startYear := Some(2016), - scalaVersion := "2.11.7", + scalaVersion := "2.11.8", test in assembly := {}, licenses := Seq("MIT license" -> url("https://opensource.org/licenses/MIT")), homepage := Some(url("https://github.com/zalando/reactive-nakadi")), @@ -28,7 +28,6 @@ val commonSettings = sonatypeSettings ++ Seq( ) libraryDependencies ++= Seq( - "joda-time" % "joda-time" % "2.3", "commons-logging" % "commons-logging" % "1.1.1", "com.typesafe.play" %% "play-json" % "2.5.4", "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, @@ -36,7 +35,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-http-core" % akkaVersion, "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.10.60", - "org.scalatest" %% "scalatest" % "2.2.4" % "test", + "org.scalatest" %% "scalatest" % "2.2.6" % "test", "org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "test, it", "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test, it" ) diff --git a/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala b/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala index b34be65..109e4ce 100644 --- a/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala +++ b/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala @@ -1,8 +1,9 @@ package org.zalando.react.nakadi +import java.time.{ZoneId, OffsetDateTime} + import org.zalando.react.nakadi.commit.OffsetTracking import org.zalando.react.nakadi.commit.handlers.BaseCommitManager -import org.joda.time.{DateTime, DateTimeZone} import scala.concurrent.Future @@ -21,7 +22,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(1), - leaseTimestamp = new DateTime(DateTimeZone.UTC), + leaseTimestamp =OffsetDateTime.now, leaseId = None ) } @@ -35,7 +36,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(leaseCounter), - leaseTimestamp = new DateTime(DateTimeZone.UTC), + leaseTimestamp = OffsetDateTime.now, leaseId = None ) store.put(key, generateValue(offset.checkpointId, offset.leaseHolder, count)) @@ -57,10 +58,10 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(1), - leaseTimestamp = new DateTime(DateTimeZone.UTC), + leaseTimestamp = OffsetDateTime.now, leaseId = None ) } } } -} \ No newline at end of file +} diff --git a/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala b/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala index 14fd473..7bc511e 100644 --- a/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala +++ b/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala @@ -1,15 +1,14 @@ package org.zalando.react.nakadi +import java.time.OffsetDateTime import java.util.UUID import akka.actor.ActorSystem import akka.stream.testkit.scaladsl.TestSource import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import org.joda.time.DateTime import play.api.libs.json.Json - import org.zalando.react.nakadi.client.models._ -import org.zalando.react.nakadi.NakadiMessages.{Cursor, Offset, ProducerMessage, ConsumerMessage} +import org.zalando.react.nakadi.NakadiMessages.{ConsumerMessage, Cursor, Offset, ProducerMessage} import scala.concurrent.Await import scala.concurrent.duration._ @@ -28,7 +27,7 @@ class ReactiveNakadiSubscriberSpec extends NakadiTest { data = Json.parse(s"""{"foo": "bar"}""").as[EventPayload], EventMetadata( eid = UUID.randomUUID().toString, - occurred_at = new DateTime(), + occurred_at = OffsetDateTime.now, flow_id = Option("my-test-flow-id") ) ) diff --git a/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala b/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala index e7706eb..e602015 100644 --- a/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala +++ b/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala @@ -1,9 +1,9 @@ package org.zalando.react.nakadi +import java.time.OffsetDateTime + import akka.event.LoggingAdapter import akka.actor.{ActorRef, ActorSystem} -import org.joda.time.{DateTime, DateTimeZone} - import org.zalando.react.nakadi.utils.IdGenerator import org.zalando.react.nakadi.properties.ConsumerProperties import org.zalando.react.nakadi.commit.handlers.{BaseCommitManager => BaseCommitHandler} @@ -42,8 +42,8 @@ class LeaseManagerImpl(override val leaseHolder: String, log: Option[LoggingAdapter], idGenerator: IdGenerator = IdGenerator) extends LeaseManager { - def now = new DateTime(DateTimeZone.UTC) - def newLeaseTimeout = now.plusSeconds(staleLeaseDelta.length.toInt) + def now = OffsetDateTime.now + def newLeaseTimeout = now.plusSeconds(staleLeaseDelta.length) // Key / value for partition id and lease counter override val counter: mutable.Map[String, Long] = mutable.Map.empty @@ -75,7 +75,7 @@ class LeaseManagerImpl(override val leaseHolder: String, def validate(currentOffset: OffsetTracking): Boolean = { val count = counter.getOrElse(currentOffset.partitionId, 0L) - currentOffset.leaseCounter.contains(count) || currentOffset.leaseTimestamp.isBeforeNow + currentOffset.leaseCounter.contains(count) || currentOffset.leaseTimestamp.isBefore(now) } override def requestLease(groupId: String, eventType: String, partitionId: String) diff --git a/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala b/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala index f997829..65a8294 100644 --- a/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala +++ b/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala @@ -1,20 +1,20 @@ package org.zalando.react.nakadi.client.models +import java.time.OffsetDateTime +import java.time.format.DateTimeFormatter + import play.api.libs.json._ import play.api.libs.functional.syntax._ import play.api.data.validation.ValidationError -import org.joda.time.{DateTimeZone, DateTime} -import org.joda.time.format.ISODateTimeFormat - import scala.util.control.Exception.nonFatalCatch object JsonOps { - implicit val jodaDateTimeReads = Reads[DateTime] { + implicit val OffsetDateTimeReads = Reads[OffsetDateTime] { _.validate[String].flatMap { dateStr => - nonFatalCatch.either(new DateTime(dateStr, DateTimeZone.UTC)).fold( + nonFatalCatch.either(OffsetDateTime.parse(dateStr)).fold( ex => JsError(Seq(JsPath() -> Seq(ValidationError(ex.getMessage)))), JsSuccess(_) ) @@ -24,8 +24,8 @@ object JsonOps { implicit val readsMetaData: Reads[EventMetadata] = ( (__ \ "eid").read[String] and (__ \ "event_type").readNullable[String] and - (__ \ "occurred_at").read[DateTime] and - (__ \ "received_at").readNullable[DateTime] and + (__ \ "occurred_at").read[OffsetDateTime] and + (__ \ "received_at").readNullable[OffsetDateTime] and (__ \ "parent_eids").readNullable[Seq[String]] and (__ \ "flow_id").readNullable[String] )(EventMetadata) @@ -33,8 +33,8 @@ object JsonOps { implicit val writesMetaData: Writes[EventMetadata] = ( (__ \ "eid").write[String] and (__ \ "event_type").writeNullable[String] and - (__ \ "occurred_at").write[String].contramap[DateTime](ISODateTimeFormat.dateTime().print) and - (__ \ "received_at").writeNullable[String].contramap[Option[DateTime]](_.map(ISODateTimeFormat.dateTime().print)) and + (__ \ "occurred_at").write[String].contramap[OffsetDateTime](_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) and + (__ \ "received_at").writeNullable[String].contramap[Option[OffsetDateTime]](_.map(_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))) and (__ \ "parent_eids").writeNullable[Seq[String]] and (__ \ "flow_id").writeNullable[String] )(unlift(EventMetadata.unapply)) diff --git a/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala b/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala index c9e96ec..86676c4 100644 --- a/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala +++ b/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala @@ -1,13 +1,12 @@ package org.zalando.react.nakadi.client.models -import org.joda.time.DateTime - +import java.time.OffsetDateTime case class EventMetadata( eid: String, event_type: Option[String] = None, - occurred_at: DateTime, - received_at: Option[DateTime] = None, + occurred_at: OffsetDateTime, + received_at: Option[OffsetDateTime] = None, parent_eids: Option[Seq[String]] = None, flow_id: Option[String] = None ) diff --git a/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala b/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala index 9ef297d..138f601 100644 --- a/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala +++ b/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala @@ -1,12 +1,12 @@ package org.zalando.react.nakadi.commit -import org.joda.time.DateTime +import java.time.OffsetDateTime case class OffsetTracking( partitionId: String, checkpointId: String, leaseHolder: String, - leaseTimestamp: DateTime, + leaseTimestamp: OffsetDateTime, leaseCounter: Option[Long] = None, leaseId: Option[String] = None ) diff --git a/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala b/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala index 602b9e2..77dcdcb 100644 --- a/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala +++ b/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala @@ -1,5 +1,8 @@ package org.zalando.react.nakadi.commit.handlers.aws +import java.time.format.DateTimeFormatter +import java.time.{ZoneId, ZoneOffset, OffsetDateTime} + import akka.actor.ActorSystem import org.zalando.react.nakadi.commit.OffsetTracking import org.zalando.react.nakadi.commit.handlers.BaseCommitManager @@ -8,7 +11,6 @@ import com.amazonaws.services.dynamodbv2.document.{Item, Table} import com.amazonaws.services.dynamodbv2.document.utils.ValueMap import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec import org.zalando.react.nakadi.properties.CommitProperties -import org.joda.time.{DateTime, DateTimeZone} import scala.concurrent.Future import scala.collection.JavaConverters._ @@ -66,7 +68,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti val valueMap = new ValueMap() .withString(":cidval", offsetTracking.checkpointId) .withString(":lhval", offsetTracking.leaseHolder) - .withString(":ltsval", offsetTracking.leaseTimestamp.toDateTime.toString) + .withString(":ltsval", offsetTracking.leaseTimestamp.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) var leaseIdKey = "" offsetTracking.leaseId.foreach { leaseId => @@ -107,7 +109,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti .withString(CheckpointIdKey, offsetTracking.checkpointId) .withString(LeaseHolderKey, offsetTracking.leaseHolder) .withNumber(LeaseCounterKey, 1) - .withString(LeaseTimestampKey, offsetTracking.leaseTimestamp.toDateTime.toString) + .withString(LeaseTimestampKey, offsetTracking.leaseTimestamp.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) offsetTracking.leaseId.map(item.withString(LeaseIdKey, _)) table.putItem(item) toOffsetTracking(item) @@ -140,7 +142,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti checkpointId = item.getString(CheckpointIdKey), leaseHolder = item.getString(LeaseHolderKey), leaseCounter = Option(item.getLong(LeaseCounterKey)), - leaseTimestamp = new DateTime(item.getString(LeaseTimestampKey), DateTimeZone.UTC), + leaseTimestamp = OffsetDateTime.parse(item.getString(LeaseTimestampKey), DateTimeFormatter.ISO_OFFSET_DATE_TIME), leaseId = Option(item.getString(LeaseIdKey)) ) } diff --git a/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala b/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala index 4f30dc7..b2613ee 100644 --- a/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala +++ b/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala @@ -1,6 +1,7 @@ package org.zalando.react.nakadi -import org.joda.time.{DateTime, DateTimeZone} +import java.time.OffsetDateTime + import org.scalamock.scalatest.MockFactory import org.scalatest.concurrent.ScalaFutures import org.scalatest.{FlatSpec, Matchers} @@ -20,7 +21,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal val partitionId = "15" val groupId = "some-group-id" val eventType = "some-event-type" - val timestamp = new DateTime(DateTimeZone.UTC) + val timestamp = OffsetDateTime.now // Map of event-type-partition to offset count val offsetMap = OffsetMap(Map(EventTypePartition(eventType, partitionId).hash -> 10)) @@ -110,7 +111,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal it should "return true for valid validation condition" in { setupIdGenerator - val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minus(400)) + val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minusSeconds(400)) val leaseManager = createLeaseManager leaseManager.counter(partitionId) = 2 @@ -120,7 +121,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal it should "return true for validation if lease time stamp is after now but counter is the same" in { setupIdGenerator - val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plus(400)) + val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plusSeconds(400)) val leaseManager = createLeaseManager leaseManager.counter(partitionId) = 2 @@ -130,7 +131,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal it should "return true for validation if time is before now, but counter differ (i.e. stale lease)" in { setupIdGenerator - val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minus(400)) + val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minusSeconds(400)) val leaseManager = createLeaseManager leaseManager.counter(partitionId) = 5 @@ -140,12 +141,12 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal it should "return false for validation if lease counters dont match lease time stamp is after now" in { setupIdGenerator - val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plus(400)) + val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plusSeconds(400)) val leaseManager = createLeaseManager leaseManager.counter(partitionId) = 5 leaseManager.validate(offset) should === (false) } - private def now = new DateTime(DateTimeZone.UTC) + private def now = OffsetDateTime.now } diff --git a/src/test/scala/org/zalando/react/nakadi/client/models/JsonOpsSpec.scala b/src/test/scala/org/zalando/react/nakadi/client/models/JsonOpsSpec.scala index 8fa6028..ebe1be9 100644 --- a/src/test/scala/org/zalando/react/nakadi/client/models/JsonOpsSpec.scala +++ b/src/test/scala/org/zalando/react/nakadi/client/models/JsonOpsSpec.scala @@ -1,8 +1,7 @@ package org.zalando.react.nakadi.client.models -import org.joda.time.DateTime import org.scalatest.{Matchers, WordSpec} -import play.api.libs.json.{JsObject, JsString, JsValue, Json} +import play.api.libs.json.{JsObject, JsString, Json} class JsonOpsSpec extends WordSpec with Matchers { @@ -15,7 +14,7 @@ class JsonOpsSpec extends WordSpec with Matchers { |{ | "metadata": { | "eid": "7b7c7100-0559-11e6-a837-0800200c9a66", - | "occurred_at": "2016-04-01T13:42:16.000Z", + | "occurred_at": "2016-04-01T13:42:16Z", | "event_type": "order-created" | }, | "id": "504c91de-17c9-46d8-81ee-55135084d696" @@ -30,7 +29,7 @@ class JsonOpsSpec extends WordSpec with Matchers { | "data": {"some": "payload"}, | "metadata": { | "eid": "7b7c7100-0559-11e6-a837-0800200c9a66", - | "occurred_at": "2016-04-01T13:42:16.000Z", + | "occurred_at": "2016-04-01T13:42:16Z", | "event_type": "order-created" | } |} diff --git a/src/test/scala/org/zalando/react/nakadi/commit/OffsetMapSpec.scala b/src/test/scala/org/zalando/react/nakadi/commit/OffsetMapSpec.scala index 1f89d01..72d48fe 100644 --- a/src/test/scala/org/zalando/react/nakadi/commit/OffsetMapSpec.scala +++ b/src/test/scala/org/zalando/react/nakadi/commit/OffsetMapSpec.scala @@ -1,6 +1,5 @@ package org.zalando.react.nakadi.commit -import org.joda.time.{DateTimeZone, DateTime} import org.scalatest.{Matchers, FlatSpec}