Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zeromq publisher for unconfirmed transactions and new blocks #2016

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-parsing" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"org.bitlet" % "weupnp" % "0.1.4",

"org.zeromq" % "jeromq" % "0.5.3",

// api dependencies
"io.circe" %% "circe-core" % circeVersion,
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.ergoplatform.mining.ErgoMiner.StartMining
import org.ergoplatform.network.{ErgoNodeViewSynchronizer, ErgoSyncTracker}
import org.ergoplatform.nodeView.history.ErgoSyncInfoMessageSpec
import org.ergoplatform.nodeView.history.extra.ExtraIndexer
import org.ergoplatform.nodeView.{ErgoNodeViewRef, ErgoReadersHolderRef}
import org.ergoplatform.nodeView.{ErgoEventPublisherRef, ErgoNodeViewRef, ErgoReadersHolderRef}
import org.ergoplatform.settings.{Args, ErgoSettings, NetworkType}
import scorex.core.api.http._
import scorex.core.app.ScorexContext
Expand Down Expand Up @@ -98,6 +98,8 @@ class ErgoApp(args: Args) extends ScorexLogging {

private val readersHolderRef: ActorRef = ErgoReadersHolderRef(nodeViewHolderRef)

ErgoEventPublisherRef()

// Create an instance of ErgoMiner actor if "mining = true" in config
private val minerRefOpt: Option[ActorRef] =
if (ergoSettings.nodeSettings.mining) {
Expand Down
42 changes: 42 additions & 0 deletions src/main/scala/org/ergoplatform/nodeView/ErgoEventPublisher.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.ergoplatform.nodeView

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.ergoplatform.modifiers.history.header.Header
import org.zeromq.SocketType
import org.zeromq.ZMQ
import org.zeromq.ZContext
import org.ergoplatform.modifiers.mempool.UnconfirmedTransaction
import org.ergoplatform.network.ErgoNodeViewSynchronizer.ReceivableMessages.{FullBlockApplied, SuccessfulTransaction}
import scorex.util.ScorexLogging

class ErgoEventPublisher(socket: ZMQ.Socket) extends Actor with ScorexLogging {

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[SuccessfulTransaction])
context.system.eventStream.subscribe(self, classOf[FullBlockApplied])
}

@SuppressWarnings(Array("IsInstanceOf"))
override def receive: Receive = {
case SuccessfulTransaction(transaction: UnconfirmedTransaction) =>
socket.send(s"utx${transaction.id}")

case FullBlockApplied(header: Header) =>
socket.send(s"blk${header.id}${header.height.toString}")

case a: Any => log.warn(s"ErgoEventPublisher got improper input: $a")
}
}

object ErgoEventPublisherRef {

def apply()
(implicit context: ActorRefFactory): ActorRef = {
val zContext: ZContext = new ZContext()
val socket = zContext.createSocket(SocketType.PUB)
socket.bind("tcp://127.0.0.1:5555")
val props = Props(new ErgoEventPublisher(socket))
context.actorOf(props)
}

}