Skip to content

Commit

Permalink
Add Scalafmt 1.5.1 (#489)
Browse files Browse the repository at this point in the history
* Replace scalariform with Scalafmt
* Run Scalafmt check on Travis
* Scalafmt reformat
  • Loading branch information
ennru authored Jun 7, 2018
1 parent ecf11dd commit 557927a
Show file tree
Hide file tree
Showing 57 changed files with 1,773 additions and 1,165 deletions.
9 changes: 9 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
style = defaultWithAlign

align.tokens = [off]
danglingParentheses = true
docstrings = JavaDoc
indentOperator = spray
maxColumn = 120
rewrite.rules = [RedundantBraces, RedundantParens, SortImports]
unindentTopLevelOperators = true
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ services:

jobs:
include:
- stage: check
script: sbt scalafmtCheck || { echo "[error] Unformatted code found. Please run 'test:compile' and commit the reformatted code."; false; }
env: SCALAFMT_CHECK
- script: sbt scalafmtSbtCheck || { echo "[error] Unformatted sbt code found. Please run 'scalafmtSbt' and commit the reformatted code."; false; }
env: SCALAFMT_SBT_CHECK

- stage: test
script: sbt -jvm-opts .jvmopts-travis +test
jdk: oraclejdk8
Expand All @@ -29,6 +35,10 @@ jobs:
jdk: oraclejdk8

stages:
# runs on master commits and PRs
- name: check
if: NOT tag =~ ^v

# runs on master commits and PRs
- name: test
if: NOT tag =~ ^v
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ Example:
## How To Enforce These Guidelines?

1. [Travis CI](https://travis-ci.org/akka/reactive-kafka) automatically merges the code, builds it, runs the tests and sets Pull Request status accordingly of results in GitHub.
1. [Scalafmt](http://scalameta.org/scalafmt/) enforces some of the code style rules.
1. [sbt-header plugin](https://github.com/sbt/sbt-header) manages consistent copyright headers in every source file.
1. The [sbt-whitesourece plugin](https://github.com/lightbend/sbt-whitesource) checks licensing models of all (transitive) dependencies.
1. A GitHub bot checks whether you've signed the Lightbend CLA.
1. [Scalariform](https://github.com/daniel-trinh/scalariform) enforces some of the code style rules.

30 changes: 21 additions & 9 deletions benchmarks/src/main/scala/akka.kafka.benchmarks/Benchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,47 @@ import scala.concurrent.duration._

object Benchmarks {

def run(cmd: RunTestCommand)(implicit actorSystem: ActorSystem, mat: Materializer): Unit = {

def run(cmd: RunTestCommand)(implicit actorSystem: ActorSystem, mat: Materializer): Unit =
cmd.testName match {
case "plain-consumer-nokafka" =>
runPerfTest(cmd, KafkaConsumerFixtures.noopFixtureGen(cmd), KafkaConsumerBenchmarks.consumePlainNoKafka)
case "akka-plain-consumer-nokafka" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd), ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd),
ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
case "plain-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumePlain)
case "akka-plain-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
case "batched-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
runPerfTest(cmd,
KafkaConsumerFixtures.filledTopics(cmd),
KafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
case "akka-batched-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.commitableSources(cmd), ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.commitableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
case "at-most-once-consumer" =>
runPerfTest(cmd, KafkaConsumerFixtures.filledTopics(cmd), KafkaConsumerBenchmarks.consumeCommitAtMostOnce)
case "akka-at-most-once-consumer" =>
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.commitableSources(cmd), ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.commitableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
case "plain-producer" =>
runPerfTest(cmd, KafkaProducerFixtures.initializedProducer(cmd), KafkaProducerBenchmarks.plainFlow)
case "akka-plain-producer" =>
runPerfTest(cmd, ReactiveKafkaProducerFixtures.flowFixture(cmd), ReactiveKafkaProducerBenchmarks.plainFlow)
case "transactions" =>
runPerfTest(cmd, KafkaTransactionFixtures.initialize(cmd), KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
runPerfTest(cmd,
KafkaTransactionFixtures.initialize(cmd),
KafkaTransactionBenchmarks.consumeTransformProduceTransaction(commitInterval = 100.milliseconds))
case "akka-transactions" =>
runPerfTest(cmd, ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds), ReactiveKafkaTransactionBenchmarks.consumeTransformProduceTransaction)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
ReactiveKafkaTransactionBenchmarks.consumeTransformProduceTransaction
)
case _ => Future.failed(new IllegalArgumentException(s"Unrecognized test name: ${cmd.testName}"))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package akka.kafka.benchmarks
import java.util
import com.codahale.metrics.Meter
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.consumer.{OffsetCommitCallback, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.{OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.TopicPartition

import scala.annotation.tailrec
Expand All @@ -23,15 +23,14 @@ object KafkaConsumerBenchmarks extends LazyLogging {
def consumePlainNoKafka(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
logger.debug(s"Polling")
meter.mark()
pollInLoop(readLimit, readSoFar + 1)
}
}
meter.mark()
pollInLoop(readLimit = fixture.msgCount)
}
Expand All @@ -43,7 +42,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val consumer = fixture.consumer

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
Expand All @@ -55,7 +54,6 @@ object KafkaConsumerBenchmarks extends LazyLogging {
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}
meter.mark()
pollInLoop(readLimit = fixture.msgCount)
fixture.close()
Expand All @@ -77,15 +75,17 @@ object KafkaConsumerBenchmarks extends LazyLogging {
accumulatedMsgCount = 0
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(lastProcessedOffset))
logger.debug("Committing offset " + offsetMap.head._2.offset())
consumer.commitAsync(offsetMap.asJava, new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = {
commitInProgress = false
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
commitInProgress = false
}
})
)
}

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
Expand All @@ -101,16 +101,14 @@ object KafkaConsumerBenchmarks extends LazyLogging {
if (!commitInProgress) {
commitInProgress = true
doCommit()
}
else // previous commit still in progress
} else // previous commit still in progress
consumer.pause(assignment)
}
}
val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

pollInLoop(readLimit = fixture.msgCount)
fixture.close()
Expand All @@ -123,7 +121,7 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val consumer = fixture.consumer
val assignment = consumer.assignment()
@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
Expand All @@ -136,18 +134,19 @@ object KafkaConsumerBenchmarks extends LazyLogging {
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(record.offset()))
consumer.pause(assignment)

consumer.commitAsync(offsetMap.asJava, new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = {
consumer.resume(assignment)
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
consumer.resume(assignment)
}
})
)
}

