From 682ecb30af5c3f18d5fbb16f744db7114a254af1 Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 26 Sep 2019 17:01:32 +1000 Subject: [PATCH] Bring the template up to date The Scala g8 template was way out of date. --- src/main/g8/backend/build.sbt | 34 +++--- src/main/g8/backend/project/plugins.sbt | 2 +- .../$deviceType__Camel$Reading.scala | 56 +++++---- .../$deviceType__Camel$MetaFilter.scala | 115 ------------------ .../$deviceType__Camel$Transformer.scala | 34 +++--- .../ui/$deviceType__Camel$Server.scala | 46 ++++--- .../$deviceType__Camel$ReadingTest.scala | 26 ++-- .../$deviceType__Camel$TransformerTest.scala | 2 +- .../ui/$deviceType__Camel$ServiceTest.scala | 2 +- .../ui/EndDeviceServiceTest.scala | 2 +- .../ui/LatestEndDeviceEventsTest.scala | 10 +- .../ui/LatestReadingsTest.scala | 3 +- .../ui/ServerSentEventMarshallerTest.scala | 5 +- 13 files changed, 121 insertions(+), 216 deletions(-) delete mode 100644 src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$MetaFilter.scala diff --git a/src/main/g8/backend/build.sbt b/src/main/g8/backend/build.sbt index 1ee0d99..91f2e29 100644 --- a/src/main/g8/backend/build.sbt +++ b/src/main/g8/backend/build.sbt @@ -11,18 +11,16 @@ lazy val root = .settings(settings) .settings( libraryDependencies ++= Seq( + library.akkaHttpSprayJson, library.fs, library.loraControlPlane, library.loraStreams, library.ioxSss, - library.jaegerTracing, library.sprayJson, library.streambedCore, library.streambedDurableQueueRemoteClient, library.streambedHttp, - library.streambedIdentity, library.akkaTestkit % Test, - library.scalaCheck % Test, library.streambedTestKit % Test, library.utest % Test ), @@ -45,25 +43,23 @@ lazy val root = lazy val library = new { object Version { - val akka = "2.5.19" - val loraSdk = "0.12.0" + val akka = "2.5.23" + val akkaHttp = "10.1.8" val scalaCheck = "1.14.0" val sprayJson = "1.3.5" - val streambed = "0.24.8" - val utest = "0.6.4" + val streambed = "0.41.1" + val utest = "0.6.9" } - val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Version.akka - val fs = "com.cisco.streambed" %% "fs" % Version.streambed - val loraControlPlane = "com.cisco.streambed.lora" %% "lora-control-plane" % Version.loraSdk - val loraStreams = "com.cisco.streambed.lora" %% "lora-streams" % Version.loraSdk - val ioxSss = "com.cisco.streambed" %% "iox-sss" % Version.streambed - val jaegerTracing = "com.cisco.streambed" %% "jaeger-tracing" % Version.streambed - val scalaCheck = "org.scalacheck" %% "scalacheck" % Version.scalaCheck - val sprayJson = "io.spray" %% "spray-json" % Version.sprayJson - val streambedCore = "com.cisco.streambed" %% "streambed-core" % Version.streambed + val akkaHttpSprayJson = "com.typesafe.akka" %% "akka-http-spray-json" % Version.akkaHttp + val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Version.akka + val fs = "com.cisco.streambed" %% "fs" % Version.streambed + val loraControlPlane = "com.cisco.streambed" %% "lora-control-plane" % Version.streambed + val loraStreams = "com.cisco.streambed" %% "lora-streams" % Version.streambed + val ioxSss = "com.cisco.streambed" %% "iox-sss" % Version.streambed + val sprayJson = "io.spray" %% "spray-json" % Version.sprayJson + val streambedCore = "com.cisco.streambed" %% "streambed-core" % Version.streambed val streambedDurableQueueRemoteClient = "com.cisco.streambed" %% "streambed-durable-queue-remote-client" % Version.streambed val streambedHttp = "com.cisco.streambed" %% "streambed-http" % Version.streambed - val streambedIdentity = "com.cisco.streambed" %% "streambed-identity" % Version.streambed val streambedTestKit = "com.cisco.streambed" %% "streambed-testkit" % Version.streambed val utest = "com.lihaoyi" %% "utest" % Version.utest } @@ -74,11 +70,11 @@ lazy val library = lazy val settings = Seq( - scalaVersion := "2.12.8", + scalaVersion := "2.12.9", organization := "$organization;format="package"$", organizationName := "$organizationName$", startYear := Some(2018), - headerLicense := Some(HeaderLicense.Custom("Copyright (c) $organizationName$, 2018")), + headerLicense := Some(HeaderLicense.Custom("Copyright (c) $organizationName$, 2019")), scalacOptions ++= Seq( "-unchecked", "-deprecation", diff --git a/src/main/g8/backend/project/plugins.sbt b/src/main/g8/backend/project/plugins.sbt index aa4a175..d015984 100644 --- a/src/main/g8/backend/project/plugins.sbt +++ b/src/main/g8/backend/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6") -addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.4.0") +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.0") addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3") addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "4.1.0") diff --git a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$Reading.scala b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$Reading.scala index 78f9e62..5e999df 100644 --- a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$Reading.scala +++ b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$Reading.scala @@ -9,11 +9,9 @@ import java.util.function import akka.NotUsed import akka.stream.javadsl.{ Flow => JFlow } import akka.stream.scaladsl.Flow -import akka.stream.{ ActorAttributes, Supervision } import akka.util.ByteString import com.cisco.streambed.durablequeue.DurableQueue -import com.cisco.streambed.identity.Principal -import com.cisco.streambed.identity.streams.Streams +import com.cisco.streambed.identity.{Crypto, Principal} import spray.json._ import scala.compat.java8.FunctionConverters._ @@ -77,19 +75,27 @@ object $deviceType;format="Camel"$Reading { implicit ec: ExecutionContext ): Flow[DurableQueue.Received, ($deviceType;format="Camel"$Reading, Long), NotUsed] = Flow[DurableQueue.Received] - .map { + .mapAsync(1) { case DurableQueue.Received(_, encryptedData, o, _, _) => - ((getSecret($deviceType;format="Camel"$Key), encryptedData), o) + Crypto + .decrypt(getSecret($deviceType;format="Camel"$Key), encryptedData) + .filter(!_.left.exists(_ == Principal.Unauthenticated)) + .map(_ -> o) } - .via(Streams.decrypter) - .map { - case (data, o) => - import $deviceType;format="Camel"$ReadingJsonProtocol._ - (data.utf8String.parseJson.convertTo[$deviceType;format="Camel"$Reading], o) + .collect { + case (Right(data), o) => + try { + import $deviceType;format="Camel"$ReadingJsonProtocol._ + Some((data.utf8String.parseJson.convertTo[$deviceType;format="Camel"$Reading], o)) + } catch { + case _: DeserializationException | _: JsonParser.ParsingException | + _: NumberFormatException => + None + } + } + .collect { + case Some(e) => e } - .withAttributes( - ActorAttributes.supervisionStrategy(Supervision.resumingDecider) - ) /** * Conveniently tail, decrypt and decode readings. Yields the reading and its offset. @@ -118,21 +124,19 @@ object $deviceType;format="Camel"$Reading { (reading.nwkAddr, reading.toJson.compactPrint, carry) } - .map { + .mapAsync(1) { case (nwkAddr, decryptedData, carry) => - ((getSecret($deviceType;format="Camel"$Key), ByteString(decryptedData)), - (nwkAddr, carry)) + Crypto + .encrypt(getSecret($deviceType;format="Camel"$Key), ByteString(decryptedData)) + .collect { + case Right(bytes) => + DurableQueue.CommandRequest( + DurableQueue.Send(nwkAddr, bytes, $deviceType;format="Camel"$DataUpJsonTopic), + carry + ) + } } - .via(Streams.encrypter) - .map { - case (bytes, (nwkAddr, carry)) => - DurableQueue.CommandRequest( - DurableQueue.Send(nwkAddr, bytes, $deviceType;format="Camel"$DataUpJsonTopic), - carry - ) - } - - + /** * A convenience function for encoding `$deviceType;format="Camel"$Reading` instances, encrypting them and then * publishing to a queue. diff --git a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$MetaFilter.scala b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$MetaFilter.scala deleted file mode 100644 index a596c98..0000000 --- a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$MetaFilter.scala +++ /dev/null @@ -1,115 +0,0 @@ -package $organization;format="package"$.$deviceType;format="camel"$ -package transformer - -import akka.NotUsed -import akka.stream.scaladsl.{ Flow, Source } -import akka.stream.{ ActorAttributes, Materializer, Supervision } -import com.cisco.streambed.UuidOps -import com.cisco.streambed.HexString -import com.cisco.streambed.durablequeue.DurableQueue -import com.cisco.streambed.durablequeue.opentracing.Headers -import com.cisco.streambed.identity.Principal -import com.cisco.streambed.identity.streams.Streams -import com.cisco.streambed.lora.controlplane.EndDeviceEvents -import io.opentracing.{ References, Span, Tracer } -import spray.json._ - -/** - * Run the filtering process to filter through end-device-events - * meta data for $deviceType$ events only - the events get published to a - * new topic and encrypted under the $deviceType$ key. - */ -object $deviceType;format="Camel"$MetaFilter { - - /** - * The topic to publish end device events pertaining to $deviceType$ - */ - val $deviceType;format="Camel"$EventsTopic: DurableQueue.Topic = - "$deviceType;format="norm"$-events" - - /** - * Provides a source to perform the meta data filtering. - */ - def source( - durableQueue: DurableQueue, - getSecret: Principal.GetSecret, - tracer: Tracer - )(implicit mat: Materializer): Source[Span, NotUsed] = { - import mat.executionContext - val metaFilter = Flow[DurableQueue.Event] - .named("$deviceType;format="norm"$-meta") - .log("$deviceType;format="norm"$-meta", identity) - .map { - case DurableQueue.Received(key, data, _, headers, _) => ((key, data), headers) - } - .map { case (data, headers) => (data, Headers.spanContext(headers, tracer)) } - .map { - case (received, spanContext) => - val span = { - val scope = - tracer - .buildSpan("$deviceType;format="norm"$-event-filtering") - .addReference(References.FOLLOWS_FROM, spanContext) - .startActive(false) - try { - scope.span() - } finally { - scope.close() - } - } - received -> span - } - .map { - case ((nwkAddr, data), span) => - val path = EndDeviceEvents.EventKey + "." + HexString.intToHex(nwkAddr.toInt) - ((getSecret(path), data), (nwkAddr, span)) - } - .via(Streams.decrypter) - .filter { - case (decryptedData, _) => - decryptedData.headOption - .contains('{') // If we cannot decrypt then the sensor doesn't belong to us - } - .filter { - case (decryptedData, _) => - // We only publish certain events - other data could also be quite sensitive e.g. secrets, counters etc. - import DefaultJsonProtocol._ - decryptedData.utf8String.toJson.asJsObject - .getFields("type") - .headOption match { - case Some(eventType) - if eventType == EndDeviceEvents.EventJsonProtocol.BatteryLevelUpdatedField || - eventType == EndDeviceEvents.EventJsonProtocol.NameUpdatedField || - eventType == EndDeviceEvents.EventJsonProtocol.NwkAddrRemovedField || - eventType == EndDeviceEvents.EventJsonProtocol.NwkAddrUpdatedField || - eventType == EndDeviceEvents.EventJsonProtocol.PositionUpdatedField => - true - case _ => false - } - } - .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) - .map { - case (decryptedData, carry) => - ((getSecret($deviceType;format="Camel"$Reading.$deviceType;format="Camel"$Key), decryptedData), carry) - } - .via(Streams.encrypter) - .map { - case (encryptedData, (key, span)) => - DurableQueue.CommandRequest(DurableQueue.Send(key, - encryptedData, - $deviceType;format="Camel"$EventsTopic, - Headers.headers(span.context(), - tracer)), - span) - } - .via(durableQueue.flow) - .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(span)) => span } - .wireTap(span => tracer.scopeManager().activate(span, true).close()) - durableQueue - .resumableSource( - EndDeviceEvents.EventTopic, - UuidOps.v5($deviceType;format="Camel"$MetaFilter.getClass), - metaFilter - ) - } -} diff --git a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$Transformer.scala b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$Transformer.scala index 1caf036..e11051d 100644 --- a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$Transformer.scala +++ b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$Transformer.scala @@ -5,7 +5,7 @@ import java.time.Instant import akka.NotUsed import akka.stream.Materializer -import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.scaladsl.Source import com.cisco.streambed.UuidOps import com.cisco.streambed.durablequeue.DurableQueue import com.cisco.streambed.durablequeue.opentracing.Headers @@ -37,11 +37,20 @@ object $deviceType;format="Camel"$Transformer { implicit mat: Materializer ): Source[Span, NotUsed] = { import mat.executionContext - val transform = Flow[DurableQueue.Event] + val topic = $deviceType;format="Camel"$DataUpMacPayloadTopic + val id = UuidOps.v5($deviceType;format="Camel"$Transformer.getClass) + Source + .fromFuture(durableQueue.offset(topic, id)) + .flatMapConcat { offset => + durableQueue + .source(topic, offset, finite = false) + .dropWhile(r => offset.contains(r.offset)) + } .named("$deviceType;format="norm"$-transformer") .log("$deviceType;format="norm"$-transformer", identity) - .map { case DurableQueue.Received(_, data, _, headers, _) => data -> headers } - .map { case (data, headers) => data -> Headers.spanContext(headers, tracer) } + .map { received => + received -> Headers.spanContext(received.headers, tracer) + } .map { case (received, spanContext) => val span = { @@ -56,24 +65,19 @@ object $deviceType;format="Camel"$Transformer { scope.close() } } - received -> span + received.data -> (received, span) } .via(LoRaStreams.dataUpDecoder(getSecret)) .map { - case ((nwkAddr, payload), span) => + case ((nwkAddr, _, payload), carry) => ($deviceType;format="Camel"$Reading(Instant.now(), nwkAddr, payload.toArray), - span) + carry) } - .collect { case (Some(reading), span) => (reading, span) } + .collect { case (Some(reading), carry) => (reading, carry) } .via($deviceType;format="Camel"$Reading.appender(getSecret)) .via(durableQueue.flow) - .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(span)) => span } + .collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry } + .via(durableQueue.commit(id)) .wireTap(span => tracer.scopeManager().activate(span, true).close()) - durableQueue - .resumableSource( - $deviceType;format="Camel"$DataUpMacPayloadTopic, - UuidOps.v5($deviceType;format="Camel"$MetaFilter.getClass), - transform - ) } } diff --git a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$Server.scala b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$Server.scala index 732066c..1dea59c 100644 --- a/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$Server.scala +++ b/src/main/g8/backend/src/main/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$Server.scala @@ -4,12 +4,14 @@ import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.{RestartSource, Sink} import $organization;format="package"$.$deviceType;format="camel"$.transformer._ +import au.com.titanclass.streams.telemetry.{MetricsReporter, TracingReporter} +import com.cisco.streambed.telemetry.{TelemetryReporter, TracerConfig} +import com.codahale.metrics.MetricFilter import com.cisco.streambed.durablequeue.remote.DurableQueueProvider import com.cisco.streambed.http.HttpServerConfig import com.cisco.streambed.http.identity.UserIdentityService import com.cisco.streambed.identity.iox.SecretStoreProvider import com.cisco.streambed.storage.fs.RawStorageProvider -import com.cisco.streambed.tracing.jaeger.TracerConfig import com.cisco.streambed.{Application, ApplicationContext, ApplicationProcess} import java.util.concurrent.TimeUnit import scala.concurrent.ExecutionContext @@ -48,7 +50,32 @@ object $deviceType;format="Camel"$Server implicit val mat: Materializer = context.mat implicit val system: ActorSystem = context.system - val tracer = TracerConfig.tracer(context.config) + val metricsReporter = + new MetricsReporter(applicationContext.metricRegistry, + MetricFilter.ALL, + TimeUnit.HOURS, + TimeUnit.MILLISECONDS, + Some(applicationContext.reportingExecutor)) + + metricsReporter.start(1, TimeUnit.MINUTES) + + val tracingReporter = new TracingReporter(1) + val tracer = TracerConfig.tracer(applicationContext.config, + _.withReporter(tracingReporter)) + + { + val _ = + TelemetryReporter + .report(applicationContext.config, + metricsReporter.source, + tracingReporter.source) + .foreach { bs => + bs.foreach { b => + context.system.log.info("Telemetry listening on {}", b) + } + } + } + val userIdentityService = UserIdentityService(context)(context.system) val maxSensors = context.config.getInt( @@ -71,21 +98,12 @@ object $deviceType;format="Camel"$Server context.config.getDouble( "$deviceType;format="norm"$.backoff-random-factor") - { - val _ = RestartSource - .withBackoff(minBackoff, maxBackoff, backoffRandomFactor)( - () => - $deviceType;format="Camel"$MetaFilter - .source(context.durableQueue, context.getSecret, tracer)) - .runWith(Sink.ignore) - } - { val _ = RestartSource .withBackoff(minBackoff, maxBackoff, backoffRandomFactor)( () => $deviceType;format="Camel"$Transformer - .source(context.durableQueue, context.getSecret, tracer)) + .source(context.durableQueue, context.principal.getSecret, tracer)) .runWith(Sink.ignore) } @@ -95,7 +113,7 @@ object $deviceType;format="Camel"$Server () => $deviceType;format="Camel"$Service .latestReadings(context.durableQueue, - context.getSecret, + context.principal.getSecret, context.storage, finite = false, maxSensors, @@ -109,7 +127,7 @@ object $deviceType;format="Camel"$Server () => EndDeviceService .events(context.durableQueue, - context.getSecret, + context.principal.getSecret, context.storage, finite = false, maxSensors, diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$ReadingTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$ReadingTest.scala index 2b28c9b..b3dae97 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$ReadingTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/$deviceType__Camel$ReadingTest.scala @@ -2,15 +2,13 @@ package $organization;format="package"$.$deviceType;format="camel"$ import java.time.Instant -import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl.{ Sink, Source } import akka.stream.{ ActorMaterializer, Materializer } import akka.util.ByteString import com.cisco.streambed.HexString import com.cisco.streambed.durablequeue.DurableQueue -import com.cisco.streambed.identity.Principal -import com.cisco.streambed.identity.streams.Streams +import com.cisco.streambed.identity.{Crypto, Principal} import spray.json._ import utest._ @@ -74,17 +72,19 @@ object $deviceType;format="Camel"$ReadingTest extends TestSuite { .single( s"""{"time":"1970-01-01T00:00:00Z","nwkAddr":1,"temperature":\$temp,"moisturePercentage":\$moisture}""" ) - .map { data => - ((getSecret($deviceType;format="Camel"$Reading.$deviceType;format="Camel"$Key), ByteString(data)), NotUsed) + .mapAsync(1) { decryptedData => + Crypto + .encrypt(getSecret($deviceType;format="Camel"$Reading.$deviceType;format="Camel"$Key), ByteString(decryptedData)) + .collect { + case Right(bytes) => bytes + } } - .via(Streams.encrypter) - .map { - case (encryptedData, _) => - DurableQueue.Received(nwkAddr, - encryptedData, - 0, - DurableQueue.EmptyHeaders, - $deviceType;format="Camel"$Reading.$deviceType;format="Camel"$DataUpJsonTopic) + .map { encryptedData => + DurableQueue.Received(nwkAddr, + encryptedData, + 0, + DurableQueue.EmptyHeaders, + $deviceType;format="Camel"$Reading.$deviceType;format="Camel"$DataUpJsonTopic) } .via($deviceType;format="Camel"$Reading.tailer(getSecret)) .runWith(Sink.head) diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$TransformerTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$TransformerTest.scala index e4cfa33..5999992 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$TransformerTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/transformer/$deviceType__Camel$TransformerTest.scala @@ -33,7 +33,7 @@ object $deviceType;format="Camel"$TransformerTest extends TestSuite { val tests = Tests { 'transform - { - val durableQueue = InMemoryQueue.queue() + val durableQueue = InMemoryQueue.queue("some-namespace") val encryptionKey = "2B7E151628AED2A6ABF7158809CF4F3C" // Used for encrypting/decrypting everything val getSecret: Principal.GetSecret = { _ => Future.successful( diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$ServiceTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$ServiceTest.scala index b08fcef..65d5534 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$ServiceTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/$deviceType__Camel$ServiceTest.scala @@ -40,7 +40,7 @@ object $deviceType;format="Camel"$ServiceTest extends TestSuite { ) } - val durableQueue = InMemoryQueue.queue() + val durableQueue = InMemoryQueue.queue("some-namespace") val storage = StorageOps.storage() diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/EndDeviceServiceTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/EndDeviceServiceTest.scala index 668bda5..2d9fbcc 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/EndDeviceServiceTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/EndDeviceServiceTest.scala @@ -41,7 +41,7 @@ object EndDeviceServiceTest extends TestSuite { ) } - val durableQueue = InMemoryQueue.queue() + val durableQueue = InMemoryQueue.queue("some-namespace") val storage = StorageOps.storage() diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestEndDeviceEventsTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestEndDeviceEventsTest.scala index 07958fb..33d7207 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestEndDeviceEventsTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestEndDeviceEventsTest.scala @@ -34,8 +34,7 @@ object LatestEndDeviceEventsTest extends TestSuite with TestKitBase { 'encodeDecode - { 'one - testStateCodec( LatestEndDeviceEvents, - LatestEndDeviceEvents(None, List.empty), - ByteString("""{"events":[]}""") + LatestEndDeviceEvents(None, List.empty) ) 'two - testStateCodec( @@ -46,14 +45,12 @@ object LatestEndDeviceEventsTest extends TestSuite with TestKitBase { NwkAddrUpdated(1, DevEUI(1)), PositionUpdated(1, Instant.ofEpochSecond(0), LatLng(-10, 10, None)) ) - ), - ByteString( - """{"offset":45,"events":[{"nwkAddr":1,"devEUI":1,"type":"NwkAddrUpdated"},{"nwkAddr":1,"time":"1970-01-01T00:00:00Z","position":{"lat":-10,"lng":10},"type":"PositionUpdated"}]}""") + ) ) } } - def testStateCodec[T](codec: StateCodec[T], value: T, expected: ByteString)( + def testStateCodec[T](codec: StateCodec[T], value: T)( implicit ec: ExecutionContext, mat: Materializer): Future[Unit] = for { @@ -61,7 +58,6 @@ object LatestEndDeviceEventsTest extends TestSuite with TestKitBase { encodedData <- encodedSource.runFold(ByteString.empty)(_ ++ _) decoded <- codec.decode(Source.single(encodedData)) } yield { - encodedData ==> expected decoded ==> value } } diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestReadingsTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestReadingsTest.scala index 8270385..e9930a3 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestReadingsTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/LatestReadingsTest.scala @@ -10,6 +10,7 @@ import com.cisco.streambed.storage.StateCodec import java.time.Instant import scala.concurrent.{ExecutionContext, Future} import utest._ +import spray.json._ object LatestReadingsTest extends TestSuite with TestKitBase { override implicit lazy val system: ActorSystem = @@ -41,7 +42,7 @@ object LatestReadingsTest extends TestSuite with TestKitBase { ) ), ByteString( - """{"offset":45,"readings":[{"time":"1970-01-01T00:00:00Z","nwkAddr":1,"temperature":5.5,"moisturePercentage":1.5},{"time":"1970-01-01T00:00:00Z","nwkAddr":2,"temperature":8.5,"moisturePercentage":2.5}]}""") + """{"offset":45,"readings":[{"time":"1970-01-01T00:00:00Z","nwkAddr":1,"temperature":5.5,"moisturePercentage":1.5},{"time":"1970-01-01T00:00:00Z","nwkAddr":2,"temperature":8.5,"moisturePercentage":2.5}]}""".parseJson.compactPrint) ) } } diff --git a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/ServerSentEventMarshallerTest.scala b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/ServerSentEventMarshallerTest.scala index 615d45f..d44c676 100644 --- a/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/ServerSentEventMarshallerTest.scala +++ b/src/main/g8/backend/src/test/scala/$organization__packaged$/$deviceType__camel$/ui/ServerSentEventMarshallerTest.scala @@ -5,6 +5,7 @@ import akka.http.scaladsl.model.sse.ServerSentEvent import com.cisco.streambed.lora.controlplane.EndDeviceEvents._ import java.time.Instant import utest._ +import spray.json._ object ServerSentEventMarshallerTest extends TestSuite { val tests = Tests { @@ -14,7 +15,7 @@ object ServerSentEventMarshallerTest extends TestSuite { ServerSentEventMarshaller($deviceType;format="Camel"$Reading(now, 1, BigDecimal("5.5"), BigDecimal("1.5")), 123) ==> ServerSentEvent( - """{"time":"1970-01-01T00:00:00Z","nwkAddr":1,"temperature":5.5,"moisturePercentage":1.5}""", + """{"time":"1970-01-01T00:00:00Z","nwkAddr":1,"temperature":5.5,"moisturePercentage":1.5}""".parseJson.compactPrint, "$deviceType;format="Camel"$Reading", "123") } @@ -25,7 +26,7 @@ object ServerSentEventMarshallerTest extends TestSuite { ServerSentEventMarshaller( PositionUpdated(1, now, LatLng(BigDecimal(12), BigDecimal(34), None)), 123) ==> ServerSentEvent( - """{"nwkAddr":1,"time":"1970-01-01T00:00:00Z","position":{"lat":12,"lng":34},"type":"PositionUpdated"}""", + """{"nwkAddr":1,"time":"1970-01-01T00:00:00Z","position":{"lat":12,"lng":34},"type":"PositionUpdated"}""".parseJson.compactPrint, "PositionUpdated", "123" )