Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions gearpump-benchmarks/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2015, Yahoo Inc.
Licensed under the terms of the Apache License 2.0. Please see LICENSE file in the project root for terms.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yahoo-low-latency-bechmarks</artifactId>
<groupId>com.yahoo.stream</groupId>
<version>0.1.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gearpump-benchmarks</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-streaming_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-core_2.11</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-external-kafka_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</dependency>
<dependency>
<groupId>com.yahoo.stream</groupId>
<artifactId>streaming-benchmark-common</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${gp.scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<minimizeJar>false</minimizeJar>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5 changes: 5 additions & 0 deletions gearpump-benchmarks/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
gearpump {
serializers {
"scala.Tuple7" = ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package gearpump.benchmark

import java.util.Properties

import akka.actor.ActorSystem
import benchmark.common.Utils
import benchmark.common.advertising.{CampaignProcessorCommon, RedisAdCampaignCache}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.partitioner.{UnicastPartitioner, HashPartitioner}
import org.apache.gearpump.streaming.StreamApplication
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.kafka.lib.source.StringMessageDecoder
import org.apache.gearpump.streaming.kafka.util.KafkaConfig
import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
import org.apache.gearpump.streaming.kafka.KafkaSource
import org.apache.gearpump.streaming.source.DataSourceProcessor
import org.apache.gearpump.util.{Graph, AkkaApp}
import org.apache.gearpump.util.Graph._
import org.json.JSONObject
import scala.collection.JavaConverters._

object Advertising extends AkkaApp{

def application(args: Array[String], system: ActorSystem) : StreamApplication = {
implicit val actorSystem = system
val commonConfig = Utils.findAndReadConfigFile(args(0), true).asInstanceOf[java.util.Map[String, Any]]

val cores = commonConfig.get("process.cores").asInstanceOf[Int]
val topic = commonConfig.get("kafka.topic").asInstanceOf[String]
val partitions = commonConfig.get("kafka.partitions").asInstanceOf[Int]
val redisHost = commonConfig.get("redis.host").asInstanceOf[String]

val zookeeperHosts = commonConfig.get("zookeeper.servers").asInstanceOf[java.util.List[String]] match {
case l: java.util.List[String] => l.asScala.toSeq
case other => throw new ClassCastException(other + " not a List[String]")
}
val zookeeperPort = commonConfig.get("zookeeper.port").asInstanceOf[Int]
val zookeeperConnect = zookeeperHosts.map(_ + ":" + zookeeperPort).mkString(",")

val kafkaHosts = commonConfig.get("kafka.brokers").asInstanceOf[java.util.List[String]] match {
case l: java.util.List[String] => l.asScala.toSeq
case other => throw new ClassCastException(other + " not a List[String]")
}
val kafkaPort = commonConfig.get("kafka.port").asInstanceOf[Int]
val brokerList = kafkaHosts.map(_ + ":" + kafkaPort).mkString(",")

val appName = "Advertising"
val parallel = Math.max(1, cores / 7)
val props = new Properties
props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeperConnect)
props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG, appName)
props.put(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[StringMessageDecoder])
val gearConfig = UserConfig.empty.withString("redis.host", redisHost)
val source = new KafkaSource(topic, props)
val sourceProcessor = DataSourceProcessor(source, partitions)
val deserializer = Processor[DeserializeTask](parallel)
val filter = Processor[EventFilterTask](parallel)
val projection = Processor[EventProjectionTask](parallel)
val join = Processor[RedisJoinTask](parallel)
val campaign = Processor[CampaignProcessorTask](parallel * 2)
val partitioner = new AdPartitioner

val graph = Graph(sourceProcessor ~ new HashPartitioner ~> deserializer ~> filter ~> projection ~> join ~ partitioner ~> campaign)
StreamApplication(appName, graph, gearConfig)
}

override def main(akkaConf: Advertising.Config, args: Array[String]): Unit = {
val context = ClientContext(akkaConf)
context.submit(application(args, context.system))
context.close()
}

override def help: Unit = {}
}

class AdPartitioner extends UnicastPartitioner {
override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
(msg.msg.asInstanceOf[(String, String, String)]._1.hashCode & Integer.MAX_VALUE) % partitionNum
}
}

class DeserializeTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg : Message) : Unit = {
val jsonObj = new JSONObject(msg.msg.asInstanceOf[String])
val tuple = (
jsonObj.getString("user_id"),
jsonObj.getString("page_id"),
jsonObj.getString("ad_id"),
jsonObj.getString("ad_type"),
jsonObj.getString("event_type"),
jsonObj.getString("event_time"),
jsonObj.getString("ip_address")
)
taskContext.output(Message(tuple, msg.timestamp))
}
}

class EventFilterTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg: Message): Unit = {
val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
if(tuple._5 == "view") {
taskContext.output(msg)
}
}
}

class EventProjectionTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
override def onNext(msg: Message): Unit = {
val tuple = msg.msg.asInstanceOf[(String, String, String, String, String, String, String)]
taskContext.output(Message((tuple._3, tuple._6), msg.timestamp))
}
}

class RedisJoinTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private val redisHost = conf.getString("redis.host").get
private val redisAdCampaignCache = new RedisAdCampaignCache(redisHost)

override def onStart(startTime : StartTime) : Unit = {
redisAdCampaignCache.prepare()
}

override def onNext(msg: Message): Unit = {
val (ad_id, event_time) = msg.msg.asInstanceOf[(String, String)]
val campaign_id = redisAdCampaignCache.execute(ad_id)
if(campaign_id != null) {
val result = (campaign_id, ad_id, event_time)
taskContext.output(Message(result, msg.timestamp))
}
}
}

class CampaignProcessorTask(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
private val redisHost = conf.getString("redis.host").get
private val campaignProcessorCommon = new CampaignProcessorCommon(redisHost)

override def onStart(startTime : StartTime) : Unit = {
campaignProcessorCommon.prepare()
}

override def onNext(msg: Message): Unit = {
val (campaign_id, _, event_time) = msg.msg.asInstanceOf[(String, String, String)]
campaignProcessorCommon.execute(campaign_id, event_time)
}
}
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<kafka.version>0.8.2.1</kafka.version>
<flink.version>0.10.1</flink.version>
<storm.version>0.10.0</storm.version>
<gearpump.version>0.8.1</gearpump.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<gp.scala.version>2.11.8</gp.scala.version>
<json.version>20141113</json.version>
<jedis.version>2.4.2</jedis.version>
<sedis.version>1.2.2</sedis.version>
Expand Down Expand Up @@ -65,6 +67,21 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-core_2.11</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-streaming_2.11</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-external-kafka_2.11</artifactId>
<version>${gearpump.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.stream</groupId>
<artifactId>streaming-benchmark-common</artifactId>
Expand Down Expand Up @@ -128,5 +145,6 @@ Licensed under the terms of the Apache License 2.0. Please see LICENSE file in t
<module>storm-benchmarks</module>
<module>flink-benchmarks</module>
<module>spark-benchmarks</module>
<module>gearpump-benchmarks</module>
</modules>
</project>
Loading