diff --git a/blockchain-updates/build.sbt b/blockchain-updates/build.sbt index 86b418ba8fe..8381b0e1a9e 100644 --- a/blockchain-updates/build.sbt +++ b/blockchain-updates/build.sbt @@ -2,16 +2,19 @@ import WavesDockerKeys._ name := "blockchain-updates" -libraryDependencies ++= Dependencies.kafka +: Dependencies.protobuf.value +libraryDependencies ++= Dependencies.grpc extensionClasses += "com.wavesplatform.events.BlockchainUpdates" inConfig(Compile)( Seq( PB.protoSources in Compile := Seq(PB.externalIncludePath.value), - includeFilter in PB.generate := new SimpleFileFilter((f: File) => f.getName.endsWith(".proto") && f.getParent.replace('\\', '/').endsWith("waves/events")), + includeFilter in PB.generate := { (f: File) => + (** / "waves" / "events" / ** / "*.proto").matches(f.toPath) + }, PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value - )) + ) +) enablePlugins(RunApplicationSettings, WavesExtensionDockerPlugin, ExtensionPackaging) @@ -19,8 +22,9 @@ docker := docker.dependsOn(LocalProject("node-it") / docker).value inTask(docker)( Seq( imageNames := Seq(ImageName("com.wavesplatform/blockchain-updates")), - exposedPorts := Set(6886), + exposedPorts := Set(6880, 6881), additionalFiles ++= Seq( (LocalProject("blockchain-updates") / Universal / stage).value ) - )) + ) +) diff --git a/blockchain-updates/src/main/resources/application.conf b/blockchain-updates/src/main/resources/application.conf index fa663261640..3055a4d2beb 100644 --- a/blockchain-updates/src/main/resources/application.conf +++ b/blockchain-updates/src/main/resources/application.conf @@ -1,20 +1,3 @@ blockchain-updates { - # ProducerConfig.BOOTSTRAP_SERVERS_CONFIG - # format: address:port,address2:port2,... - bootstrap-servers = "localhost:9092" - - # name of topic to send updates to - topic = "blockchain-updates" - - # ProducerConfig.CLIENT_ID_CONFIG - client-id = "waves-node" - - # Authentication - ssl { - enabled = no - - username = "" - - password = "" - } + grpc-port = 6881 } diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala index 513479d6930..6564c809916 100644 --- a/blockchain-updates/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala @@ -1,143 +1,145 @@ package com.wavesplatform.events -import java.time.{Duration => JDuration} -import java.util - +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit + +import com.wavesplatform.block.{Block, MicroBlock} +import com.wavesplatform.common.state.ByteStr +import com.wavesplatform.events.api.grpc.BlockchainUpdatesApiGrpcImpl +import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc +import com.wavesplatform.events.repo.UpdatesRepoImpl +import com.wavesplatform.events.settings.BlockchainUpdatesSettings import com.wavesplatform.extensions.{Context, Extension} +import com.wavesplatform.state.Blockchain +import com.wavesplatform.state.diffs.BlockDiffer +import com.wavesplatform.utils.ScorexLogging +import io.grpc.Server +import io.grpc.netty.NettyServerBuilder +import monix.execution.Scheduler import net.ceedubs.ficus.Ficus._ -import com.wavesplatform.events.settings.BlockchainUpdatesSettings -import com.wavesplatform.utils.{ScorexLogging, forceStopApplication} -import monix.execution.Ack -import monix.execution.Ack.Continue -import monix.reactive.Observer -import org.apache.kafka.clients.producer.{KafkaProducer, RecordMetadata} -import com.wavesplatform.events.kafka.{createProducer, createProducerRecord, createProperties} -import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.common.serialization.Deserializer -import com.wavesplatform.events.protobuf.{BlockchainUpdated => PBBlockchainUpdated} -import org.apache.kafka.common.TopicPartition import scala.concurrent.Future -import scala.concurrent.duration._ +import scala.util.{Failure, Success} -class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging { - import monix.execution.Scheduler.Implicits.global +class BlockchainUpdates(private val context: Context) extends Extension with ScorexLogging with BlockchainUpdateTriggers { + implicit val scheduler: Scheduler = Scheduler(context.actorSystem.dispatcher) private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("blockchain-updates") - private[this] var maybeProducer: Option[KafkaProducer[Int, BlockchainUpdated]] = None - - private def getLastHeight(timeout: Duration = 10.seconds): Int = { - import scala.jdk.CollectionConverters._ - - val props = createProperties(settings) - props.put(ConsumerConfig.GROUP_ID_CONFIG, "admin") - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000") - props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1") - - val consumer = new KafkaConsumer[Unit, Int]( - props, - new Deserializer[Unit] { - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} - override def deserialize(topic: String, data: Array[Byte]): Unit = {} - override def close(): Unit = {} - }, - new Deserializer[Int] { // height of last event - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} - override def deserialize(topic: String, data: Array[Byte]): Int = - PBBlockchainUpdated.parseFrom(data).height - override def close(): Unit = {} + private[this] val repo = new UpdatesRepoImpl(s"${context.settings.directory}/blockchain-updates") + + private[this] var grpcServer: Server = null + + override def start(): Unit = { + log.info(s"BlockchainUpdates extension starting with settings $settings") + + // startup checks + val nodeHeight = context.blockchain.height + val extensionHeight = repo.height.get + if (extensionHeight < nodeHeight) { + val exception = new IllegalStateException(s"BlockchainUpdates at height $extensionHeight is lower than node at height $nodeHeight") + log.error("BlockchainUpdates startup check failed", exception) + throw exception + } else if (nodeHeight > 0) { + (repo.updateForHeight(nodeHeight), context.blockchain.blockHeader(nodeHeight)) match { + case (Success(Some(extensionBlockAtNodeHeight)), Some(lastNodeBlockHeader)) => + val lastNodeBlockId = lastNodeBlockHeader.id.value() + + // check if extension is on fork. Block ids must be equal at node height + if (extensionBlockAtNodeHeight.toId != lastNodeBlockId) { + val exception = new IllegalStateException( + s"BlockchainUpdates extension has forked: at node height $nodeHeight node block id is $lastNodeBlockId, extension's is ${extensionBlockAtNodeHeight.toId}" + ) + log.error("BlockchainUpdates startup check failed", exception) + throw exception + } + + // if not on fork, but extension moved higher than node, rollback the extension to recover + if (extensionHeight > nodeHeight) { + log.warn(s"BlockchainUpdates at height $extensionHeight is higher than node at height $nodeHeight, rolling back BlockchainUpdates") + repo + .rollback(RollbackCompleted(extensionBlockAtNodeHeight.toId, extensionBlockAtNodeHeight.toHeight)) + .recoverWith { case _: Throwable => Failure(new RuntimeException("BlockchainUpdates failed to rollback at startup")) } + .get + } + case (Success(None), Some(_)) => + val exception = new RuntimeException( + s"BlockchainUpdates has no block at height $nodeHeight, while node has one at startup. Extension height: $extensionHeight, node height: $nodeHeight" + ) + log.error("BlockchainUpdates startup check failed", exception) + throw exception + case (Failure(ex), _) => + val exception = new RuntimeException(s"BlockchainUpdates failed to get extension block info at node height at startup", ex) + log.error("BlockchainUpdates startup check failed", ex) + throw exception + case (Success(_), None) => + val exception = new RuntimeException(s"Incorrect node state: missing block at height $nodeHeight") + log.error("BlockchainUpdates startup check failed", exception) + throw exception + case _ => + val exception = new RuntimeException( + s"BlockchainUpdates failed to perform a startup check. Extension height: $extensionHeight, node height: $nodeHeight" + ) + log.error("BlockchainUpdates startup check failed", exception) + throw exception } - ) + } + log.info(s"BlockchainUpdates startup check successful at height $extensionHeight") - val partition = consumer.partitionsFor(settings.topic).asScala.head - val tp = new TopicPartition(settings.topic, partition.partition) + // starting gRPC API + val bindAddress = new InetSocketAddress("0.0.0.0", settings.grpcPort) - consumer.assign(util.Arrays.asList(tp)) + grpcServer = NettyServerBuilder + .forAddress(bindAddress) + .addService(BlockchainUpdatesApiGrpc.bindService(new BlockchainUpdatesApiGrpcImpl(repo)(scheduler), scheduler)) + .build() + .start() - val endOffset = consumer.endOffsets(util.Arrays.asList(tp)).asScala.apply(tp) + log.info(s"BlockchainUpdates extension started gRPC API on port ${settings.grpcPort}") + } - if (endOffset != 0) { - consumer.seek(tp, endOffset - 1) + override def shutdown(): Future[Unit] = Future { + log.info(s"BlockchainUpdates extension shutting down, last persisted height ${repo.height.get - 1}") - val records = consumer.poll(JDuration.ofMillis(timeout.toMillis)) + if (grpcServer != null) { + grpcServer.shutdown() + grpcServer.awaitTermination(10L, TimeUnit.SECONDS) + } - if (records.isEmpty) 0 - else records.records(tp).asScala.last.value - } else 0 + repo.shutdown() } - private[this] def startupCheck(): Unit = { - // if kafkaHeight <= (blockchainHeight + 1) — rollback node with event sending to Kafka. If rollback fails — fail - // if kafkaHeight > (blockchainHeight + 1) — fail. This should not happen - // if kafka is empty, but blockchain is further than genesis block — fail - // if both kafka and blockchain are empty — OK - - // Idea for better checks: maintain Kafka view of blocks with signatures and check for (and recover from) forks. - // The view can be maintained via transaction writes - - val blockchainHeight = context.blockchain.height - val kafkaHeight = getLastHeight() - - if (kafkaHeight == 0 && blockchainHeight > 1) - throw new IllegalStateException("No events in Kafka, but blockchain is neither empty nor on genesis block.") - - if (kafkaHeight > blockchainHeight + 1 || (kafkaHeight != 0 && blockchainHeight == 0)) - throw new IllegalStateException(s"""Node is behind kafka. Kafka is at $kafkaHeight, while node is at $blockchainHeight. - |This should never happen. Manual correction of even full system restart might be necessary.""".stripMargin) - - if (kafkaHeight != 0) { - val heightToRollbackTo = Math.max(kafkaHeight - 1, 1) - val sigToRollback = context.blockchain - .blockHeader(heightToRollbackTo) - .map(_.id()) - .get // guaranteed not to fail by previous checks on heights - - log.info(s"Kafka is at $kafkaHeight, while node is at $blockchainHeight. Rolling node back to $heightToRollbackTo") - context.rollbackTo(sigToRollback).runSyncUnsafe(10.second) match { - case Right(_) => - case Left(_) => - throw new IllegalStateException(s"Unable to rollback Node to Kafka state. Kafka is at $kafkaHeight, while node is at $blockchainHeight.") - } + override def onProcessBlock( + block: Block, + diff: BlockDiffer.DetailedDiff, + minerReward: Option[Long], + blockchainBeforeWithMinerReward: Blockchain + ): Unit = { + val newBlock = BlockAppended.from(block, diff, minerReward, blockchainBeforeWithMinerReward) + repo.appendBlock(newBlock).get + if (newBlock.toHeight % 100 == 0) { + log.debug(s"BlockchainUpdates appended blocks up to ${newBlock.toHeight}") } } - override def start(): Unit = { - maybeProducer = Some(createProducer(settings)) - maybeProducer foreach { producer => - log.info("Performing startup node/Kafka consistency check...") - - context.blockchainUpdated.subscribe(new Observer.Sync[BlockchainUpdated] { - override def onNext(elem: BlockchainUpdated): Ack = { - producer.send( - createProducerRecord(settings.topic, elem), - (_: RecordMetadata, exception: Exception) => - if (exception != null) { - log.error("Error sending blockchain updates", exception) - forceStopApplication() - } - ) - Continue - } - override def onError(ex: Throwable): Unit = { - log.error("Error sending blockchain updates", ex) - forceStopApplication() - } - override def onComplete(): Unit = { - log.error("Blockchain updates Observable complete") - forceStopApplication() // this should never happen, but just in case, explicit stop. - } - }) - - // startupCheck is after subscription, so that if the check makes a rollback, it would be handled - startupCheck() - log.info("Starting sending blockchain updates to Kafka") - } + override def onProcessMicroBlock( + microBlock: MicroBlock, + diff: BlockDiffer.DetailedDiff, + blockchainBeforeWithMinerReward: Blockchain, + totalBlockId: ByteStr, + totalTransactionsRoot: ByteStr + ): Unit = { + val newMicroBlock = MicroBlockAppended.from(microBlock, diff, blockchainBeforeWithMinerReward, totalBlockId, totalTransactionsRoot) + repo.appendMicroBlock(newMicroBlock).get } - override def shutdown(): Future[Unit] = Future { - log.info("Shutting down blockchain updates sending") - maybeProducer foreach (_.close()) + override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = { + val rollbackCompleted = RollbackCompleted(toBlockId, toHeight) + repo.rollback(rollbackCompleted).get + } + + override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = { + val microBlockRollbackCompleted = MicroBlockRollbackCompleted(toBlockId, height) + repo.rollbackMicroBlock(microBlockRollbackCompleted).get } } diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/BlockchainUpdatesApiGrpcImpl.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/BlockchainUpdatesApiGrpcImpl.scala new file mode 100644 index 00000000000..21d2eba608d --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/BlockchainUpdatesApiGrpcImpl.scala @@ -0,0 +1,53 @@ +package com.wavesplatform.events.api.grpc + +import com.wavesplatform.events.api.grpc.backpressure._ +import com.wavesplatform.events.api.grpc.protobuf._ +import com.wavesplatform.events.protobuf.serde._ +import com.wavesplatform.events.repo.UpdatesRepo +import com.wavesplatform.utils.ScorexLogging +import io.grpc.stub.StreamObserver +import io.grpc.{Status, StatusRuntimeException} +import monix.execution.Scheduler + +import scala.concurrent.Future +import scala.util.{Failure, Success} + +class BlockchainUpdatesApiGrpcImpl(repo: UpdatesRepo.Read with UpdatesRepo.Stream)(implicit sc: Scheduler) + extends BlockchainUpdatesApiGrpc.BlockchainUpdatesApi + with ScorexLogging { + override def getBlockUpdate(request: GetBlockUpdateRequest): Future[GetBlockUpdateResponse] = Future { + repo.updateForHeight(request.height) match { + case Success(Some(upd)) => GetBlockUpdateResponse(Some(upd.protobuf)) + case Success(None) => throw new StatusRuntimeException(Status.NOT_FOUND) + case Failure(e: IllegalArgumentException) => + throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(e.getMessage)) + case Failure(exception) => + log.error(s"BlockchainUpdates gRPC failed to get block update for height ${request.height}", exception) + throw new StatusRuntimeException(Status.INTERNAL) + } + } + + override def getBlockUpdatesRange(request: GetBlockUpdatesRangeRequest): Future[GetBlockUpdatesRangeResponse] = Future { + repo.updatesRange(request.fromHeight, request.toHeight) match { + case Success(updates) => GetBlockUpdatesRangeResponse(updates.map(_.protobuf)) + case Failure(e: IllegalArgumentException) => + throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(e.getMessage)) + case Failure(e) => + log.error(s"BlockchainUpdates gRPC failed to get block range updates for range ${request.fromHeight} to ${request.toHeight}", e) + throw new StatusRuntimeException(Status.INTERNAL) + } + } + + override def subscribe(request: SubscribeRequest, responseObserver: StreamObserver[SubscribeEvent]): Unit = { + if (request.fromHeight <= 0) { + responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("height must be a positive integer"))) + } else { + val updatesPB = repo + .stream(request.fromHeight) + .map(elem => SubscribeEvent(update = Some(elem.protobuf))) + + wrapObservable(updatesPB, responseObserver)(identity) + } + } + +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/backpressure.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/backpressure.scala new file mode 100644 index 00000000000..ab6a3626da9 --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/api/grpc/backpressure.scala @@ -0,0 +1,168 @@ +package com.wavesplatform.events.api.grpc +import java.util.concurrent.LinkedBlockingQueue + +import com.wavesplatform.utils.ScorexLogging +import io.grpc.stub.{CallStreamObserver, ServerCallStreamObserver, StreamObserver} +import io.grpc.{Status, StatusRuntimeException} +import monix.eval.Task +import monix.execution.{Ack, AsyncQueue, Cancelable, Scheduler} +import monix.reactive.Observable + +//noinspection ScalaStyle +object backpressure extends ScorexLogging { + def wrapObservable[A, B](source: Observable[A], dest: StreamObserver[B])(f: A => B)(implicit s: Scheduler): Unit = dest match { + case cso: CallStreamObserver[B] @unchecked => + val csoHash = Integer.toHexString(System.identityHashCode(dest)) + log.info(s"[$csoHash] Connecting stream observer") + val queue = AsyncQueue.bounded[B](32) + + cso.setOnReadyHandler { () => + log.info(s"[$csoHash] Stream ready") + pushNext() + } + + def pushNext(): Unit = + cso.synchronized { + if (!cso.isReady) return + log.info(s"[$csoHash] Starting poll") + var exit = false + while (cso.isReady && !exit) queue.tryPoll() match { + case Some(elem) => + log.info(s"[$csoHash] Sending element: ${elem.getClass.getSimpleName}#${Integer.toHexString(System.identityHashCode(elem))}") + cso.onNext(elem) + case None => exit = true + } + log.info(s"[$csoHash] Ending poll, cso ready = ${cso.isReady}") + } + + val cancelable = source.subscribe( + (elem: A) => { + log.info(s"[$csoHash] Offering new element: ${elem.getClass.getSimpleName}#${Integer.toHexString(System.identityHashCode(elem))}") + queue + .offer(f(elem)) + .flatMap { _ => + //log.info(s"[$csoHash] Element added, starting poll: $elem") + pushNext() + Ack.Continue + } + }, + err => { + log.error(s"[$csoHash] Stream error", err) + cso.onError(err) + }, + () => { + log.info(s"[$csoHash] Stream completed") + cso.onCompleted() + } + ) + + cso match { + case scso: ServerCallStreamObserver[B] @unchecked => + scso.setOnCancelHandler(cancelable.cancel _) + + case _ => + log.warn("Couldn't bind onCancel handler") + } + + case _ => + log.warn(s"Connecting without back-pressure: $dest") + source.subscribe( + { (elem: A) => + dest.onNext(f(elem)); Ack.Continue + }, + dest.onError, + () => dest.onCompleted() + ) + + } + + implicit class StreamObserverMonixOps[T](streamObserver: StreamObserver[T])(implicit sc: Scheduler) extends ScorexLogging { + // TODO: More convenient back-pressure implementation + def toSubscriber: monix.reactive.observers.Subscriber[T] = { + import org.reactivestreams.{Subscriber, Subscription} + + val rxs = new Subscriber[T] with Cancelable { + private[this] val queue = new LinkedBlockingQueue[T](32) + + @volatile + private[this] var subscription: Subscription = _ + + private[this] val observerReadyFunc: () => Boolean = streamObserver match { + case callStreamObserver: CallStreamObserver[_] => + () => callStreamObserver.isReady + case _ => + () => true + } + + def isReady: Boolean = observerReadyFunc() + + override def onSubscribe(subscription: Subscription): Unit = { + this.subscription = subscription + + def pushElement(): Unit = Option(queue.peek()) match { + case Some(_) if this.isReady => + val qv = queue.poll() + streamObserver.onNext(qv) + subscription.request(1) + + case None if this.isReady => + subscription.request(1) + + case _ => + // Ignore + } + + subscription match { + case scso: ServerCallStreamObserver[T] => + scso.disableAutoInboundFlowControl() + scso.setOnCancelHandler(() => subscription.cancel()) + scso.setOnReadyHandler(() => pushElement()) + + case cso: CallStreamObserver[T] => + cso.disableAutoInboundFlowControl() + cso.setOnReadyHandler(() => pushElement()) + + case _ => + subscription.request(Long.MaxValue) + } + } + + override def onNext(t: T): Unit = { + queue.add(t) + if (isReady) { + val value = Option(queue.poll()) + value.foreach(streamObserver.onNext) + if (isReady) subscription.request(1) + } + } + + override def onError(t: Throwable): Unit = { + log.error("gRPC streaming error", t) + streamObserver.onError(toStatusException(t)) + } + override def onComplete(): Unit = streamObserver.onCompleted() + def cancel(): Unit = Option(subscription).foreach(_.cancel()) + } + + monix.reactive.observers.Subscriber.fromReactiveSubscriber(rxs, rxs) + } + + def completeWith(obs: Observable[T]): Cancelable = { + streamObserver match { + case _: CallStreamObserver[T] => + log.info("Subscribed with backpressure") + obs.subscribe(this.toSubscriber) + + case _ => // No back-pressure + log.warn("Subscribed without backpressure") + obs + .doOnError(exception => Task(streamObserver.onError(toStatusException(exception)))) + .doOnComplete(Task(streamObserver.onCompleted())) + .foreach(value => streamObserver.onNext(value)) + } + } + + def toStatusException(t: Throwable) = + new StatusRuntimeException(Status.INTERNAL.withDescription(t.getMessage)) + } +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/events.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/events.scala new file mode 100644 index 00000000000..c037ef35e21 --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/events.scala @@ -0,0 +1,239 @@ +package com.wavesplatform.events + +import cats.Monoid +import cats.syntax.monoid._ +import com.wavesplatform.account.Address +import com.wavesplatform.block.{Block, MicroBlock} +import com.wavesplatform.common.state.ByteStr +import com.wavesplatform.state.DiffToStateApplier.PortfolioUpdates +import com.wavesplatform.state.diffs.BlockDiffer.DetailedDiff +import com.wavesplatform.state.reader.CompositeBlockchain +import com.wavesplatform.state.{AccountDataInfo, AssetDescription, AssetScriptInfo, Blockchain, DataEntry, Diff, DiffToStateApplier, LeaseBalance} +import com.wavesplatform.transaction.{Asset, GenesisTransaction, Transaction} +import com.wavesplatform.transaction.Asset.{IssuedAsset, Waves} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +final case class AssetStateUpdate( + asset: IssuedAsset, + decimals: Int, + name: ByteStr, + description: ByteStr, + reissuable: Boolean, + volume: BigInt, + scriptInfo: Option[AssetScriptInfo], + sponsorship: Option[Long], + nft: Boolean, + assetExistedBefore: Boolean +) + +final case class StateUpdate( + balances: Seq[(Address, Asset, Long)], + leases: Seq[(Address, LeaseBalance)], + dataEntries: Seq[(Address, DataEntry[_])], + assets: Seq[AssetStateUpdate] +) { + def isEmpty: Boolean = balances.isEmpty && leases.isEmpty && dataEntries.isEmpty && assets.isEmpty +} + +object StateUpdate { + implicit val monoid: Monoid[StateUpdate] = new Monoid[StateUpdate] { + override def empty: StateUpdate = StateUpdate(Seq.empty, Seq.empty, Seq.empty, Seq.empty) + + override def combine(x: StateUpdate, y: StateUpdate): StateUpdate = { + // merge balance updates, preserving order + val balancesMap = mutable.LinkedHashMap.empty[(Address, Asset), Long] + (x.balances ++ y.balances).foreach { + case (addr, asset, balance) => + balancesMap.remove((addr, asset)) + balancesMap.addOne(((addr, asset), balance)) + } + val balances = balancesMap.toList.map { case ((addr, asset), balance) => (addr, asset, balance) } + + // merge leases, preserving order + val leasesMap = mutable.LinkedHashMap.empty[Address, LeaseBalance] + (x.leases ++ y.leases).foreach { + case (addr, balance) => + leasesMap.remove(addr) + leasesMap.addOne((addr, balance)) + } + val leases = leasesMap.toList + + // merge data entries, preserving order + val dataEntriesMap = mutable.LinkedHashMap.empty[(Address, String), DataEntry[_]] + (x.dataEntries ++ y.dataEntries).foreach { + case (addr, entry) => + dataEntriesMap.remove((addr, entry.key)) + dataEntriesMap.addOne(((addr, entry.key), entry)) + } + val dataEntries = dataEntriesMap.toList.map { case ((addr, _), entry) => (addr, entry) } + + // merge asset state updates, preserving order + val assetsMap = mutable.LinkedHashMap.empty[IssuedAsset, AssetStateUpdate] + (x.assets ++ y.assets).foreach(a => { + assetsMap.remove(a.asset) + assetsMap.addOne((a.asset, a)) + }) + val assets = assetsMap.toList.map { case (_, upd) => upd } + + StateUpdate( + balances = balances, + leases = leases, + dataEntries = dataEntries, + assets = assets + ) + } + } + + def atomic(blockchainBeforeWithMinerReward: Blockchain, diff: Diff, byTransaction: Option[Transaction]): StateUpdate = { + val blockchainAfter = CompositeBlockchain(blockchainBeforeWithMinerReward, Some(diff)) + + val PortfolioUpdates(updatedBalances, updatedLeases) = DiffToStateApplier.portfolios(blockchainBeforeWithMinerReward, diff) + + val balances = ArrayBuffer.empty[(Address, Asset, Long)] + for ((address, assetMap) <- updatedBalances; (asset, balance) <- assetMap) balances += ((address, asset, balance)) + + val dataEntries = diff.accountData.toSeq.flatMap { + case (address, AccountDataInfo(data)) => + data.toSeq.map { case (_, entry) => (address, entry) } + } + + val assets: Seq[AssetStateUpdate] = for { + a <- (diff.issuedAssets.keySet ++ diff.updatedAssets.keySet ++ diff.assetScripts.keySet ++ diff.sponsorship.keySet).toSeq + AssetDescription( + _, + _, + name, + description, + decimals, + reissuable, + totalVolume, + _, + script, + sponsorship, + nft + ) <- blockchainAfter.assetDescription(a).toSeq + existedBefore = !diff.issuedAssets.isDefinedAt(a) + } yield AssetStateUpdate( + a, + decimals, + ByteStr(name.toByteArray), + ByteStr(description.toByteArray), + reissuable, + totalVolume, + script, + if (sponsorship == 0) None else Some(sponsorship), + nft, + existedBefore + ) + + StateUpdate(balances.toSeq, updatedLeases.toSeq, dataEntries, assets) + } + + def container( + blockchainBeforeWithMinerReward: Blockchain, + diff: DetailedDiff, + transactions: Seq[Transaction], + minerAddress: Address + ): (StateUpdate, Seq[StateUpdate]) = { + val DetailedDiff(parentDiff, txsDiffs) = diff + val parentStateUpdate = atomic(blockchainBeforeWithMinerReward, parentDiff, None) + + // miner reward is already in the blockchainBeforeWithMinerReward + // if miner balance has been changed in parentDiff, it is already included in balance updates + // if it has not, it needs to be manually requested from the blockchain and added to balance updates + val parentStateUpdateWithMinerReward = parentStateUpdate.balances.find(_._1 == minerAddress) match { + case Some(_) => parentStateUpdate + case None => + val minerBalance = blockchainBeforeWithMinerReward.balance(minerAddress, Waves) + parentStateUpdate.copy(balances = parentStateUpdate.balances :+ ((minerAddress, Waves, minerBalance))) + } + + val (txsStateUpdates, _) = txsDiffs + .zip(transactions) + .foldLeft((ArrayBuffer.empty[StateUpdate], parentDiff)) { + case ((updates, accDiff), (txDiff, tx)) => + ( + updates += atomic(CompositeBlockchain(blockchainBeforeWithMinerReward, Some(accDiff)), txDiff, Some(tx)), + accDiff.combine(txDiff) + ) + } + + (parentStateUpdateWithMinerReward, txsStateUpdates.toSeq) + } +} + +sealed trait BlockchainUpdated extends Product with Serializable { + def toId: ByteStr + def toHeight: Int +} + +final case class BlockAppended( + toId: ByteStr, + toHeight: Int, + block: Block, + updatedWavesAmount: Long, + blockStateUpdate: StateUpdate, + transactionStateUpdates: Seq[StateUpdate] +) extends BlockchainUpdated + +object BlockAppended { + def from(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBeforeWithMinerReward: Blockchain): BlockAppended = { + val (blockStateUpdate, txsStateUpdates) = + StateUpdate.container(blockchainBeforeWithMinerReward, diff, block.transactionData, block.sender.toAddress) + + // updatedWavesAmount can change as a result of either genesis transactions or miner rewards + val updatedWavesAmount = blockchainBeforeWithMinerReward.height match { + // genesis case + case 0 => block.transactionData.collect { case GenesisTransaction(_, amount, _, _, _) => amount }.sum + // miner reward case + case _ => blockchainBeforeWithMinerReward.wavesAmount(blockchainBeforeWithMinerReward.height).toLong + minerReward.getOrElse(0L) + } + + BlockAppended(block.id.value(), blockchainBeforeWithMinerReward.height + 1, block, updatedWavesAmount, blockStateUpdate, txsStateUpdates) + } +} + +final case class MicroBlockAppended( + toId: ByteStr, + toHeight: Int, + microBlock: MicroBlock, + microBlockStateUpdate: StateUpdate, + transactionStateUpdates: Seq[StateUpdate], + totalTransactionsRoot: ByteStr +) extends BlockchainUpdated + +object MicroBlockAppended { + def from( + microBlock: MicroBlock, + diff: DetailedDiff, + blockchainBeforeWithMinerReward: Blockchain, + totalBlockId: ByteStr, + totalTransactionsRoot: ByteStr + ): MicroBlockAppended = { + val (microBlockStateUpdate, txsStateUpdates) = + StateUpdate.container(blockchainBeforeWithMinerReward, diff, microBlock.transactionData, microBlock.sender.toAddress) + + MicroBlockAppended( + totalBlockId, + blockchainBeforeWithMinerReward.height, + microBlock, + microBlockStateUpdate, + txsStateUpdates, + totalTransactionsRoot + ) + } +} + +final case class RollbackCompleted(toId: ByteStr, toHeight: Int) extends BlockchainUpdated + +object RollbackCompleted { + def from(toBlockId: ByteStr, toHeight: Int): RollbackCompleted = RollbackCompleted(toBlockId, toHeight) +} + +final case class MicroBlockRollbackCompleted(toId: ByteStr, toHeight: Int) extends BlockchainUpdated + +object MicroBlockRollbackCompleted { + def from(toBlockId: ByteStr, height: Int): MicroBlockRollbackCompleted = MicroBlockRollbackCompleted(toBlockId, height) +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/kafka/package.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/kafka/package.scala deleted file mode 100644 index 3bb8af96497..00000000000 --- a/blockchain-updates/src/main/scala/com/wavesplatform/events/kafka/package.scala +++ /dev/null @@ -1,67 +0,0 @@ -package com.wavesplatform.events -import java.util - -import com.wavesplatform.events.protobuf.PBEvents -import com.wavesplatform.events.settings.BlockchainUpdatesSettings -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.serialization.{IntegerSerializer, Serializer} - -package object kafka { - private object BlockchainUpdatedSerializer extends Serializer[BlockchainUpdated] { - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} - override def close(): Unit = {} - - override def serialize(topic: String, data: BlockchainUpdated): Array[Byte] = - PBEvents.protobuf(data).toByteArray - } - - private object IntSerializer extends Serializer[Int] { - val integerSerializer = new IntegerSerializer - - override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = integerSerializer.configure(configs, isKey) - override def close(): Unit = integerSerializer.close() - - override def serialize(topic: String, data: Int): Array[Byte] = - integerSerializer.serialize(topic, data) - } - - def createProperties(settings: BlockchainUpdatesSettings): util.Properties = { - val props = new util.Properties() - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, settings.bootstrapServers) - props.put(ProducerConfig.CLIENT_ID_CONFIG, settings.clientId) - // props.put(ProducerConfig.RETRIES_CONFIG, "0") - - // SASL_SSL - if (settings.ssl.enabled) { - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL") - props.put(SaslConfigs.SASL_MECHANISM, "PLAIN") - props.put( - SaslConfigs.SASL_JAAS_CONFIG, - s"org.apache.kafka.common.security.plain.PlainLoginModule required username = '${settings.ssl.username}' password = '${settings.ssl.password}';" - ) - } - props - } - - def createProducerProperties(settings: BlockchainUpdatesSettings): util.Properties = { - val props = createProperties(settings) - props.put(ProducerConfig.ACKS_CONFIG, "all") - props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "10485760") // 10MB - props - } - - def createProducer(settings: BlockchainUpdatesSettings): KafkaProducer[Int, BlockchainUpdated] = - new KafkaProducer[Int, BlockchainUpdated](createProducerProperties(settings), IntSerializer, BlockchainUpdatedSerializer) - - def createProducerRecord(topic: String, event: BlockchainUpdated): ProducerRecord[Int, BlockchainUpdated] = { - val h = event match { - case ap: BlockAppended => ap.toHeight - case MicroBlockAppended(_, height, _, _, _) => height - case RollbackCompleted(_, height) => height - case MicroBlockRollbackCompleted(_, height) => height - } - new ProducerRecord[Int, BlockchainUpdated](topic, h, event) - } -} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/protobuf/PBEvents.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/protobuf/PBEvents.scala index 0d165a81a9e..5091ba7ab7d 100644 --- a/blockchain-updates/src/main/scala/com/wavesplatform/events/protobuf/PBEvents.scala +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/protobuf/PBEvents.scala @@ -2,112 +2,281 @@ package com.wavesplatform.events.protobuf import java.nio.charset.StandardCharsets +import cats.kernel.Monoid import com.google.protobuf.ByteString +import com.wavesplatform.account.Address import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.events -import com.wavesplatform.events.protobuf.BlockchainUpdated.{Append => PBAppend, Rollback => PBRollback} -import com.wavesplatform.events.protobuf.StateUpdate.{AssetStateUpdate, BalanceUpdate => PBBalanceUpdate, DataEntryUpdate => PBDataEntryUpdate, LeasingUpdate => PBLeasingUpdate} +import com.wavesplatform.events.protobuf.BlockchainUpdated.Append.Body +import com.wavesplatform.events.protobuf.BlockchainUpdated.Rollback.RollbackType +import com.wavesplatform.events.{ + AssetStateUpdate => VanillaAssetStateUpdate, + StateUpdate => VanillaStateUpdate, + BlockchainUpdated => VanillaBlockchainUpdated, + BlockAppended => VanillaBlockAppended, + MicroBlockAppended => VanillaMicroBlockAppended, + RollbackCompleted => VanillaRollbackCompleted, + MicroBlockRollbackCompleted => VanillaMicroBlockRollbackCompleted +} +import com.wavesplatform.events.protobuf.BlockchainUpdated.{Append, Rollback, Update} +import com.wavesplatform.events.protobuf.StateUpdate.AssetStateUpdate.AssetScriptInfo +import com.wavesplatform.events.protobuf.StateUpdate.{AssetStateUpdate, BalanceUpdate, DataEntryUpdate, LeasingUpdate} import com.wavesplatform.protobuf.block.{PBBlocks, PBMicroBlocks} -import com.wavesplatform.protobuf.transaction.PBTransactions +import com.wavesplatform.protobuf.transaction.{PBAmounts, PBTransactions} +import com.wavesplatform.state.{LeaseBalance, AssetScriptInfo => VanillaAssetScriptInfo} +import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.Transaction -object PBEvents { +import scala.util.{Failure, Try} + +trait Protobuf[A] { + def protobuf: A +} + +trait Vanilla[A] { + def vanilla: Try[A] +} + +object serde { + import com.wavesplatform.protobuf.utils.PBImplicitConversions._ - def protobuf(event: events.BlockchainUpdated): BlockchainUpdated = - event match { - case events.BlockAppended(sig, height, block, updatedWavesAmount, blockStateUpdate, transactionStateUpdates) => - val blockUpdate = Some(blockStateUpdate).filterNot(_.isEmpty).map(protobufStateUpdate) - val txsUpdates = transactionStateUpdates.map(protobufStateUpdate) - - BlockchainUpdated( - id = sig.toByteString, - height = height, - update = BlockchainUpdated.Update.Append( - PBAppend( - transactionIds = getIds(block.transactionData), - stateUpdate = blockUpdate, - transactionStateUpdates = txsUpdates, - body = PBAppend.Body.Block( - PBAppend.BlockAppend( - block = Some(PBBlocks.protobuf(block)), - updatedWavesAmount = updatedWavesAmount + implicit class BlockchainUpdatedProtobuf(self: VanillaBlockchainUpdated) extends Protobuf[BlockchainUpdated] { + + import BlockchainUpdatedProtobuf._ + + override def protobuf: BlockchainUpdated = + self match { + case VanillaBlockAppended(id, height, block, updatedWavesAmount, blockStateUpdate, transactionStateUpdates) => + val blockUpdate = Some(blockStateUpdate).filterNot(_.isEmpty).map(_.protobuf) + val txsUpdates = transactionStateUpdates.map(_.protobuf) + + BlockchainUpdated( + id = id.toByteString, + height = height, + update = BlockchainUpdated.Update.Append( + Append( + transactionIds = getIds(block.transactionData), + stateUpdate = blockUpdate, + transactionStateUpdates = txsUpdates, + body = Append.Body.Block( + Append.BlockAppend( + block = Some(PBBlocks.protobuf(block)), + updatedWavesAmount = updatedWavesAmount + ) ) ) ) ) - ) - case events.MicroBlockAppended(totalBlockId, height, microBlock, microBlockStateUpdate, transactionStateUpdates) => - val microBlockUpdate = Some(microBlockStateUpdate).filterNot(_.isEmpty).map(protobufStateUpdate) - val txsUpdates = transactionStateUpdates.map(protobufStateUpdate) - - BlockchainUpdated( - id = totalBlockId.toByteString, - height = height, - update = BlockchainUpdated.Update.Append( - PBAppend( - transactionIds = getIds(microBlock.transactionData), - stateUpdate = microBlockUpdate, - transactionStateUpdates = txsUpdates, - body = PBAppend.Body.MicroBlock( - PBAppend.MicroBlockAppend( - microBlock = Some(PBMicroBlocks.protobuf(microBlock, totalBlockId)) + case VanillaMicroBlockAppended(totalBlockId, height, microBlock, microBlockStateUpdate, transactionStateUpdates, totalTransactionsRoot) => + val microBlockUpdate = Some(microBlockStateUpdate).filterNot(_.isEmpty).map(_.protobuf) + val txsUpdates = transactionStateUpdates.map(_.protobuf) + + BlockchainUpdated( + id = totalBlockId.toByteString, + height = height, + update = BlockchainUpdated.Update.Append( + Append( + transactionIds = getIds(microBlock.transactionData), + stateUpdate = microBlockUpdate, + transactionStateUpdates = txsUpdates, + body = Append.Body.MicroBlock( + Append.MicroBlockAppend( + microBlock = Some(PBMicroBlocks.protobuf(microBlock, totalBlockId)), + updatedTransactionsRoot = totalTransactionsRoot.toByteString + ) ) ) ) ) - ) - case events.RollbackCompleted(to, height) => - BlockchainUpdated( - id = to.toByteString, - height = height, - update = BlockchainUpdated.Update.Rollback( - PBRollback(PBRollback.RollbackType.BLOCK) + case VanillaRollbackCompleted(to, height) => + BlockchainUpdated( + id = to.toByteString, + height = height, + update = BlockchainUpdated.Update.Rollback( + Rollback(Rollback.RollbackType.BLOCK) + ) ) - ) - case events.MicroBlockRollbackCompleted(toSig, height) => - BlockchainUpdated( - id = toSig.toByteString, - height = height, - update = BlockchainUpdated.Update.Rollback( - PBRollback(PBRollback.RollbackType.MICROBLOCK) + case VanillaMicroBlockRollbackCompleted(toSig, height) => + BlockchainUpdated( + id = toSig.toByteString, + height = height, + update = BlockchainUpdated.Update.Rollback( + Rollback(Rollback.RollbackType.MICROBLOCK) + ) ) - ) + } + } + + object BlockchainUpdatedProtobuf { + private def getIds(txs: Seq[Transaction]): Seq[ByteString] = txs.map(t => ByteString.copyFrom(t.id().arr)) + } + + implicit class BlockchainUpdatedVanilla(self: BlockchainUpdated) extends Vanilla[VanillaBlockchainUpdated] { + private[this] lazy val error: IllegalArgumentException = { + val base58Id = ByteStr(self.id.toByteArray).toString + new IllegalArgumentException(s"Invalid protobuf BlockchainUpdated at height ${self.height}, id $base58Id") } - private def toString(bytes: ByteStr): String = new String(bytes.arr, StandardCharsets.UTF_8) - - private def protobufAssetStateUpdate(a: events.AssetStateUpdate): AssetStateUpdate = - AssetStateUpdate( - assetId = a.asset.id.toByteString, - decimals = a.decimals, - name = toString(a.name), - description = toString(a.description), - reissuable = a.reissuable, - volume = a.volume.longValue, - script = PBTransactions.toPBScript(a.script.map(_.script)), - sponsorship = a.sponsorship.getOrElse(0), - nft = a.nft, - assetExistedBefore = a.assetExistedBefore, - safeVolume = ByteString.copyFrom(a.volume.toByteArray) + override def vanilla: Try[VanillaBlockchainUpdated] = + Try { + self.update match { + case Update.Append(append) => + append.body match { + case Body.Block(body) => + VanillaBlockAppended( + toId = ByteStr(self.id.toByteArray), + toHeight = self.height, + block = PBBlocks.vanilla(body.block.get, unsafe = true).get, + updatedWavesAmount = body.updatedWavesAmount, + blockStateUpdate = append.stateUpdate.map(_.vanilla.get).getOrElse(Monoid[VanillaStateUpdate].empty), + transactionStateUpdates = append.transactionStateUpdates.map(_.vanilla.get) + ) + case Body.MicroBlock(body) => + VanillaMicroBlockAppended( + toId = ByteStr(self.id.toByteArray), + toHeight = self.height, + microBlock = PBMicroBlocks.vanilla(body.microBlock.get, unsafe = true).get.microblock, + microBlockStateUpdate = append.stateUpdate.map(_.vanilla.get).getOrElse(Monoid[VanillaStateUpdate].empty), + transactionStateUpdates = append.transactionStateUpdates.map(_.vanilla.get), + totalTransactionsRoot = ByteStr(body.updatedTransactionsRoot.toByteArray) + ) + case Body.Empty => throw error + } + case Update.Rollback(rollback) => + rollback.`type` match { + case RollbackType.BLOCK => + VanillaRollbackCompleted( + toId = ByteStr(self.id.toByteArray), + toHeight = self.height + ) + case RollbackType.MICROBLOCK => + VanillaMicroBlockRollbackCompleted( + toId = ByteStr(self.id.toByteArray), + toHeight = self.height + ) + case RollbackType.Unrecognized(_) => throw error + } + case Update.Empty => throw error + } + } recoverWith { case _: Throwable => Failure(error) } + } + + implicit class AssetStateUpdateProtobuf(self: VanillaAssetStateUpdate) extends Protobuf[AssetStateUpdate] { + + import AssetStateUpdateProtobuf._ + + override def protobuf: AssetStateUpdate = AssetStateUpdate( + assetId = self.asset.id.toByteString, + decimals = self.decimals, + name = toStringUtf8(self.name), + description = toStringUtf8(self.description), + reissuable = self.reissuable, + volume = self.volume.longValue, + scriptInfo = self.scriptInfo.map(_.protobuf), + sponsorship = self.sponsorship.getOrElse(0), + nft = self.nft, + assetExistedBefore = self.assetExistedBefore, + safeVolume = ByteString.copyFrom(self.volume.toByteArray) ) + } - private def protobufStateUpdate(su: events.StateUpdate): StateUpdate = { - StateUpdate( - balances = su.balances.map { + object AssetStateUpdateProtobuf { + private def toStringUtf8(bytes: ByteStr): String = new String(bytes.arr, StandardCharsets.UTF_8) + } + + implicit class AssetStateUpdateVanilla(self: AssetStateUpdate) extends Vanilla[VanillaAssetStateUpdate] { + import AssetStateUpdateVanilla._ + override def vanilla: Try[VanillaAssetStateUpdate] = + Try { + PBAmounts.toVanillaAssetId(self.assetId) match { + case a: IssuedAsset => + VanillaAssetStateUpdate( + asset = a, + decimals = self.decimals, + name = ByteStr(self.name.getBytes()), + description = ByteStr(self.description.getBytes()), + reissuable = self.reissuable, + volume = BigInt(self.safeVolume.toByteArray), + scriptInfo = self.scriptInfo.map(_.vanilla.get), + sponsorship = if (self.sponsorship == 0) None else Some(self.sponsorship), + nft = self.nft, + assetExistedBefore = self.assetExistedBefore + ) + case _ => throw error + } + } recoverWith { case _: Throwable => Failure(error) } + } + + object AssetStateUpdateVanilla { + private lazy val error = new IllegalArgumentException(s"Invalid protobuf AssetStateUpdate") + } + + implicit class AssetScriptInfoProtobuf(self: VanillaAssetScriptInfo) extends Protobuf[AssetScriptInfo] { + override def protobuf: AssetScriptInfo = AssetScriptInfo( + script = PBTransactions.toPBScript(Some(self.script)), + complexity = self.complexity + ) + } + + implicit class AssetScriptInfoVanilla(self: AssetScriptInfo) extends Vanilla[VanillaAssetScriptInfo] { + import AssetScriptInfoVanilla._ + override def vanilla: Try[VanillaAssetScriptInfo] = + Try { + VanillaAssetScriptInfo( + script = PBTransactions.toVanillaScript(self.script).get, + complexity = self.complexity + ) + } recoverWith { case _: Throwable => Failure(error) } + } + + object AssetScriptInfoVanilla { + private lazy val error = new IllegalArgumentException(s"Invalid protobuf AssetScriptInfo") + } + + implicit class StateUpdateProtobuf(self: VanillaStateUpdate) extends Protobuf[StateUpdate] { + override def protobuf: StateUpdate = StateUpdate( + balances = self.balances.map { case (addr, assetId, amt) => - PBBalanceUpdate(address = addr, amount = Some((assetId, amt))) + BalanceUpdate(address = addr, amount = Some((assetId, amt))) }, - leases = su.leases.map { + leases = self.leases.map { case (addr, leaseBalance) => - PBLeasingUpdate(address = addr, in = leaseBalance.in, out = leaseBalance.out) + LeasingUpdate(address = addr, in = leaseBalance.in, out = leaseBalance.out) }, - dataEntries = su.dataEntries.map { - case (addr, entry) => PBDataEntryUpdate(address = addr, dataEntry = Some(PBTransactions.toPBDataEntry(entry))) + dataEntries = self.dataEntries.map { + case (addr, entry) => DataEntryUpdate(address = addr, dataEntry = Some(PBTransactions.toPBDataEntry(entry))) }, - assets = su.assets.map(protobufAssetStateUpdate) + assets = self.assets.map(_.protobuf) ) } - private def getIds(txs: Seq[Transaction]): Seq[ByteString] = txs.map(t => ByteString.copyFrom(t.id().arr)) + implicit class StateUpdateVanilla(self: StateUpdate) extends Vanilla[VanillaStateUpdate] { + import StateUpdateVanilla._ + override def vanilla: Try[VanillaStateUpdate] = + Try { + VanillaStateUpdate( + balances = self.balances.map { b => + val (asset, balance) = PBAmounts.toAssetAndAmount(b.amount.get) + val address = toAddress(b.address).get + (address, asset, balance) + }, + leases = self.leases.map { l => + val address = toAddress(l.address).get + (address, LeaseBalance(l.in, l.out)) + }, + dataEntries = self.dataEntries.map { + case DataEntryUpdate(addr, entry) => + (toAddress(addr).get, PBTransactions.toVanillaDataEntry(entry.get)) + }, + assets = self.assets.map(_.vanilla.get) + ) + } recoverWith { case _: Throwable => Failure(error) } + } + + object StateUpdateVanilla { + private lazy val error = new IllegalArgumentException(s"Invalid protobuf StateUpdate") + + def toAddress(bs: ByteString): Try[Address] = Address.fromBytes(bs.toByteArray).left.map(_ => error).toTry + } + } diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/LiquidState.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/LiquidState.scala new file mode 100644 index 00000000000..8bf644a41c1 --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/LiquidState.scala @@ -0,0 +1,34 @@ +package com.wavesplatform.events.repo + +import cats.syntax.monoid._ +import com.wavesplatform.events.{BlockAppended, BlockchainUpdated, MicroBlockAppended} + +case class LiquidState( + keyBlock: BlockAppended, + microBlocks: Seq[MicroBlockAppended] +) { + def solidify(): BlockAppended = { + val toId = microBlocks.lastOption.fold(keyBlock.toId)(_.toId) + val signature = microBlocks.lastOption.fold(keyBlock.block.signature)(_.microBlock.totalResBlockSig) + val transactionsRoot = microBlocks.lastOption.fold(keyBlock.block.header.transactionsRoot)(_.totalTransactionsRoot) + + val transactionData = microBlocks.foldLeft(keyBlock.block.transactionData)((txs, mb) => txs ++ mb.microBlock.transactionData) + val blockStateUpdate = microBlocks.foldLeft(keyBlock.blockStateUpdate)((upd, mb) => upd.combine(mb.microBlockStateUpdate)) + val transactionStateUpdates = microBlocks.foldLeft(keyBlock.transactionStateUpdates)((upds, mb) => upds ++ mb.transactionStateUpdates) + + BlockAppended( + toId = toId, + toHeight = keyBlock.toHeight, + block = keyBlock.block.copy( + header = keyBlock.block.header.copy(transactionsRoot = transactionsRoot), + signature = signature, + transactionData = transactionData + ), + updatedWavesAmount = keyBlock.updatedWavesAmount, + blockStateUpdate = blockStateUpdate, + transactionStateUpdates = transactionStateUpdates + ) + } + + def toSeq: Seq[BlockchainUpdated] = Seq(keyBlock) ++ microBlocks +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/Lockable.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/Lockable.scala new file mode 100644 index 00000000000..da69a1a1003 --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/Lockable.scala @@ -0,0 +1,25 @@ +package com.wavesplatform.events.repo + +import java.util.concurrent.locks.{ReadWriteLock, ReentrantReadWriteLock} + +trait Lockable { + private[this] val lock: ReadWriteLock = new ReentrantReadWriteLock() + + protected def readLock[A](a: => A): A = { + lock.readLock().lock() + try { + a + } finally { + lock.readLock().unlock() + } + } + + protected def writeLock[A](a: => A): A = { + lock.writeLock().lock() + try { + a + } finally { + lock.writeLock().unlock() + } + } +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepo.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepo.scala new file mode 100644 index 00000000000..837fb7f5b78 --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepo.scala @@ -0,0 +1,34 @@ +package com.wavesplatform.events.repo + +import com.wavesplatform.events.{BlockAppended, BlockchainUpdated, MicroBlockAppended, MicroBlockRollbackCompleted, RollbackCompleted} +import monix.reactive.Observable + +import scala.util.Try + +object UpdatesRepo { + trait Read { + def height: Try[Int] + + def updateForHeight(height: Int): Try[Option[BlockAppended]] + + // inclusive from both sides + def updatesRange(from: Int, to: Int): Try[Seq[BlockAppended]] + } + + trait Write { + // def dropLiquidState(afterId: Option[ByteStr] = None): Unit + + def appendMicroBlock(microBlockAppended: MicroBlockAppended): Try[Unit] + + def appendBlock(blockAppended: BlockAppended): Try[Unit] + + def rollback(rollback: RollbackCompleted): Try[Unit] + + def rollbackMicroBlock(microBlockRollback: MicroBlockRollbackCompleted): Try[Unit] + } + + trait Stream { + // inclusive + def stream(from: Int): Observable[BlockchainUpdated] + } +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepoImpl.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepoImpl.scala new file mode 100644 index 00000000000..5357d19a45a --- /dev/null +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/repo/UpdatesRepoImpl.scala @@ -0,0 +1,287 @@ +package com.wavesplatform.events.repo + +import java.nio.{ByteBuffer, ByteOrder} + +import cats.implicits._ +import com.wavesplatform.Shutdownable +import com.wavesplatform.database.openDB +import com.wavesplatform.events._ +import com.wavesplatform.events.protobuf.serde._ +import com.wavesplatform.events.protobuf.{BlockchainUpdated => PBBlockchainUpdated} +import com.wavesplatform.utils.ScorexLogging +import monix.eval.Task +import monix.execution.{Ack, Scheduler} +import monix.reactive.Observable +import monix.reactive.subjects.ConcurrentSubject + +import scala.annotation.tailrec +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + +class UpdatesRepoImpl(directory: String)(implicit val scheduler: Scheduler) + extends UpdatesRepo.Read + with UpdatesRepo.Write + with UpdatesRepo.Stream + with ScorexLogging + with Shutdownable + with Lockable { + import UpdatesRepoImpl._ + + private[this] val db = openDB(directory) + private[this] var liquidState: Option[LiquidState] = None + + log.info(s"BlockchainUpdates extension opened db at ${directory}") + + override def shutdown(): Unit = db.close() + + private[this] val realTimeUpdates = ConcurrentSubject.publish[BlockchainUpdated] + private[this] def sendRealTimeUpdate(ba: BlockchainUpdated): Try[Unit] = { + realTimeUpdates.onNext(ba) match { + case Ack.Continue => + Success(()) + case Ack.Stop => + Failure(new IllegalStateException("realTimeUpdates stream sent Ack.Stop")) + } + } + + // UpdatesRepo.Read impl + override def height: Try[Int] = Try { + readLock { + liquidState.map(_.keyBlock.toHeight).getOrElse { + val iter = db.iterator() + try { + iter.seekToLast() + if (iter.hasNext) { + val blockBytes = iter.next.getValue + val lastUpdate = PBBlockchainUpdated.parseFrom(blockBytes).vanilla.get + lastUpdate.toHeight + } else { + 0 + } + } finally { + iter.close() + } + } + } + } + + override def updateForHeight(height: Int): Try[Option[BlockAppended]] = + readLock { + liquidState match { + case Some(ls) if ls.keyBlock.toHeight == height => + log.debug(s"BlockchainUpdates extension requested liquid block at height $height") + Success(Some(ls.solidify())) + case Some(ls) if ls.keyBlock.toHeight < height => + log.warn(s"BlockchainUpdates extension requested non-existing block at height $height, current ${ls.keyBlock.toHeight}") + Success(None) + case _ if height <= 0 => + Failure(new IllegalArgumentException("BlockchainUpdates asked for an update at a non-positive height")) + case _ => + val bytes = db.get(key(height)) + if (bytes == null || bytes.isEmpty) { + Success(None) + } else { + for { + pbParseResult <- Try(PBBlockchainUpdated.parseFrom(bytes)) + vanillaUpdate <- pbParseResult.vanilla + blockAppended <- Try(vanillaUpdate.asInstanceOf[BlockAppended]) + } yield Some(blockAppended) + } + } + } + + override def updatesRange(from: Int, to: Int): Try[Seq[BlockAppended]] = + if (to - from > RANGE_REQUEST_MAX_SIZE) { + Failure(new IllegalArgumentException(s"Maximum range length of $RANGE_REQUEST_MAX_SIZE exceeded")) + } else { + readLock { + height.flatMap { h => + log.info(s"BlockchainUpdates request updatesRange from $from to $to") + val cnt = to - from + 1 + Try( + Await + .result( + stream(from).collect { case u: BlockAppended => u }.take(cnt).takeWhile(_.toHeight < h).bufferTumbling(cnt).runAsyncGetLast, + RANGE_REQUEST_TIMEOUT + ) + .get + ).flatMap { appends => + if (appends.nonEmpty && appends.length < (appends.last.toHeight - appends.head.toHeight + 1)) { + Failure(new IllegalStateException(s"Missing blocks found in range $from, $to")) + } else { + Success(appends) + } + } + } + } + } + + // UpdatesRepo.Write impl + override def appendBlock(blockAppended: BlockAppended): Try[Unit] = writeLock { + Try { + liquidState.foreach { ls => + val solidBlock = ls.solidify() + db.put( + key(solidBlock.toHeight), + solidBlock.protobuf.toByteArray + ) + } + liquidState = Some(LiquidState(blockAppended, Seq.empty)) + sendRealTimeUpdate(blockAppended) + } + } + + override def appendMicroBlock(microBlockAppended: MicroBlockAppended): Try[Unit] = writeLock { + liquidState match { + case Some(LiquidState(keyBlock, microBlocks)) => + liquidState = Some(LiquidState(keyBlock, microBlocks :+ microBlockAppended)) + sendRealTimeUpdate(microBlockAppended) + case None => + Failure(new IllegalStateException("BlockchainUpdates attempted to insert a microblock without a keyblock")) + } + } + + override def rollback(rollback: RollbackCompleted): Try[Unit] = + writeLock { + height + .flatMap { h => + if (rollback.toHeight > h) { + Failure(new IllegalArgumentException("BlockchainUpdates attempted to rollback to a height higher than current")) + } else if (rollback.toHeight <= 0) { + Failure(new IllegalArgumentException("BlockchainUpdates attempted to rollback to a non-positive height")) + } else if (rollback.toHeight == h) { + Failure(new IllegalArgumentException("BlockchainUpdates attempted to rollback to current height")) + } else if (rollback.toHeight == h - 1) { + liquidState = None + Success(()) + } else { + val iter = db.iterator() + val batch = db.createWriteBatch() + try { + iter.seek(key(rollback.toHeight)) + iter.next + while (iter.hasNext) { + batch.delete(iter.next.getKey) + } + db.write(batch) + Success(()) + } catch { + case t: Throwable => Failure(t) + } finally { + iter.close() + batch.close() + } + } + } + .flatMap(_ => sendRealTimeUpdate(rollback)) + } + + override def rollbackMicroBlock(microBlockRollback: MicroBlockRollbackCompleted): Try[Unit] = + writeLock { + height + .flatMap { h => + if (microBlockRollback.toHeight != h) { + Failure(new IllegalArgumentException("BlockchainUpdates microblock rollback height was not equal to current height")) + } else { + liquidState match { + case Some(ls) => + if (microBlockRollback.toId == ls.keyBlock.toId) { + liquidState = Some(ls.copy(microBlocks = Seq.empty)) + Success(()) + } else { + val remainingMicroBlocks = ls.microBlocks.reverse.dropWhile(_.toId != microBlockRollback.toId).reverse + if (remainingMicroBlocks.isEmpty) { + Failure(new IllegalArgumentException("BlockchainUpdates attempted to rollback a non-existing microblock")) + } else { + liquidState = Some(ls.copy(microBlocks = remainingMicroBlocks)) + Success(()) + } + } + case None => Failure(new IllegalStateException("BlockchainUpdates attempted to rollback microblock without liquid state present")) + } + } + } + .flatMap(_ => sendRealTimeUpdate(microBlockRollback)) + } + + // UpdatesRepo.Stream impl + override def stream(fromHeight: Int): Observable[BlockchainUpdated] = { + val realTimeUpdatesForCurrentSubscription = ConcurrentSubject.replay[BlockchainUpdated] + + /** + * reads from level db by synchronous batches each using one iterator + * each batch gets a read lock + * @param startingFrom batch start height + * @return Task to be consumed by Observable.unfoldEval + */ + def readBatch(startingFrom: Option[Int]): Task[Option[(Seq[BlockchainUpdated], Option[Int])]] = Task.eval { + startingFrom map { from => + readLock { + val res = Seq.newBuilder[BlockchainUpdated] + res.sizeHint(LEVELDB_READ_BATCH_SIZE) + + val iterator = db.iterator() + val isLastBatch = try { + iterator.seek(key(from)) + + @tailrec + def goUnsafe(remaining: Int): Boolean = { + if (remaining > 0 && iterator.hasNext) { + val next = iterator.next() + val blockBytes = next.getValue + res += PBBlockchainUpdated.parseFrom(blockBytes).vanilla.get + goUnsafe(remaining - 1) + } else { + !iterator.hasNext + } + } + + goUnsafe(LEVELDB_READ_BATCH_SIZE) + } catch { + case t: Throwable => + Task.raiseError(t) + true + } finally { + iterator.close() + } + + if (isLastBatch) { + realTimeUpdates.subscribe(realTimeUpdatesForCurrentSubscription) + val liquidUpdates = liquidState match { + case None => Seq.empty + case Some(LiquidState(keyBlock, microBlocks)) => Seq(keyBlock) ++ microBlocks + } + (res.result() ++ liquidUpdates, None) + } else { + val result = res.result() + val nextTickFrom = result.lastOption.map(_.toHeight + 1) + (result, nextTickFrom) + } + } + } + } + + Observable.fromTry(height).flatMap { h => + if (h < fromHeight) { + Observable.raiseError(new IllegalArgumentException("Requested start height exceeds current blockchain height")) + } else { + val historical = Observable + .unfoldEval(fromHeight.some)(readBatch) + .flatMap(Observable.fromIterable) + + historical ++ realTimeUpdatesForCurrentSubscription + } + } + } +} + +object UpdatesRepoImpl { + private val LEVELDB_READ_BATCH_SIZE = 1024 + + private val RANGE_REQUEST_MAX_SIZE = 1000 + private val RANGE_REQUEST_TIMEOUT = 1.minute + + private def key(height: Int): Array[Byte] = + ByteBuffer.allocate(8).order(ByteOrder.BIG_ENDIAN).putInt(height).array() +} diff --git a/blockchain-updates/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala b/blockchain-updates/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala index 452c0ac0e7d..1d178346de1 100644 --- a/blockchain-updates/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala +++ b/blockchain-updates/src/main/scala/com/wavesplatform/events/settings/BlockchainUpdatesSettings.scala @@ -5,13 +5,7 @@ import net.ceedubs.ficus.readers.ValueReader import net.ceedubs.ficus.readers.ArbitraryTypeReader.arbitraryTypeValueReader import net.ceedubs.ficus.readers.namemappers.implicits.hyphenCase -case class SslSettings(enabled: Boolean, username: String, password: String) - -object SslSettings { - implicit val valueReader: ValueReader[SslSettings] = arbitraryTypeValueReader -} - -case class BlockchainUpdatesSettings(bootstrapServers: String, topic: String, clientId: String, ssl: SslSettings) +case class BlockchainUpdatesSettings(grpcPort: Int) object BlockchainUpdatesSettings { implicit val valueReader: ValueReader[BlockchainUpdatesSettings] = arbitraryTypeValueReader diff --git a/blockchain-updates/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala b/blockchain-updates/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala new file mode 100644 index 00000000000..85791bf7580 --- /dev/null +++ b/blockchain-updates/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala @@ -0,0 +1,312 @@ +//package com.wavesplatform.events +// +//import com.wavesplatform.account.KeyPair +//import com.wavesplatform.block.MicroBlock +//import com.wavesplatform.common.state.ByteStr +//import com.wavesplatform.common.utils.EitherExt2 +//import com.wavesplatform.features.EstimatorProvider +//import com.wavesplatform.history.Domain.BlockchainUpdaterExt +//import com.wavesplatform.lagonaki.mocks.TestBlock +//import com.wavesplatform.lang.script.Script +//import com.wavesplatform.protobuf.utils.PBImplicitConversions.PBByteStringOps +//import com.wavesplatform.settings.{Constants, WavesSettings} +//import com.wavesplatform.state.diffs.ENOUGH_AMT +//import com.wavesplatform.state.{Blockchain, Diff, NewTransactionInfo} +//import com.wavesplatform.transaction.Asset.Waves +//import com.wavesplatform.transaction.assets.IssueTransaction +//import com.wavesplatform.transaction.transfer.TransferTransaction +//import com.wavesplatform.transaction.{BlockchainUpdater, DataTransaction, GenesisTransaction, Transaction} +//import com.wavesplatform.{BlockGen, TestHelpers, crypto, state} +//import org.scalacheck.Gen +//import org.scalatest.{FreeSpec, Matchers} +//import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks +// +//class BlockchainUpdateTriggersSpec extends FreeSpec with Matchers with BlockGen with ScalaCheckPropertyChecks with EventsHelpers { +// private val WAVES_AMOUNT = Constants.UnitsInWave * Constants.TotalWaves +// +// override protected def settings: WavesSettings = TestHelpers.enableNG(super.settings) +// +// // add a genesis block to the blockchain +// private val master: KeyPair = accountGen.sample.get +// private val rich: KeyPair = accountGen.sample.get +// private val initialAmount: Long = WAVES_AMOUNT / 2 +// private val genesis = TestBlock.create( +// 0, +// Seq( +// GenesisTransaction.create(master.toAddress, initialAmount, 0).explicitGet(), +// GenesisTransaction.create(rich.toAddress, initialAmount, 0).explicitGet() +// ), +// master +// ) +// override protected def initBlockchain(blockchainUpdater: Blockchain with BlockchainUpdater): Unit = { +// blockchainUpdater.processBlock(genesis).explicitGet() +// super.initBlockchain(blockchainUpdater) +// } +// +// private val sigGen: Gen[ByteStr] = bytes64gen.map(ByteStr.apply) +// private val heightGen: Gen[Int] = Gen.choose(1, 1000) +// private val assetAmtGen: Gen[Long] = Gen.oneOf(Gen.const[Long](1), Gen.choose[Long](2, ENOUGH_AMT)) +// +// private def microBlockGen(txs: Seq[Transaction], signer: KeyPair): Gen[MicroBlock] = +// for { +// sig <- byteArrayGen(crypto.SignatureLength).map(ByteStr.apply) +// mb = MicroBlock.buildAndSign(3.toByte, signer, txs, genesis.signature, sig).explicitGet() +// } yield mb +// +// /** +// * Tests the assertion both for transactions added in a block and in a microblock +// */ +// private def testTxsStateUpdates[A](txs: Seq[Transaction])(assertion: Seq[StateUpdate] => A): Unit = { +// val b = blockGen(txs, master).sample.get +// assertion(appendBlock(b).transactionStateUpdates) +// +// val mb = microBlockGen(txs, master).sample.get +// assertion(appendMicroBlock(mb).transactionStateUpdates) +// } +// +// private def isNFT(tx: IssueTransaction): Boolean = tx.quantity == 1 && tx.decimals == 0 && !tx.reissuable +// +// "updated Waves amount is calculated correctly for miner reward" in forAll { +// for { +// b <- blockGen(Seq.empty, master) +// reward <- Gen.option(Gen.choose(1L, 1000000L)) +// ba = appendBlock(b, reward) +// } yield (reward, ba) +// } { +// case (reward, BlockAppended(_, _, _, updatedWavesAmount, _, _)) => +// updatedWavesAmount shouldBe WAVES_AMOUNT + reward.getOrElse(0L) +// } +// +// "rollbacks correctly" - { +// "block" in forAll(sigGen, heightGen) { (sig, height) => +// produceEvent(_.onRollback(sig, height)) match { +// case RollbackCompleted(toId, toHeight) => +// toId shouldBe sig +// toHeight shouldBe height +// case _ => fail() +// } +// } +// +// "microblock" in forAll(sigGen, heightGen) { (sig, height) => +// produceEvent(_.onMicroBlockRollback(sig, height)) match { +// case MicroBlockRollbackCompleted(toId, toHeight) => +// toId shouldBe sig +// toHeight shouldBe height +// case _ => fail() +// } +// } +// } +// +// "appends correctly" - { +// "empty block" in forAll { +// for { +// b <- blockGen(Seq.empty, master) +// ba = appendBlock(b) +// } yield (b, ba) +// } { +// case (b, BlockAppended(toId, toHeight, block, _, _, _)) => +// toId shouldBe b.signature +// toHeight shouldBe blockchain.height + 1 +// +// block.signature shouldBe b.signature +// block.transactionData shouldBe b.transactionData +// } +// +// "microblock with one transaction" in forAll { +// for { +// tx <- dataTransactionGen(0, sender = Some(rich)) +// mb <- microBlockGen(Seq(tx), master) +// mba = appendMicroBlock(mb) +// } yield (mb, mba) +// } { +// case (mb, MicroBlockAppended(toId, toHeight, microBlock, _, _)) => +// toId shouldBe mb.totalResBlockSig +// toHeight shouldBe blockchain.height +// +// microBlock.signature shouldBe mb.signature +// microBlock.transactionData shouldBe mb.transactionData +// } +// +// "including correct miner rewards for" - { +// "block" in forAll { +// for { +// miner <- accountGen +// tx <- dataTransactionGen(0, sender = Some(rich)) +// mb <- blockGen(Seq(tx), miner) +// ba = appendBlock(mb) +// } yield (tx, miner, ba.blockStateUpdate) +// } { case (tx, miner, su) => su.balances.find(_._1 == miner.publicKey.toAddress).get._3 shouldBe 0.4 * tx.fee } +// +// "microblock, giving reward to a key block miner" in forAll { +// for { +// tx <- dataTransactionGen(0, sender = Some(rich)) +// mb <- microBlockGen(Seq(tx), master) +// mba = appendMicroBlock(mb) +// } yield (tx, mba.microBlockStateUpdate) +// } { +// case (tx, su) => +// su.balances.find(_._1 == master.publicKey.toAddress).get._3 shouldBe (0.4 * tx.fee + initialAmount) +// } +// } +// +// "block/microblock with balance updates from transfer txs" in forAll { +// for { +// sender <- accountGen +// recipient <- accountGen +// amount <- Gen.choose(1L, initialAmount - Constants.UnitsInWave) +// master2sender <- transferGeneratorPV2(1, master, sender.toAddress, amount) +// fee <- Gen.choose(1, master2sender.amount - 1) +// sender2recipient = TransferTransaction.selfSigned(2.toByte, sender, recipient.toAddress, Waves, master2sender.amount - fee, Waves, fee, ByteStr.empty, 2) +// .explicitGet() +// } yield (sender, recipient, master2sender, sender2recipient) +// } { +// case (sender, recipient, master2sender, sender2recipient) => +// testTxsStateUpdates(Seq(master2sender, sender2recipient)) { transactionStateUpdates => +// transactionStateUpdates.last.balances.find(_._1 == sender.publicKey.toAddress).get._3 shouldBe 0 +// +// transactionStateUpdates.last.balances.find(_._1 == recipient.publicKey.toAddress).get._3 shouldBe +// sender2recipient.amount +// } +// } +// +// "block/microblock with a data transaction" in forAll(dataTransactionGen(DataTransaction.MaxEntryCount, sender = Some(rich))) { tx => +// testTxsStateUpdates(Seq(tx)) { +// _.head.dataEntries.map(_._2).sortBy(_.key) shouldBe tx.data.sortBy(_.key) +// } +// } +// +// "blocks/microblocks with correct asset state updates by" - { +// "issue transaction" in forAll(issueV2TransactionGen(Gen.const(master))) { tx => +// testTxsStateUpdates(Seq(tx)) { upds => +// val AssetStateUpdate(asset, decimals, name, description, reissuable, volume, script, sponsorship, nft, assetExistedBefore) = +// upds.head.assets.head +// asset.id shouldBe tx.id() +// name shouldBe tx.name.byteStr +// description shouldBe tx.description.byteStr +// decimals shouldBe tx.decimals +// reissuable shouldBe tx.reissuable +// volume.toLong shouldBe tx.quantity +// script.map(_.script) shouldBe tx.script +// nft shouldBe isNFT(tx) +// sponsorship shouldBe None +// assetExistedBefore shouldBe false +// } +// } +// +// "reissue transaction" in forAll { +// for { +// issueAmt <- assetAmtGen +// reissueAmt <- assetAmtGen +// (issue, reissue, _) <- issueReissueBurnGeneratorP(issueAmt, reissueAmt, 1, master).suchThat(_._1.reissuable) +// } yield (issue, reissue) +// } { +// case (issue, reissue) => +// testTxsStateUpdates(Seq(issue, reissue)) { upds => +// val issueUpd = upds.head.assets.head +// val reissueUpd = upds.last.assets.head +// +// reissueUpd shouldBe issueUpd.copy( +// volume = BigInt(issue.quantity) + BigInt(reissue.quantity), +// reissuable = reissue.reissuable, +// assetExistedBefore = !issueUpd.assetExistedBefore +// ) +// } +// } +// +// "burn transaction" in forAll { +// for { +// issueAmt <- assetAmtGen +// burnAmt <- Gen.choose(1, issueAmt) +// (issue, _, burn) <- issueReissueBurnGeneratorP(issueAmt, 1, burnAmt, master) +// } yield (issue, burn) +// } { +// case (issue, burn) => +// testTxsStateUpdates(Seq(issue, burn)) { upds => +// val issueUpd = upds.head.assets.head +// val burnUpd = upds.last.assets.head +// +// burnUpd shouldBe issueUpd.copy( +// volume = BigInt(issue.quantity) - BigInt(burn.quantity), +// assetExistedBefore = !issueUpd.assetExistedBefore +// ) +// } +// } +// +// "set asset script transaction" in forAll(issueAndSetAssetScriptGen(master)) { +// case (issue, setAssetScript) => +// testTxsStateUpdates(Seq(issue, setAssetScript)) { upds => +// val issueUpd = upds.head.assets.head +// val scriptUpd = upds.last.assets.head +// +// scriptUpd shouldBe issueUpd.copy( +// script = setAssetScript.script.map( +// s => +// state.AssetScriptInfo( +// s, +// Script.estimate(s, EstimatorProvider.EstimatorBlockchainExt(blockchain).estimator, useContractVerifierLimit = false).explicitGet() +// ) +// ), +// assetExistedBefore = !issueUpd.assetExistedBefore +// ) +// } +// } +// +// "sponsor fee transaction " in forAll(sponsorFeeCancelSponsorFeeGen(master)) { +// case (issue, startSponsorship, _, cancelSponsorship) => +// testTxsStateUpdates(Seq(issue, startSponsorship, cancelSponsorship)) { upds => +// val issueUpd = upds.head.assets.head +// val startSponsorshipUpd = upds(1).assets.head +// val cancelSponsorshipUpd = upds(2).assets.head +// +// startSponsorshipUpd shouldBe issueUpd.copy( +// sponsorship = startSponsorship.minSponsoredAssetFee, +// assetExistedBefore = !issueUpd.assetExistedBefore +// ) +// +// cancelSponsorshipUpd shouldBe startSponsorshipUpd.copy( +// sponsorship = cancelSponsorship.minSponsoredAssetFee +// ) +// } +// } +// +// "invokeScript transaction (diff emulated by issue, reussue and burn txs)" in forAll { +// for { +// issueAmt <- assetAmtGen +// reissueAmt <- assetAmtGen +// (issue, reissue, _) <- issueReissueBurnGeneratorP(issueAmt, reissueAmt, 1, master).suchThat(_._1.reissuable) +// invoke <- invokeScriptGen(Gen.const(Seq.empty)) +// } yield (issue, reissue, invoke) +// } { +// case (issue, reissue, invoke) => +// // create a block with issue and reissue txs, getting their diffs +// val assetsDummyBlock = TestBlock.create(master, Seq(issue, reissue)) +// val assetsDummyBlockDiff = detailedDiffFromBlock(assetsDummyBlock) +// +// val invokeBlock = TestBlock.create(master, Seq(invoke)) +// // merge issue/reissue diffs as if they were produced by a single invoke +// val invokeTxDiff = assetsDummyBlockDiff.transactionDiffs +// .foldLeft(Diff.empty)(Diff.diffMonoid.combine) +// .copy(transactions = Map(invoke.id() -> NewTransactionInfo(invoke, Set(master.toAddress), true))) +// val invokeBlockDetailedDiff = assetsDummyBlockDiff.copy(transactionDiffs = Seq(invokeTxDiff)) +// +// produceEvent(_.onProcessBlock(invokeBlock, invokeBlockDetailedDiff, None, blockchain)) match { +// case ba: BlockAppended => +// val AssetStateUpdate(asset, decimals, name, description, reissuable, volume, script, sponsorship, nft, assetExistedBefore) = +// ba.transactionStateUpdates.head.assets.head +// +// asset.id shouldBe issue.assetId +// decimals shouldBe issue.decimals +// name shouldBe issue.name.byteStr +// description shouldBe issue.description.byteStr +// reissuable shouldBe reissue.reissuable +// volume shouldBe (BigInt(issue.quantity) + BigInt(reissue.quantity)) +// script shouldBe issue.script +// sponsorship shouldBe None +// nft shouldBe isNFT(issue) +// assetExistedBefore shouldBe false +// case _ => fail() +// } +// } +// } +// } +//} diff --git a/blockchain-updates/src/test/scala/com/wavesplatform/events/EventsHelpers.scala b/blockchain-updates/src/test/scala/com/wavesplatform/events/EventsHelpers.scala new file mode 100644 index 00000000000..ff241d297b2 --- /dev/null +++ b/blockchain-updates/src/test/scala/com/wavesplatform/events/EventsHelpers.scala @@ -0,0 +1,39 @@ +//package com.wavesplatform.events +// +//import com.wavesplatform.block.{Block, MicroBlock} +//import com.wavesplatform.common.utils.EitherExt2 +//import com.wavesplatform.mining.MiningConstraint +//import com.wavesplatform.state.diffs.BlockDiffer +//import com.wavesplatform.state.diffs.BlockDiffer.DetailedDiff +//import monix.execution.Scheduler.Implicits.global +//import monix.reactive.subjects.ReplaySubject +//import org.scalatest.Suite +// +//import scala.concurrent.duration._ +// +//private[events] trait EventsHelpers extends WithBlockchain { _: Suite => +// protected def produceEvent(useTrigger: BlockchainUpdateTriggers => Unit): BlockchainUpdated = { +// val evts = ReplaySubject[BlockchainUpdated]() +// val t = new BlockchainUpdateTriggersImpl(evts) +// useTrigger(t) +// evts.onComplete() +// evts.toListL.runSyncUnsafe(500.milliseconds).head +// } +// +// protected def detailedDiffFromBlock(b: Block): DetailedDiff = +// BlockDiffer.fromBlock(blockchain, None, b, MiningConstraint.Unlimited, verify = false).explicitGet().detailedDiff +// +// protected def appendBlock(b: Block, minerReward: Option[Long] = None): BlockAppended = +// produceEvent(_.onProcessBlock(b, detailedDiffFromBlock(b), minerReward, blockchain)) match { +// case ba: BlockAppended => ba +// case _ => fail() +// } +// +// protected def appendMicroBlock(mb: MicroBlock): MicroBlockAppended = { +// val dd = BlockDiffer.fromMicroBlock(blockchain, Some(0), mb, 1, MiningConstraint.Unlimited, verify = false).explicitGet().detailedDiff +// produceEvent(_.onProcessMicroBlock(mb, dd, blockchain, mb.totalResBlockSig)) match { +// case mba: MicroBlockAppended => mba +// case _ => fail() +// } +// } +//} diff --git a/blockchain-updates/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala b/blockchain-updates/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala new file mode 100644 index 00000000000..6301b70a6fb --- /dev/null +++ b/blockchain-updates/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala @@ -0,0 +1,35 @@ +//package com.wavesplatform.events +// +//import com.wavesplatform.common.utils.EitherExt2 +//import com.wavesplatform.settings.WavesSettings +//import com.wavesplatform.state.diffs.ENOUGH_AMT +//import com.wavesplatform.transaction.GenesisTransaction +//import com.wavesplatform.{BlockGen, TestHelpers} +//import org.scalacheck.Gen +//import org.scalatest.{FreeSpec, Matchers} +//import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks +// +//class GenesisBlockUpdateSpec extends FreeSpec with Matchers with BlockGen with ScalaCheckPropertyChecks with EventsHelpers { +// override protected def settings: WavesSettings = TestHelpers.enableNG(super.settings) +// +// val genesisAppendWithWavesAmountGen: Gen[(BlockAppended, Long)] = for { +// master <- accountGen +// wavesAmount <- Gen.choose(1L, ENOUGH_AMT) +// gt = GenesisTransaction.create(master.toAddress, wavesAmount, 0).explicitGet() +// b <- blockGen(Seq(gt), master) +// ba = appendBlock(b) +// } yield (ba, wavesAmount) +// +// "on genesis block append" - { +// "master address balance gets correctly updated" in forAll(genesisAppendWithWavesAmountGen) { +// case (BlockAppended(_, _, _, _, _, upds), wavesAmount) => +// upds.head.balances.head._3 shouldBe wavesAmount +// } +// +// "updated Waves amount is calculated correctly" in forAll(genesisAppendWithWavesAmountGen) { +// case (BlockAppended(_, _, _, updatedWavesAmount, _, _), wavesAmount) => +// updatedWavesAmount shouldBe wavesAmount +// } +// } +// +//} diff --git a/blockchain-updates/src/test/scala/com/wavesplatform/events/WithBlockchain.scala b/blockchain-updates/src/test/scala/com/wavesplatform/events/WithBlockchain.scala new file mode 100644 index 00000000000..13e454c3373 --- /dev/null +++ b/blockchain-updates/src/test/scala/com/wavesplatform/events/WithBlockchain.scala @@ -0,0 +1,46 @@ +//package com.wavesplatform.events +// +//import java.nio.file.Files +// +//import com.wavesplatform.database.TestStorageFactory +//import com.wavesplatform.settings.{WavesSettings, loadConfig} +//import com.wavesplatform.state.Blockchain +//import com.wavesplatform.transaction.BlockchainUpdater +//import com.wavesplatform.{NTPTime, TestHelpers, database} +//import monix.reactive.Observer +//import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} +// +//trait WithBlockchain extends BeforeAndAfterEach with BeforeAndAfterAll with NTPTime { _: Suite => +// protected def settings: WavesSettings = WavesSettings.fromRootConfig(loadConfig(None)) +// +// private val path = Files.createTempDirectory("leveldb-test") +// private val db = database.openDB(path.toAbsolutePath.toString) +// private val (bcu, _) = TestStorageFactory( +// settings, +// db, +// ntpTime, +// Observer.stopped, +// BlockchainUpdateTriggers.noop +// ) +// +// protected def blockchain: Blockchain = bcu +// +// /** +// * Override this method to do some initialization actions with +// * the blockchain before it becomes read-only +// * @param blockchainUpdater a blockchain to add something to (genesis, some blocks, etc.) +// */ +// protected def initBlockchain(blockchainUpdater: Blockchain with BlockchainUpdater): Unit = () +// +// override protected def beforeAll(): Unit = { +// initBlockchain(bcu) +// super.beforeAll() +// } +// +// override def afterAll(): Unit = { +// bcu.shutdown() +// db.close() +// TestHelpers.deleteRecursively(path) +// super.afterAll() +// } +//} diff --git a/build.sbt b/build.sbt index f846f552833..30cc3151d0d 100644 --- a/build.sbt +++ b/build.sbt @@ -161,7 +161,6 @@ checkPRRaw := Def .sequential( root / clean, Def.task { - (`lang-jvm` / Compile / PB.generate).value (Test / compile).value (`lang-tests` / Test / test).value (`lang-js` / Compile / fastOptJS).value diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index ed37bae42ef..6b9b18c31f6 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -19,11 +19,12 @@ import com.wavesplatform.api.http._ import com.wavesplatform.api.http.alias.AliasApiRoute import com.wavesplatform.api.http.assets.AssetsApiRoute import com.wavesplatform.api.http.leasing.LeaseApiRoute +import com.wavesplatform.block.{Block, MicroBlock} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector import com.wavesplatform.consensus.nxt.api.http.NxtConsensusApiRoute import com.wavesplatform.database.{DBExt, Keys, openDB} -import com.wavesplatform.events.{BlockchainUpdateTriggersImpl, BlockchainUpdated, UtxEvent} +import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent} import com.wavesplatform.extensions.{Context, Extension} import com.wavesplatform.features.EstimatorProvider._ import com.wavesplatform.features.api.ActivationApiRoute @@ -36,6 +37,7 @@ import com.wavesplatform.network.RxExtensionLoader.RxExtensionLoaderShutdownHook import com.wavesplatform.network._ import com.wavesplatform.settings.WavesSettings import com.wavesplatform.state.appender.{BlockAppender, ExtensionAppender, MicroblockAppender} +import com.wavesplatform.state.diffs.BlockDiffer import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} import com.wavesplatform.transaction.smart.script.trace.TracedResult import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} @@ -102,21 +104,40 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con private val historyRepliesScheduler = fixedPool(poolSize = 2, "history-replier", reporter = log.error("Error in History Replier", _)) private val minerScheduler = singleThread("block-miner", reporter = log.error("Error in Miner", _)) - private val blockchainUpdatesScheduler = singleThread("blockchain-updates", reporter = log.error("Error on sending blockchain updates", _)) - private val blockchainUpdated = ConcurrentSubject.publish[BlockchainUpdated](scheduler) - private val utxEvents = ConcurrentSubject.publish[UtxEvent](scheduler) - private val blockchainUpdateTriggers = new BlockchainUpdateTriggersImpl(blockchainUpdated) + private val utxEvents = ConcurrentSubject.publish[UtxEvent](scheduler) + + private var extensions = Seq.empty[Extension] + + // update triggers combined into one instance + private var triggers = Seq.empty[BlockchainUpdateTriggers] + private val triggersCombined = new BlockchainUpdateTriggers { + override def onProcessBlock(block: Block, diff: BlockDiffer.DetailedDiff, minerReward: Option[Long], blockchainBeforeWithMinerReward: Blockchain): Unit = + triggers.foreach(_.onProcessBlock(block, diff, minerReward, blockchainBeforeWithMinerReward)) + + override def onProcessMicroBlock( + microBlock: MicroBlock, + diff: BlockDiffer.DetailedDiff, + blockchainBeforeWithMinerReward: Blockchain, + totalBlockId: ByteStr, + totalTransactionsRoot: ByteStr + ): Unit = + triggers.foreach(_.onProcessMicroBlock(microBlock, diff, blockchainBeforeWithMinerReward, totalBlockId, totalTransactionsRoot)) + + override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = + triggers.foreach(_.onRollback(toBlockId, toHeight)) + + override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = + triggers.foreach(_.onMicroBlockRollback(toBlockId, height)) + } private[this] var miner: Miner with MinerDebugInfo = Miner.Disabled private val (blockchainUpdater, levelDB) = - StorageFactory(settings, db, time, spendableBalanceChanged, blockchainUpdateTriggers, bc => miner.scheduleMining(bc)) + StorageFactory(settings, db, time, spendableBalanceChanged, triggersCombined, bc => miner.scheduleMining(bc)) private var rxExtensionLoaderShutdown: Option[RxExtensionLoaderShutdownHook] = None private var maybeUtx: Option[UtxPool] = None private var maybeNetwork: Option[NS] = None - private var extensions = Seq.empty[Extension] - def apiShutdown(): Unit = { for { u <- maybeUtx @@ -210,10 +231,9 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con override def utx: UtxPool = utxStorage override def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] = Await.result(utxSynchronizer.publish(tx), Duration.Inf) // TODO: Replace with async if possible - override def spendableBalanceChanged: Observable[(Address, Asset)] = app.spendableBalanceChanged - override def actorSystem: ActorSystem = app.actorSystem - override def utxEvents: Observable[UtxEvent] = app.utxEvents - override def blockchainUpdated: Observable[BlockchainUpdated] = app.blockchainUpdated + override def spendableBalanceChanged: Observable[(Address, Asset)] = app.spendableBalanceChanged + override def actorSystem: ActorSystem = app.actorSystem + override def utxEvents: Observable[UtxEvent] = app.utxEvents override val transactionsApi: CommonTransactionsApi = CommonTransactionsApi( blockchainUpdater.bestLiquidDiff.map(diff => Height(blockchainUpdater.height) -> diff), @@ -236,6 +256,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con log.info(s"Enable extension: $extensionClassName") ctor.newInstance(extensionContext) } + triggers ++= extensions.collect { case e: BlockchainUpdateTriggers => e } extensions.foreach(_.start()) // Node start @@ -406,12 +427,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con def shutdown(utx: UtxPool, network: NS): Unit = if (shutdownInProgress.compareAndSet(false, true)) { - - if (extensions.nonEmpty) { - log.info(s"Shutting down extensions") - Await.ready(Future.sequence(extensions.map(_.shutdown())), settings.extensionsShutdownTimeout) - } - spendableBalanceChanged.onComplete() utx.close() @@ -434,9 +449,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con log.info("Stopping network services") network.shutdown() - blockchainUpdated.onComplete() - - shutdownAndWait(blockchainUpdatesScheduler, "BlockchainUpdated") shutdownAndWait(minerScheduler, "Miner") shutdownAndWait(microblockSynchronizerScheduler, "MicroblockSynchronizer") shutdownAndWait(scoreObserverScheduler, "ScoreObserver") @@ -449,6 +461,12 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con log.info("Closing storage") db.close() + // extensions should be shut down last, after all node functionality, to guarantee no data loss + if (extensions.nonEmpty) { + log.info(s"Shutting down extensions") + Await.ready(Future.sequence(extensions.map(_.shutdown())), settings.extensionsShutdownTimeout) + } + time.close() log.info("Shutdown complete") } diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 3610e175ed9..c6bc932415e 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -13,7 +13,7 @@ import com.wavesplatform.block.{Block, BlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector import com.wavesplatform.database.openDB -import com.wavesplatform.events.{BlockchainUpdateTriggersImpl, BlockchainUpdated, UtxEvent} +import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent} import com.wavesplatform.extensions.{Context, Extension} import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.history.StorageFactory @@ -105,7 +105,6 @@ object Importer extends ScorexLogging { appenderScheduler: Scheduler, extensionTime: Time, utxPool: UtxPool, - blockchainUpdatedObservable: Observable[BlockchainUpdated], db: DB, extensionActorSystem: ActorSystem ): Seq[Extension] = @@ -125,7 +124,6 @@ object Importer extends ScorexLogging { TracedResult.wrapE(Left(GenericError("Not implemented during import"))) override def spendableBalanceChanged: Observable[(Address, Asset)] = Observable.empty override def actorSystem: ActorSystem = extensionActorSystem - override def blockchainUpdated: Observable[BlockchainUpdated] = blockchainUpdatedObservable override def utxEvents: Observable[UtxEvent] = Observable.empty override def transactionsApi: CommonTransactionsApi = CommonTransactionsApi( @@ -250,17 +248,15 @@ object Importer extends ScorexLogging { val time = new NTP(settings.ntpServer) val actorSystem = ActorSystem("wavesplatform-import") - val blockchainUpdated = PublishSubject[BlockchainUpdated]() - val blockchainUpdateTriggers = new BlockchainUpdateTriggersImpl(blockchainUpdated) val db = openDB(settings.dbSettings.directory) val (blockchainUpdater, levelDb) = - StorageFactory(settings, db, time, Observer.empty, blockchainUpdateTriggers) + StorageFactory(settings, db, time, Observer.empty, BlockchainUpdateTriggers.noop) val utxPool = new UtxPoolImpl(time, blockchainUpdater, PublishSubject(), settings.utxSettings) val pos = PoSSelector(blockchainUpdater, settings.synchronizationSettings) val extAppender = BlockAppender(blockchainUpdater, time, utxPool, pos, scheduler, importOptions.verify) _ checkGenesis(settings, blockchainUpdater) - val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, blockchainUpdated, db, actorSystem) + val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, db, actorSystem) sys.addShutdownHook { quit = true diff --git a/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggers.scala b/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggers.scala index 8c7fa09fc4f..17860ae2282 100644 --- a/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggers.scala +++ b/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggers.scala @@ -6,17 +6,29 @@ import com.wavesplatform.state.Blockchain import com.wavesplatform.state.diffs.BlockDiffer.DetailedDiff trait BlockchainUpdateTriggers { - def onProcessBlock(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBefore: Blockchain): Unit - def onProcessMicroBlock(microBlock: MicroBlock, diff: DetailedDiff, blockchainBefore: Blockchain, totalBlockId: ByteStr): Unit + def onProcessBlock(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBeforeWithMinerReward: Blockchain): Unit + def onProcessMicroBlock( + microBlock: MicroBlock, + diff: DetailedDiff, + blockchainBeforeWithMinerReward: Blockchain, + totalBlockId: ByteStr, + totalTransactionsRoot: ByteStr + ): Unit def onRollback(toBlockId: ByteStr, toHeight: Int): Unit def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit } object BlockchainUpdateTriggers { def noop: BlockchainUpdateTriggers = new BlockchainUpdateTriggers { - override def onProcessBlock(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBefore: Blockchain): Unit = {} - override def onProcessMicroBlock(microBlock: MicroBlock, diff: DetailedDiff, blockchainBefore: Blockchain, totalBlockId: ByteStr): Unit = {} - override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = {} + override def onProcessBlock(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBeforeWithMinerReward: Blockchain): Unit = {} + override def onProcessMicroBlock( + microBlock: MicroBlock, + diff: DetailedDiff, + blockchainBeforeWithMinerReward: Blockchain, + totalBlockId: ByteStr, + totalTransactionsRoot: ByteStr + ): Unit = {} + override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = {} override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = {} } } diff --git a/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggersImpl.scala b/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggersImpl.scala deleted file mode 100644 index 624d4d40f85..00000000000 --- a/node/src/main/scala/com/wavesplatform/events/BlockchainUpdateTriggersImpl.scala +++ /dev/null @@ -1,107 +0,0 @@ -package com.wavesplatform.events - -import cats.syntax.monoid._ -import com.wavesplatform.account.Address -import com.wavesplatform.block.{Block, MicroBlock} -import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.state.DiffToStateApplier.PortfolioUpdates -import com.wavesplatform.state.diffs.BlockDiffer.DetailedDiff -import com.wavesplatform.state.reader.CompositeBlockchain -import com.wavesplatform.state.{AccountDataInfo, AssetDescription, Blockchain, Diff, DiffToStateApplier} -import com.wavesplatform.transaction.{Asset, GenesisTransaction, Transaction} -import monix.reactive.Observer - -import scala.collection.mutable.ArrayBuffer - -class BlockchainUpdateTriggersImpl(private val events: Observer[BlockchainUpdated]) extends BlockchainUpdateTriggers { - - override def onProcessBlock(block: Block, diff: DetailedDiff, minerReward: Option[Long], blockchainBefore: Blockchain): Unit = { - val (blockStateUpdate, txsStateUpdates) = containerStateUpdate(blockchainBefore, diff, block.transactionData) - - // updatedWavesAmount can change as a result of either genesis transactions or miner rewards - val updatedWavesAmount = blockchainBefore.height match { - // genesis case - case 0 => block.transactionData.collect { case GenesisTransaction(_, amount, _, _, _) => amount }.sum - // miner reward case - case _ => blockchainBefore.wavesAmount(blockchainBefore.height).toLong + minerReward.getOrElse(0L) - } - - events.onNext(BlockAppended(block.signature, blockchainBefore.height + 1, block, updatedWavesAmount, blockStateUpdate, txsStateUpdates)) - } - - override def onProcessMicroBlock(microBlock: MicroBlock, diff: DetailedDiff, blockchainBefore: Blockchain, totalBlockId: ByteStr): Unit = { - val (microBlockStateUpdate, txsStateUpdates) = containerStateUpdate(blockchainBefore, diff, microBlock.transactionData) - events.onNext(MicroBlockAppended(totalBlockId, blockchainBefore.height, microBlock, microBlockStateUpdate, txsStateUpdates)) - } - - override def onRollback(toBlockId: ByteStr, toHeight: Int): Unit = events.onNext(RollbackCompleted(toBlockId, toHeight)) - - override def onMicroBlockRollback(toBlockId: ByteStr, height: Int): Unit = - events.onNext(MicroBlockRollbackCompleted(toBlockId, height)) - - private def atomicStateUpdate(blockchainBefore: Blockchain, diff: Diff, byTransaction: Option[Transaction]): StateUpdate = { - val blockchainAfter = CompositeBlockchain(blockchainBefore, Some(diff)) - - val PortfolioUpdates(updatedBalances, updatedLeases) = DiffToStateApplier.portfolios(blockchainBefore, diff) - - val balances = ArrayBuffer.empty[(Address, Asset, Long)] - for ((address, assetMap) <- updatedBalances; (asset, balance) <- assetMap) balances += ((address, asset, balance)) - - val dataEntries = diff.accountData.toSeq.flatMap { - case (address, AccountDataInfo(data)) => - data.toSeq.map { case (_, entry) => (address, entry) } - } - - val assets: Seq[AssetStateUpdate] = for { - a <- (diff.issuedAssets.keySet ++ diff.updatedAssets.keySet ++ diff.assetScripts.keySet ++ diff.sponsorship.keySet).toSeq - AssetDescription( - _, - _, - name, - description, - decimals, - reissuable, - totalVolume, - _, - script, - sponsorship, - nft - ) <- blockchainAfter.assetDescription(a).toSeq - existedBefore = !diff.issuedAssets.isDefinedAt(a) - } yield AssetStateUpdate( - a, - decimals, - ByteStr(name.toByteArray), - ByteStr(description.toByteArray), - reissuable, - totalVolume, - script, - if (sponsorship == 0) None else Some(sponsorship), - nft, - existedBefore - ) - - StateUpdate(balances.toSeq, updatedLeases.toSeq, dataEntries, assets) - } - - private def containerStateUpdate( - blockchainBefore: Blockchain, - diff: DetailedDiff, - transactions: Seq[Transaction] - ): (StateUpdate, Seq[StateUpdate]) = { - val DetailedDiff(parentDiff, txsDiffs) = diff - val parentStateUpdate = atomicStateUpdate(blockchainBefore, parentDiff, None) - - val (txsStateUpdates, _) = txsDiffs - .zip(transactions) - .foldLeft((ArrayBuffer.empty[StateUpdate], parentDiff)) { - case ((updates, accDiff), (txDiff, tx)) => - ( - updates += atomicStateUpdate(CompositeBlockchain(blockchainBefore, Some(accDiff)), txDiff, Some(tx)), - accDiff.combine(txDiff) - ) - } - - (parentStateUpdate, txsStateUpdates.toSeq) - } -} diff --git a/node/src/main/scala/com/wavesplatform/events/events.scala b/node/src/main/scala/com/wavesplatform/events/events.scala deleted file mode 100644 index ff5e5d1a250..00000000000 --- a/node/src/main/scala/com/wavesplatform/events/events.scala +++ /dev/null @@ -1,52 +0,0 @@ -package com.wavesplatform.events - -import com.wavesplatform.account.Address -import com.wavesplatform.block.{Block, MicroBlock} -import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.state.{AssetScriptInfo, DataEntry, LeaseBalance} -import com.wavesplatform.transaction.Asset -import com.wavesplatform.transaction.Asset.IssuedAsset - -final case class AssetStateUpdate( - asset: IssuedAsset, - decimals: Int, - name: ByteStr, - description: ByteStr, - reissuable: Boolean, - volume: BigInt, - script: Option[AssetScriptInfo], - sponsorship: Option[Long], - nft: Boolean, - assetExistedBefore: Boolean -) - -final case class StateUpdate( - balances: Seq[(Address, Asset, Long)], - leases: Seq[(Address, LeaseBalance)], - dataEntries: Seq[(Address, DataEntry[_])], - assets: Seq[AssetStateUpdate] -) { - def isEmpty: Boolean = balances.isEmpty && leases.isEmpty && dataEntries.isEmpty && assets.isEmpty -} - -sealed trait BlockchainUpdated extends Product with Serializable { - def toId: ByteStr - def toHeight: Int -} -final case class BlockAppended( - toId: ByteStr, - toHeight: Int, - block: Block, - updatedWavesAmount: Long, - blockStateUpdate: StateUpdate, - transactionStateUpdates: Seq[StateUpdate] -) extends BlockchainUpdated -final case class MicroBlockAppended( - toId: ByteStr, - toHeight: Int, - microBlock: MicroBlock, - microBlockStateUpdate: StateUpdate, - transactionStateUpdates: Seq[StateUpdate] -) extends BlockchainUpdated -final case class RollbackCompleted(toId: ByteStr, toHeight: Int) extends BlockchainUpdated -final case class MicroBlockRollbackCompleted(toId: ByteStr, toHeight: Int) extends BlockchainUpdated diff --git a/node/src/main/scala/com/wavesplatform/extensions/Context.scala b/node/src/main/scala/com/wavesplatform/extensions/Context.scala index 9568268db1d..d43d5531751 100644 --- a/node/src/main/scala/com/wavesplatform/extensions/Context.scala +++ b/node/src/main/scala/com/wavesplatform/extensions/Context.scala @@ -4,7 +4,7 @@ import akka.actor.ActorSystem import com.wavesplatform.account.Address import com.wavesplatform.api.common._ import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.events.{BlockchainUpdated, UtxEvent} +import com.wavesplatform.events.UtxEvent import com.wavesplatform.lang.ValidationError import com.wavesplatform.settings.WavesSettings import com.wavesplatform.state.Blockchain @@ -31,7 +31,6 @@ trait Context { def broadcastTransaction(tx: Transaction): TracedResult[ValidationError, Boolean] def spendableBalanceChanged: Observable[(Address, Asset)] - def blockchainUpdated: Observable[BlockchainUpdated] def utxEvents: Observable[UtxEvent] def actorSystem: ActorSystem } diff --git a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala index 94b7fd50739..276fbb57825 100644 --- a/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala +++ b/node/src/main/scala/com/wavesplatform/state/BlockchainUpdaterImpl.scala @@ -202,17 +202,20 @@ class BlockchainUpdaterImpl( val height = lastBlockId.fold(0)(leveldb.unsafeHeightOf) val miningConstraints = MiningConstraints(leveldb, height) val reward = nextReward() + + val referencedBlockchain = CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = reward) BlockDiffer .fromBlock( - CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = reward), + referencedBlockchain, leveldb.lastBlock, block, miningConstraints.total, verify ) .map { r => - val refBlockchain = CompositeBlockchain(leveldb, Some(r.diff), Some(block), r.carry, reward, Some(hitSource)) - miner.scheduleMining(Some(refBlockchain)) + val updatedBlockchain = CompositeBlockchain(leveldb, Some(r.diff), Some(block), r.carry, reward, Some(hitSource)) + miner.scheduleMining(Some(updatedBlockchain)) + blockchainUpdateTriggers.onProcessBlock(block, r.detailedDiff, reward, referencedBlockchain) Option((r, Nil, reward, hitSource)) } } @@ -222,11 +225,12 @@ class BlockchainUpdaterImpl( val height = leveldb.unsafeHeightOf(ng.base.header.reference) val miningConstraints = MiningConstraints(leveldb, height) - blockchainUpdateTriggers.onMicroBlockRollback(block.header.reference, this.height) + blockchainUpdateTriggers.onRollback(ng.base.header.reference, leveldb.height) + val referencedBlockchain = CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = ng.reward) BlockDiffer .fromBlock( - CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = ng.reward), + referencedBlockchain, leveldb.lastBlock, block, miningConstraints.total, @@ -238,6 +242,7 @@ class BlockchainUpdaterImpl( ) val (mbs, diffs) = ng.allDiffs.unzip log.trace(s"Discarded microblocks = $mbs, diffs = ${diffs.map(_.hashString)}") + blockchainUpdateTriggers.onProcessBlock(block, r.detailedDiff, ng.reward, referencedBlockchain) Some((r, diffs, ng.reward, hitSource)) } } else if (areVersionsOfSameBlock(block, ng.base)) { @@ -249,17 +254,21 @@ class BlockchainUpdaterImpl( val height = leveldb.unsafeHeightOf(ng.base.header.reference) val miningConstraints = MiningConstraints(leveldb, height) - blockchainUpdateTriggers.onMicroBlockRollback(block.header.reference, this.height) + blockchainUpdateTriggers.onRollback(ng.base.header.reference, leveldb.height) + val referencedBlockchain = CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = ng.reward) BlockDiffer .fromBlock( - CompositeBlockchain(leveldb, carry = leveldb.carryFee, reward = ng.reward), + referencedBlockchain, leveldb.lastBlock, block, miningConstraints.total, verify ) - .map(r => Some((r, Nil, ng.reward, hitSource))) + .map { r => + blockchainUpdateTriggers.onProcessBlock(block, r.detailedDiff, ng.reward, referencedBlockchain) + Some((r, Nil, ng.reward, hitSource)) + } } } else Left( @@ -294,7 +303,8 @@ class BlockchainUpdaterImpl( val liquidDiffWithCancelledLeases = ng.cancelExpiredLeases(referencedLiquidDiff) - val referencedBlockchain = CompositeBlockchain(leveldb, Some(liquidDiffWithCancelledLeases), Some(referencedForgedBlock), carry, reward) + val referencedBlockchain = + CompositeBlockchain(leveldb, Some(liquidDiffWithCancelledLeases), Some(referencedForgedBlock), carry, reward) val maybeDiff = BlockDiffer .fromBlock( referencedBlockchain, @@ -304,19 +314,29 @@ class BlockchainUpdaterImpl( verify ) - maybeDiff.map { differResult => - val tempBlockchain = CompositeBlockchain(referencedBlockchain, Some(differResult.diff), Some(block), differResult.carry, reward, Some(hitSource)) - miner.scheduleMining(Some(tempBlockchain)) - - leveldb.append(liquidDiffWithCancelledLeases, carry, totalFee, prevReward, prevHitSource, referencedForgedBlock) - BlockStats.appended(referencedForgedBlock, referencedLiquidDiff.scriptsComplexity) - TxsInBlockchainStats.record(ng.transactions.size) - val (discardedMbs, discardedDiffs) = discarded.unzip - if (discardedMbs.nonEmpty) { - log.trace(s"Discarded microblocks: $discardedMbs") - } - - Some((differResult, discardedDiffs, reward, hitSource)) + maybeDiff.map { + differResult => + val tempBlockchain = CompositeBlockchain( + referencedBlockchain, + Some(differResult.diff), + Some(block), + differResult.carry, + reward, + Some(hitSource) + ) + miner.scheduleMining(Some(tempBlockchain)) + + blockchainUpdateTriggers.onProcessBlock(block, differResult.detailedDiff, reward, referencedBlockchain) + + leveldb.append(liquidDiffWithCancelledLeases, carry, totalFee, prevReward, prevHitSource, referencedForgedBlock) + BlockStats.appended(referencedForgedBlock, referencedLiquidDiff.scriptsComplexity) + TxsInBlockchainStats.record(ng.transactions.size) + val (discardedMbs, discardedDiffs) = discarded.unzip + if (discardedMbs.nonEmpty) { + log.trace(s"Discarded microblocks: $discardedMbs") + } + + Some((differResult, discardedDiffs, reward, hitSource)) } } else { val errorText = s"Forged block has invalid signature. Base: ${ng.base}, requested reference: ${block.header.reference}" @@ -326,7 +346,7 @@ class BlockchainUpdaterImpl( } }).map { _ map { - case (BlockDiffer.Result(newBlockDiff, carry, totalFee, updatedTotalConstraint, detailedDiff), discDiffs, reward, hitSource) => + case (BlockDiffer.Result(newBlockDiff, carry, totalFee, updatedTotalConstraint, _), discDiffs, reward, hitSource) => val newHeight = leveldb.height + 1 val prevNgState = ngState @@ -351,8 +371,6 @@ class BlockchainUpdaterImpl( log.info(s"New height: $newHeight") } - blockchainUpdateTriggers.onProcessBlock(block, detailedDiff, reward, leveldb) - discDiffs } getOrElse Nil } @@ -472,8 +490,12 @@ class BlockchainUpdaterImpl( val BlockDiffer.Result(diff, carry, totalFee, updatedMdConstraint, detailedDiff) = blockDifferResult restTotalConstraint = updatedMdConstraint val blockId = ng.createBlockId(microBlock) - blockchainUpdateTriggers.onProcessMicroBlock(microBlock, detailedDiff, this, blockId) + + val transactionsRoot = ng.createTransactionsRoot(microBlock) + blockchainUpdateTriggers.onProcessMicroBlock(microBlock, detailedDiff, this, blockId, transactionsRoot) + ng.append(microBlock, diff, carry, totalFee, System.currentTimeMillis, Some(blockId)) + log.info(s"${microBlock.stringRepr(blockId)} appended, diff=${diff.hashString}") internalLastBlockInfo.onNext(LastBlockInfo(blockId, height, score, ready = true)) diff --git a/node/src/main/scala/com/wavesplatform/state/NgState.scala b/node/src/main/scala/com/wavesplatform/state/NgState.scala index 9d5449fe21f..fae7240809e 100644 --- a/node/src/main/scala/com/wavesplatform/state/NgState.scala +++ b/node/src/main/scala/com/wavesplatform/state/NgState.scala @@ -142,17 +142,21 @@ class NgState( baseBlockCarry + microDiffs.values.map(_.carryFee).sum def createBlockId(microBlock: MicroBlock): BlockId = { - val newTransactions = this.transactions ++ microBlock.transactionData - val transactionsRoot = block.mkTransactionsRoot(base.header.version, newTransactions) + val newTransactions = this.transactions ++ microBlock.transactionData val fullBlock = base.copy( transactionData = newTransactions, signature = microBlock.totalResBlockSig, - header = base.header.copy(transactionsRoot = transactionsRoot) + header = base.header.copy(transactionsRoot = createTransactionsRoot(microBlock)) ) fullBlock.id() } + def createTransactionsRoot(microBlock: MicroBlock): ByteStr = { + val newTransactions = this.transactions ++ microBlock.transactionData + block.mkTransactionsRoot(base.header.version, newTransactions) + } + private[this] def forgeBlock(blockId: BlockId): Option[(Block, DiscardedMicroBlocks)] = internalCaches.forgedBlockCache.get( blockId, { () => diff --git a/node/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala b/node/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala deleted file mode 100644 index 1e4e5c3cc46..00000000000 --- a/node/src/test/scala/com/wavesplatform/events/BlockchainUpdateTriggersSpec.scala +++ /dev/null @@ -1,312 +0,0 @@ -package com.wavesplatform.events - -import com.wavesplatform.account.KeyPair -import com.wavesplatform.block.MicroBlock -import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.common.utils.EitherExt2 -import com.wavesplatform.features.EstimatorProvider -import com.wavesplatform.history.Domain.BlockchainUpdaterExt -import com.wavesplatform.lagonaki.mocks.TestBlock -import com.wavesplatform.lang.script.Script -import com.wavesplatform.protobuf.utils.PBImplicitConversions.PBByteStringOps -import com.wavesplatform.settings.{Constants, WavesSettings} -import com.wavesplatform.state.diffs.ENOUGH_AMT -import com.wavesplatform.state.{Blockchain, Diff, NewTransactionInfo} -import com.wavesplatform.transaction.Asset.Waves -import com.wavesplatform.transaction.assets.IssueTransaction -import com.wavesplatform.transaction.transfer.TransferTransaction -import com.wavesplatform.transaction.{BlockchainUpdater, DataTransaction, GenesisTransaction, Transaction} -import com.wavesplatform.{BlockGen, TestHelpers, crypto, state} -import org.scalacheck.Gen -import org.scalatest.{FreeSpec, Matchers} -import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks - -class BlockchainUpdateTriggersSpec extends FreeSpec with Matchers with BlockGen with ScalaCheckPropertyChecks with EventsHelpers { - private val WAVES_AMOUNT = Constants.UnitsInWave * Constants.TotalWaves - - override protected def settings: WavesSettings = TestHelpers.enableNG(super.settings) - - // add a genesis block to the blockchain - private val master: KeyPair = accountGen.sample.get - private val rich: KeyPair = accountGen.sample.get - private val initialAmount: Long = WAVES_AMOUNT / 2 - private val genesis = TestBlock.create( - 0, - Seq( - GenesisTransaction.create(master.toAddress, initialAmount, 0).explicitGet(), - GenesisTransaction.create(rich.toAddress, initialAmount, 0).explicitGet() - ), - master - ) - override protected def initBlockchain(blockchainUpdater: Blockchain with BlockchainUpdater): Unit = { - blockchainUpdater.processBlock(genesis).explicitGet() - super.initBlockchain(blockchainUpdater) - } - - private val sigGen: Gen[ByteStr] = bytes64gen.map(ByteStr.apply) - private val heightGen: Gen[Int] = Gen.choose(1, 1000) - private val assetAmtGen: Gen[Long] = Gen.oneOf(Gen.const[Long](1), Gen.choose[Long](2, ENOUGH_AMT)) - - private def microBlockGen(txs: Seq[Transaction], signer: KeyPair): Gen[MicroBlock] = - for { - sig <- byteArrayGen(crypto.SignatureLength).map(ByteStr.apply) - mb = MicroBlock.buildAndSign(3.toByte, signer, txs, genesis.signature, sig).explicitGet() - } yield mb - - /** - * Tests the assertion both for transactions added in a block and in a microblock - */ - private def testTxsStateUpdates[A](txs: Seq[Transaction])(assertion: Seq[StateUpdate] => A): Unit = { - val b = blockGen(txs, master).sample.get - assertion(appendBlock(b).transactionStateUpdates) - - val mb = microBlockGen(txs, master).sample.get - assertion(appendMicroBlock(mb).transactionStateUpdates) - } - - private def isNFT(tx: IssueTransaction): Boolean = tx.quantity == 1 && tx.decimals == 0 && !tx.reissuable - - "updated Waves amount is calculated correctly for miner reward" in forAll { - for { - b <- blockGen(Seq.empty, master) - reward <- Gen.option(Gen.choose(1L, 1000000L)) - ba = appendBlock(b, reward) - } yield (reward, ba) - } { - case (reward, BlockAppended(_, _, _, updatedWavesAmount, _, _)) => - updatedWavesAmount shouldBe WAVES_AMOUNT + reward.getOrElse(0L) - } - - "rollbacks correctly" - { - "block" in forAll(sigGen, heightGen) { (sig, height) => - produceEvent(_.onRollback(sig, height)) match { - case RollbackCompleted(toId, toHeight) => - toId shouldBe sig - toHeight shouldBe height - case _ => fail() - } - } - - "microblock" in forAll(sigGen, heightGen) { (sig, height) => - produceEvent(_.onMicroBlockRollback(sig, height)) match { - case MicroBlockRollbackCompleted(toId, toHeight) => - toId shouldBe sig - toHeight shouldBe height - case _ => fail() - } - } - } - - "appends correctly" - { - "empty block" in forAll { - for { - b <- blockGen(Seq.empty, master) - ba = appendBlock(b) - } yield (b, ba) - } { - case (b, BlockAppended(toId, toHeight, block, _, _, _)) => - toId shouldBe b.signature - toHeight shouldBe blockchain.height + 1 - - block.signature shouldBe b.signature - block.transactionData shouldBe b.transactionData - } - - "microblock with one transaction" in forAll { - for { - tx <- dataTransactionGen(0, sender = Some(rich)) - mb <- microBlockGen(Seq(tx), master) - mba = appendMicroBlock(mb) - } yield (mb, mba) - } { - case (mb, MicroBlockAppended(toId, toHeight, microBlock, _, _)) => - toId shouldBe mb.totalResBlockSig - toHeight shouldBe blockchain.height - - microBlock.signature shouldBe mb.signature - microBlock.transactionData shouldBe mb.transactionData - } - - "including correct miner rewards for" - { - "block" in forAll { - for { - miner <- accountGen - tx <- dataTransactionGen(0, sender = Some(rich)) - mb <- blockGen(Seq(tx), miner) - ba = appendBlock(mb) - } yield (tx, miner, ba.blockStateUpdate) - } { case (tx, miner, su) => su.balances.find(_._1 == miner.publicKey.toAddress).get._3 shouldBe 0.4 * tx.fee } - - "microblock, giving reward to a key block miner" in forAll { - for { - tx <- dataTransactionGen(0, sender = Some(rich)) - mb <- microBlockGen(Seq(tx), master) - mba = appendMicroBlock(mb) - } yield (tx, mba.microBlockStateUpdate) - } { - case (tx, su) => - su.balances.find(_._1 == master.publicKey.toAddress).get._3 shouldBe (0.4 * tx.fee + initialAmount) - } - } - - "block/microblock with balance updates from transfer txs" in forAll { - for { - sender <- accountGen - recipient <- accountGen - amount <- Gen.choose(1L, initialAmount - Constants.UnitsInWave) - master2sender <- transferGeneratorPV2(1, master, sender.toAddress, amount) - fee <- Gen.choose(1, master2sender.amount - 1) - sender2recipient = TransferTransaction.selfSigned(2.toByte, sender, recipient.toAddress, Waves, master2sender.amount - fee, Waves, fee, ByteStr.empty, 2) - .explicitGet() - } yield (sender, recipient, master2sender, sender2recipient) - } { - case (sender, recipient, master2sender, sender2recipient) => - testTxsStateUpdates(Seq(master2sender, sender2recipient)) { transactionStateUpdates => - transactionStateUpdates.last.balances.find(_._1 == sender.publicKey.toAddress).get._3 shouldBe 0 - - transactionStateUpdates.last.balances.find(_._1 == recipient.publicKey.toAddress).get._3 shouldBe - sender2recipient.amount - } - } - - "block/microblock with a data transaction" in forAll(dataTransactionGen(DataTransaction.MaxEntryCount, sender = Some(rich))) { tx => - testTxsStateUpdates(Seq(tx)) { - _.head.dataEntries.map(_._2).sortBy(_.key) shouldBe tx.data.sortBy(_.key) - } - } - - "blocks/microblocks with correct asset state updates by" - { - "issue transaction" in forAll(issueV2TransactionGen(Gen.const(master))) { tx => - testTxsStateUpdates(Seq(tx)) { upds => - val AssetStateUpdate(asset, decimals, name, description, reissuable, volume, script, sponsorship, nft, assetExistedBefore) = - upds.head.assets.head - asset.id shouldBe tx.id() - name shouldBe tx.name.byteStr - description shouldBe tx.description.byteStr - decimals shouldBe tx.decimals - reissuable shouldBe tx.reissuable - volume.toLong shouldBe tx.quantity - script.map(_.script) shouldBe tx.script - nft shouldBe isNFT(tx) - sponsorship shouldBe None - assetExistedBefore shouldBe false - } - } - - "reissue transaction" in forAll { - for { - issueAmt <- assetAmtGen - reissueAmt <- assetAmtGen - (issue, reissue, _) <- issueReissueBurnGeneratorP(issueAmt, reissueAmt, 1, master).suchThat(_._1.reissuable) - } yield (issue, reissue) - } { - case (issue, reissue) => - testTxsStateUpdates(Seq(issue, reissue)) { upds => - val issueUpd = upds.head.assets.head - val reissueUpd = upds.last.assets.head - - reissueUpd shouldBe issueUpd.copy( - volume = BigInt(issue.quantity) + BigInt(reissue.quantity), - reissuable = reissue.reissuable, - assetExistedBefore = !issueUpd.assetExistedBefore - ) - } - } - - "burn transaction" in forAll { - for { - issueAmt <- assetAmtGen - burnAmt <- Gen.choose(1, issueAmt) - (issue, _, burn) <- issueReissueBurnGeneratorP(issueAmt, 1, burnAmt, master) - } yield (issue, burn) - } { - case (issue, burn) => - testTxsStateUpdates(Seq(issue, burn)) { upds => - val issueUpd = upds.head.assets.head - val burnUpd = upds.last.assets.head - - burnUpd shouldBe issueUpd.copy( - volume = BigInt(issue.quantity) - BigInt(burn.quantity), - assetExistedBefore = !issueUpd.assetExistedBefore - ) - } - } - - "set asset script transaction" in forAll(issueAndSetAssetScriptGen(master)) { - case (issue, setAssetScript) => - testTxsStateUpdates(Seq(issue, setAssetScript)) { upds => - val issueUpd = upds.head.assets.head - val scriptUpd = upds.last.assets.head - - scriptUpd shouldBe issueUpd.copy( - script = setAssetScript.script.map( - s => - state.AssetScriptInfo( - s, - Script.estimate(s, EstimatorProvider.EstimatorBlockchainExt(blockchain).estimator, useContractVerifierLimit = false).explicitGet() - ) - ), - assetExistedBefore = !issueUpd.assetExistedBefore - ) - } - } - - "sponsor fee transaction " in forAll(sponsorFeeCancelSponsorFeeGen(master)) { - case (issue, startSponsorship, _, cancelSponsorship) => - testTxsStateUpdates(Seq(issue, startSponsorship, cancelSponsorship)) { upds => - val issueUpd = upds.head.assets.head - val startSponsorshipUpd = upds(1).assets.head - val cancelSponsorshipUpd = upds(2).assets.head - - startSponsorshipUpd shouldBe issueUpd.copy( - sponsorship = startSponsorship.minSponsoredAssetFee, - assetExistedBefore = !issueUpd.assetExistedBefore - ) - - cancelSponsorshipUpd shouldBe startSponsorshipUpd.copy( - sponsorship = cancelSponsorship.minSponsoredAssetFee - ) - } - } - - "invokeScript transaction (diff emulated by issue, reussue and burn txs)" in forAll { - for { - issueAmt <- assetAmtGen - reissueAmt <- assetAmtGen - (issue, reissue, _) <- issueReissueBurnGeneratorP(issueAmt, reissueAmt, 1, master).suchThat(_._1.reissuable) - invoke <- invokeScriptGen(Gen.const(Seq.empty)) - } yield (issue, reissue, invoke) - } { - case (issue, reissue, invoke) => - // create a block with issue and reissue txs, getting their diffs - val assetsDummyBlock = TestBlock.create(master, Seq(issue, reissue)) - val assetsDummyBlockDiff = detailedDiffFromBlock(assetsDummyBlock) - - val invokeBlock = TestBlock.create(master, Seq(invoke)) - // merge issue/reissue diffs as if they were produced by a single invoke - val invokeTxDiff = assetsDummyBlockDiff.transactionDiffs - .foldLeft(Diff.empty)(Diff.diffMonoid.combine) - .copy(transactions = Map(invoke.id() -> NewTransactionInfo(invoke, Set(master.toAddress), true))) - val invokeBlockDetailedDiff = assetsDummyBlockDiff.copy(transactionDiffs = Seq(invokeTxDiff)) - - produceEvent(_.onProcessBlock(invokeBlock, invokeBlockDetailedDiff, None, blockchain)) match { - case ba: BlockAppended => - val AssetStateUpdate(asset, decimals, name, description, reissuable, volume, script, sponsorship, nft, assetExistedBefore) = - ba.transactionStateUpdates.head.assets.head - - asset.id shouldBe issue.assetId - decimals shouldBe issue.decimals - name shouldBe issue.name.byteStr - description shouldBe issue.description.byteStr - reissuable shouldBe reissue.reissuable - volume shouldBe (BigInt(issue.quantity) + BigInt(reissue.quantity)) - script shouldBe issue.script - sponsorship shouldBe None - nft shouldBe isNFT(issue) - assetExistedBefore shouldBe false - case _ => fail() - } - } - } - } -} diff --git a/node/src/test/scala/com/wavesplatform/events/EventsHelpers.scala b/node/src/test/scala/com/wavesplatform/events/EventsHelpers.scala deleted file mode 100644 index 1b7895292b1..00000000000 --- a/node/src/test/scala/com/wavesplatform/events/EventsHelpers.scala +++ /dev/null @@ -1,39 +0,0 @@ -package com.wavesplatform.events - -import com.wavesplatform.block.{Block, MicroBlock} -import com.wavesplatform.common.utils.EitherExt2 -import com.wavesplatform.mining.MiningConstraint -import com.wavesplatform.state.diffs.BlockDiffer -import com.wavesplatform.state.diffs.BlockDiffer.DetailedDiff -import monix.execution.Scheduler.Implicits.global -import monix.reactive.subjects.ReplaySubject -import org.scalatest.Suite - -import scala.concurrent.duration._ - -private[events] trait EventsHelpers extends WithBlockchain { _: Suite => - protected def produceEvent(useTrigger: BlockchainUpdateTriggers => Unit): BlockchainUpdated = { - val evts = ReplaySubject[BlockchainUpdated]() - val t = new BlockchainUpdateTriggersImpl(evts) - useTrigger(t) - evts.onComplete() - evts.toListL.runSyncUnsafe(500.milliseconds).head - } - - protected def detailedDiffFromBlock(b: Block): DetailedDiff = - BlockDiffer.fromBlock(blockchain, None, b, MiningConstraint.Unlimited, verify = false).explicitGet().detailedDiff - - protected def appendBlock(b: Block, minerReward: Option[Long] = None): BlockAppended = - produceEvent(_.onProcessBlock(b, detailedDiffFromBlock(b), minerReward, blockchain)) match { - case ba: BlockAppended => ba - case _ => fail() - } - - protected def appendMicroBlock(mb: MicroBlock): MicroBlockAppended = { - val dd = BlockDiffer.fromMicroBlock(blockchain, Some(0), mb, 1, MiningConstraint.Unlimited, verify = false).explicitGet().detailedDiff - produceEvent(_.onProcessMicroBlock(mb, dd, blockchain, mb.totalResBlockSig)) match { - case mba: MicroBlockAppended => mba - case _ => fail() - } - } -} diff --git a/node/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala b/node/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala deleted file mode 100644 index 852f7017d30..00000000000 --- a/node/src/test/scala/com/wavesplatform/events/GenesisBlockUpdateSpec.scala +++ /dev/null @@ -1,35 +0,0 @@ -package com.wavesplatform.events - -import com.wavesplatform.settings.WavesSettings -import com.wavesplatform.state.diffs.ENOUGH_AMT -import com.wavesplatform.transaction.GenesisTransaction -import com.wavesplatform.{BlockGen, TestHelpers} -import org.scalacheck.Gen -import org.scalatest.{FreeSpec, Matchers} -import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks -import com.wavesplatform.common.utils.EitherExt2 - -class GenesisBlockUpdateSpec extends FreeSpec with Matchers with BlockGen with ScalaCheckPropertyChecks with EventsHelpers { - override protected def settings: WavesSettings = TestHelpers.enableNG(super.settings) - - val genesisAppendWithWavesAmountGen: Gen[(BlockAppended, Long)] = for { - master <- accountGen - wavesAmount <- Gen.choose(1L, ENOUGH_AMT) - gt = GenesisTransaction.create(master.toAddress, wavesAmount, 0).explicitGet() - b <- blockGen(Seq(gt), master) - ba = appendBlock(b) - } yield (ba, wavesAmount) - - "on genesis block append" - { - "master address balance gets correctly updated" in forAll(genesisAppendWithWavesAmountGen) { - case (BlockAppended(_, _, _, _, _, upds), wavesAmount) => - upds.head.balances.head._3 shouldBe wavesAmount - } - - "updated Waves amount is calculated correctly" in forAll(genesisAppendWithWavesAmountGen) { - case (BlockAppended(_, _, _, updatedWavesAmount, _, _), wavesAmount) => - updatedWavesAmount shouldBe wavesAmount - } - } - -} diff --git a/node/src/test/scala/com/wavesplatform/events/WithBlockchain.scala b/node/src/test/scala/com/wavesplatform/events/WithBlockchain.scala deleted file mode 100644 index 9c18c33434f..00000000000 --- a/node/src/test/scala/com/wavesplatform/events/WithBlockchain.scala +++ /dev/null @@ -1,46 +0,0 @@ -package com.wavesplatform.events - -import java.nio.file.Files - -import com.wavesplatform.database.TestStorageFactory -import com.wavesplatform.settings.{WavesSettings, loadConfig} -import com.wavesplatform.state.Blockchain -import com.wavesplatform.transaction.BlockchainUpdater -import com.wavesplatform.{NTPTime, TestHelpers, database} -import monix.reactive.Observer -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Suite} - -trait WithBlockchain extends BeforeAndAfterEach with BeforeAndAfterAll with NTPTime { _: Suite => - protected def settings: WavesSettings = WavesSettings.fromRootConfig(loadConfig(None)) - - private val path = Files.createTempDirectory("leveldb-test") - private val db = database.openDB(path.toAbsolutePath.toString) - private val (bcu, _) = TestStorageFactory( - settings, - db, - ntpTime, - Observer.stopped, - BlockchainUpdateTriggers.noop - ) - - protected def blockchain: Blockchain = bcu - - /** - * Override this method to do some initialization actions with - * the blockchain before it becomes read-only - * @param blockchainUpdater a blockchain to add something to (genesis, some blocks, etc.) - */ - protected def initBlockchain(blockchainUpdater: Blockchain with BlockchainUpdater): Unit = () - - override protected def beforeAll(): Unit = { - initBlockchain(bcu) - super.beforeAll() - } - - override def afterAll(): Unit = { - bcu.shutdown() - db.close() - TestHelpers.deleteRecursively(path) - super.afterAll() - } -} diff --git a/node/src/test/scala/com/wavesplatform/state/BlockchainUpdaterImplSpec.scala b/node/src/test/scala/com/wavesplatform/state/BlockchainUpdaterImplSpec.scala index ac5d23a6f19..3b86cc42c4f 100644 --- a/node/src/test/scala/com/wavesplatform/state/BlockchainUpdaterImplSpec.scala +++ b/node/src/test/scala/com/wavesplatform/state/BlockchainUpdaterImplSpec.scala @@ -219,7 +219,7 @@ class BlockchainUpdaterImplSpec // microblock 1 (triggersMock.onProcessMicroBlock _) - .expects(where { (microBlock, diff, bc, _) => + .expects(where { (microBlock, diff, bc, _, _) => bc.height == 1 && microBlock.transactionData.length == 2 && // miner reward, no NG — all txs fees @@ -230,7 +230,7 @@ class BlockchainUpdaterImplSpec // microblock 2 (triggersMock.onProcessMicroBlock _) - .expects(where { (microBlock, diff, bc, _) => + .expects(where { (microBlock, diff, bc, _, _) => bc.height == 1 && microBlock.transactionData.length == 1 && // miner reward, no NG — all txs fees @@ -256,7 +256,7 @@ class BlockchainUpdaterImplSpec // microblock 3 (triggersMock.onProcessMicroBlock _) - .expects(where { (microBlock, _, bc, _) => + .expects(where { (microBlock, _, bc, _, _) => bc.height == 2 && microBlock.reference == block2.signature }) .once() diff --git a/project/Dependencies.scala b/project/Dependencies.scala index adb168f7d4c..d88d1d8e753 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -164,7 +164,7 @@ object Dependencies { ) private[this] val protoSchemasLib = - "com.wavesplatform" % "protobuf-schemas" % "1.2.6" classifier "proto" + "com.wavesplatform" % "protobuf-schemas" % "1.2.8" classifier "proto" lazy val scalapbRuntime = Def.setting { val version = scalapb.compiler.Version.scalapbVersion