Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublishSubcribe example doesn't work #52

Open
ebracci opened this issue Jan 11, 2019 · 3 comments
Open

PublishSubcribe example doesn't work #52

ebracci opened this issue Jan 11, 2019 · 3 comments

Comments

@ebracci
Copy link

ebracci commented Jan 11, 2019

Hi, I am trying to test the following example but it doesn't work.

object PublishSubscribe extends App {
  implicit val system = ActorSystem()
  val factory = new ConnectionFactory()
  val connection = system.actorOf(ConnectionActor.props(factory), "akka-rabbitmq")
  val exchange = "amq.fanout"


  def setupPublisher(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    println(channel)
    channel.queueBind(queue, exchange, "")
  }

  connection ! CreateChannel(ChannelActor.props(setup), Some("publisher"))

  def setupSubscriber(channel: Channel, self: ActorRef) {
    val queue = channel.queueDeclare().getQueue
    println(channel)
    channel.queueBind(queue, exchange, "")
    val consumer = new DefaultConsumer(channel) {
      override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
        println("received: " + fromBytes(body))
      }
    }
    channel.basicConsume(queue, true, consumer)
  }

  def setup(channel: Channel, self: ActorRef): Unit = {
    setupPublisher(channel, self)
    setupSubscriber(channel, self)
  }

  connection ! CreateChannel(ChannelActor.props(setup), Some("subscriber"))

  Future {
    def loop(n: Long) {
      val publisher = system.actorSelection("/user/rabbitmq/publisher")

      def publish(channel: Channel) {
        println("publish")
        channel.basicPublish(exchange, "", null, toBytes(n))
      }

      publisher ! ChannelMessage(publish, dropIfNoChannel = false)

      Thread.sleep(1000)
      loop(n + 1)
    }

    loop(0)
  }

  def fromBytes(x: Array[Byte]) = new String(x, "UTF-8")

  def toBytes(x: Long) = x.toString.getBytes("UTF-8")
}

The publish method is not called by

publisher ! ChannelMessage(publish, dropIfNoChannel = false)

@SergeKireev
Copy link

Not working for me neither, ChannelMessages get sent to dead letter

@RonaldKruizinga
Copy link

Try replacing
val publisher = system.actorSelection("/user/rabbitmq/publisher")

with

val publisher = system.actorSelection("/user/akka-rabbitmq/publisher")

That fixed it for me

@gertjana
Copy link
Member

Indeed the connection Actor is created with the name "akka-rabbitmq" at the top but referenced as 'rabbitmq' below.

i've update the example. thx for reporting and providing the fix ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants