Skip to content

Commit

Permalink
add AsyncBuffer of KafkaSink to handle Kafka borkers failure cases, w…
Browse files Browse the repository at this point in the history
…hich without blocking sender, and record failed events to logs by MissedEventHandler
  • Loading branch information
beineng committed Oct 18, 2018
1 parent 4312495 commit 46df10d
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 69 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ target/
/zkdata/
/native/
/integration-test/native/
hs_err_*.log
hs_err_*.log
.ensime*
25 changes: 23 additions & 2 deletions admin/frontend/src/lib/task-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,17 +296,38 @@ const sinkSchema: any = {
"properties": {
"type": "object",
"title": "Kafka Producer Properties",
"required": true,
"properties": {
"acks": {
"type": "string",
"default": "all",
"required": true
},
"retries": {
"type": "integer",
"default": 30,
"required": true
},
"max.in.flight.requests.per.connection": {
"type": "integer",
"default": 5,
"required": true
},
"enable.idempotence": {
"type": "boolean",
"default": true,
"required": true
}
}
},
"use-async-buffer": {
"type": "boolean",
"default": false,
"default": true,
"required": true
},
"async-buffer-size": {
"type": "integer",
"default": 10000,
"default": 100,
"required": true
}
}
Expand Down
56 changes: 41 additions & 15 deletions core/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,43 +1,69 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${EB_LOGFILE:-./log/default.log}</file>
<shutdownHook/>

<!-- appenders -->
<appender name="ROLLING" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${EB_LOGFILE:-./log/default}.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${EB_LOGFILE:-./log/default.log}.%d{yyyy-MM-dd}.%i</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<!-- or whenever the file size reaches 50MB -->
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!-- keep 30 days' worth of history -->
<fileNamePattern>${EB_LOGFILE:-./log/default}.%d{yyyy-MM-dd}.log</fileNamePattern>
<maxHistory>30</maxHistory>
<totalSizeCap>3GB</totalSizeCap>
</rollingPolicy>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<appender name="ASYNC_ROLLING" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="ROLLING" />
</appender>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} %X{sourceThread} %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>

<appender name="Sentry" class="io.sentry.logback.SentryAppender">
<appender name="SENTRY" class="io.sentry.logback.SentryAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>WARN</level>
</filter>
</appender>

<appender name="MISSED" class="ch.qos.logback.core.FileAppender">
<file>${EB_MISSED_EVENTS_FILE:-./log/missed_events}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>

<appender name="ASYNC_MISSED" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="MISSED" />
<queueSize>256</queueSize>
<maxFlushTime>0</maxFlushTime>
<discardingThreshold>0</discardingThreshold>
<includeCallerData>false</includeCallerData>
<neverBlock>true</neverBlock>
</appender>


<!-- loggers -->
<root level="${EB_LOGLEVEL:-DEBUG}">
<appender-ref ref="${EB_LOGREF:-CONSOLE}" />
<appender-ref ref="SENTRY" />
</root>

<logger name="org.apache.kafka" level="INFO" />
<logger name="org.apache.zookeeper" level="INFO" />
<logger name="com.datastax.driver.core.Connection" level="INFO" />

<root level="${EB_LOGLEVEL:-DEBUG}">
<appender-ref ref="${EB_LOGREF:-STDOUT}" />
<appender-ref ref="Sentry" />
</root>
<logger name="com.thenetcircle.event_bus.misc.MissedEventHandler" additivity="false">
<appender-ref ref="ASYNC_MISSED" />
</logger>

</configuration>
8 changes: 6 additions & 2 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ task {
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
properties {
acks = all
retries = 30
"max.in.flight.requests.per.connection" = 5
"enable.idempotence" = true
}

use-async-buffer = false
async-buffer-size = 10000
use-async-buffer = true
async-buffer-size = 100
}

