Skip to content
This repository has been archived by the owner on May 5, 2022. It is now read-only.

Commit

Permalink
Merge pull request #26 from lenalebt/master
Browse files Browse the repository at this point in the history
removed dependency on joda-time
  • Loading branch information
dr4ke616 committed Aug 10, 2016
2 parents b5ad614 + 1a65d5c commit 768c2ea
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 48 deletions.
5 changes: 2 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ resolvers += "Maven Central Server" at "http://repo1.maven.org/maven2"
val commonSettings = sonatypeSettings ++ Seq(
organization := "org.zalando.reactivenakadi",
startYear := Some(2016),
scalaVersion := "2.11.7",
scalaVersion := "2.11.8",
test in assembly := {},
licenses := Seq("MIT license" -> url("https://opensource.org/licenses/MIT")),
homepage := Some(url("https://github.com/zalando/reactive-nakadi")),
Expand All @@ -28,15 +28,14 @@ val commonSettings = sonatypeSettings ++ Seq(
)

libraryDependencies ++= Seq(
"joda-time" % "joda-time" % "2.3",
"commons-logging" % "commons-logging" % "1.1.1",
"com.typesafe.play" %% "play-json" % "2.5.4",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-http-core" % akkaVersion,
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.10.60",
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"org.scalamock" %% "scalamock-scalatest-support" % "3.2" % "test, it",
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % "test, it"
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.zalando.react.nakadi

import java.time.{ZoneId, OffsetDateTime}

import org.zalando.react.nakadi.commit.OffsetTracking
import org.zalando.react.nakadi.commit.handlers.BaseCommitManager
import org.joda.time.{DateTime, DateTimeZone}

import scala.concurrent.Future

Expand All @@ -21,7 +22,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager {
checkpointId = value.split("-")(0),
leaseHolder = value.split("-")(1),
leaseCounter = Option(1),
leaseTimestamp = new DateTime(DateTimeZone.UTC),
leaseTimestamp =OffsetDateTime.now,
leaseId = None
)
}
Expand All @@ -35,7 +36,7 @@ object InMemoryCommitCommitManager extends BaseCommitManager {
checkpointId = value.split("-")(0),
leaseHolder = value.split("-")(1),
leaseCounter = Option(leaseCounter),
leaseTimestamp = new DateTime(DateTimeZone.UTC),
leaseTimestamp = OffsetDateTime.now,
leaseId = None
)
store.put(key, generateValue(offset.checkpointId, offset.leaseHolder, count))
Expand All @@ -57,10 +58,10 @@ object InMemoryCommitCommitManager extends BaseCommitManager {
checkpointId = value.split("-")(0),
leaseHolder = value.split("-")(1),
leaseCounter = Option(1),
leaseTimestamp = new DateTime(DateTimeZone.UTC),
leaseTimestamp = OffsetDateTime.now,
leaseId = None
)
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package org.zalando.react.nakadi

import java.time.OffsetDateTime
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.testkit.scaladsl.TestSource
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import org.joda.time.DateTime
import play.api.libs.json.Json

import org.zalando.react.nakadi.client.models._
import org.zalando.react.nakadi.NakadiMessages.{Cursor, Offset, ProducerMessage, ConsumerMessage}
import org.zalando.react.nakadi.NakadiMessages.{ConsumerMessage, Cursor, Offset, ProducerMessage}

import scala.concurrent.Await
import scala.concurrent.duration._
Expand All @@ -28,7 +27,7 @@ class ReactiveNakadiSubscriberSpec extends NakadiTest {
data = Json.parse(s"""{"foo": "bar"}""").as[EventPayload],
EventMetadata(
eid = UUID.randomUUID().toString,
occurred_at = new DateTime(),
occurred_at = OffsetDateTime.now,
flow_id = Option("my-test-flow-id")
)
)
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/org/zalando/react/nakadi/LeaseManager.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.zalando.react.nakadi

import java.time.OffsetDateTime

import akka.event.LoggingAdapter
import akka.actor.{ActorRef, ActorSystem}
import org.joda.time.{DateTime, DateTimeZone}

import org.zalando.react.nakadi.utils.IdGenerator
import org.zalando.react.nakadi.properties.ConsumerProperties
import org.zalando.react.nakadi.commit.handlers.{BaseCommitManager => BaseCommitHandler}
Expand Down Expand Up @@ -42,8 +42,8 @@ class LeaseManagerImpl(override val leaseHolder: String,
log: Option[LoggingAdapter],
idGenerator: IdGenerator = IdGenerator) extends LeaseManager {

def now = new DateTime(DateTimeZone.UTC)
def newLeaseTimeout = now.plusSeconds(staleLeaseDelta.length.toInt)
def now = OffsetDateTime.now
def newLeaseTimeout = now.plusSeconds(staleLeaseDelta.length)

// Key / value for partition id and lease counter
override val counter: mutable.Map[String, Long] = mutable.Map.empty
Expand Down Expand Up @@ -75,7 +75,7 @@ class LeaseManagerImpl(override val leaseHolder: String,

def validate(currentOffset: OffsetTracking): Boolean = {
val count = counter.getOrElse(currentOffset.partitionId, 0L)
currentOffset.leaseCounter.contains(count) || currentOffset.leaseTimestamp.isBeforeNow
currentOffset.leaseCounter.contains(count) || currentOffset.leaseTimestamp.isBefore(now)
}

override def requestLease(groupId: String, eventType: String, partitionId: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package org.zalando.react.nakadi.client.models

import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter

import play.api.libs.json._
import play.api.libs.functional.syntax._
import play.api.data.validation.ValidationError

import org.joda.time.{DateTimeZone, DateTime}
import org.joda.time.format.ISODateTimeFormat

import scala.util.control.Exception.nonFatalCatch


object JsonOps {

implicit val jodaDateTimeReads = Reads[DateTime] {
implicit val OffsetDateTimeReads = Reads[OffsetDateTime] {
_.validate[String].flatMap { dateStr =>
nonFatalCatch.either(new DateTime(dateStr, DateTimeZone.UTC)).fold(
nonFatalCatch.either(OffsetDateTime.parse(dateStr)).fold(
ex => JsError(Seq(JsPath() -> Seq(ValidationError(ex.getMessage)))),
JsSuccess(_)
)
Expand All @@ -24,17 +24,17 @@ object JsonOps {
implicit val readsMetaData: Reads[EventMetadata] = (
(__ \ "eid").read[String] and
(__ \ "event_type").readNullable[String] and
(__ \ "occurred_at").read[DateTime] and
(__ \ "received_at").readNullable[DateTime] and
(__ \ "occurred_at").read[OffsetDateTime] and
(__ \ "received_at").readNullable[OffsetDateTime] and
(__ \ "parent_eids").readNullable[Seq[String]] and
(__ \ "flow_id").readNullable[String]
)(EventMetadata)

implicit val writesMetaData: Writes[EventMetadata] = (
(__ \ "eid").write[String] and
(__ \ "event_type").writeNullable[String] and
(__ \ "occurred_at").write[String].contramap[DateTime](ISODateTimeFormat.dateTime().print) and
(__ \ "received_at").writeNullable[String].contramap[Option[DateTime]](_.map(ISODateTimeFormat.dateTime().print)) and
(__ \ "occurred_at").write[String].contramap[OffsetDateTime](_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)) and
(__ \ "received_at").writeNullable[String].contramap[Option[OffsetDateTime]](_.map(_.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))) and
(__ \ "parent_eids").writeNullable[Seq[String]] and
(__ \ "flow_id").writeNullable[String]
)(unlift(EventMetadata.unapply))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.zalando.react.nakadi.client.models

import org.joda.time.DateTime

import java.time.OffsetDateTime

case class EventMetadata(
eid: String,
event_type: Option[String] = None,
occurred_at: DateTime,
received_at: Option[DateTime] = None,
occurred_at: OffsetDateTime,
received_at: Option[OffsetDateTime] = None,
parent_eids: Option[Seq[String]] = None,
flow_id: Option[String] = None
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.zalando.react.nakadi.commit

import org.joda.time.DateTime
import java.time.OffsetDateTime

case class OffsetTracking(
partitionId: String,
checkpointId: String,
leaseHolder: String,
leaseTimestamp: DateTime,
leaseTimestamp: OffsetDateTime,
leaseCounter: Option[Long] = None,
leaseId: Option[String] = None
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.zalando.react.nakadi.commit.handlers.aws

import java.time.format.DateTimeFormatter
import java.time.{ZoneId, ZoneOffset, OffsetDateTime}

import akka.actor.ActorSystem
import org.zalando.react.nakadi.commit.OffsetTracking
import org.zalando.react.nakadi.commit.handlers.BaseCommitManager
Expand All @@ -8,7 +11,6 @@ import com.amazonaws.services.dynamodbv2.document.{Item, Table}
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec
import org.zalando.react.nakadi.properties.CommitProperties
import org.joda.time.{DateTime, DateTimeZone}

import scala.concurrent.Future
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -66,7 +68,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti
val valueMap = new ValueMap()
.withString(":cidval", offsetTracking.checkpointId)
.withString(":lhval", offsetTracking.leaseHolder)
.withString(":ltsval", offsetTracking.leaseTimestamp.toDateTime.toString)
.withString(":ltsval", offsetTracking.leaseTimestamp.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))

var leaseIdKey = ""
offsetTracking.leaseId.foreach { leaseId =>
Expand Down Expand Up @@ -107,7 +109,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti
.withString(CheckpointIdKey, offsetTracking.checkpointId)
.withString(LeaseHolderKey, offsetTracking.leaseHolder)
.withNumber(LeaseCounterKey, 1)
.withString(LeaseTimestampKey, offsetTracking.leaseTimestamp.toDateTime.toString)
.withString(LeaseTimestampKey, offsetTracking.leaseTimestamp.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME))
offsetTracking.leaseId.map(item.withString(LeaseIdKey, _))
table.putItem(item)
toOffsetTracking(item)
Expand Down Expand Up @@ -140,7 +142,7 @@ class DynamoDBCommitManager(system: ActorSystem, leaseProperties: CommitProperti
checkpointId = item.getString(CheckpointIdKey),
leaseHolder = item.getString(LeaseHolderKey),
leaseCounter = Option(item.getLong(LeaseCounterKey)),
leaseTimestamp = new DateTime(item.getString(LeaseTimestampKey), DateTimeZone.UTC),
leaseTimestamp = OffsetDateTime.parse(item.getString(LeaseTimestampKey), DateTimeFormatter.ISO_OFFSET_DATE_TIME),
leaseId = Option(item.getString(LeaseIdKey))
)
}
Expand Down
15 changes: 8 additions & 7 deletions src/test/scala/org/zalando/react/nakadi/LeaseManagerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.react.nakadi

import org.joda.time.{DateTime, DateTimeZone}
import java.time.OffsetDateTime

import org.scalamock.scalatest.MockFactory
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}
Expand All @@ -20,7 +21,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal
val partitionId = "15"
val groupId = "some-group-id"
val eventType = "some-event-type"
val timestamp = new DateTime(DateTimeZone.UTC)
val timestamp = OffsetDateTime.now

// Map of event-type-partition to offset count
val offsetMap = OffsetMap(Map(EventTypePartition(eventType, partitionId).hash -> 10))
Expand Down Expand Up @@ -110,7 +111,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal
it should "return true for valid validation condition" in {
setupIdGenerator

val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minus(400))
val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minusSeconds(400))

val leaseManager = createLeaseManager
leaseManager.counter(partitionId) = 2
Expand All @@ -120,7 +121,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal
it should "return true for validation if lease time stamp is after now but counter is the same" in {
setupIdGenerator

val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plus(400))
val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plusSeconds(400))

val leaseManager = createLeaseManager
leaseManager.counter(partitionId) = 2
Expand All @@ -130,7 +131,7 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal
it should "return true for validation if time is before now, but counter differ (i.e. stale lease)" in {
setupIdGenerator

val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minus(400))
val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.minusSeconds(400))

val leaseManager = createLeaseManager
leaseManager.counter(partitionId) = 5
Expand All @@ -140,12 +141,12 @@ class LeaseManagerSpec extends FlatSpec with Matchers with MockFactory with Scal
it should "return false for validation if lease counters dont match lease time stamp is after now" in {
setupIdGenerator

val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plus(400))
val offset = offsetTracking.copy(leaseCounter = Option(2), leaseTimestamp = now.plusSeconds(400))

val leaseManager = createLeaseManager
leaseManager.counter(partitionId) = 5
leaseManager.validate(offset) should === (false)
}

private def now = new DateTime(DateTimeZone.UTC)
private def now = OffsetDateTime.now
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package org.zalando.react.nakadi.client.models

import org.joda.time.DateTime
import org.scalatest.{Matchers, WordSpec}
import play.api.libs.json.{JsObject, JsString, JsValue, Json}
import play.api.libs.json.{JsObject, JsString, Json}


class JsonOpsSpec extends WordSpec with Matchers {
Expand All @@ -15,7 +14,7 @@ class JsonOpsSpec extends WordSpec with Matchers {
|{
| "metadata": {
| "eid": "7b7c7100-0559-11e6-a837-0800200c9a66",
| "occurred_at": "2016-04-01T13:42:16.000Z",
| "occurred_at": "2016-04-01T13:42:16Z",
| "event_type": "order-created"
| },
| "id": "504c91de-17c9-46d8-81ee-55135084d696"
Expand All @@ -30,7 +29,7 @@ class JsonOpsSpec extends WordSpec with Matchers {
| "data": {"some": "payload"},
| "metadata": {
| "eid": "7b7c7100-0559-11e6-a837-0800200c9a66",
| "occurred_at": "2016-04-01T13:42:16.000Z",
| "occurred_at": "2016-04-01T13:42:16Z",
| "event_type": "order-created"
| }
|}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.react.nakadi.commit

import org.joda.time.{DateTimeZone, DateTime}
import org.scalatest.{Matchers, FlatSpec}


Expand Down

0 comments on commit 768c2ea

Please sign in to comment.