From a90de957e5f348e9d52f8812f1380198fe06daff Mon Sep 17 00:00:00 2001 From: Robert Pieter van Leeuwen Date: Wed, 12 Jul 2023 11:08:50 +0200 Subject: [PATCH] Add zeromq publisher for unconfirmed transactions and new blocks --- build.sbt | 2 + src/main/scala/org/ergoplatform/ErgoApp.scala | 4 +- .../nodeView/ErgoEventPublisher.scala | 42 +++++++++++++++++++ 3 files changed, 47 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/org/ergoplatform/nodeView/ErgoEventPublisher.scala diff --git a/build.sbt b/build.sbt index e880b0fa32..f2a04e2bc6 100644 --- a/build.sbt +++ b/build.sbt @@ -62,6 +62,8 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-parsing" % akkaHttpVersion, "com.typesafe.akka" %% "akka-stream" % akkaVersion, "org.bitlet" % "weupnp" % "0.1.4", + + "org.zeromq" % "jeromq" % "0.5.3", // api dependencies "io.circe" %% "circe-core" % circeVersion, diff --git a/src/main/scala/org/ergoplatform/ErgoApp.scala b/src/main/scala/org/ergoplatform/ErgoApp.scala index f83eba0a56..1b381fe934 100644 --- a/src/main/scala/org/ergoplatform/ErgoApp.scala +++ b/src/main/scala/org/ergoplatform/ErgoApp.scala @@ -12,7 +12,7 @@ import org.ergoplatform.mining.ErgoMiner.StartMining import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker} import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec import org.ergoplatform.nodeView.history.extra.ExtraIndexer -import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef} +import org.ergoplatform.nodeView.{ErgoEventPublisherRef, ErgoNodeViewRef, ErgoReadersHolderRef} import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType} import scorex.core.api.http._ import scorex.core.app.ScorexContext @@ -98,6 +98,8 @@ class ErgoApp(args: Args) extends ScorexLogging { private val readersHolderRef: ActorRef = ErgoReadersHolderRef(nodeViewHolderRef) + ErgoEventPublisherRef() + // Create an instance of ErgoMiner actor if "mining = true" in config private val minerRefOpt: Option[ActorRef] = if (ergoSettings.nodeSettings.mining) { diff --git a/src/main/scala/org/ergoplatform/nodeView/ErgoEventPublisher.scala b/src/main/scala/org/ergoplatform/nodeView/ErgoEventPublisher.scala new file mode 100644 index 0000000000..26ff06e384 --- /dev/null +++ b/src/main/scala/org/ergoplatform/nodeView/ErgoEventPublisher.scala @@ -0,0 +1,42 @@ +package org.ergoplatform.nodeView + +import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} +import org.ergoplatform.modifiers.history.header.Header +import org.zeromq.SocketType +import org.zeromq.ZMQ +import org.zeromq.ZContext +import org.ergoplatform.modifiers.mempool.UnconfirmedTransaction +import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{FullBlockApplied, SuccessfulTransaction} +import scorex.util.ScorexLogging + +class ErgoEventPublisher(socket: ZMQ.Socket) extends Actor with ScorexLogging { + + override def preStart(): Unit = { + context.system.eventStream.subscribe(self, classOf[SuccessfulTransaction]) + context.system.eventStream.subscribe(self, classOf[FullBlockApplied]) + } + + @SuppressWarnings(Array("IsInstanceOf")) + override def receive: Receive = { + case SuccessfulTransaction(transaction: UnconfirmedTransaction) => + socket.send(s"utx${transaction.id}") + + case FullBlockApplied(header: Header) => + socket.send(s"blk${header.id}${header.height.toString}") + + case a: Any => log.warn(s"ErgoEventPublisher got improper input: $a") + } +} + +object ErgoEventPublisherRef { + + def apply() + (implicit context: ActorRefFactory): ActorRef = { + val zContext: ZContext = new ZContext() + val socket = zContext.createSocket(SocketType.PUB) + socket.bind("tcp://127.0.0.1:5555") + val props = Props(new ErgoEventPublisher(socket)) + context.actorOf(props) + } + +}