tnc-topic-resolver {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Beineng Ma <[email protected]>
*/

package com.thenetcircle.event_bus.misc

import com.thenetcircle.event_bus.interfaces.Event
import com.typesafe.scalalogging.StrictLogging

object MissedEventHandler extends StrictLogging {

def handle(event: Event): Unit =
logger.warn(event.body.data)

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import java.util
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import akka.NotUsed
import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerMessage.{Envelope, Message, Result}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Sink}
import akka.stream.stage._
import akka.stream._
import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext}
import com.thenetcircle.event_bus.interfaces.EventStatus.Norm
import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SinkTask, SinkTaskBuilder}
import com.thenetcircle.event_bus.misc.Util
import com.thenetcircle.event_bus.misc.{MissedEventHandler, Util}
import com.thenetcircle.event_bus.tasks.kafka.extended.{EventSerializer, KafkaKey, KafkaKeySerializer, KafkaPartitioner}
import com.typesafe.scalalogging.StrictLogging
import net.ceedubs.ficus.Ficus._
Expand Down Expand Up @@ -81,9 +81,9 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog
// .withProperty("client.id", clientId)
}

def createMessage(event: Event)(
def createEnvelope(event: Event)(
implicit runningContext: TaskRunningContext
): Message[ProducerKey, ProducerValue, Event] = {
): Envelope[ProducerKey, ProducerValue, Event] = {
val record = createProducerRecord(event)
logger.debug(s"new kafka record $record is created")
Message(record, event)
Expand Down Expand Up @@ -129,25 +129,25 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog
kafkaProducer.get
})

// TODO issue when send to new topics, check here https://github.com/akka/reactive-kafka/issues/163

// Note that the flow might be materialized multiple times,
// like from HttpSource(multiple connections), KafkaSource(multiple topicPartitions)
// TODO pretect that the stream crashed by sending failure
// TODO use Producer.flexiFlow
// TODO optimize logging
// DONE issue when send to new topics, check here https://github.com/akka/reactive-kafka/issues/163
// DONE protects that the stream crashed by sending failure
// DONE use Producer.flexiFlow
// DONE optimize logging
val producingFlow = Flow[Event]
.map(createMessage)
.via(Producer.flow(kafkaSettings, _kafkaProducer))
.map(result => {
val eventBrief = Util.getBriefOfEvent(result.message.passThrough)
val kafkaBrief =
s"topic: ${result.metadata.topic()}, partition: ${result.metadata.partition()}, offset: ${result.metadata
.offset()}, key: ${Option(result.message.record.key()).map(_.rawData).getOrElse("")}"
logger.info(s"sending event [$eventBrief] to kafka [$kafkaBrief] succeeded.")

(Norm, result.message.passThrough)
})
.map(createEnvelope)
.via(Producer.flexiFlow(kafkaSettings, _kafkaProducer))
.map {
case Result(metadata, message) =>
val eventBrief = Util.getBriefOfEvent(message.passThrough)
val kafkaBrief =
s"topic: ${metadata.topic()}, partition: ${metadata.partition()}, offset: ${metadata
.offset()}, key: ${Option(message.record.key()).map(_.rawData).getOrElse("")}"
logger.info(s"sending event [$eventBrief] to kafka [$kafkaBrief] succeeded.")

(Norm, message.passThrough)
}

if (settings.useAsyncBuffer) {
logger.debug("wrapping async buffer")
Expand All @@ -167,17 +167,22 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog

object KafkaSink {

def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] =
def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] = {
val producingSink: Sink[Event, _] = producingFlow
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.to(Sink.ignore)

Flow
.fromGraph(
GraphDSL
.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = builder.add(new AsyncBuffer(bufferSize))
buffer.out1 ~> producingFlow ~> Sink.ignore
buffer.out1 ~> producingSink
FlowShape(buffer.in, buffer.out0)
}
)
}

