Skip to content

Commit

Permalink
BlockchainUpdates API extension (#3226)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvshur authored Oct 1, 2020
1 parent 855eb6a commit c41982f
Show file tree
Hide file tree
Showing 32 changed files with 1,769 additions and 953 deletions.
14 changes: 9 additions & 5 deletions blockchain-updates/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@ import WavesDockerKeys._

name := "blockchain-updates"

libraryDependencies ++= Dependencies.kafka +: Dependencies.protobuf.value
libraryDependencies ++= Dependencies.grpc

extensionClasses += "com.wavesplatform.events.BlockchainUpdates"

inConfig(Compile)(
Seq(
PB.protoSources in Compile := Seq(PB.externalIncludePath.value),
includeFilter in PB.generate := new SimpleFileFilter((f: File) => f.getName.endsWith(".proto") && f.getParent.replace('\\', '/').endsWith("waves/events")),
includeFilter in PB.generate := { (f: File) =>
(** / "waves" / "events" / ** / "*.proto").matches(f.toPath)
},
PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value
))
)
)

enablePlugins(RunApplicationSettings, WavesExtensionDockerPlugin, ExtensionPackaging)

docker := docker.dependsOn(LocalProject("node-it") / docker).value
inTask(docker)(
Seq(
imageNames := Seq(ImageName("com.wavesplatform/blockchain-updates")),
exposedPorts := Set(6886),
exposedPorts := Set(6880, 6881),
additionalFiles ++= Seq(
(LocalProject("blockchain-updates") / Universal / stage).value
)
))
)
)
19 changes: 1 addition & 18 deletions blockchain-updates/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
blockchain-updates {
# ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
# format: address:port,address2:port2,...
bootstrap-servers = "localhost:9092"

# name of topic to send updates to
topic = "blockchain-updates"

# ProducerConfig.CLIENT_ID_CONFIG
client-id = "waves-node"

# Authentication
ssl {
enabled = no

username = ""

password = ""
}
grpc-port = 6881
}
Original file line number Diff line number Diff line change
@@ -1,143 +1,145 @@
package com.wavesplatform.events

import java.time.{Duration => JDuration}
import java.util

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit

import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.events.api.grpc.BlockchainUpdatesApiGrpcImpl
import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc
import com.wavesplatform.events.repo.UpdatesRepoImpl
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.extensions.{Context, Extension}
import com.wavesplatform.state.Blockchain
import com.wavesplatform.state.diffs.BlockDiffer
import com.wavesplatform.utils.ScorexLogging
import io.grpc.Server
import io.grpc.netty.NettyServerBuilder
import monix.execution.Scheduler
import net.ceedubs.ficus.Ficus._
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.utils.{ScorexLogging, forceStopApplication}
import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.reactive.Observer
import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata}
import com.wavesplatform.events.kafka.{createProducer, createProducerRecord, createProperties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.serialization.Deserializer
import com.wavesplatform.events.protobuf.{BlockchainUpdated => PBBlockchainUpdated}
import org.apache.kafka.common.TopicPartition

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success}

