Skip to content

Commit

Permalink
add AsyncBuffer for KafkaSink
Browse files Browse the repository at this point in the history
  • Loading branch information
beineng committed Oct 10, 2018
1 parent cab1a29 commit 4312495
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 6 deletions.
10 changes: 10 additions & 0 deletions admin/frontend/src/lib/task-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,16 @@ const sinkSchema: any = {
"title": "Kafka Producer Properties",
"properties": {
}
},
"use-async-buffer": {
"type": "boolean",
"default": false,
"required": true
},
"async-buffer-size": {
"type": "integer",
"default": 10000,
"required": true
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ task {
# can be defined in this configuration section.
properties {
}

use-async-buffer = false
async-buffer-size = 10000
}

tnc-topic-resolver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningConte
import com.thenetcircle.event_bus.event.EventImpl
import com.thenetcircle.event_bus.event.extractor.DataFormat.DataFormat
import com.thenetcircle.event_bus.event.extractor.{DataFormat, EventExtractingException, EventExtractorFactory}
import com.thenetcircle.event_bus.misc.Util
import com.thenetcircle.event_bus.interfaces.EventStatus.{Fail, Norm, SuccStatus, ToFB}
import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SourceTask, SourceTaskBuilder}
import com.thenetcircle.event_bus.misc.Util
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging
import net.ceedubs.ficus.Ficus._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package com.thenetcircle.event_bus.tasks.kafka

import java.util.concurrent.TimeUnit
import java.util
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}

import akka.NotUsed
import akka.kafka.ProducerMessage.Message
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Flow, GraphDSL, Sink}
import akka.stream.stage._
import akka.stream._
import com.thenetcircle.event_bus.context.{TaskBuildingContext, TaskRunningContext}
import com.thenetcircle.event_bus.interfaces.EventStatus.Norm
import com.thenetcircle.event_bus.interfaces.{Event, EventStatus, SinkTask, SinkTaskBuilder}
Expand All @@ -42,13 +45,17 @@ case class KafkaSinkSettings(
parallelism: Int = 100,
closeTimeout: FiniteDuration = 60.seconds,
useDispatcher: Option[String] = None,
properties: Map[String, String] = Map.empty
properties: Map[String, String] = Map.empty,
useAsyncBuffer: Boolean = false,
asyncBufferSize: Int = 10000
)

class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLogging {

require(settings.bootstrapServers.nonEmpty, "bootstrap servers is required.")

logger.info(s"Initializing KafkaSink with settings: $settings")

def getProducerSettings()(
implicit runningContext: TaskRunningContext
): ProducerSettings[ProducerKey, ProducerValue] = {
Expand Down Expand Up @@ -126,7 +133,10 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog

// Note that the flow might be materialized multiple times,
// like from HttpSource(multiple connections), KafkaSource(multiple topicPartitions)
Flow[Event]
// TODO pretect that the stream crashed by sending failure
// TODO use Producer.flexiFlow
// TODO optimize logging
val producingFlow = Flow[Event]
.map(createMessage)
.via(Producer.flow(kafkaSettings, _kafkaProducer))
.map(result => {
Expand All @@ -138,6 +148,13 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog

(Norm, result.message.passThrough)
})

if (settings.useAsyncBuffer) {
logger.debug("wrapping async buffer")
KafkaSink.wrapAsyncBuffer(settings.asyncBufferSize, producingFlow)
} else {
producingFlow
}
}

override def shutdown()(implicit runningContext: TaskRunningContext): Unit = {
Expand All @@ -148,6 +165,101 @@ class KafkaSink(val settings: KafkaSinkSettings) extends SinkTask with StrictLog
}
}

object KafkaSink {

def wrapAsyncBuffer(bufferSize: Int, producingFlow: Flow[Event, _, _]): Flow[Event, (EventStatus, Event), NotUsed] =
Flow
.fromGraph(
GraphDSL
.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = builder.add(new AsyncBuffer(bufferSize))
buffer.out1 ~> producingFlow ~> Sink.ignore
FlowShape(buffer.in, buffer.out0)
}
)

class AsyncBuffer(bufferSize: Int) extends GraphStage[FanOutShape2[Event, (EventStatus, Event), Event]] {

val in = Inlet[Event]("AsyncBuffer.in")
val out0 = Outlet[(EventStatus, Event)]("AsyncBuffer.out0")
val out1 = Outlet[Event]("AsyncBuffer.out1")

val shape: FanOutShape2[Event, (EventStatus, Event), Event] = new FanOutShape2(in, out0, out1)

override def createLogic(
inheritedAttributes: Attributes
): GraphStageLogic = new GraphStageLogic(shape) with InHandler with StageLogging {
private val bufferImpl: util.Queue[Event] = new LinkedBlockingQueue(bufferSize)

private def handleMissedEvent(event: Event): Unit =
log.warning(s"[MISSED] ${event.body.data}")

override def onPush(): Unit = {
val event = grab(in)

if (isAvailable(out0)) {
push(out0, (Norm, event))
}

// If out1 is available, then it has been pulled but no dequeued element has been delivered.
// It means the buffer at this moment is definitely empty,
// so we just push the current element to out, then pull.
if (isAvailable(out1)) {
push(out1, event)
} else {
if (!bufferImpl.offer(event)) {
// buffer is full, record log
log.warning("A event [" + Util.getBriefOfEvent(event) + "] is dropped since the AsyncBuffer is full.")
handleMissedEvent(event)
}
}

pull(in)
}

override def onUpstreamFinish(): Unit =
if (bufferImpl.isEmpty) completeStage()

override def postStop(): Unit =
while (!bufferImpl.isEmpty) {
handleMissedEvent(bufferImpl.poll())
}

setHandler(in, this)

// outlet for outside
setHandler(
out0,
new OutHandler {
override def onPull(): Unit =
if (!hasBeenPulled(in)) pull(in)

override def onDownstreamFinish(): Unit =
if (bufferImpl.isEmpty) completeStage()
}
)

// outlet for kafka producer
setHandler(
out1,
new OutHandler {
override def onPull(): Unit = {
if (!bufferImpl.isEmpty) push(out1, bufferImpl.poll())
if (isClosed(in)) {
if (bufferImpl.isEmpty) completeStage()
} else if (!hasBeenPulled(in)) {
pull(in)
}
}
}
)
}

}

}

class KafkaSinkBuilder() extends SinkTaskBuilder {
override def build(
configString: String
Expand All @@ -164,7 +276,9 @@ class KafkaSinkBuilder() extends SinkTaskBuilder {
config.as[Int]("parallelism"),
config.as[FiniteDuration]("close-timeout"),
config.as[Option[String]]("use-dispatcher"),
config.as[Map[String, String]]("properties")
config.as[Map[String, String]]("properties"),
config.as[Boolean]("use-async-buffer"),
config.as[Int]("async-buffer-size")
)

new KafkaSink(settings)
Expand Down

0 comments on commit 4312495

Please sign in to comment.