Skip to content

Commit

Permalink
improving stability and cleanup in case of disconnection / improving …
Browse files Browse the repository at this point in the history
…report in case of disconnection (#131)

* improving disconnection Information report

* new modifications after rebase

* implementing code review suggestions

* implementing code review suggestions: adding timeout in close()

* formatting examples

Signed-off-by: Gabriele Santomaggio <[email protected]>

* formatting examples

Signed-off-by: Gabriele Santomaggio <[email protected]>

* replacing exception report to string

---------

Signed-off-by: Gabriele Santomaggio <[email protected]>
Co-authored-by: DanielePalaia <[email protected]>
Co-authored-by: Daniele Palaia <[email protected]>
Co-authored-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
4 people authored Oct 19, 2023
1 parent 11da180 commit cd50fcb
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
from rstream import (
AMQPMessage,
Consumer,
DisconnectionErrorInfo,
MessageContext,
amqp_decoder,
)

STREAM = "my-test-stream"


async def on_connection_closed(reason: Exception) -> None:
print("connection has been closed for reason: " + str(reason))
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)


async def consume():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import time

from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
Producer,
)

STREAM = "my-test-stream"
MESSAGES = 10000000
connection_is_closed = False


async def publish():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)

global connection_is_closed
connection_is_closed = True

await producer.close()

# avoid to use async context in this case as we are closing the producer ourself in the callback
# in this case we avoid double closing
producer = Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
)

await producer.start()
# create a stream if it doesn't already exist
await producer.create_stream(STREAM, exists_ok=True)

# sending a million of messages in AMQP format
start_time = time.perf_counter()

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import asyncio
import time

from rstream import AMQPMessage, Producer
from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
Producer,
)

STREAM = "my-test-stream"
MESSAGES = 1000000


async def on_connection_closed(reason: Exception) -> None:
print("connection has been closed for reason: " + str(reason))
connection_is_closed = False


async def publish():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)

# clean close or reconnect
await producer.close()
global connection_is_closed
connection_is_closed = True

async with Producer(
"localhost", username="guest", password="guest", connection_closed_handler=on_connection_closed
Expand All @@ -21,13 +34,17 @@ async def publish():

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
)
# send is asynchronous
await producer.send(stream=STREAM, message=amqp_message)
if connection_is_closed is False:
await producer.send(stream=STREAM, message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import signal

from rstream import (
AMQPMessage,
ConsumerOffsetSpecification,
DisconnectionErrorInfo,
MessageContext,
OffsetType,
SuperStreamConsumer,
amqp_decoder,
)

count = 0


async def on_message(msg: AMQPMessage, message_context: MessageContext):
global count
count += 1
if (count % 100000) == 0:
stream = await message_context.consumer.stream(message_context.subscriber_name)
offset = message_context.offset
print("Received message: {} from stream: {} - message offset: {}".format(msg, stream, offset))


async def consume():
async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:
print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)
await consumer.close()

consumer = SuperStreamConsumer(
host="localhost",
port=5552,
vhost="/",
username="guest",
password="guest",
super_stream="invoices",
connection_closed_handler=on_connection_closed,
)

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(consumer.close()))
offset_specification = ConsumerOffsetSpecification(OffsetType.FIRST, None)
await consumer.start()
await consumer.subscribe(
callback=on_message, decoder=amqp_decoder, offset_specification=offset_specification
)
await consumer.run()


asyncio.run(consume())
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import asyncio
import time

from rstream import (
AMQPMessage,
DisconnectionErrorInfo,
RouteType,
SuperStreamProducer,
)

SUPER_STREAM = "invoices"
MESSAGES = 10000000

connection_is_closed = False


async def publish():
# this value will be hashed using mumh3 hashing algorithm to decide the partition resolution for the message
async def routing_extractor(message: AMQPMessage) -> str:
return message.application_properties["id"]

async def on_connection_closed(disconnection_info: DisconnectionErrorInfo) -> None:

print(
"connection has been closed from stream: "
+ str(disconnection_info.streams)
+ " for reason: "
+ disconnection_info.reason
)
global connection_is_closed
connection_is_closed = True

await super_stream_producer.close()

# avoiding using async context as we close the producer ourself in on_connection_closed callback
super_stream_producer = SuperStreamProducer(
"localhost",
username="guest",
password="guest",
routing_extractor=routing_extractor,
routing=RouteType.Hash,
connection_closed_handler=on_connection_closed,
super_stream=SUPER_STREAM,
)

await super_stream_producer.start()

# sending a million of messages in AMQP format
start_time = time.perf_counter()
global connection_is_closed

for i in range(MESSAGES):
amqp_message = AMQPMessage(
body="hello: {}".format(i),
application_properties={"id": "{}".format(i)},
)

# send is asynchronous
if connection_is_closed is False:
await super_stream_producer.send(message=amqp_message)
else:
break

end_time = time.perf_counter()
print(f"Sent {MESSAGES} messages in {end_time - start_time:0.4f} seconds")


asyncio.run(publish())
3 changes: 3 additions & 0 deletions rstream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

from importlib import metadata

from .utils import DisconnectionErrorInfo

try:
__version__ = metadata.version(__package__)
__license__ = metadata.metadata(__package__)["license"]
Expand Down Expand Up @@ -59,4 +61,5 @@
"StreamDoesNotExist",
"OffsetSpecification",
"EventContext",
"DisconnectionErrorInfo",
]
Loading

0 comments on commit cd50fcb

Please sign in to comment.