Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openConnection/closeConnection not thread safe #85

Open
eddiew opened this issue Oct 16, 2018 · 1 comment
Open

openConnection/closeConnection not thread safe #85

eddiew opened this issue Oct 16, 2018 · 1 comment

Comments

@eddiew
Copy link

eddiew commented Oct 16, 2018

I noticed that if multiple connections to the same amqp server are opened on separate threads, one thread calling closeConnection causes all of the connections to stop working.

I don't plan to have per-thread connections in production (this arose when running tests in parallel) but it strikes me as something that would be good to fix, or at least document.

@hreinhardt
Copy link
Owner

That's strange, since there's no global state in the AMQP library; therefore, connections should be totally independent.

I tried hacking together a simple test-case, but it's not displaying any erroneous behaviour:

{-# OPTIONS -XOverloadedStrings #-}
import Network.AMQP

import Control.Concurrent
import qualified Data.ByteString.Lazy.Char8 as BL

main = do
    putStrLn "(thread1) opening connection"
    conn <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan <- openChannel conn
    declareQueue chan newQueue {queueName = "myQueue"}
    declareExchange chan newExchange {exchangeName = "myExchange", exchangeType = "direct"}
    bindQueue chan "myQueue" "myExchange" "myKey"
    consumeMsgs chan "myQueue" Ack $ \(msg, env) -> do
        putStrLn $ "(thread1) received message: "++(BL.unpack $ msgBody msg)
        ackEnv env

    forkIO thread2
    threadDelay 500000

    putStrLn "(thread1) publishing message"
    publishMsg chan "myExchange" "myKey"
        (newMsg {msgBody = (BL.pack "hello world (from thread1)"),
                 msgDeliveryMode = Just NonPersistent}
                )

    threadDelay 500000
    closeConnection conn
    putStrLn "(thread1) connection closed"


thread2 = do
    putStrLn "(thread2) opening connection"
    conn2 <- openConnection "127.0.0.1" "/" "guest" "guest"
    chan2 <- openChannel conn2

    putStrLn "(thread2) publishing message"
    publishMsg chan2 "myExchange" "myKey"
        (newMsg {msgBody = (BL.pack "hello world (from thread2)"),
                 msgDeliveryMode = Just NonPersistent}
                )
    threadDelay 100000
    closeConnection conn2
    putStrLn "(thread2) connection closed"

For me it prints:

(thread1) opening connection
(thread2) opening connection
(thread2) publishing message
(thread1) received message: hello world (from thread2)
(thread2) connection closed
(thread1) publishing message
(thread1) received message: hello world (from thread1)
(thread1) connection closed

So Thread1 is able to work with the connection, even after Thread2 opens and closes its own connection.

It would be great if you could produce a test-case. But I also wouldn't rule out that the problem might actually lie elsewhere, so you probably want to take a look at the RabbitMQ log-file first and see if there's anything unusual.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants