diff --git a/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala b/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala index f8b434a..109e4ce 100644 --- a/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala +++ b/src/it/scala/org/zalando/react/nakadi/InMemoryCommitCommitManager.scala @@ -1,6 +1,6 @@ package org.zalando.react.nakadi -import java.time.{ZoneId, ZonedDateTime} +import java.time.{ZoneId, OffsetDateTime} import org.zalando.react.nakadi.commit.OffsetTracking import org.zalando.react.nakadi.commit.handlers.BaseCommitManager @@ -22,7 +22,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(1), - leaseTimestamp =ZonedDateTime.now, + leaseTimestamp =OffsetDateTime.now, leaseId = None ) } @@ -36,7 +36,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(leaseCounter), - leaseTimestamp = ZonedDateTime.now, + leaseTimestamp = OffsetDateTime.now, leaseId = None ) store.put(key, generateValue(offset.checkpointId, offset.leaseHolder, count)) @@ -58,7 +58,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager { checkpointId = value.split("-")(0), leaseHolder = value.split("-")(1), leaseCounter = Option(1), - leaseTimestamp = ZonedDateTime.now, + leaseTimestamp = OffsetDateTime.now, leaseId = None ) } diff --git a/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala b/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala index e701872..7bc511e 100644 --- a/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala +++ b/src/it/scala/org/zalando/react/nakadi/ReactiveNakadiSubscriberSpec.scala @@ -1,6 +1,6 @@ package org.zalando.react.nakadi -import java.time.ZonedDateTime +import java.time.OffsetDateTime import java.util.UUID import akka.actor.ActorSystem @@ -27,7 +27,7 @@ class ReactiveNakadiSubscriberSpec extends NakadiTest { data = Json.parse(s"""{"foo": "bar"}""").as[EventPayload], EventMetadata( eid = UUID.randomUUID().toString, - occurred_at = ZonedDateTime.now, + occurred_at = OffsetDateTime.now, flow_id = Option("my-test-flow-id") ) ) diff --git a/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala b/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala index 0fe3344..e602015 100644 --- a/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala +++ b/src/main/scala/org/zalando/react/nakadi/LeaseManager.scala @@ -1,6 +1,6 @@ package org.zalando.react.nakadi -import java.time.ZonedDateTime +import java.time.OffsetDateTime import akka.event.LoggingAdapter import akka.actor.{ActorRef, ActorSystem} @@ -42,7 +42,7 @@ class LeaseManagerImpl(override val leaseHolder: String, log: Option[LoggingAdapter], idGenerator: IdGenerator = IdGenerator) extends LeaseManager { - def now = ZonedDateTime.now + def now = OffsetDateTime.now def newLeaseTimeout = now.plusSeconds(staleLeaseDelta.length) // Key / value for partition id and lease counter diff --git a/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala b/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala index 281e12b..65a8294 100644 --- a/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala +++ b/src/main/scala/org/zalando/react/nakadi/client/models/JsonOps.scala @@ -1,6 +1,6 @@ package org.zalando.react.nakadi.client.models -import java.time.ZonedDateTime +import java.time.OffsetDateTime import java.time.format.DateTimeFormatter import play.api.libs.json._ @@ -12,9 +12,9 @@ import scala.util.control.Exception.nonFatalCatch object JsonOps { - implicit val zonedDateTimeReads = Reads[ZonedDateTime] { + implicit val OffsetDateTimeReads = Reads[OffsetDateTime] { _.validate[String].flatMap { dateStr => - nonFatalCatch.either(ZonedDateTime.parse(dateStr)).fold( + nonFatalCatch.either(OffsetDateTime.parse(dateStr)).fold( ex => JsError(Seq(JsPath() -> Seq(ValidationError(ex.getMessage)))), JsSuccess(_) ) @@ -24,8 +24,8 @@ object JsonOps { implicit val readsMetaData: Reads[EventMetadata] = ( (__ \ "eid").read[String] and (__ \ "event_type").readNullable[String] and - (__ \ "occurred_at").read[ZonedDateTime] and - (__ \ "received_at").readNullable[ZonedDateTime] and + (__ \ "occurred_at").read[OffsetDateTime] and + (__ \ "received_at").readNullable[OffsetDateTime] and (__ \ "parent_eids").readNullable[Seq[String]] and (__ \ "flow_id").readNullable[String] )(EventMetadata) @@ -33,8 +33,8 @@ object JsonOps { implicit val writesMetaData: Writes[EventMetadata] = ( (__ \ "eid").write[String] and (__ \ "event_type").writeNullable[String] and - (__ \ "occurred_at").write[String].contramap[ZonedDateTime](_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) and - (__ \ "received_at").writeNullable[String].contramap[Option[ZonedDateTime]](_.map(_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))) and + (__ \ "occurred_at").write[String].contramap[OffsetDateTime](_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) and + (__ \ "received_at").writeNullable[String].contramap[Option[OffsetDateTime]](_.map(_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))) and (__ \ "parent_eids").writeNullable[Seq[String]] and (__ \ "flow_id").writeNullable[String] )(unlift(EventMetadata.unapply)) diff --git a/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala b/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala index d8e5714..86676c4 100644 --- a/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala +++ b/src/main/scala/org/zalando/react/nakadi/client/models/Models.scala @@ -1,12 +1,12 @@ package org.zalando.react.nakadi.client.models -import java.time.ZonedDateTime +import java.time.OffsetDateTime case class EventMetadata( eid: String, event_type: Option[String] = None, - occurred_at: ZonedDateTime, - received_at: Option[ZonedDateTime] = None, + occurred_at: OffsetDateTime, + received_at: Option[OffsetDateTime] = None, parent_eids: Option[Seq[String]] = None, flow_id: Option[String] = None ) diff --git a/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala b/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala index d97aa89..138f601 100644 --- a/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala +++ b/src/main/scala/org/zalando/react/nakadi/commit/OffsetTracking.scala @@ -1,12 +1,12 @@ package org.zalando.react.nakadi.commit -import java.time.ZonedDateTime +import java.time.OffsetDateTime case class OffsetTracking( partitionId: String, checkpointId: String, leaseHolder: String, - leaseTimestamp: ZonedDateTime, + leaseTimestamp: OffsetDateTime, leaseCounter: Option[Long] = None, leaseId: Option[String] = None ) diff --git a/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala b/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala index a2190a8..77dcdcb 100644 --- a/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala +++ b/src/main/scala/org/zalando/react/nakadi/commit/handlers/aws/DynamoDBCommitManager.scala @@ -1,7 +1,7 @@ package org.zalando.react.nakadi.commit.handlers.aws import java.time.format.DateTimeFormatter -import java.time.{ZoneId, ZoneOffset, ZonedDateTime} +import java.time.{ZoneId, ZoneOffset, OffsetDateTime} import akka.actor.ActorSystem import org.zalando.react.nakadi.commit.OffsetTracking @@ -142,7 +142,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti checkpointId = item.getString(CheckpointIdKey), leaseHolder = item.getString(LeaseHolderKey), leaseCounter = Option(item.getLong(LeaseCounterKey)), - leaseTimestamp = ZonedDateTime.parse(item.getString(LeaseTimestampKey), DateTimeFormatter.ISO_OFFSET_DATE_TIME), + leaseTimestamp = OffsetDateTime.parse(item.getString(LeaseTimestampKey), DateTimeFormatter.ISO_OFFSET_DATE_TIME), leaseId = Option(item.getString(LeaseIdKey)) ) } diff --git a/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala b/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala index fe0266a..b2613ee 100644 --- a/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala +++ b/src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala @@ -1,6 +1,6 @@ package org.zalando.react.nakadi -import java.time.ZonedDateTime +import java.time.OffsetDateTime import org.scalamock.scalatest.MockFactory import org.scalatest.concurrent.ScalaFutures @@ -21,7 +21,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal val partitionId = "15" val groupId = "some-group-id" val eventType = "some-event-type" - val timestamp = ZonedDateTime.now + val timestamp = OffsetDateTime.now // Map of event-type-partition to offset count val offsetMap = OffsetMap(Map(EventTypePartition(eventType, partitionId).hash -> 10)) @@ -148,5 +148,5 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal leaseManager.validate(offset) should === (false) } - private def now = ZonedDateTime.now + private def now = OffsetDateTime.now }