From 4d88343b8d7b3d05dc4358816c0a1a6c6c97f0f5 Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Tue, 17 May 2022 18:44:06 +0300 Subject: [PATCH 1/3] NODE-2329 Reestabilish input stream on unexpected read bytes size --- .../scala/com/wavesplatform/Importer.scala | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 5ad6ee136e3..1d8369dd38a 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -1,12 +1,5 @@ package com.wavesplatform -import java.io._ -import java.net.{MalformedURLException, URL} - -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ -import scala.util.{Failure, Success, Try} - import akka.actor.ActorSystem import com.google.common.io.ByteStreams import com.google.common.primitives.Ints @@ -16,7 +9,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonB import com.wavesplatform.block.{Block, BlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector -import com.wavesplatform.database.{openDB, DBExt, KeyTags} +import com.wavesplatform.database.{DBExt, KeyTags, openDB} import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent} import com.wavesplatform.extensions.{Context, Extension} import com.wavesplatform.features.BlockchainFeatures @@ -25,12 +18,12 @@ import com.wavesplatform.lang.ValidationError import com.wavesplatform.mining.Miner import com.wavesplatform.protobuf.block.PBBlocks import com.wavesplatform.settings.WavesSettings -import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} import com.wavesplatform.state.appender.BlockAppender -import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} +import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} import com.wavesplatform.transaction.TxValidationError.GenericError import com.wavesplatform.transaction.smart.script.trace.TracedResult -import com.wavesplatform.utils._ +import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} +import com.wavesplatform.utils.* import com.wavesplatform.utx.{UtxPool, UtxPoolImpl} import com.wavesplatform.wallet.Wallet import kamon.Kamon @@ -40,6 +33,12 @@ import monix.reactive.{Observable, Observer} import org.iq80.leveldb.DB import scopt.OParser +import java.io.* +import java.net.{MalformedURLException, URL} +import scala.concurrent.duration.* +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success, Try} + object Importer extends ScorexLogging { import monix.execution.Scheduler.Implicits.global @@ -59,7 +58,7 @@ object Importer extends ScorexLogging { import scopt.OParser val builder = OParser.builder[ImportOptions] - import builder._ + import builder.* OParser.sequence( programName("waves import"), @@ -167,12 +166,13 @@ object Importer extends ScorexLogging { } } - @volatile private var quit = false - private val lock = new Object + @volatile private var quit = false + @volatile private var inputStream: InputStream = null + private val lock = new Object - //noinspection UnstableApiUsage + // noinspection UnstableApiUsage def startImport( - inputStream: BufferedInputStream, + getInputStream: () => InputStream, blockchain: Blockchain, appendBlock: AppendBlock, importOptions: ImportOptions, @@ -189,13 +189,15 @@ object Importer extends ScorexLogging { if (blocksToSkip > 0) log.info(s"Skipping $blocksToSkip block(s)") sys.addShutdownHook { - import scala.concurrent.duration._ + import scala.concurrent.duration.* val millis = (System.nanoTime() - start).nanos.toMillis log.info( s"Imported $counter block(s) from $startHeight to ${startHeight + counter} in ${humanReadableDuration(millis)}" ) } + inputStream = getInputStream() + while (!quit && counter < blocksToApply) lock.synchronized { val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES) if (s1 == Ints.BYTES) { @@ -236,7 +238,9 @@ object Importer extends ScorexLogging { } } else { log.info(s"$factReadSize != expected $blockSize") - quit = true + log.info(s"reestablishing input stream") + inputStream.close() + inputStream = getInputStream() } } else { if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})") @@ -286,7 +290,7 @@ object Importer extends ScorexLogging { val extensions = initExtensions(settings, blockchainUpdater, scheduler, time, utxPool, db, actorSystem) checkGenesis(settings, blockchainUpdater, Miner.Disabled) - val importFileOffset = + def importFileOffset() = if (importOptions.dryRun) 0 else importOptions.format match { @@ -304,7 +308,9 @@ object Importer extends ScorexLogging { case _ => 0L } - val inputStream = new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset), 2 * 1024 * 1024) + + def establishInputStream() = + new BufferedInputStream(initFileStream(importOptions.blockchainFile, importFileOffset()), 2 * 1024 * 1024) if (importOptions.dryRun) { def readNextBlock(): Future[Option[Block]] = Future.successful(None) @@ -353,10 +359,10 @@ object Importer extends ScorexLogging { levelDb.close() db.close() } - inputStream.close() + if (inputStream != null) inputStream.close() } - startImport(inputStream, blockchainUpdater, extAppender, importOptions, importFileOffset == 0) + startImport(establishInputStream, blockchainUpdater, extAppender, importOptions, importFileOffset() == 0) Await.result(Kamon.stopModules(), 10.seconds) } } From 3218b6af752b3322773547dce96cd418c4040d05 Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Tue, 17 May 2022 18:52:36 +0300 Subject: [PATCH 2/3] Fix checkPR error --- node/src/main/scala/com/wavesplatform/Importer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 1d8369dd38a..7b20cefa571 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -362,7 +362,7 @@ object Importer extends ScorexLogging { if (inputStream != null) inputStream.close() } - startImport(establishInputStream, blockchainUpdater, extAppender, importOptions, importFileOffset() == 0) + startImport(() => establishInputStream(), blockchainUpdater, extAppender, importOptions, importFileOffset() == 0) Await.result(Kamon.stopModules(), 10.seconds) } } From 626e27b4de426763baa00b1024024642263335bf Mon Sep 17 00:00:00 2001 From: Artyom Sayadyan Date: Mon, 6 Jun 2022 17:57:56 +0300 Subject: [PATCH 3/3] Catch reading bytes exceptions --- .../scala/com/wavesplatform/Importer.scala | 87 ++++++++++--------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 7b20cefa571..d491f1041c3 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -37,6 +37,7 @@ import java.io.* import java.net.{MalformedURLException, URL} import scala.concurrent.duration.* import scala.concurrent.{Await, Future} +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} object Importer extends ScorexLogging { @@ -199,52 +200,58 @@ object Importer extends ScorexLogging { inputStream = getInputStream() while (!quit && counter < blocksToApply) lock.synchronized { - val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES) - if (s1 == Ints.BYTES) { - val blockSize = Ints.fromByteArray(lenBytes) - - lazy val blockBytes = new Array[Byte](blockSize) - val factReadSize = - if (blocksToSkip > 0) { - // File IO optimization - ByteStreams.skipFully(inputStream, blockSize) - blockSize - } else { - ByteStreams.read(inputStream, blockBytes, 0, blockSize) - } + try { + val s1 = ByteStreams.read(inputStream, lenBytes, 0, Ints.BYTES) + if (s1 == Ints.BYTES) { + val blockSize = Ints.fromByteArray(lenBytes) + + lazy val blockBytes = new Array[Byte](blockSize) + val factReadSize = + if (blocksToSkip > 0) { + // File IO optimization + ByteStreams.skipFully(inputStream, blockSize) + blockSize + } else { + ByteStreams.read(inputStream, blockBytes, 0, blockSize) + } - if (factReadSize == blockSize) { - if (blocksToSkip > 0) { - blocksToSkip -= 1 - } else { - val blockV5 = blockchain.isFeatureActivated( - BlockchainFeatures.BlockV5, - blockchain.height + 1 - ) - val block = - (if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes) - else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get - if (blockchain.lastBlockId.contains(block.header.reference)) { - Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match { - case Left(ve) => - log.error(s"Error appending block: $ve") - quit = true - case _ => - counter = counter + 1 - } + if (factReadSize == blockSize) { + if (blocksToSkip > 0) { + blocksToSkip -= 1 } else { - log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") + val blockV5 = blockchain.isFeatureActivated( + BlockchainFeatures.BlockV5, + blockchain.height + 1 + ) + val block = + (if (importOptions.format == Formats.Binary && !blockV5) Block.parseBytes(blockBytes) + else PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)).get + if (blockchain.lastBlockId.contains(block.header.reference)) { + Await.result(appendBlock(block).runAsyncLogErr, Duration.Inf) match { + case Left(ve) => + log.error(s"Error appending block: $ve") + quit = true + case _ => + counter = counter + 1 + } + } else { + log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}") + } } + } else { + log.info(s"$factReadSize != expected $blockSize") + log.info(s"reestablishing input stream") + inputStream.close() + inputStream = getInputStream() } } else { - log.info(s"$factReadSize != expected $blockSize") - log.info(s"reestablishing input stream") - inputStream.close() - inputStream = getInputStream() + if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})") + quit = true } - } else { - if (inputStream.available() > 0) log.info(s"Expecting to read ${Ints.BYTES} but got $s1 (${inputStream.available()})") - quit = true + } catch { + case NonFatal(e) => + log.error(s"Error reading bytes: $e") + quit = true } } }