class AsyncBuffer(bufferSize: Int) extends GraphStage[FanOutShape2[Event, (EventStatus, Event), Event]] {

Expand All @@ -190,10 +195,12 @@ object KafkaSink {
override def createLogic(
inheritedAttributes: Attributes
): GraphStageLogic = new GraphStageLogic(shape) with InHandler with StageLogging {
private val bufferImpl: util.Queue[Event] = new LinkedBlockingQueue(bufferSize)
private val buffer: util.Queue[Event] = new LinkedBlockingQueue(bufferSize)

private def handleMissedEvent(event: Event): Unit =
log.warning(s"[MISSED] ${event.body.data}")
private def flushBuffer(): Unit =
while (!buffer.isEmpty) {
MissedEventHandler.handle(buffer.poll())
}

override def onPush(): Unit = {
val event = grab(in)
Expand All @@ -202,29 +209,20 @@ object KafkaSink {
push(out0, (Norm, event))
}

// If out1 is available, then it has been pulled but no dequeued element has been delivered.
// It means the buffer at this moment is definitely empty,
// so we just push the current element to out, then pull.
if (isAvailable(out1)) {
if (buffer.isEmpty && isAvailable(out1)) {
push(out1, event)
} else {
if (!bufferImpl.offer(event)) {
// buffer is full, record log
if (!buffer.offer(event)) { // if the buffer is full
log.warning("A event [" + Util.getBriefOfEvent(event) + "] is dropped since the AsyncBuffer is full.")
handleMissedEvent(event)
MissedEventHandler.handle(event)
}
}

pull(in)
}

override def onUpstreamFinish(): Unit =
if (bufferImpl.isEmpty) completeStage()
if (buffer.isEmpty) completeStage()

override def postStop(): Unit =
while (!bufferImpl.isEmpty) {
handleMissedEvent(bufferImpl.poll())
}
override def postStop(): Unit = flushBuffer()

setHandler(in, this)

Expand All @@ -233,10 +231,14 @@ object KafkaSink {
out0,
new OutHandler {
override def onPull(): Unit =
if (!hasBeenPulled(in)) pull(in)
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
} else if (!hasBeenPulled(in)) {
pull(in)
}

override def onDownstreamFinish(): Unit =
if (bufferImpl.isEmpty) completeStage()
if (buffer.isEmpty) completeStage()
}
)

Expand All @@ -245,12 +247,13 @@ object KafkaSink {
out1,
new OutHandler {
override def onPull(): Unit = {
if (!bufferImpl.isEmpty) push(out1, bufferImpl.poll())
if (isClosed(in)) {
if (bufferImpl.isEmpty) completeStage()
} else if (!hasBeenPulled(in)) {
pull(in)
}
if (!buffer.isEmpty) push(out1, buffer.poll())
if (isClosed(in) && buffer.isEmpty) completeStage()
}

override def onDownstreamFinish(): Unit = {
flushBuffer()
super.onDownstreamFinish()
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class KafkaSinkBuilderTest extends TestBase {
| "use-dispatcher": "test-dispatcher",
| "properties": {
| "batch.size": 1024,
| "connections.max.idle.ms": 100
| "connections.max.idle.ms": 100,
| "max.in.flight.requests.per.connection": 10
| }
|}""".stripMargin)

Expand All @@ -50,6 +51,13 @@ class KafkaSinkBuilderTest extends TestBase {
settings.parallelism shouldEqual 50
settings.useDispatcher.get shouldEqual "test-dispatcher"

settings.useAsyncBuffer shouldEqual true
settings.asyncBufferSize shouldEqual 100
producerSettings.properties("acks") shouldEqual "all"
producerSettings.properties("retries") shouldEqual "30"
producerSettings.properties("max.in.flight.requests.per.connection") shouldEqual "10"
producerSettings.properties("enable.idempotence") shouldEqual "true"

producerSettings.properties("batch.size") shouldEqual "1024"
producerSettings.properties("connections.max.idle.ms") shouldEqual "100"

Expand Down

0 comments on commit 46df10d

Please sign in to comment.