diff --git a/gearpump-benchmarks/pom.xml b/gearpump-benchmarks/pom.xml
new file mode 100644
index 000000000..ab52b9401
--- /dev/null
+++ b/gearpump-benchmarks/pom.xml
@@ -0,0 +1,116 @@
+
+
+
+
+ yahoo-low-latency-bechmarks
+ com.yahoo.stream
+ 0.1.0
+
+ 4.0.0
+ gearpump-benchmarks
+
+
+
+ org.apache.gearpump
+ gearpump-streaming_2.11
+ provided
+
+
+ org.apache.gearpump
+ gearpump-core_2.11
+ provided
+
+
+ org.apache.gearpump
+ gearpump-external-kafka_2.11
+
+
+ org.json
+ json
+
+
+ com.yahoo.stream
+ streaming-benchmark-common
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+ eclipse-add-source
+
+ add-source
+
+
+
+ scala-compile-first
+ process-resources
+
+ compile
+
+
+
+ scala-test-compile-first
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${gp.scala.version}
+ incremental
+ true
+
+ -unchecked
+ -deprecation
+ -feature
+
+
+ -Xms1024m
+ -Xmx1024m
+
+
+ -source
+ ${java.version}
+ -target
+ ${java.version}
+ -Xlint:all,-serial,-path
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+ true
+ false
+
+
+
+
+
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
diff --git a/gearpump-benchmarks/src/main/resources/reference.conf b/gearpump-benchmarks/src/main/resources/reference.conf
new file mode 100644
index 000000000..3a245f0f6
--- /dev/null
+++ b/gearpump-benchmarks/src/main/resources/reference.conf
@@ -0,0 +1,5 @@
+gearpump {
+ serializers {
+ "scala.Tuple7" = ""
+ }
+}
\ No newline at end of file
diff --git a/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala
new file mode 100644
index 000000000..470b2a4f0
--- /dev/null
+++ b/gearpump-benchmarks/src/main/scala/gearpump/benchmark/Advertising.scala
@@ -0,0 +1,147 @@
+package gearpump.benchmark
+
+import java.util.Properties
+
+import akka.actor.ActorSystem
+import benchmark.common.Utils
+import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache}
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.partitioner.{UnicastPartitioner, HashPartitioner}
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.kafka.lib.source.StringMessageDecoder
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.kafka.KafkaSource
+import org.apache.gearpump.streaming.source.DataSourceProcessor
+import org.apache.gearpump.util.{Graph, AkkaApp}
+import org.apache.gearpump.util.Graph._
+import org.json.JSONObject
+import scala.collection.JavaConverters._
+
+object Advertising extends AkkaApp{
+
+ def application(args: Array[String], system: ActorSystem) : StreamApplication = {
+ implicit val actorSystem = system
+ val commonConfig = Utils.findAndReadConfigFile(args(0), true).asInstanceOf[java.util.Map[String, Any]]
+
+ val cores = commonConfig.get("process.cores").asInstanceOf[Int]
+ val topic = commonConfig.get("kafka.topic").asInstanceOf[String]
+ val partitions = commonConfig.get("kafka.partitions").asInstanceOf[Int]
+ val redisHost = commonConfig.get("redis.host").asInstanceOf[String]
+
+ val zookeeperHosts = commonConfig.get("zookeeper.servers").asInstanceOf[java.util.List[String]] match {
+ case l: java.util.List[String] => l.asScala.toSeq
+ case other => throw new ClassCastException(other + " not a List[String]")
+ }
+ val zookeeperPort = commonConfig.get("zookeeper.port").asInstanceOf[Int]
+ val zookeeperConnect = zookeeperHosts.map(_ + ":" + zookeeperPort).mkString(",")
+
+ val kafkaHosts = commonConfig.get("kafka.brokers").asInstanceOf[java.util.List[String]] match {
+ case l: java.util.List[String] => l.asScala.toSeq
+ case other => throw new ClassCastException(other + " not a List[String]")
+ }
+ val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int]
+ val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",")
+
+ val appName = "Advertising"
+ val parallel = Math.max(1, cores / 7)
+ val props = new Properties
+ props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
+ props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
+ props.put(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[StringMessageDecoder])
+ val gearConfig = UserConfig.empty.withString("redis.host", redisHost)
+ val source = new KafkaSource(topic, props)
+ val sourceProcessor = DataSourceProcessor(source, partitions)
+ val deserializer = Processor[DeserializeTask](parallel)
+ val filter = Processor[EventFilterTask](parallel)
+ val projection = Processor[EventProjectionTask](parallel)
+ val join = Processor[RedisJoinTask](parallel)
+ val campaign = Processor[CampaignProcessorTask](parallel * 2)
+ val partitioner = new AdPartitioner
+
+ val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign)
+ StreamApplication(appName, graph, gearConfig)
+ }
+
+ override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = {
+ val context = ClientContext(akkaConf)
+ context.submit(application(args, context.system))
+ context.close()
+ }
+
+ override def help: Unit = {}
+}
+
+class AdPartitioner extends UnicastPartitioner {
+ override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+ (msg.msg.asInstanceOf[(String, String, String)]._1.hashCode & Integer.MAX_VALUE) % partitionNum
+ }
+}
+
+class DeserializeTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ override def onNext(msg : Message) : Unit = {
+ val jsonObj = new JSONObject(msg.msg.asInstanceOf[String])
+ val tuple = (
+ jsonObj.getString("user_id"),
+ jsonObj.getString("page_id"),
+ jsonObj.getString("ad_id"),
+ jsonObj.getString("ad_type"),
+ jsonObj.getString("event_type"),
+ jsonObj.getString("event_time"),
+ jsonObj.getString("ip_address")
+ )
+ taskContext.output(Message(tuple, msg.timestamp))
+ }
+}
+
+class EventFilterTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ override def onNext(msg: Message): Unit = {
+ val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
+ if(tuple._5 == "view") {
+ taskContext.output(msg)
+ }
+ }
+}
+
+class EventProjectionTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ override def onNext(msg: Message): Unit = {
+ val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
+ taskContext.output(Message((tuple._3, tuple._6), msg.timestamp))
+ }
+}
+
+class RedisJoinTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ private val redisHost = conf.getString("redis.host").get
+ private val redisAdCampaignCache = new RedisAdCampaignCache(redisHost)
+
+ override def onStart(startTime : StartTime) : Unit = {
+ redisAdCampaignCache.prepare()
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val (ad_id, event_time) = msg.msg.asInstanceOf[(String, String)]
+ val campaign_id = redisAdCampaignCache.execute(ad_id)
+ if(campaign_id != null) {
+ val result = (campaign_id, ad_id, event_time)
+ taskContext.output(Message(result, msg.timestamp))
+ }
+ }
+}
+
+class CampaignProcessorTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
+ private val redisHost = conf.getString("redis.host").get
+ private val campaignProcessorCommon = new CampaignProcessorCommon(redisHost)
+
+ override def onStart(startTime : StartTime) : Unit = {
+ campaignProcessorCommon.prepare()
+ }
+
+ override def onNext(msg: Message): Unit = {
+ val (campaign_id, _, event_time) = msg.msg.asInstanceOf[(String, String, String)]
+ campaignProcessorCommon.execute(campaign_id, event_time)
+ }
+}
diff --git a/pom.xml b/pom.xml
index dec8e7fff..05e6acbdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,8 +19,10 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
0.8.2.1
0.10.1
0.10.0
+ 0.8.1
2.10
2.10.4
+ 2.11.8
20141113
2.4.2
1.2.2
@@ -65,6 +67,21 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
flink-connector-kafka
${flink.version}
+
+ org.apache.gearpump
+ gearpump-core_2.11
+ ${gearpump.version}
+
+
+ org.apache.gearpump
+ gearpump-streaming_2.11
+ ${gearpump.version}
+
+
+ org.apache.gearpump
+ gearpump-external-kafka_2.11
+ ${gearpump.version}
+
com.yahoo.stream
streaming-benchmark-common
@@ -128,5 +145,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
storm-benchmarks
flink-benchmarks
spark-benchmarks
+ gearpump-benchmarks
diff --git a/stream-bench.sh b/stream-bench.sh
index d2cb3d57c..c38607605 100755
--- a/stream-bench.sh
+++ b/stream-bench.sh
@@ -18,12 +18,14 @@ SCALA_SUB_VERSION=${SCALA_SUB_VERSION:-"4"}
STORM_VERSION=${STORM_VERSION:-"0.10.0"}
FLINK_VERSION=${FLINK_VERSION:-"0.10.1"}
SPARK_VERSION=${SPARK_VERSION:-"1.5.1"}
+GEARPUMP_VERSION=${GEARPUMP_VERSION:-"0.8.1"}
STORM_DIR="apache-storm-$STORM_VERSION"
REDIS_DIR="redis-$REDIS_VERSION"
KAFKA_DIR="kafka_$SCALA_BIN_VERSION-$KAFKA_VERSION"
FLINK_DIR="flink-$FLINK_VERSION"
SPARK_DIR="spark-$SPARK_VERSION-bin-hadoop2.6"
+GEARPUMP_DIR="gearpump-2.11-$GEARPUMP_VERSION"
#Get one of the closet apache mirrors
APACHE_MIRROR=$(curl 'https://www.apache.org/dyn/closer.cgi' | grep -o '[^<]*' | sed 's/<[^>]*>//g' | head -1)
@@ -150,6 +152,10 @@ run() {
SPARK_FILE="$SPARK_DIR.tgz"
fetch_untar_file "$SPARK_FILE" "$APACHE_MIRROR/spark/spark-$SPARK_VERSION/$SPARK_FILE"
+ #Fetch Gearpump
+ GEARPUMP_FILE="$GEARPUMP_DIR.tar.gz"
+ fetch_untar_file "$GEARPUMP_FILE" "https://github.com/gearpump/gearpump/releases/download/$GEARPUMP_VERSION/$GEARPUMP_FILE"
+
elif [ "START_ZK" = "$OPERATION" ];
then
start_if_needed dev_zookeeper ZooKeeper 10 "$STORM_DIR/bin/storm" dev-zookeeper
@@ -203,6 +209,20 @@ run() {
stop_if_needed org.apache.spark.deploy.master.Master SparkMaster
stop_if_needed org.apache.spark.deploy.worker.Worker SparkSlave
sleep 3
+
+ elif [ "START_GEARPUMP" = "$OPERATION" ];
+ then
+ start_if_needed org.apache.gearpump.cluster.main.Master GearpumpMaster 3 $GEARPUMP_DIR/bin/master -ip 127.0.0.1 -port 3000
+ start_if_needed org.apache.gearpump.cluster.main.Worker GearpumpWorker 3 $GEARPUMP_DIR/bin/worker
+ start_if_needed org.apache.gearpump.services.main.Services GearpumpDashboard 3 $GEARPUMP_DIR/bin/services
+ sleep 3
+ elif [ "STOP_GEARPUMP" = "$OPERATION" ];
+ then
+ stop_if_needed org.apache.gearpump.services.main.Services GearpumpDashboard
+ stop_if_needed org.apache.gearpump.cluster.main.Worker GearpumpWorker
+ stop_if_needed org.apache.gearpump.cluster.main.Master GearpumpMaster
+ sleep 3
+
elif [ "START_LOAD" = "$OPERATION" ];
then
cd data
@@ -214,6 +234,7 @@ run() {
cd data
$LEIN run -g --configPath ../$CONF_FILE || true
cd ..
+
elif [ "START_STORM_TOPOLOGY" = "$OPERATION" ];
then
"$STORM_DIR/bin/storm" jar ./storm-benchmarks/target/storm-benchmarks-0.1.0.jar storm.benchmark.AdvertisingTopology test-topo -conf $CONF_FILE
@@ -222,6 +243,7 @@ run() {
then
"$STORM_DIR/bin/storm" kill -w 0 test-topo || true
sleep 10
+
elif [ "START_SPARK_PROCESSING" = "$OPERATION" ];
then
"$SPARK_DIR/bin/spark-submit" --master spark://localhost:7077 --class spark.benchmark.KafkaRedisAdvertisingStream ./spark-benchmarks/target/spark-benchmarks-0.1.0.jar "$CONF_FILE" &
@@ -229,6 +251,7 @@ run() {
elif [ "STOP_SPARK_PROCESSING" = "$OPERATION" ];
then
stop_if_needed spark.benchmark.KafkaRedisAdvertisingStream "Spark Client Process"
+
elif [ "START_FLINK_PROCESSING" = "$OPERATION" ];
then
"$FLINK_DIR/bin/flink" run ./flink-benchmarks/target/flink-benchmarks-0.1.0.jar --confPath $CONF_FILE &
@@ -243,6 +266,22 @@ run() {
"$FLINK_DIR/bin/flink" cancel $FLINK_ID
sleep 3
fi
+
+ elif [ "START_GEARPUMP_APP" = "$OPERATION" ];
+ then
+ "$GEARPUMP_DIR/bin/gear" app -jar ./gearpump-benchmarks/target/gearpump-benchmarks-0.1.0.jar gearpump.benchmark.Advertising $CONF_FILE &
+ sleep 5
+ elif [ "STOP_GEARPUMP_APP" = "$OPERATION" ];
+ then
+ APP_ID=`"$GEARPUMP_DIR/bin/gear" info | grep "application:" | awk -F ',' '{print $1}' | awk '{print $2}'`
+ if [ "$APP_ID" == "" ];
+ then
+ echo "Could not find Gearpump application to kill"
+ else
+ "$GEARPUMP_DIR/bin/gear" kill -appid $APP_ID
+ sleep 5
+ fi
+
elif [ "STORM_TEST" = "$OPERATION" ];
then
run "START_ZK"
@@ -288,6 +327,21 @@ run() {
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
+ elif [ "GEARPUMP_TEST" = "$OPERATION" ];
+ then
+ run "START_ZK"
+ run "START_REDIS"
+ run "START_KAFKA"
+ run "START_GEARPUMP"
+ run "START_GEARPUMP_APP"
+ run "START_LOAD"
+ sleep $TEST_TIME
+ run "STOP_LOAD"
+ run "STOP_GEARPUMP_APP"
+ run "STOP_GEARPUMP"
+ run "STOP_KAFKA"
+ run "STOP_REDIS"
+ run "STOP_ZK"
elif [ "STOP_ALL" = "$OPERATION" ];
then
run "STOP_LOAD"
@@ -297,6 +351,8 @@ run() {
run "STOP_FLINK"
run "STOP_STORM_TOPOLOGY"
run "STOP_STORM"
+ run "STOP_GEARPUMP_APP"
+ run "STOP_GEARPUMP"
run "STOP_KAFKA"
run "STOP_REDIS"
run "STOP_ZK"
@@ -322,6 +378,8 @@ run() {
echo "STOP_FLINK: kill flink processes"
echo "START_SPARK: run spark processes"
echo "STOP_SPARK: kill spark processes"
+ echo "START_GEARPUMP: run gearpump processes"
+ echo "STOP_GEARPUMP: kill gearpump processes"
echo
echo "START_STORM_TOPOLOGY: run the storm test topology"
echo "STOP_STORM_TOPOLOGY: kill the storm test topology"
@@ -329,10 +387,13 @@ run() {
echo "STOP_FLINK_PROCESSSING: kill the flink test processing"
echo "START_SPARK_PROCESSING: run the spark test processing"
echo "STOP_SPARK_PROCESSSING: kill the spark test processing"
+ echo "START_GEARPUMP_PROCESSING: run the gearpump test processing"
+ echo "STOP_GEARPUMP_PROCESSSING: kill the gearpump test processing"
echo
echo "STORM_TEST: run storm test (assumes SETUP is done)"
echo "FLINK_TEST: run flink test (assumes SETUP is done)"
echo "SPARK_TEST: run spark test (assumes SETUP is done)"
+ echo "GEARPUMP_TEST: run gearpump test (assumes SETUP is done)"
echo "STOP_ALL: stop everything"
echo
echo "HELP: print out this message"