class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging {
import monix.execution.Scheduler.Implicits.global
class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging with BlockchainUpdateTriggers {
implicit val scheduler: Scheduler = Scheduler(context.actorSystem.dispatcher)

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("blockchain-updates")

private[this] var maybeProducer: Option[KafkaProducer[Int, BlockchainUpdated]] = None

private def getLastHeight(timeout: Duration = 10.seconds): Int = {
import scala.jdk.CollectionConverters._

val props = createProperties(settings)
props.put(ConsumerConfig.GROUP_ID_CONFIG, "admin")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1")

val consumer = new KafkaConsumer[Unit, Int](
props,
new Deserializer[Unit] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): Unit = {}
override def close(): Unit = {}
},
new Deserializer[Int] { // height of last event
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {}
override def deserialize(topic: String, data: Array[Byte]): Int =
PBBlockchainUpdated.parseFrom(data).height
override def close(): Unit = {}
private[this] val repo = new UpdatesRepoImpl(s"${context.settings.directory}/blockchain-updates")

private[this] var grpcServer: Server = null

override def start(): Unit = {
log.info(s"BlockchainUpdates extension starting with settings $settings")

// startup checks
val nodeHeight = context.blockchain.height
val extensionHeight = repo.height.get
if (extensionHeight < nodeHeight) {
val exception = new IllegalStateException(s"BlockchainUpdates at height $extensionHeight is lower than node at height $nodeHeight")
log.error("BlockchainUpdates startup check failed", exception)
throw exception
} else if (nodeHeight > 0) {
(repo.updateForHeight(nodeHeight), context.blockchain.blockHeader(nodeHeight)) match {
case (Success(Some(extensionBlockAtNodeHeight)), Some(lastNodeBlockHeader)) =>
val lastNodeBlockId = lastNodeBlockHeader.id.value()

// check if extension is on fork. Block ids must be equal at node height
if (extensionBlockAtNodeHeight.toId != lastNodeBlockId) {
val exception = new IllegalStateException(
s"BlockchainUpdates extension has forked: at node height $nodeHeight node block id is $lastNodeBlockId, extension's is ${extensionBlockAtNodeHeight.toId}"
)
log.error("BlockchainUpdates startup check failed", exception)
throw exception
}

// if not on fork, but extension moved higher than node, rollback the extension to recover
if (extensionHeight > nodeHeight) {
log.warn(s"BlockchainUpdates at height $extensionHeight is higher than node at height $nodeHeight, rolling back BlockchainUpdates")
repo
.rollback(RollbackCompleted(extensionBlockAtNodeHeight.toId, extensionBlockAtNodeHeight.toHeight))
.recoverWith { case _: Throwable => Failure(new RuntimeException("BlockchainUpdates failed to rollback at startup")) }
.get
}
case (Success(None), Some(_)) =>
val exception = new RuntimeException(
s"BlockchainUpdates has no block at height $nodeHeight, while node has one at startup. Extension height: $extensionHeight, node height: $nodeHeight"
)
log.error("BlockchainUpdates startup check failed", exception)
throw exception
case (Failure(ex), _) =>
val exception = new RuntimeException(s"BlockchainUpdates failed to get extension block info at node height at startup", ex)
log.error("BlockchainUpdates startup check failed", ex)
throw exception
case (Success(_), None) =>
val exception = new RuntimeException(s"Incorrect node state: missing block at height $nodeHeight")
log.error("BlockchainUpdates startup check failed", exception)
throw exception
case _ =>
val exception = new RuntimeException(
s"BlockchainUpdates failed to perform a startup check. Extension height: $extensionHeight, node height: $nodeHeight"
)
log.error("BlockchainUpdates startup check failed", exception)
throw exception
}
)
}
log.info(s"BlockchainUpdates startup check successful at height $extensionHeight")

val partition = consumer.partitionsFor(settings.topic).asScala.head
val tp = new TopicPartition(settings.topic, partition.partition)
// starting gRPC API
val bindAddress = new InetSocketAddress("0.0.0.0", settings.grpcPort)

consumer.assign(util.Arrays.asList(tp))
grpcServer = NettyServerBuilder
.forAddress(bindAddress)
.addService(BlockchainUpdatesApiGrpc.bindService(new BlockchainUpdatesApiGrpcImpl(repo)(scheduler), scheduler))
.build()
.start()

val endOffset = consumer.endOffsets(util.Arrays.asList(tp)).asScala.apply(tp)
log.info(s"BlockchainUpdates extension started gRPC API on port ${settings.grpcPort}")
}

if (endOffset != 0) {
consumer.seek(tp, endOffset - 1)
override def shutdown(): Future[Unit] = Future {
log.info(s"BlockchainUpdates extension shutting down, last persisted height ${repo.height.get - 1}")

val records = consumer.poll(JDuration.ofMillis(timeout.toMillis))
if (grpcServer != null) {
grpcServer.shutdown()
grpcServer.awaitTermination(10L, TimeUnit.SECONDS)
}

if (records.isEmpty) 0
else records.records(tp).asScala.last.value
} else 0
repo.shutdown()
}

private[this] def startupCheck(): Unit = {
// if kafkaHeight <= (blockchainHeight + 1) — rollback node with event sending to Kafka. If rollback fails — fail
// if kafkaHeight > (blockchainHeight + 1) — fail. This should not happen
// if kafka is empty, but blockchain is further than genesis block — fail
// if both kafka and blockchain are empty — OK

// Idea for better checks: maintain Kafka view of blocks with signatures and check for (and recover from) forks.
// The view can be maintained via transaction writes

val blockchainHeight = context.blockchain.height
val kafkaHeight = getLastHeight()

if (kafkaHeight == 0 && blockchainHeight > 1)
throw new IllegalStateException("No events in Kafka, but blockchain is neither empty nor on genesis block.")

if (kafkaHeight > blockchainHeight + 1 || (kafkaHeight != 0 && blockchainHeight == 0))
throw new IllegalStateException(s"""Node is behind kafka. Kafka is at $kafkaHeight, while node is at $blockchainHeight.
|This should never happen. Manual correction of even full system restart might be necessary.""".stripMargin)

if (kafkaHeight != 0) {
val heightToRollbackTo = Math.max(kafkaHeight - 1, 1)
val sigToRollback = context.blockchain
.blockHeader(heightToRollbackTo)
.map(_.id())
.get // guaranteed not to fail by previous checks on heights

log.info(s"Kafka is at $kafkaHeight, while node is at $blockchainHeight. Rolling node back to $heightToRollbackTo")
context.rollbackTo(sigToRollback).runSyncUnsafe(10.second) match {
case Right(_) =>
case Left(_) =>
throw new IllegalStateException(s"Unable to rollback Node to Kafka state. Kafka is at $kafkaHeight, while node is at $blockchainHeight.")
}
override def onProcessBlock(
block: Block,
diff: BlockDiffer.DetailedDiff,
minerReward: Option[Long],
blockchainBeforeWithMinerReward: Blockchain
): Unit = {
val newBlock = BlockAppended.from(block, diff, minerReward, blockchainBeforeWithMinerReward)
repo.appendBlock(newBlock).get
if (newBlock.toHeight % 100 == 0) {
log.debug(s"BlockchainUpdates appended blocks up to ${newBlock.toHeight}")
}
}

override def start(): Unit = {
maybeProducer = Some(createProducer(settings))
maybeProducer foreach { producer =>
log.info("Performing startup node/Kafka consistency check...")

context.blockchainUpdated.subscribe(new Observer.Sync[BlockchainUpdated] {
override def onNext(elem: BlockchainUpdated): Ack = {
producer.send(
createProducerRecord(settings.topic, elem),
(_: RecordMetadata, exception: Exception) =>
if (exception != null) {
log.error("Error sending blockchain updates", exception)
forceStopApplication()
}
)
Continue
}
override def onError(ex: Throwable): Unit = {
log.error("Error sending blockchain updates", ex)
forceStopApplication()
}
override def onComplete(): Unit = {
log.error("Blockchain updates Observable complete")
forceStopApplication() // this should never happen, but just in case, explicit stop.
}
})

// startupCheck is after subscription, so that if the check makes a rollback, it would be handled
startupCheck()
log.info("Starting sending blockchain updates to Kafka")
}
override def onProcessMicroBlock(
microBlock: MicroBlock,
diff: BlockDiffer.DetailedDiff,
blockchainBeforeWithMinerReward: Blockchain,
totalBlockId: ByteStr,
totalTransactionsRoot: ByteStr
): Unit = {
val newMicroBlock = MicroBlockAppended.from(microBlock, diff, blockchainBeforeWithMinerReward, totalBlockId, totalTransactionsRoot)
repo.appendMicroBlock(newMicroBlock).get
}

override def shutdown(): Future[Unit] = Future {
log.info("Shutting down blockchain updates sending")
maybeProducer foreach (_.close())
override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = {
val rollbackCompleted = RollbackCompleted(toBlockId, toHeight)
repo.rollback(rollbackCompleted).get
}

override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = {
val microBlockRollbackCompleted = MicroBlockRollbackCompleted(toBlockId, height)
repo.rollbackMicroBlock(microBlockRollbackCompleted).get
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.wavesplatform.events.api.grpc

import com.wavesplatform.events.api.grpc.backpressure._
import com.wavesplatform.events.api.grpc.protobuf._
import com.wavesplatform.events.protobuf.serde._
import com.wavesplatform.events.repo.UpdatesRepo
import com.wavesplatform.utils.ScorexLogging
import io.grpc.stub.StreamObserver
import io.grpc.{Status, StatusRuntimeException}
import monix.execution.Scheduler

import scala.concurrent.Future
import scala.util.{Failure, Success}

class BlockchainUpdatesApiGrpcImpl(repo: UpdatesRepo.Read with UpdatesRepo.Stream)(implicit sc: Scheduler)
extends BlockchainUpdatesApiGrpc.BlockchainUpdatesApi
with ScorexLogging {
override def getBlockUpdate(request: GetBlockUpdateRequest): Future[GetBlockUpdateResponse] = Future {
repo.updateForHeight(request.height) match {
case Success(Some(upd)) => GetBlockUpdateResponse(Some(upd.protobuf))
case Success(None) => throw new StatusRuntimeException(Status.NOT_FOUND)
case Failure(e: IllegalArgumentException) =>
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(e.getMessage))
case Failure(exception) =>
log.error(s"BlockchainUpdates gRPC failed to get block update for height ${request.height}", exception)
throw new StatusRuntimeException(Status.INTERNAL)
}
}

override def getBlockUpdatesRange(request: GetBlockUpdatesRangeRequest): Future[GetBlockUpdatesRangeResponse] = Future {
repo.updatesRange(request.fromHeight, request.toHeight) match {
case Success(updates) => GetBlockUpdatesRangeResponse(updates.map(_.protobuf))
case Failure(e: IllegalArgumentException) =>
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(e.getMessage))
case Failure(e) =>
log.error(s"BlockchainUpdates gRPC failed to get block range updates for range ${request.fromHeight} to ${request.toHeight}", e)
throw new StatusRuntimeException(Status.INTERNAL)
}
}

override def subscribe(request: SubscribeRequest, responseObserver: StreamObserver[SubscribeEvent]): Unit = {
if (request.fromHeight <= 0) {
responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("height must be a positive integer")))
} else {
val updatesPB = repo
.stream(request.fromHeight)
.map(elem => SubscribeEvent(update = Some(elem.protobuf)))

wrapObservable(updatesPB, responseObserver)(identity)
}
}

}
Loading

0 comments on commit c41982f

Please sign in to comment.