Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bring the template up to date #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions src/main/g8/backend/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,16 @@ lazy val root =
.settings(settings)
.settings(
libraryDependencies ++= Seq(
library.akkaHttpSprayJson,
library.fs,
library.loraControlPlane,
library.loraStreams,
library.ioxSss,
library.jaegerTracing,
library.sprayJson,
library.streambedCore,
library.streambedDurableQueueRemoteClient,
library.streambedHttp,
library.streambedIdentity,
library.akkaTestkit % Test,
library.scalaCheck % Test,
library.streambedTestKit % Test,
library.utest % Test
),
Expand All @@ -45,25 +43,23 @@ lazy val root =
lazy val library =
new {
object Version {
val akka = "2.5.19"
val loraSdk = "0.12.0"
val akka = "2.5.23"
val akkaHttp = "10.1.8"
val scalaCheck = "1.14.0"
val sprayJson = "1.3.5"
val streambed = "0.24.8"
val utest = "0.6.4"
val streambed = "0.41.1"
val utest = "0.6.9"
}
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Version.akka
val fs = "com.cisco.streambed" %% "fs" % Version.streambed
val loraControlPlane = "com.cisco.streambed.lora" %% "lora-control-plane" % Version.loraSdk
val loraStreams = "com.cisco.streambed.lora" %% "lora-streams" % Version.loraSdk
val ioxSss = "com.cisco.streambed" %% "iox-sss" % Version.streambed
val jaegerTracing = "com.cisco.streambed" %% "jaeger-tracing" % Version.streambed
val scalaCheck = "org.scalacheck" %% "scalacheck" % Version.scalaCheck
val sprayJson = "io.spray" %% "spray-json" % Version.sprayJson
val streambedCore = "com.cisco.streambed" %% "streambed-core" % Version.streambed
val akkaHttpSprayJson = "com.typesafe.akka" %% "akka-http-spray-json" % Version.akkaHttp
val akkaTestkit = "com.typesafe.akka" %% "akka-testkit" % Version.akka
val fs = "com.cisco.streambed" %% "fs" % Version.streambed
val loraControlPlane = "com.cisco.streambed" %% "lora-control-plane" % Version.streambed
val loraStreams = "com.cisco.streambed" %% "lora-streams" % Version.streambed
val ioxSss = "com.cisco.streambed" %% "iox-sss" % Version.streambed
val sprayJson = "io.spray" %% "spray-json" % Version.sprayJson
val streambedCore = "com.cisco.streambed" %% "streambed-core" % Version.streambed
val streambedDurableQueueRemoteClient = "com.cisco.streambed" %% "streambed-durable-queue-remote-client" % Version.streambed
val streambedHttp = "com.cisco.streambed" %% "streambed-http" % Version.streambed
val streambedIdentity = "com.cisco.streambed" %% "streambed-identity" % Version.streambed
val streambedTestKit = "com.cisco.streambed" %% "streambed-testkit" % Version.streambed
val utest = "com.lihaoyi" %% "utest" % Version.utest
}
Expand All @@ -74,11 +70,11 @@ lazy val library =

lazy val settings =
Seq(
scalaVersion := "2.12.8",
scalaVersion := "2.12.9",
organization := "$organization;format="package"$",
organizationName := "$organizationName$",
startYear := Some(2018),
headerLicense := Some(HeaderLicense.Custom("Copyright (c) $organizationName$, 2018")),
headerLicense := Some(HeaderLicense.Custom("Copyright (c) $organizationName$, 2019")),
scalacOptions ++= Seq(
"-unchecked",
"-deprecation",
Expand Down
2 changes: 1 addition & 1 deletion src/main/g8/backend/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.4.0")
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.6")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "4.1.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ import java.util.function
import akka.NotUsed
import akka.stream.javadsl.{ Flow => JFlow }
import akka.stream.scaladsl.Flow
import akka.stream.{ ActorAttributes, Supervision }
import akka.util.ByteString
import com.cisco.streambed.durablequeue.DurableQueue
import com.cisco.streambed.identity.Principal
import com.cisco.streambed.identity.streams.Streams
import com.cisco.streambed.identity.{Crypto, Principal}
import spray.json._

import scala.compat.java8.FunctionConverters._
Expand Down Expand Up @@ -77,19 +75,27 @@ object $deviceType;format="Camel"$Reading {
implicit ec: ExecutionContext
): Flow[DurableQueue.Received, ($deviceType;format="Camel"$Reading, Long), NotUsed] =
Flow[DurableQueue.Received]
.map {
.mapAsync(1) {
case DurableQueue.Received(_, encryptedData, o, _, _) =>
((getSecret($deviceType;format="Camel"$Key), encryptedData), o)
Crypto
.decrypt(getSecret($deviceType;format="Camel"$Key), encryptedData)
.filter(!_.left.exists(_ == Principal.Unauthenticated))
.map(_ -> o)
}
.via(Streams.decrypter)
.map {
case (data, o) =>
import $deviceType;format="Camel"$ReadingJsonProtocol._
(data.utf8String.parseJson.convertTo[$deviceType;format="Camel"$Reading], o)
.collect {
case (Right(data), o) =>
try {
import $deviceType;format="Camel"$ReadingJsonProtocol._
Some((data.utf8String.parseJson.convertTo[$deviceType;format="Camel"$Reading], o))
} catch {
case _: DeserializationException | _: JsonParser.ParsingException |
_: NumberFormatException =>
None
}
}
.collect {
case Some(e) => e
}
.withAttributes(
ActorAttributes.supervisionStrategy(Supervision.resumingDecider)
)

/**
* Conveniently tail, decrypt and decode readings. Yields the reading and its offset.
Expand Down Expand Up @@ -118,21 +124,19 @@ object $deviceType;format="Camel"$Reading {

(reading.nwkAddr, reading.toJson.compactPrint, carry)
}
.map {
.mapAsync(1) {
case (nwkAddr, decryptedData, carry) =>
((getSecret($deviceType;format="Camel"$Key), ByteString(decryptedData)),
(nwkAddr, carry))
Crypto
.encrypt(getSecret($deviceType;format="Camel"$Key), ByteString(decryptedData))
.collect {
case Right(bytes) =>
DurableQueue.CommandRequest(
DurableQueue.Send(nwkAddr, bytes, $deviceType;format="Camel"$DataUpJsonTopic),
carry
)
}
}
.via(Streams.encrypter)
.map {
case (bytes, (nwkAddr, carry)) =>
DurableQueue.CommandRequest(
DurableQueue.Send(nwkAddr, bytes, $deviceType;format="Camel"$DataUpJsonTopic),
carry
)
}



/**
* A convenience function for encoding `$deviceType;format="Camel"$Reading` instances, encrypting them and then
* publishing to a queue.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.time.Instant

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.scaladsl.Source
import com.cisco.streambed.UuidOps
import com.cisco.streambed.durablequeue.DurableQueue
import com.cisco.streambed.durablequeue.opentracing.Headers
Expand Down Expand Up @@ -37,11 +37,20 @@ object $deviceType;format="Camel"$Transformer {
implicit mat: Materializer
): Source[Span, NotUsed] = {
import mat.executionContext
val transform = Flow[DurableQueue.Event]
val topic = $deviceType;format="Camel"$DataUpMacPayloadTopic
val id = UuidOps.v5($deviceType;format="Camel"$Transformer.getClass)
Source
.fromFuture(durableQueue.offset(topic, id))
.flatMapConcat { offset =>
durableQueue
.source(topic, offset, finite = false)
.dropWhile(r => offset.contains(r.offset))
}
.named("$deviceType;format="norm"$-transformer")
.log("$deviceType;format="norm"$-transformer", identity)
.map { case DurableQueue.Received(_, data, _, headers, _) => data -> headers }
.map { case (data, headers) => data -> Headers.spanContext(headers, tracer) }
.map { received =>
received -> Headers.spanContext(received.headers, tracer)
}
.map {
case (received, spanContext) =>
val span = {
Expand All @@ -56,24 +65,19 @@ object $deviceType;format="Camel"$Transformer {
scope.close()
}
}
received -> span
received.data -> (received, span)
}
.via(LoRaStreams.dataUpDecoder(getSecret))
.map {
case ((nwkAddr, payload), span) =>
case ((nwkAddr, _, payload), carry) =>
($deviceType;format="Camel"$Reading(Instant.now(), nwkAddr, payload.toArray),
span)
carry)
}
.collect { case (Some(reading), span) => (reading, span) }
.collect { case (Some(reading), carry) => (reading, carry) }
.via($deviceType;format="Camel"$Reading.appender(getSecret))
.via(durableQueue.flow)
.collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(span)) => span }
.collect { case DurableQueue.CommandReply(DurableQueue.SendAck, Some(carry)) => carry }
.via(durableQueue.commit(id))
.wireTap(span => tracer.scopeManager().activate(span, true).close())
durableQueue
.resumableSource(
$deviceType;format="Camel"$DataUpMacPayloadTopic,
UuidOps.v5($deviceType;format="Camel"$MetaFilter.getClass),
transform
)
}
}
Loading