Skip to content

Commit 31d3c1e

Browse files
committed
Add code
1 parent b0bff70 commit 31d3c1e

File tree

249 files changed

+18812
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

249 files changed

+18812
-0
lines changed

kafka-streams/PROJECT

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
owners:
2+
- csl-team:ldap
3+
- messaging-review:ldap
4+
- scosenza
5+
- dbress
6+
- adams
7+
watchers:
8+
9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package com.twitter.finatra.kafkastreams.prerestore
2+
3+
import com.twitter.conversions.DurationOps._
4+
import com.twitter.finatra.annotations.Experimental
5+
import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer
6+
import com.twitter.finatra.kafkastreams.internal.utils.ReflectionUtils
7+
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning
8+
import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils
9+
10+
import java.util.Properties
11+
import java.util.concurrent.TimeUnit
12+
import org.apache.kafka.clients.consumer.Consumer
13+
import org.apache.kafka.common.Metric
14+
import org.apache.kafka.common.utils.Utils
15+
import org.apache.kafka.streams.processor.internals.StreamThread
16+
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
17+
import org.joda.time.DateTimeUtils
18+
import scala.collection.JavaConverters._
19+
import scala.util.control.NonFatal
20+
21+
@Experimental
22+
trait PreRestoreState extends KafkaStreamsTwitterServer with StaticPartitioning {
23+
24+
private val preRestoreState = flag("kafka.prerestore.state", true, "Pre-Restore state")
25+
private val preRestoreDurationInitialDelay =
26+
flag("kafka.prerestore.duration", 2.minutes, "Pre-Restore min delay")
27+
28+
/* Protected */
29+
30+
final override protected[finatra] def createAndStartKafkaStreams(): Unit = {
31+
if (preRestoreState()) {
32+
val copiedProperties = properties.clone().asInstanceOf[Properties]
33+
val preRestoreProperties = configurePreRestoreProperties(copiedProperties)
34+
val preRestoreKafkaStreams =
35+
new KafkaStreams(topology, preRestoreProperties, kafkaStreamsClientSupplier)
36+
setExceptionHandler(preRestoreKafkaStreams)
37+
preRestoreKafkaStreams.start()
38+
startWaitForPreRestoredThread(preRestoreKafkaStreams)
39+
} else {
40+
super.createAndStartKafkaStreams()
41+
}
42+
}
43+
44+
/* Private */
45+
46+
private def startWaitForPreRestoredThread(preRestoreKafkaStreams: KafkaStreams): Unit = {
47+
new Thread("wait-for-pre-restoring-server-thread") {
48+
override def run(): Unit = {
49+
try {
50+
waitForPreRestoreFinished(preRestoreKafkaStreams)
51+
52+
info(s"Closing pre-restoring server")
53+
preRestoreKafkaStreams.close(1, TimeUnit.MINUTES)
54+
info(s"Pre-restore complete.")
55+
56+
//Reset the thread id and start Kafka Streams as if we weren't using pre-restore mode
57+
CompatibleUtils.resetStreamThreadId()
58+
PreRestoreState.super.createAndStartKafkaStreams()
59+
} catch {
60+
case NonFatal(e) =>
61+
error("PreRestore error", e)
62+
close(defaultCloseGracePeriod)
63+
}
64+
}
65+
}.start()
66+
}
67+
68+
// Note: 10000 is somewhat arbitrary. The goal is to get close to the head of the changelog, before exiting pre-restore mode and taking active ownership of the pre-restoring tasks
69+
private def waitForPreRestoreFinished(preRestoreKafkaStreams: KafkaStreams): Unit = {
70+
info(
71+
s"Waiting for Total Restore Lag to be less than 1000 after an initial wait period of ${preRestoreDurationInitialDelay()}"
72+
)
73+
val minTimeToFinish = DateTimeUtils
74+
.currentTimeMillis() + preRestoreDurationInitialDelay().inMillis
75+
var totalRestoreLag = Double.MaxValue
76+
while (totalRestoreLag >= 10000 || DateTimeUtils.currentTimeMillis() < minTimeToFinish) {
77+
totalRestoreLag = findTotalRestoreLag(preRestoreKafkaStreams)
78+
Thread.sleep(1000)
79+
}
80+
}
81+
82+
private def findTotalRestoreLag(preRestoreKafkaStreams: KafkaStreams): Double = {
83+
val lagMetrics = findRestoreConsumerLagMetrics(preRestoreKafkaStreams)
84+
val totalRestoreLag = lagMetrics.map(_.metricValue.asInstanceOf[Double]).sum
85+
info(s"Total Restore Lag: $totalRestoreLag")
86+
totalRestoreLag
87+
}
88+
89+
private def configurePreRestoreProperties(properties: Properties) = {
90+
val applicationServerConfigHost = Utils.getHost(applicationServerConfig())
91+
properties.put(
92+
StreamsConfig.APPLICATION_SERVER_CONFIG,
93+
s"$applicationServerConfigHost:${StaticPartitioning.PreRestoreSignalingPort}"
94+
)
95+
96+
// During prerestore we set poll_ms to 0 to prevent activeTask.polling from slowing down the standby tasks
97+
// See https://github.com/apache/kafka/blob/b532ee218e01baccc0ff8c4b1df586577637de50/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L832
98+
properties.put(StreamsConfig.POLL_MS_CONFIG, "0")
99+
100+
properties
101+
}
102+
103+
private def findRestoreConsumerLagMetrics(kafkaStreams: KafkaStreams): Seq[Metric] = {
104+
for {
105+
thread <- getThreads(kafkaStreams).toSeq
106+
restoreConsumer = getRestoreConsumer(thread)
107+
(name, recordsLag) <- findConsumerLagMetric(restoreConsumer)
108+
} yield {
109+
recordsLag
110+
}
111+
}
112+
113+
private def findConsumerLagMetric(restoreConsumer: Consumer[Array[Byte], Array[Byte]]) = {
114+
restoreConsumer.metrics().asScala.find {
115+
case (metricName, metric) => metricName.name() == "records-lag"
116+
}
117+
}
118+
119+
private def getThreads(kafkaStreams: KafkaStreams) = {
120+
ReflectionUtils.getFinalField[Array[StreamThread]](anyRef = kafkaStreams, fieldName = "threads")
121+
}
122+
123+
private def getRestoreConsumer(thread: StreamThread) = {
124+
ReflectionUtils.getFinalField[Consumer[Array[Byte], Array[Byte]]](thread, "restoreConsumer")
125+
}
126+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
<configuration>
2+
<appender name="TEST" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%date %.-3level %-25logger{0} %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
8+
<appender name="MAIN" class="ch.qos.logback.core.ConsoleAppender">
9+
<encoder>
10+
<pattern>%red(%date [%thread] %.-3level %-25logger{0} %msg%n)</pattern>
11+
</encoder>
12+
</appender>
13+
<appender name="MAIN_SERVER" class="ch.qos.logback.core.ConsoleAppender">
14+
<encoder>
15+
<pattern>%red(%date [%thread] %.-3level %-25logger{0} %msg%n)</pattern>
16+
</encoder>
17+
</appender>
18+
19+
<appender name="KAFKA" class="ch.qos.logback.core.ConsoleAppender">
20+
<encoder>
21+
<pattern>%blue(%date [%thread] %.-3level %-25logger{0} %msg%n)</pattern>
22+
</encoder>
23+
</appender>
24+
25+
<appender name="STATE" class="ch.qos.logback.core.ConsoleAppender">
26+
<encoder>
27+
<pattern>%green(%date [%thread] %.-3level %-25logger{0} %msg%n)</pattern>
28+
</encoder>
29+
</appender>
30+
31+
<logger name="com.twitter.finatra.kafkastreams.integration"
32+
additivity="false">
33+
<appender-ref ref="TEST"/>
34+
</logger>
35+
36+
<logger name="com.twitter.finatra.kafka.test"
37+
level="debug"
38+
additivity="false">
39+
<appender-ref ref="TEST"/>
40+
</logger>
41+
42+
<logger name="ch.qos.logback"
43+
level="error"
44+
additivity="false">
45+
<appender-ref ref="TEST"/>
46+
</logger>
47+
48+
<logger name="com.twitter.finatra.kafkastreams.integration.wordcount_windowed.PreRestoreWindowedWordCountServer"
49+
level="info"
50+
additivity="false">
51+
<appender-ref ref="MAIN_SERVER"/>
52+
</logger>
53+
54+
<logger name="kafka"
55+
additivity="false">
56+
<appender-ref ref="KAFKA"/>
57+
</logger>
58+
59+
<logger name="org.apache.kafka.streams.KafkaStreams"
60+
additivity="false">
61+
<appender-ref ref="STATE"/>
62+
</logger>
63+
64+
<!-- Main Twitter Config -->
65+
66+
<logger name="com.twitter" level="info"/>
67+
<logger name="com.twitter.finatra" level="info"/>
68+
<logger name="com.twitter.finatra.http.filters.AccessLoggingFilter" level="warn"/>
69+
<logger name="com.twitter.finatra.messaging" level="debug"/>
70+
<logger name="com.twitter.finatra.kafka" level="debug"/>
71+
<logger name="com.twitter.finatra.kafkastreams" level="debug"/>
72+
<logger name="com.twitter.finatra.streams" level="debug"/>
73+
<logger name="com.twitter.finagle.tracing.DefaultTracer$" level="warn"/>
74+
<logger name="com.twitter.finagle.tracing.DefaultTracer" level="warn"/>
75+
<logger name="com.twitter.finagle.toggle.TwitterToggleMap" level="error"/>
76+
77+
<logger name="kafka" level="warn"/>
78+
<logger name="kafka.server.BrokerMetadataCheckpoint" level="error"/>
79+
<logger name="kafka.server.KafkaConfig" level="warn"/>
80+
<logger name="logger" level="warn"/>
81+
<logger name="org.apache.kafka" level="warn"/>
82+
<logger name="org.apache.kafka.clients.NetworkClient" level="error"/>
83+
<logger name="org.apache.kafka.streams" level="info"/>
84+
<logger name="org.apache.zookeeper" level="error"/>
85+
86+
<!-- Root Logger -->
87+
<root level="warn">
88+
<appender-ref ref="MAIN"/>
89+
</root>
90+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.twitter.finatra.kafkastreams.integration.wordcount
2+
3+
import com.twitter.finatra.kafka.serde.ScalaSerdes
4+
import com.twitter.finatra.kafkastreams.KafkaStreamsTwitterServer
5+
import com.twitter.finatra.kafkastreams.prerestore.PreRestoreState
6+
import org.apache.kafka.common.serialization.Serdes
7+
import org.apache.kafka.streams.StreamsBuilder
8+
import org.apache.kafka.streams.kstream.{Consumed, Materialized, Produced}
9+
import org.apache.kafka.streams.scala.kstream.Grouped
10+
11+
class PreRestoreWordCountRocksDbServer extends KafkaStreamsTwitterServer with PreRestoreState {
12+
13+
override val name = "wordcount"
14+
private val countStoreName = "CountsStore"
15+
flag("hack_to_allow_explicit_http_port_below", "hack", "hack")
16+
17+
override protected def configureKafkaStreams(builder: StreamsBuilder): Unit = {
18+
builder.asScala
19+
.stream("TextLinesTopic")(Consumed.`with`(Serdes.Bytes, Serdes.String))
20+
.flatMapValues(_.split(' '))
21+
.groupBy((_, word) => word)(Grouped.`with`(Serdes.String, Serdes.String))
22+
.count()(Materialized.as(countStoreName))
23+
.toStream
24+
.to("WordsWithCountsTopic")(Produced.`with`(Serdes.String, ScalaSerdes.Long))
25+
}
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package com.twitter.finatra.kafkastreams.integration.wordcount
2+
3+
import com.twitter.conversions.DurationOps._
4+
import com.twitter.finatra.http.EmbeddedHttpServer
5+
import com.twitter.finatra.kafka.serde.ScalaSerdes
6+
import com.twitter.finatra.kafkastreams.internal.utils.CompatibleUtils
7+
import com.twitter.finatra.kafkastreams.test.KafkaStreamsMultiServerFeatureTest
8+
import com.twitter.util.{Await, Duration}
9+
import org.apache.kafka.common.serialization.Serdes
10+
import org.scalatest.Ignore
11+
12+
@Ignore
13+
class PreRestoreWordCountServerFeatureTest extends KafkaStreamsMultiServerFeatureTest {
14+
15+
private def createServer(preRestore: Boolean): EmbeddedHttpServer = {
16+
new EmbeddedHttpServer(
17+
new PreRestoreWordCountRocksDbServer,
18+
flags = kafkaStreamsFlags ++ Map(
19+
"kafka.application.num.instances" -> "1",
20+
"kafka.prerestore.state" -> s"$preRestore",
21+
"kafka.prerestore.duration" -> "1000.milliseconds",
22+
"kafka.application.server" -> "0.foo.scosenza.service.smf1.twitter.com:12345",
23+
"kafka.application.id" -> "wordcount-prod"
24+
)
25+
)
26+
}
27+
28+
override protected def kafkaCommitInterval: Duration = 1.second
29+
30+
private val textLinesTopic =
31+
kafkaTopic(ScalaSerdes.Long, Serdes.String, "TextLinesTopic", logPublish = false)
32+
private val countsChangelogTopic = kafkaTopic(
33+
Serdes.String,
34+
Serdes.Long,
35+
"wordcount-prod-CountsStore-changelog",
36+
autoCreate = false
37+
)
38+
private val wordsWithCountsTopic = kafkaTopic(Serdes.String, Serdes.Long, "WordsWithCountsTopic")
39+
40+
test("word count") {
41+
testInitialStartupWithoutPrerestore()
42+
testRestartWithoutPrerestore()
43+
testRestartWithoutPrerestore()
44+
testRestartWithPrerestore()
45+
}
46+
47+
private def testInitialStartupWithoutPrerestore(): Unit = {
48+
val server = createServer(preRestore = false)
49+
//val countsStore = kafkaStateStore[String, Long]("CountsStore", server)
50+
server.start()
51+
val serverStats = server.inMemoryStats
52+
53+
textLinesTopic.publish(1L -> "hello world hello")
54+
/*countsStore.queryKeyValueUntilValue("hello", 2L)
55+
countsStore.queryKeyValueUntilValue("world", 1L)*/
56+
57+
textLinesTopic.publish(1L -> "world world")
58+
//countsStore.queryKeyValueUntilValue("world", 3L)
59+
serverStats.gauges.waitFor("kafka/thread1/restore_consumer/records_consumed_total", 0.0f)
60+
61+
for (i <- 1 to 1000) {
62+
textLinesTopic.publish(1L -> s"foo$i")
63+
}
64+
65+
server.close()
66+
Await.result(server.mainResult)
67+
server.clearStats()
68+
CompatibleUtils.resetStreamThreadId()
69+
}
70+
71+
private def testRestartWithoutPrerestore(): Unit = {
72+
val server = createServer(preRestore = false)
73+
/*val countsStore = kafkaStateStore[String, Long]("CountsStore", server)
74+
server.start()
75+
val serverStats = InMemoryStatsUtil(server.injector)
76+
77+
countsStore.queryKeyValueUntilValue("hello", 2L)
78+
countsStore.queryKeyValueUntilValue("world", 3L)
79+
server.printStats()
80+
//TODO byte consumed is >0 but records consumed is 0 :-/ serverStats.waitForGauge("kafka/thread1/restore_consumer/records_consumed_total", 2)*/
81+
82+
server.close()
83+
Await.result(server.mainResult)
84+
server.clearStats()
85+
resetStreamThreadId()
86+
}
87+
88+
private def testRestartWithPrerestore(): Unit = {
89+
val server = createServer(preRestore = true)
90+
/*val countsStore = kafkaStateStore[String, Long]("CountsStore", server)
91+
server.start()
92+
val serverStats = InMemoryStatsUtil(server.injector)
93+
94+
countsStore.queryKeyValueUntilValue("hello", 2L)
95+
countsStore.queryKeyValueUntilValue("world", 3L)
96+
serverStats.waitForGauge("kafka/thread1/restore_consumer/records_consumed_total", 0)*/
97+
98+
server.close()
99+
Await.result(server.mainResult)
100+
server.clearStats()
101+
resetStreamThreadId()
102+
}
103+
}

0 commit comments

Comments
 (0)