val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

pollInLoop(readLimit = fixture.msgCount)
fixture.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@ case class KafkaConsumerTestFixture(topic: String, msgCount: Int, consumer: Kafk
object KafkaConsumerFixtures extends PerfFixtureHelpers {

def noopFixtureGen(c: RunTestCommand) = FixtureGen[KafkaConsumerTestFixture](
c, msgCount => {
KafkaConsumerTestFixture("topic", msgCount, null)
}
c,
msgCount => {
KafkaConsumerTestFixture("topic", msgCount, null)
}
)

def filledTopics(c: RunTestCommand) = FixtureGen[KafkaConsumerTestFixture](
c, msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val consumer = new KafkaConsumer[Array[Byte], String](consumerJavaProps, new ByteArrayDeserializer, new StringDeserializer)
consumer.subscribe(Set(topic).asJava)
KafkaConsumerTestFixture(topic, msgCount, consumer)
}
c,
msgCount => {
val topic = randomId()
fillTopic(c.kafkaHost, topic, msgCount)
val consumerJavaProps = new java.util.Properties
consumerJavaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, c.kafkaHost)
consumerJavaProps.put(ConsumerConfig.CLIENT_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.GROUP_ID_CONFIG, randomId())
consumerJavaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val consumer =
new KafkaConsumer[Array[Byte], String](consumerJavaProps, new ByteArrayDeserializer, new StringDeserializer)
consumer.subscribe(Set(topic).asJava)
KafkaConsumerTestFixture(topic, msgCount, consumer)
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import scala.concurrent.duration._
object KafkaProducerBenchmarks extends LazyLogging {

val logStep = 100000

/**
* Streams generated numbers to a Kafka producer. Does not commit.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ case class KafkaProducerTestFixture(topic: String, msgCount: Int, producer: Kafk
object KafkaProducerFixtures extends PerfFixtureHelpers {

def noopFixtureGen(c: RunTestCommand) = FixtureGen[KafkaProducerTestFixture](
c, msgCount => {
KafkaProducerTestFixture("topic", msgCount, null)
}
c,
msgCount => {
KafkaProducerTestFixture("topic", msgCount, null)
}
)

def initializedProducer(c: RunTestCommand) = FixtureGen[KafkaProducerTestFixture](
c, msgCount => {
val topic = randomId()
val rawProducer = initTopicAndProducer(c.kafkaHost, topic)
KafkaProducerTestFixture(topic, msgCount, rawProducer)
}
c,
msgCount => {
val topic = randomId()
val rawProducer = initTopicAndProducer(c.kafkaHost, topic)
KafkaProducerTestFixture(topic, msgCount, rawProducer)
}
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ object KafkaTransactionBenchmarks extends LazyLogging {
/**
* Process records in a consume-transform-produce transacational workflow and commit every interval.
*/
def consumeTransformProduceTransaction(commitInterval: FiniteDuration)(fixture: KafkaTransactionTestFixture, meter: Meter): Unit = {
def consumeTransformProduceTransaction(commitInterval: FiniteDuration)(fixture: KafkaTransactionTestFixture,
meter: Meter): Unit = {
val consumer = fixture.consumer
val producer = fixture.producer
val msgCount = fixture.msgCount
Expand Down Expand Up @@ -49,12 +50,11 @@ object KafkaTransactionBenchmarks extends LazyLogging {
}

@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int = {
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit) {
doCommit()
readSoFar
}
else {
} else {
logger.debug("Polling")
val records = consumer.poll(pollTimeoutMs)
for (record <- records.iterator().asScala) {
Expand All @@ -64,7 +64,9 @@ object KafkaTransactionBenchmarks extends LazyLogging {
val producerRecord = new ProducerRecord(fixture.sinkTopic, record.partition(), record.key(), record.value())
producer.send(producerRecord)
if (lastProcessedOffset % loggedStep == 0)
logger.info(s"Transformed $lastProcessedOffset elements to Kafka (${100 * lastProcessedOffset / msgCount}%)")
logger.info(
s"Transformed $lastProcessedOffset elements to Kafka (${100 * lastProcessedOffset / msgCount}%)"
)

if (System.nanoTime() >= lastCommit + commitInterval.toNanos) {
doCommit()
Expand All @@ -75,7 +77,6 @@ object KafkaTransactionBenchmarks extends LazyLogging {
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
}

meter.mark()
logger.debug("Initializing transactions")
Expand Down
Loading

0 comments on commit 557927a

Please sign in to comment.