Skip to content

Commit

Permalink
implementing maxProducersByConnection and maxConsumerByConnection (#188)
Browse files Browse the repository at this point in the history
* implementing maxProducersByConnection and maxConsumerByConnection

* fixing indexes issue in client

* improving connection management in superstream_consumer

* adding a sleep before reconnecting

* fixing bug in available slot algorithm

* fixing bug in available slot algorithm

* bumping version for releasing
  • Loading branch information
DanielePalaia authored Apr 23, 2024
1 parent 35b8328 commit da80481
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 64 deletions.
12 changes: 10 additions & 2 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:
vhost = rabbitmq_data["Virtualhost"]
load_balancer = bool(rabbitmq_data["LoadBalancer"])
stream_name = rabbitmq_data["StreamName"]
max_publishers_by_connection = rabbitmq_data["MaxPublishersByConnection"]
producers = rabbitmq_data["Producers"]

if bool(rabbitmq_data["SuperStream"]) is False:
producer = Producer(
Expand All @@ -69,16 +71,18 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:
port=port,
vhost=vhost,
load_balancer_mode=load_balancer,
max_publishers_by_connection=max_publishers_by_connection,
)

else:
super_stream_creation_opt = SuperStreamCreationOption(n_partitions=3)
super_stream_creation_opt = SuperStreamCreationOption(n_partitions=int(producers))
producer = SuperStreamProducer( # type: ignore
host=host,
username=username,
password=password,
port=port,
vhost=vhost,
max_publishers_by_connection=max_publishers_by_connection,
load_balancer_mode=load_balancer,
super_stream=stream_name,
super_stream_creation_option=super_stream_creation_opt,
Expand Down Expand Up @@ -128,6 +132,8 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
vhost = rabbitmq_data["Virtualhost"]
load_balancer = bool(rabbitmq_data["LoadBalancer"])
stream_name = rabbitmq_data["StreamName"]
n_producers = rabbitmq_data["Producers"]
max_subscribers_by_connection = rabbitmq_data["MaxSubscribersByConnection"]

if bool(rabbitmq_data["SuperStream"]) is False:
consumer = Consumer(
Expand All @@ -138,10 +144,11 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
vhost=vhost,
load_balancer_mode=load_balancer,
on_close_handler=on_close_connection,
max_subscribers_by_connection=max_subscribers_by_connection,
)

else:
super_stream_creation_opt = SuperStreamCreationOption(n_partitions=3)
super_stream_creation_opt = SuperStreamCreationOption(n_partitions=int(n_producers))
consumer = SuperStreamConsumer( # type: ignore
host=host,
username=username,
Expand All @@ -152,6 +159,7 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:
super_stream=stream_name,
super_stream_creation_option=super_stream_creation_opt,
on_close_handler=on_close_connection,
max_subscribers_by_connection=max_subscribers_by_connection,
)

return consumer
Expand Down
14 changes: 8 additions & 6 deletions docs/examples/reliable_client/appsettings.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
{
"RabbitMQ": {
"Host": "localhost",
"Username": "guest",
"Password": "guest",
"Host": "34.147.210.193",
"Username": "default_user_4RVgi_YtwKGw7xlGPIi",
"Password": "3urSCJfSnaZal6d_VsCSJRrUwar_iTOA",
"Port": 5552,
"Virtualhost": "/",
"LoadBalancer": true,
"SuperStream": false,
"SuperStream": true,
"MaxPublishersByConnection": 256,
"MaxSubscribersByConnection": 256,
"Producers": 3,
"Consumers": 3,
"DelayDuringSendMs":0,
"MessagesToSend": 100000,
"StreamName": "PythonClientTest",
"MessagesToSend": 10000000,
"StreamName": "invoices",
"Logging": ""
}
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "rstream"
version = "0.18.0"
version = "0.19.0"
description = "A python client for RabbitMQ Streams"
authors = ["George Fortunatov <[email protected]>", "Daniele Palaia <[email protected]>"]
readme = "README.md"
Expand Down
81 changes: 52 additions & 29 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
frame_max: int,
heartbeat: int,
connection_name: Optional[str] = "",
max_clients_by_connections=256,
connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
):
Expand Down Expand Up @@ -103,10 +104,12 @@ def __init__(

self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
self._is_not_closed: bool = True
self._max_clients_by_connections = max_clients_by_connections

self._streams: list[str] = []
self._available_ids: list[bool] = [True for i in range(257)]
self._current_id = 1
# used to assing publish_ids and subscribe_ids
self._available_client_ids: list[bool] = [True for i in range(max_clients_by_connections)]
self._current_id = 0

def start_task(self, name: str, coro: Awaitable[None]) -> None:
assert name not in self._tasks
Expand Down Expand Up @@ -161,22 +164,22 @@ async def remove_stream(self, stream: str):
self._streams.remove(stream)

async def get_available_id(self) -> int:
if self._current_id <= 256:
publishing_id = self._current_id
self._available_ids[publishing_id] = False
self._current_id = self._current_id + 1
return publishing_id
else:
self._current_id = 1
for publishing_id in range(self._current_id, 257):
if self._available_ids[publishing_id] is True:
self._available_ids[publishing_id] = False
self._current_id = publishing_id
return publishing_id
return self._current_id

async def free_available_id(self, publishing_id):
self._available_ids[publishing_id] = True
for publishing_subscribing_id in range(0, self._max_clients_by_connections):
if self._available_client_ids[publishing_subscribing_id] is True:
self._available_client_ids[publishing_subscribing_id] = False
self._current_id = publishing_subscribing_id
return publishing_subscribing_id
return self._current_id

async def get_count_available_ids(self):
count = 0
for slot in self._available_client_ids:
if slot is True:
count = count + 1
return count

async def free_available_id(self, publishing_subscribing_id):
self._available_client_ids[publishing_subscribing_id] = True

async def send_publish_frame(self, frame: schema.Publish, version: int = 1) -> None:
logger.debug("Sending frame: %s", frame)
Expand Down Expand Up @@ -712,7 +715,7 @@ def __init__(

self._frame_max = frame_max
self._heartbeat = heartbeat
self._clients: dict[Addr, Client] = {}
self._clients: dict[Addr, list[Client]] = defaultdict(list)

async def get(
self,
Expand All @@ -721,6 +724,7 @@ async def get(
connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None,
stream: Optional[str] = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
max_clients_by_connections: int = 256,
) -> Client:
"""Get a client according to `addr` parameter
Expand All @@ -732,33 +736,48 @@ async def get(
"""
desired_addr = addr or self.addr

if desired_addr not in self._clients or self._clients[desired_addr].is_connection_alive() is False:
if addr and self.load_balancer_mode:
self._clients[desired_addr] = await self._resolve_broker(
# check if at least one client of desired_addr is connected
if desired_addr in self._clients:
for client in self._clients[desired_addr]:
if client.is_connection_alive() is True and await client.get_count_available_ids() > 0:
if stream is not None:
client.add_stream(stream)
return client

if addr and self.load_balancer_mode:
self._clients[desired_addr].append(
await self._resolve_broker(
addr=desired_addr,
connection_closed_handler=connection_closed_handler,
connection_name=connection_name,
sasl_configuration_mechanism=sasl_configuration_mechanism,
max_clients_by_connections=max_clients_by_connections,
)
else:
self._clients[desired_addr] = await self.new(
)
else:
self._clients[desired_addr].append(
await self.new(
addr=desired_addr,
connection_closed_handler=connection_closed_handler,
connection_name=connection_name,
sasl_configuration_mechanism=sasl_configuration_mechanism,
max_clients_by_connections=max_clients_by_connections,
)
)

if stream is not None:
self._clients[desired_addr].add_stream(stream)
assert self._clients[desired_addr].is_started
return self._clients[desired_addr]
self._clients[desired_addr][len(self._clients[desired_addr]) - 1].add_stream(stream)

assert self._clients[desired_addr][len(self._clients[desired_addr]) - 1].is_started
return self._clients[desired_addr][len(self._clients[desired_addr]) - 1]

async def _resolve_broker(
self,
connection_name: Optional[str],
addr: Addr,
connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
max_clients_by_connections: int = 256,
) -> Client:
desired_host, desired_port = addr.host, str(addr.port)

Expand All @@ -770,6 +789,7 @@ async def _resolve_broker(
connection_closed_handler=connection_closed_handler,
connection_name=connection_name,
sasl_configuration_mechanism=sasl_configuration_mechanism,
max_clients_by_connections=max_clients_by_connections,
)

assert client.server_properties is not None
Expand All @@ -795,6 +815,7 @@ async def new(
addr: Addr,
connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
max_clients_by_connections: int = 256,
) -> Client:
host, port = addr
client = Client(
Expand All @@ -806,6 +827,7 @@ async def new(
connection_name=connection_name,
connection_closed_handler=connection_closed_handler,
sasl_configuration_mechanism=sasl_configuration_mechanism,
max_clients_by_connections=max_clients_by_connections,
)
await client.start()
await client.authenticate(
Expand All @@ -816,7 +838,8 @@ async def new(
return client

async def close(self) -> None:
for client in list(self._clients.values()):
await client.close()
for addr in self._clients.values():
for client in addr:
await client.close()

self._clients.clear()
5 changes: 5 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(
heartbeat: int = 60,
load_balancer_mode: bool = False,
max_retries: int = 20,
max_subscribers_by_connection: int = 256,
on_close_handler: Optional[CB_CONN[OnClosedErrorInfo]] = None,
connection_name: str = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
Expand Down Expand Up @@ -108,6 +109,7 @@ def __init__(
self._sasl_configuration_mechanism = sasl_configuration_mechanism
if self._connection_name is None:
self._connection_name = "rstream-consumer"
self._max_subscribers_by_connection = max_subscribers_by_connection

@property
async def default_client(self) -> Client:
Expand All @@ -127,6 +129,7 @@ async def start(self) -> None:
connection_closed_handler=self._on_close_handler,
connection_name=self._connection_name,
sasl_configuration_mechanism=self._sasl_configuration_mechanism,
max_clients_by_connections=self._max_subscribers_by_connection,
)

def stop(self) -> None:
Expand Down Expand Up @@ -159,6 +162,7 @@ async def _get_or_create_client(self, stream: str) -> Client:
self._default_client = await self._pool.get(
connection_closed_handler=self._on_close_handler,
connection_name=self._connection_name,
max_clients_by_connections=self._max_subscribers_by_connection,
)

leader, replicas = await (await self.default_client).query_leader_and_replicas(stream)
Expand All @@ -169,6 +173,7 @@ async def _get_or_create_client(self, stream: str) -> Client:
connection_closed_handler=self._on_close_handler,
connection_name=self._connection_name,
stream=stream,
max_clients_by_connections=self._max_subscribers_by_connection,
)

await self._close_locator_connection()
Expand Down
6 changes: 5 additions & 1 deletion rstream/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def __init__(
heartbeat: int = 60,
load_balancer_mode: bool = False,
max_retries: int = 20,
max_publishers_by_connection=256,
default_batch_publishing_delay: float = 3,
default_context_switch_value: int = 1000,
connection_name: str = None,
# on_close_handler: Optional[CB[OnClosedErrorInfo]] = None,
sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain,
filter_value_extractor: Optional[CB_F[Any]] = None,
):
Expand Down Expand Up @@ -123,6 +123,7 @@ def __init__(
self._connection_name = connection_name
self._filter_value_extractor: Optional[CB_F[Any]] = filter_value_extractor
self.publisher_id = 0
self._max_publishers_by_connection = max_publishers_by_connection

if self._connection_name is None:
self._connection_name = "rstream-producer"
Expand All @@ -146,6 +147,7 @@ async def start(self) -> None:
connection_closed_handler=self._on_connection_closed,
connection_name=self._connection_name,
sasl_configuration_mechanism=self._sasl_configuration_mechanism,
max_clients_by_connections=self._max_publishers_by_connection,
)

# Check if the filtering is supported by the server
Expand Down Expand Up @@ -201,6 +203,7 @@ async def _get_or_create_client(self, stream: str) -> Client:
self._default_client = await self._pool.get(
connection_closed_handler=self._on_connection_closed,
connection_name=self._connection_name,
max_clients_by_connections=self._max_publishers_by_connection,
)
leader, _ = await (await self.default_client).query_leader_and_replicas(stream)

Expand All @@ -211,6 +214,7 @@ async def _get_or_create_client(self, stream: str) -> Client:
connection_closed_handler=self._on_connection_closed,
stream=stream,
sasl_configuration_mechanism=self._sasl_configuration_mechanism,
max_clients_by_connections=self._max_publishers_by_connection,
)

await self._close_locator_connection()
Expand Down
Loading

0 comments on commit da80481

Please sign in to comment.