Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dropIfNoChannel problem #48

Open
RashadAnsari opened this issue Sep 17, 2018 · 3 comments
Open

dropIfNoChannel problem #48

RashadAnsari opened this issue Sep 17, 2018 · 3 comments

Comments

@RashadAnsari
Copy link

Hi
When I send many ChannelMessage with a high rate to ChannelActor with dropIfNoChannel = false if I stop rabbitmq and after a duration of time like 1 minute start rabbitmq again, the ChannelActor blocked for process many messages and can't process other message and throw heap space error.
I write this code:

import akka.actor.{ActorRef, ActorSystem}
import akka.pattern._
import akka.util.Timeout
import com.newmotion.akka.rabbitmq.{ChannelActor, ChannelCreated, ChannelMessage, ConnectionActor, ConnectionFactory, CreateChannel}
import com.rabbitmq.client.MessageProperties

import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Exam1 extends App {

  val system = ActorSystem()

  implicit val timeout: Timeout = Timeout(2 second)
  implicit val executionContext: ExecutionContext = system.dispatcher

  val unconfirmed = mutable.Set.empty[Long]

  val connFactory = new ConnectionFactory()
  connFactory.setHost("127.0.0.1")
  connFactory.setPort(5672)
  connFactory.setUsername("amqp")
  connFactory.setPassword("pass")

  val pubConnActor = system.actorOf(ConnectionActor.props(connFactory))

  Thread.sleep(5000)

  @tailrec
  def produce(chActor: ActorRef): Unit = {
    chActor ! ChannelMessage({
      ch =>
        println("------------------------")
        ch.basicPublish("amq.direct", "me-topic", MessageProperties.PERSISTENT_BASIC, "message".getBytes("UTF-8"))
    }, dropIfNoChannel = false)
    produce(chActor)
  }

  (pubConnActor ? CreateChannel(ChannelActor.props().withMailbox("bounded-mailbox"))).mapTo[ChannelCreated] map {
    case ChannelCreated(chActor) =>
      produce(chActor)
  }

}

When I run my code and after a duration of time stop rabbitmq and after a duration of time start again I get heap space error and stop my program.
I think this is for loop function in ChannelActor.
Please fix it and choose another way.

@sbmpost
Copy link
Contributor

sbmpost commented Oct 2, 2018

If rabbitmq goes offline for a short moment of time (small network glitch for instance), the fallback mechanism is to queue messages in memory for as long as it is offline. But there are limits to the number of messages that can be queued. Is this why you are using a bounded-mailbox?

Note: I am not the maintainer of this library, but more curious about the behaviour you are describing

@RashadAnsari
Copy link
Author

Thank you.
I just use bounded-mailbox for control message rate.
When I send many messages to channel actor if I don't use a bounded-mailbox, channel actor crash.

@sbmpost
Copy link
Contributor

sbmpost commented Dec 1, 2018

A version 5.0.4-beta has been released. While the changes aren't directly related to your problem, it might change the behaviour you are seeing. But as mentioned before, akka-rabbitmq can only queue messages for a limited period of time (that is until the heap space runs out). If the connection to the server is restored within that time, then you should be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants