Skip to content

Commit

Permalink
Testing for disconnection scenarios (#140)
Browse files Browse the repository at this point in the history
* coverage for disconnection scenarios

* fixing a few flows - fixing tests

* adding new tests, fixing some bugs detected

* some further tests and updates

* updating examples

* adding test for send_batch when the connnection get terminated by the server

* adding test for send_batch when the connnection get terminated by the server

* Handle disconnection

Signed-off-by: Gabriele Santomaggio <[email protected]>

* reformatting

* avoid rising on_close callback for locator connections

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
DanielePalaia and Gsantomaggio authored Oct 30, 2023
1 parent 6038781 commit b6fb1bc
Show file tree
Hide file tree
Showing 14 changed files with 454 additions and 175 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,56 +1,59 @@
import asyncio
import time
import signal

from rstream import (
AMQPMessage,
Consumer,
DisconnectionErrorInfo,
Producer,
MessageContext,
amqp_decoder,
)

STREAM = "my-test-stream"
MESSAGES = 10000000
COUNT = 0
connection_is_closed = False


async def publish():
async def consume():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
+ str(disconnection_info.reason)
)

global connection_is_closed
connection_is_closed = True

await producer.close()

# avoid to use async context in this case as we are closing the producer ourself in the callback
# in this case we avoid double closing
producer = Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
# avoid multiple simultaneous disconnection to call close multiple times
if connection_is_closed is False:
await consumer.close()
connection_is_closed = True

consumer = Consumer(
host="localhost",
port=5552,
vhost="/",
username="guest",
password="guest",
connection_closed_handler=on_connection_closed,
)

await producer.start()
# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)

# sending a million of messages in AMQP format
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break
async def on_message(msg: AMQPMessage, message_context: MessageContext):
stream = message_context.consumer.get_stream(message_context.subscriber_name)
offset = message_context.offset
global COUNT
COUNT = COUNT + 1
if COUNT % 1000000 == 0:
# print("Got message: {} from stream {}, offset {}".format(msg, stream, offset))
print("consumed 1 million messages")

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")
await consumer.start()
await consumer.subscribe(stream=STREAM, callback=on_message, decoder=amqp_decoder)
await consumer.run()


asyncio.run(publish())
asyncio.run(consume())
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
)

STREAM = "my-test-stream"
MESSAGES = 1000000
MESSAGES = 10000000
connection_is_closed = False


Expand All @@ -18,36 +18,42 @@ async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> No
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
+ str(disconnection_info.reason)
)

# clean close or reconnect
await producer.close()
global connection_is_closed
connection_is_closed = True

print("creating Producer")
# producer will be closed at the end by the async context manager
# both if connection is still alive or not
async with Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
) as producer:

# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

print("Sending MESSAGES")
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
global connection_is_closed
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")
if i % 10000 == 0:
print("sent 10000 messages")

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)

count = 0
connection_is_closed = False


async def on_message(msg: AMQPMessage, message_context: MessageContext):
Expand All @@ -31,7 +32,12 @@ async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> No
+ " for reason: "
+ disconnection_info.reason
)
await consumer.close()

global connection_is_closed
if connection_is_closed is False:
connection_is_closed = True
# avoid multiple simultaneous disconnection to call close multiple times
await consumer.close()

consumer = SuperStreamConsumer(
host="localhost",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,38 @@ async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> No
global connection_is_closed
connection_is_closed = True

await super_stream_producer.close()

# avoiding using async context as we close the producer ourself in on_connection_closed callback
super_stream_producer = SuperStreamProducer(
# super_stream_producer will be closed by the async context manager
# both if connection is still alive or not
print("creating super_stream producer")
async with SuperStreamProducer(
"localhost",
username="guest",
password="guest",
routing_extractor=routing_extractor,
routing=RouteType.Hash,
connection_closed_handler=on_connection_closed,
super_stream=SUPER_STREAM,
)

await super_stream_producer.start()

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed
) as super_stream_producer:

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
application_properties={"id": "{}".format(i)},
)
# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

# send is asynchronous
if connection_is_closed is False:
await super_stream_producer.send(message=amqp_message)
else:
break
print("sending messages")
for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
application_properties={"id": "{}".format(i)},
)

# send is asynchronous
if connection_is_closed is False:
await super_stream_producer.send(message=amqp_message)
else:
break

if i % 10000 == 0:
print("sent 10000 MESSAGES")

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")
Expand Down
Loading

0 comments on commit b6fb1bc

Please sign in to comment.