Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Jul 27, 2017
1 parent 836b2dc commit e334b19
Show file tree
Hide file tree
Showing 71 changed files with 4,169 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.lightbend.modelserver.store;

import com.lightbend.model.Model;
import com.lightbend.model.ModelFactory;
import com.lightbend.model.java.Model;
import com.lightbend.model.java.ModelFactory;
import com.lightbend.model.Modeldescriptor;
import com.lightbend.model.PMML.PMMLModelFactory;
import com.lightbend.model.tensorflow.TensorflowModelFactory;
import com.lightbend.model.java.PMML.PMMLModelFactory;
import com.lightbend.model.java.tensorflow.TensorflowModelFactory;
import com.lightbend.queriablestate.ModelServingInfo;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lightbend.modelserver.store;

import com.lightbend.model.Model;
import com.lightbend.model.java.Model;
import com.lightbend.queriablestate.ModelServingInfo;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lightbend.modelserver.store;

import com.lightbend.model.Model;
import com.lightbend.model.java.Model;
import com.lightbend.queriablestate.ModelServingInfo;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.lightbend.modelserver.withstore;

import com.lightbend.model.DataConverter;
import com.lightbend.model.Winerecord;
import com.lightbend.modelserver.store.ModelStateStore;
import com.lightbend.modelserver.support.java.DataConverter;
import com.lightbend.queriablestate.ModelServingInfo;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.lightbend.modelserver.withstore;

