Skip to content

Commit

Permalink
spectator 1.8.0 and iep 5.0.27
Browse files Browse the repository at this point in the history
  • Loading branch information
brharrington committed Oct 11, 2024
1 parent b5e17f2 commit eda7a91
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[events] object DatapointConverter {
v * v
}

@scala.annotation.tailrec
private[events] def toDouble(value: Any, dflt: Any): Double = {
value match {
case v: Boolean => if (v) 1.0 else 0.0
Expand All @@ -118,17 +119,17 @@ private[events] object DatapointConverter {
}
}

private[events] def addNaN(value: AtomicDouble, amount: Double): Unit = {
private[events] def addNaN(now: Long, value: StepDouble, amount: Double): Unit = {
if (amount.isNaN)
return

var set = false
while (!set) {
val v = value.get()
val v = value.getCurrent(now)
if (v.isNaN) {
set = value.compareAndSet(v, amount)
set = value.compareAndSet(now, v, amount)
} else {
value.addAndGet(amount)
value.addAndGet(now, amount)
set = true
}
}
Expand All @@ -154,7 +155,7 @@ private[events] object DatapointConverter {

override def update(value: Double): Unit = {
if (value.isFinite && value >= 0.0) {
addNaN(buffer.getCurrent, value)
addNaN(params.clock.wallTime(), buffer, value)
}
}

Expand All @@ -169,7 +170,7 @@ private[events] object DatapointConverter {

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN
}
}

Expand All @@ -183,7 +184,7 @@ private[events] object DatapointConverter {
}

override def update(value: Double): Unit = {
addNaN(buffer.getCurrent, 1.0)
addNaN(params.clock.wallTime(), buffer, 1.0)
}

override def flush(timestamp: Long): Unit = {
Expand All @@ -197,7 +198,7 @@ private[events] object DatapointConverter {

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN
}
}

Expand All @@ -211,9 +212,7 @@ private[events] object DatapointConverter {
}

override def update(value: Double): Unit = {
if (value.isFinite) {
buffer.getCurrent.max(value)
}
buffer.max(params.clock.wallTime(), value)
}

override def flush(timestamp: Long): Unit = {
Expand All @@ -227,7 +226,7 @@ private[events] object DatapointConverter {

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN
}
}

Expand All @@ -241,9 +240,7 @@ private[events] object DatapointConverter {
}

override def update(value: Double): Unit = {
if (value.isFinite) {
min(buffer.getCurrent, value)
}
buffer.min(params.clock.wallTime(), value)
}

private def min(current: AtomicDouble, value: Double): Unit = {
Expand All @@ -270,7 +267,7 @@ private[events] object DatapointConverter {

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
buffer.getCurrent(now).isNaN && buffer.poll(now).isNaN
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,17 @@ import com.netflix.spectator.impl.StepLong
case class StreamMetadata(
streamId: String,
remoteAddress: String = "unknown",
clock: Clock = Clock.SYSTEM,
receivedMessages: StepLong = new StepLong(0, Clock.SYSTEM, 60_000),
droppedMessages: StepLong = new StepLong(0, Clock.SYSTEM, 60_000)
) extends JsonSupport {

def updateReceived(n: Int): Unit = {
receivedMessages.getCurrent.addAndGet(n)
receivedMessages.addAndGet(clock.wallTime(), n)
}

def updateDropped(n: Int): Unit = {
droppedMessages.getCurrent.addAndGet(n)
droppedMessages.addAndGet(clock.wallTime(), n)
}

override def hasCustomEncoding: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ class StreamMetadataSuite extends FunSuite {
val clock = new ManualClock()
val step = 60_000L
val meta =
StreamMetadata("id", "addr", new StepLong(0, clock, step), new StepLong(0, clock, step))
StreamMetadata(
"id",
"addr",
clock,
new StepLong(0, clock, step),
new StepLong(0, clock, step)
)
meta.updateReceived(100)
meta.updateDropped(2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class SubscriptionManagerSuite extends FunSuite {

val sm = new SubscriptionManager[Integer](registry)
val meta =
StreamMetadata("a", "test", new StepLong(0, clock, step), new StepLong(0, clock, step))
StreamMetadata("a", "test", clock, new StepLong(0, clock, step), new StepLong(0, clock, step))
assert(sm.register(meta, 1))

val ok = Id.create("atlas.lwcapi.currentStreams").withTag("state", "ok")
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ object Dependencies {
object Versions {
val pekko = "1.0.3"
val pekkoHttpV = "1.0.1"
val iep = "5.0.26"
val iep = "5.0.27"
val jackson = "2.18.0"
val log4j = "2.24.1"
val scala = "2.13.15"
val slf4j = "2.0.16"
val spectator = "1.7.19"
val spectator = "1.8.0"
val spring = "6.1.12"

val crossScala = Seq(scala, "3.4.1")
Expand Down

0 comments on commit eda7a91

Please sign in to comment.