import com.lightbend.model.*;
import com.lightbend.model.PMML.PMMLModelFactory;
import com.lightbend.model.tensorflow.TensorflowModelFactory;
import com.lightbend.model.Modeldescriptor;
import com.lightbend.model.java.Model;
import com.lightbend.model.java.ModelFactory;
import com.lightbend.model.java.PMML.PMMLModelFactory;
import com.lightbend.model.java.tensorflow.TensorflowModelFactory;
import com.lightbend.modelserver.store.ModelStateStore;
import com.lightbend.modelserver.support.java.DataConverter;
import com.lightbend.modelserver.support.java.ModelToServe;
import com.lightbend.queriablestate.ModelServingInfo;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -30,11 +34,11 @@ public class ModelProcessorWithStore extends AbstractProcessor<byte[], byte[]> {
@Override
public void process(byte[] key, byte[] value) {

Optional<CurrentModelDescriptor> descriptor = DataConverter.convertModel(value);
Optional<ModelToServe> descriptor = DataConverter.convertModel(value);
if(!descriptor.isPresent()){
return; // Bad record
}
CurrentModelDescriptor model = descriptor.get();
ModelToServe model = descriptor.get();
System.out.println("New scoring model " + model);
if(model.getModelData() == null) {
System.out.println("Location based model is not yet supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public List<HostStoreInfo> streamsMetadataForStore(@PathParam("storeName") Strin
@Path("{storeName}/value")
@Produces(MediaType.APPLICATION_JSON)
public ModelServingInfo servingInfo(@PathParam("storeName") final String storeName) {
// return ModelState.getInstance().getCurrentServingInfo();
// Get the Store
final ReadableModelStateStore store = streams.store(storeName, new ModelStateStore.ModelStateStoreType());
if (store == null) {
Expand Down
64 changes: 59 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,62 @@
# Kafka Streams Model server
# Model serving

This is a simple implementation of model serving using Kafka Streams
The basic idea behind this implementation is fairly straightforward - there are two streams:
This is an umbrella project for all things model serving that is comprised of multiple projects

-**Data Stream** - Kafka stream delivering data record as protobuf buffer (example, modeldescriptor.proto)
-**akkaserver** - implementation of model scoring and statistics serving using Akka streams and Akka HTTP

-**Model Stream** - Kafka stream delivering models as protobuf buffer (example, modeldescriptor.proto)

-**flinkserver** - implementation of model scoring and queryable state using Flink. Both
key-based and partition-based approach are implemented here

-**kafkaclient** - generic client used for testing of all implementations (except serving samples)
Reads data files, split them into records, converts to protobuf implementations and publishes them to Kafka

-**kafkaconfiguration** - simple module containing class with Kafka definitions - server location,
topics, etc. used by all applications

-**kafkastreamserver** - implementation of model scoring and queryable state using Kafka streams
Also includes implementation of custom Kafka streams store.

-**model** - implementation of support classes representing model and model factories used by all applications.
Because Kafka streams is Java and the rest of implementations are Scala, there are two versions of these
classes - Java and Scala

-**serving samples** - This module contains simple implementations of model scoring using PMML and
tensorflow model definitions. It is not using any streaming frameworks - just straight Scala code

-**protobufs** - a module containing protobufs that are used for all streaming frameworks.
This protobufs describe model and data definition in the stream. Because Kafka streams is Java
and the rest of implementations are Scala, both Java and Scala implementations of protobufs are
generated


-**sparkML** - examples of using SparkML for machine learning and exporting results to PMML
using JPMML evaluator for Spark - https://github.com/jpmml/jpmml-evaluator-spark

-**sparkserver** - implementation of model scoring using Spark

-**utils** - a module containing some utility code. Most importantly it contains embedded Kafka implementation
which can be used for testing in the absence of kafka server. In order to use it, just add these
lines to your code:


// Create embedded Kafka and topics
EmbeddedSingleNodeKafkaCluster.start() // Create and start the cluster
EmbeddedSingleNodeKafkaCluster.createTopic(DATA_TOPIC) // Add topic
EmbeddedSingleNodeKafkaCluster.createTopic(MODELS_TOPIC) // Add topic

If you are using both server and client add kafka embedded only to server and start it before the client
In addition to embedded kafka this module there are some utility classes used by all applications.
Because Kafka streams is Java and the rest of implementations are Scala, there are two versions of these
classes - Java and Scala

-**data** - a directory of data files used as sources for all applications

Not included in this project are:

-**Beam implementation** - Beam Flink runner is still on Scala 2.10 so it is in its own
separate project - ???

-**Python/Tensorflow/Keras** - is it is a Python so it is in its own
separate project - ???

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.lightbend.modelserver

import akka.actor.ActorSystem
import akka.http.scaladsl.server.Route
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.{ActorMaterializer, SourceShape}
import akka.stream.scaladsl.{GraphDSL, Sink, Source}
import akka.util.Timeout

import scala.concurrent.duration._
import com.lightbend.configuration.kafka.ApplicationKafkaParameters
import com.lightbend.configuration.kafka.ApplicationKafkaParameters.{DATA_GROUP, LOCAL_KAFKA_BROKER, MODELS_GROUP}
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.modelserver.kafka.EmbeddedSingleNodeKafkaCluster
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import akka.http.scaladsl.Http
import com.lightbend.modelserver.modelServer.ReadableModelStateStore
import com.lightbend.modelserver.queriablestate.QueriesAkkaHttpResource
import com.lightbend.modelserver.support.scala.{DataReader, ModelToServe}

/**
* Created by boris on 7/21/17.
*/
object AkkaModelServer {

implicit val system = ActorSystem("ModelServing")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val dataConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(LOCAL_KAFKA_BROKER)
.withGroupId(DATA_GROUP)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

val modelConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(LOCAL_KAFKA_BROKER)
.withGroupId(MODELS_GROUP)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

def main(args: Array[String]): Unit = {


import ApplicationKafkaParameters._

// Create embedded Kafka and topics
// EmbeddedSingleNodeKafkaCluster.start()
// EmbeddedSingleNodeKafkaCluster.createTopic(DATA_TOPIC)
// EmbeddedSingleNodeKafkaCluster.createTopic(MODELS_TOPIC)

val modelStream: Source[ModelToServe, Consumer.Control] =
Consumer.atMostOnceSource(modelConsumerSettings, Subscriptions.topics(MODELS_TOPIC))
.map(record => ModelToServe.fromByteArray(record.value())).filter(_.isSuccess).map(_.get)

val dataStream: Source[WineRecord, Consumer.Control] =
Consumer.atMostOnceSource(dataConsumerSettings, Subscriptions.topics(DATA_TOPIC))
.map(record => DataReader.fromByteArray(record.value())).filter(_.isSuccess).map(_.get)

val model = new ModelStage()

def keepModelMaterializedValue[M1, M2, M3](m1: M1, m2: M2, m3: M3): M3 = m3

val modelPredictions : Source[Option[Double], ReadableModelStateStore] = Source.fromGraph(
GraphDSL.create(dataStream, modelStream, model)(keepModelMaterializedValue) {
implicit builder => (d, m, w) =>
import GraphDSL.Implicits._

// wire together the input streams with the model stage (2 in, 1 out)
/*
dataStream --> | |
| model | -> predictions
modelStream -> | |
*/

d ~> w.dataRecordIn
m ~> w.modelRecordIn
SourceShape(w.scoringResultOut)
}
)


val materializedReadableModelStateStore: ReadableModelStateStore =
modelPredictions
.map(println(_))
.to(Sink.ignore) // we do not read the results directly
.run() // we run the stream, materializing the stage's StateStore

startRest(materializedReadableModelStateStore)
}

def startRest(service : ReadableModelStateStore) : Unit = {

implicit val timeout = Timeout(10 seconds)
val host = "localhost"
val port = 5000
val routes: Route = QueriesAkkaHttpResource.storeRoutes(service)

Http().bindAndHandle(routes, host, port) map
{ binding => println(s"REST interface bound to ${binding.localAddress}") } recover { case ex =>
println(s"REST interface could not bind to $host:$port", ex.getMessage)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.lightbend.modelServer.modelServer
package com.lightbend.modelserver

import akka.stream._
import akka.stream.stage.{ GraphStageLogicWithLogging, _ }
import akka.stream.stage.{GraphStageLogicWithLogging, _}
import com.lightbend.model.modeldescriptor.ModelDescriptor
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.modelServer.model.Model
import com.lightbend.modelServer.model.PMML.PMMLModel
import com.lightbend.modelServer.model.tensorflow.TensorFlowModel
import com.lightbend.modelServer.{ ModelToServe, ModelToServeStats }
import com.lightbend.model.scala.PMML.PMMLModel
import com.lightbend.model.scala.tensorflow.TensorFlowModel
import com.lightbend.modelserver.modelServer.ReadableModelStateStore
import com.lightbend.model.scala.Model
import com.lightbend.modelserver.support.scala.{ModelToServe, ModelToServeStats}

import scala.collection.immutable

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.lightbend.modelserver.modelServer

import com.lightbend.modelserver.support.scala.ModelToServeStats


/**
* Created by boris on 7/21/17.
*/
trait ReadableModelStateStore {
def getCurrentServingInfo: ModelToServeStats
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.lightbend.modelserver.queriablestate


import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.Directives._
import com.lightbend.modelserver.modelServer.ReadableModelStateStore
import com.lightbend.modelserver.support.scala.ModelToServeStats
import de.heikoseeberger.akkahttpjackson.JacksonSupport

object QueriesAkkaHttpResource extends JacksonSupport {

def storeRoutes(predictions: ReadableModelStateStore): Route =
get {
path("stats") {
val info: ModelToServeStats = predictions.getCurrentServingInfo
complete(info)
}
}
}
59 changes: 44 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,37 +1,66 @@

name := "KafkaStreamsModelServer"
name := "ModelServing"

version := "1.0"

scalaVersion in ThisBuild := "2.11.11"


lazy val protobufs = (project in file("./protobufs"))
.settings(
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value,
scalapb.gen(javaConversions=true) -> (sourceManaged in Compile).value
)
.settings(
PB.targets in Compile := Seq(
PB.gens.java -> (sourceManaged in Compile).value,
scalapb.gen(javaConversions=true) -> (sourceManaged in Compile).value
)
)

lazy val client = (project in file("./client"))
lazy val kafkaclient = (project in file("./kafkaclient"))
.settings(libraryDependencies ++= Dependencies.kafkabaseDependencies)
.dependsOn(protobufs, configuration)
.dependsOn(protobufs, kafkaconfiguration)

lazy val model = (project in file("./model"))
.settings(libraryDependencies ++= Dependencies.modelsDependencies)
.dependsOn(protobufs)
.dependsOn(protobufs, utils)


lazy val server = (project in file("./server"))
lazy val kafkastreamsserver = (project in file("./Kafkastreamsserver"))
.settings(libraryDependencies ++= Dependencies.kafkaDependencies ++ Dependencies.webDependencies)
.dependsOn(model, configuration)
.dependsOn(model, kafkaconfiguration, utils)

lazy val akkaServer = (project in file("./akkaserver"))
.settings(libraryDependencies ++= Dependencies.kafkaDependencies ++ Dependencies.akkaServerDependencies
++ Dependencies.modelsDependencies ++ Seq(Dependencies.curator))
.dependsOn(protobufs, configuration)
++ Dependencies.modelsDependencies)
.dependsOn(model, kafkaconfiguration, utils)

lazy val flinkserver = (project in file("./flinkserver"))
.settings(libraryDependencies ++= Dependencies.flinkDependencies ++ Seq(Dependencies.joda, Dependencies.akkaslf))
.settings(dependencyOverrides += "com.typesafe.akka" % "akka-actor-2.11" % "2.3")
.dependsOn(model, kafkaconfiguration, utils)

lazy val sparkserver = (project in file("./sparkserver"))
.settings(libraryDependencies ++= Dependencies.sparkDependencies)
.settings(dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.9")
.settings(dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.9")
.settings(dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.9")
.dependsOn(model, kafkaconfiguration, utils)

lazy val sparkML = (project in file("./sparkML"))
.settings(libraryDependencies ++= Dependencies.sparkMLDependencies)
.settings(dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" % "2.8.9")
.settings(dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.8.9")
.settings(dependencyOverrides += "com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.9")

lazy val configuration = (project in file("./configuration"))

lazy val servingsamples = (project in file("./servingsamples"))
.settings(libraryDependencies ++= Dependencies.modelsDependencies ++ Seq(Dependencies.tensorflowProto))


lazy val kafkaconfiguration = (project in file("./kafkaconfiguration"))

lazy val utils = (project in file("./utils"))
.settings(libraryDependencies ++= Dependencies.kafkaDependencies ++ Seq(Dependencies.curator))
.dependsOn(protobufs)

lazy val root = (project in file(".")).
aggregate(protobufs, client, model, configuration, server, akkaServer)
aggregate(protobufs, kafkaclient, model, utils, kafkaconfiguration, kafkastreamsserver, akkaServer, sparkserver)

Loading

0 comments on commit e334b19

Please sign in to comment.