diff --git a/docs/examples/reliable_client/BestPracticesClient.py b/docs/examples/reliable_client/BestPracticesClient.py index 77a94ce..b82d7a4 100644 --- a/docs/examples/reliable_client/BestPracticesClient.py +++ b/docs/examples/reliable_client/BestPracticesClient.py @@ -60,6 +60,8 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: vhost = rabbitmq_data["Virtualhost"] load_balancer = bool(rabbitmq_data["LoadBalancer"]) stream_name = rabbitmq_data["StreamName"] + max_publishers_by_connection = rabbitmq_data["MaxPublishersByConnection"] + producers = rabbitmq_data["Producers"] if bool(rabbitmq_data["SuperStream"]) is False: producer = Producer( @@ -69,16 +71,18 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer: port=port, vhost=vhost, load_balancer_mode=load_balancer, + max_publishers_by_connection=max_publishers_by_connection, ) else: - super_stream_creation_opt = SuperStreamCreationOption(n_partitions=3) + super_stream_creation_opt = SuperStreamCreationOption(n_partitions=int(producers)) producer = SuperStreamProducer( # type: ignore host=host, username=username, password=password, port=port, vhost=vhost, + max_publishers_by_connection=max_publishers_by_connection, load_balancer_mode=load_balancer, super_stream=stream_name, super_stream_creation_option=super_stream_creation_opt, @@ -128,6 +132,8 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: vhost = rabbitmq_data["Virtualhost"] load_balancer = bool(rabbitmq_data["LoadBalancer"]) stream_name = rabbitmq_data["StreamName"] + n_producers = rabbitmq_data["Producers"] + max_subscribers_by_connection = rabbitmq_data["MaxSubscribersByConnection"] if bool(rabbitmq_data["SuperStream"]) is False: consumer = Consumer( @@ -138,10 +144,11 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: vhost=vhost, load_balancer_mode=load_balancer, on_close_handler=on_close_connection, + max_subscribers_by_connection=max_subscribers_by_connection, ) else: - super_stream_creation_opt = SuperStreamCreationOption(n_partitions=3) + super_stream_creation_opt = SuperStreamCreationOption(n_partitions=int(n_producers)) consumer = SuperStreamConsumer( # type: ignore host=host, username=username, @@ -152,6 +159,7 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer: super_stream=stream_name, super_stream_creation_option=super_stream_creation_opt, on_close_handler=on_close_connection, + max_subscribers_by_connection=max_subscribers_by_connection, ) return consumer diff --git a/docs/examples/reliable_client/appsettings.json b/docs/examples/reliable_client/appsettings.json index 49f44a9..fdf3386 100644 --- a/docs/examples/reliable_client/appsettings.json +++ b/docs/examples/reliable_client/appsettings.json @@ -1,17 +1,19 @@ { "RabbitMQ": { - "Host": "localhost", - "Username": "guest", - "Password": "guest", + "Host": "34.147.210.193", + "Username": "default_user_4RVgi_YtwKGw7xlGPIi", + "Password": "3urSCJfSnaZal6d_VsCSJRrUwar_iTOA", "Port": 5552, "Virtualhost": "/", "LoadBalancer": true, - "SuperStream": false, + "SuperStream": true, + "MaxPublishersByConnection": 256, + "MaxSubscribersByConnection": 256, "Producers": 3, "Consumers": 3, "DelayDuringSendMs":0, - "MessagesToSend": 100000, - "StreamName": "PythonClientTest", + "MessagesToSend": 10000000, + "StreamName": "invoices", "Logging": "" } } diff --git a/pyproject.toml b/pyproject.toml index b988a92..45b41c0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "rstream" -version = "0.18.0" +version = "0.19.0" description = "A python client for RabbitMQ Streams" authors = ["George Fortunatov ", "Daniele Palaia "] readme = "README.md" diff --git a/rstream/client.py b/rstream/client.py index c5f221f..976507a 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -69,6 +69,7 @@ def __init__( frame_max: int, heartbeat: int, connection_name: Optional[str] = "", + max_clients_by_connections=256, connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, ): @@ -103,10 +104,12 @@ def __init__( self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) self._is_not_closed: bool = True + self._max_clients_by_connections = max_clients_by_connections self._streams: list[str] = [] - self._available_ids: list[bool] = [True for i in range(257)] - self._current_id = 1 + # used to assing publish_ids and subscribe_ids + self._available_client_ids: list[bool] = [True for i in range(max_clients_by_connections)] + self._current_id = 0 def start_task(self, name: str, coro: Awaitable[None]) -> None: assert name not in self._tasks @@ -161,22 +164,22 @@ async def remove_stream(self, stream: str): self._streams.remove(stream) async def get_available_id(self) -> int: - if self._current_id <= 256: - publishing_id = self._current_id - self._available_ids[publishing_id] = False - self._current_id = self._current_id + 1 - return publishing_id - else: - self._current_id = 1 - for publishing_id in range(self._current_id, 257): - if self._available_ids[publishing_id] is True: - self._available_ids[publishing_id] = False - self._current_id = publishing_id - return publishing_id - return self._current_id - - async def free_available_id(self, publishing_id): - self._available_ids[publishing_id] = True + for publishing_subscribing_id in range(0, self._max_clients_by_connections): + if self._available_client_ids[publishing_subscribing_id] is True: + self._available_client_ids[publishing_subscribing_id] = False + self._current_id = publishing_subscribing_id + return publishing_subscribing_id + return self._current_id + + async def get_count_available_ids(self): + count = 0 + for slot in self._available_client_ids: + if slot is True: + count = count + 1 + return count + + async def free_available_id(self, publishing_subscribing_id): + self._available_client_ids[publishing_subscribing_id] = True async def send_publish_frame(self, frame: schema.Publish, version: int = 1) -> None: logger.debug("Sending frame: %s", frame) @@ -712,7 +715,7 @@ def __init__( self._frame_max = frame_max self._heartbeat = heartbeat - self._clients: dict[Addr, Client] = {} + self._clients: dict[Addr, list[Client]] = defaultdict(list) async def get( self, @@ -721,6 +724,7 @@ async def get( connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None, stream: Optional[str] = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, + max_clients_by_connections: int = 256, ) -> Client: """Get a client according to `addr` parameter @@ -732,26 +736,40 @@ async def get( """ desired_addr = addr or self.addr - if desired_addr not in self._clients or self._clients[desired_addr].is_connection_alive() is False: - if addr and self.load_balancer_mode: - self._clients[desired_addr] = await self._resolve_broker( + # check if at least one client of desired_addr is connected + if desired_addr in self._clients: + for client in self._clients[desired_addr]: + if client.is_connection_alive() is True and await client.get_count_available_ids() > 0: + if stream is not None: + client.add_stream(stream) + return client + + if addr and self.load_balancer_mode: + self._clients[desired_addr].append( + await self._resolve_broker( addr=desired_addr, connection_closed_handler=connection_closed_handler, connection_name=connection_name, sasl_configuration_mechanism=sasl_configuration_mechanism, + max_clients_by_connections=max_clients_by_connections, ) - else: - self._clients[desired_addr] = await self.new( + ) + else: + self._clients[desired_addr].append( + await self.new( addr=desired_addr, connection_closed_handler=connection_closed_handler, connection_name=connection_name, sasl_configuration_mechanism=sasl_configuration_mechanism, + max_clients_by_connections=max_clients_by_connections, ) + ) if stream is not None: - self._clients[desired_addr].add_stream(stream) - assert self._clients[desired_addr].is_started - return self._clients[desired_addr] + self._clients[desired_addr][len(self._clients[desired_addr]) - 1].add_stream(stream) + + assert self._clients[desired_addr][len(self._clients[desired_addr]) - 1].is_started + return self._clients[desired_addr][len(self._clients[desired_addr]) - 1] async def _resolve_broker( self, @@ -759,6 +777,7 @@ async def _resolve_broker( addr: Addr, connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, + max_clients_by_connections: int = 256, ) -> Client: desired_host, desired_port = addr.host, str(addr.port) @@ -770,6 +789,7 @@ async def _resolve_broker( connection_closed_handler=connection_closed_handler, connection_name=connection_name, sasl_configuration_mechanism=sasl_configuration_mechanism, + max_clients_by_connections=max_clients_by_connections, ) assert client.server_properties is not None @@ -795,6 +815,7 @@ async def new( addr: Addr, connection_closed_handler: Optional[CB[OnClosedErrorInfo]] = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, + max_clients_by_connections: int = 256, ) -> Client: host, port = addr client = Client( @@ -806,6 +827,7 @@ async def new( connection_name=connection_name, connection_closed_handler=connection_closed_handler, sasl_configuration_mechanism=sasl_configuration_mechanism, + max_clients_by_connections=max_clients_by_connections, ) await client.start() await client.authenticate( @@ -816,7 +838,8 @@ async def new( return client async def close(self) -> None: - for client in list(self._clients.values()): - await client.close() + for addr in self._clients.values(): + for client in addr: + await client.close() self._clients.clear() diff --git a/rstream/consumer.py b/rstream/consumer.py index 1f248a6..3786601 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -80,6 +80,7 @@ def __init__( heartbeat: int = 60, load_balancer_mode: bool = False, max_retries: int = 20, + max_subscribers_by_connection: int = 256, on_close_handler: Optional[CB_CONN[OnClosedErrorInfo]] = None, connection_name: str = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, @@ -108,6 +109,7 @@ def __init__( self._sasl_configuration_mechanism = sasl_configuration_mechanism if self._connection_name is None: self._connection_name = "rstream-consumer" + self._max_subscribers_by_connection = max_subscribers_by_connection @property async def default_client(self) -> Client: @@ -127,6 +129,7 @@ async def start(self) -> None: connection_closed_handler=self._on_close_handler, connection_name=self._connection_name, sasl_configuration_mechanism=self._sasl_configuration_mechanism, + max_clients_by_connections=self._max_subscribers_by_connection, ) def stop(self) -> None: @@ -159,6 +162,7 @@ async def _get_or_create_client(self, stream: str) -> Client: self._default_client = await self._pool.get( connection_closed_handler=self._on_close_handler, connection_name=self._connection_name, + max_clients_by_connections=self._max_subscribers_by_connection, ) leader, replicas = await (await self.default_client).query_leader_and_replicas(stream) @@ -169,6 +173,7 @@ async def _get_or_create_client(self, stream: str) -> Client: connection_closed_handler=self._on_close_handler, connection_name=self._connection_name, stream=stream, + max_clients_by_connections=self._max_subscribers_by_connection, ) await self._close_locator_connection() diff --git a/rstream/producer.py b/rstream/producer.py index 0b08ff6..47985a9 100644 --- a/rstream/producer.py +++ b/rstream/producer.py @@ -82,10 +82,10 @@ def __init__( heartbeat: int = 60, load_balancer_mode: bool = False, max_retries: int = 20, + max_publishers_by_connection=256, default_batch_publishing_delay: float = 3, default_context_switch_value: int = 1000, connection_name: str = None, - # on_close_handler: Optional[CB[OnClosedErrorInfo]] = None, sasl_configuration_mechanism: SlasMechanism = SlasMechanism.MechanismPlain, filter_value_extractor: Optional[CB_F[Any]] = None, ): @@ -123,6 +123,7 @@ def __init__( self._connection_name = connection_name self._filter_value_extractor: Optional[CB_F[Any]] = filter_value_extractor self.publisher_id = 0 + self._max_publishers_by_connection = max_publishers_by_connection if self._connection_name is None: self._connection_name = "rstream-producer" @@ -146,6 +147,7 @@ async def start(self) -> None: connection_closed_handler=self._on_connection_closed, connection_name=self._connection_name, sasl_configuration_mechanism=self._sasl_configuration_mechanism, + max_clients_by_connections=self._max_publishers_by_connection, ) # Check if the filtering is supported by the server @@ -201,6 +203,7 @@ async def _get_or_create_client(self, stream: str) -> Client: self._default_client = await self._pool.get( connection_closed_handler=self._on_connection_closed, connection_name=self._connection_name, + max_clients_by_connections=self._max_publishers_by_connection, ) leader, _ = await (await self.default_client).query_leader_and_replicas(stream) @@ -211,6 +214,7 @@ async def _get_or_create_client(self, stream: str) -> Client: connection_closed_handler=self._on_connection_closed, stream=stream, sasl_configuration_mechanism=self._sasl_configuration_mechanism, + max_clients_by_connections=self._max_publishers_by_connection, ) await self._close_locator_connection() diff --git a/rstream/superstream_consumer.py b/rstream/superstream_consumer.py index 3b45be0..34466e1 100644 --- a/rstream/superstream_consumer.py +++ b/rstream/superstream_consumer.py @@ -52,6 +52,7 @@ def __init__( heartbeat: int = 60, load_balancer_mode: bool = False, max_retries: int = 20, + max_subscribers_by_connection: int = 256, super_stream: str, super_stream_creation_option: Optional[SuperStreamCreationOption] = None, connection_name: str = None, @@ -83,7 +84,7 @@ def __init__( self.heartbeat = heartbeat self.load_balancer_mode = load_balancer_mode self.max_retries = max_retries - self._consumers: dict[str, Consumer] = {} + self._consumer: Optional[Consumer] = None self._stop_event = asyncio.Event() self._subscribers: dict[str, str] = defaultdict(str) self._on_close_handler = on_close_handler @@ -94,11 +95,15 @@ def __init__( self.super_stream_creation_option = super_stream_creation_option # is containing partitions name for every stream in case of CREATE/DELETE superstream self._partitions: list = [] + self._max_subscribers_by_connection = max_subscribers_by_connection @property async def default_client(self) -> Client: if self._default_client is None: - self._default_client = await self._pool.get(connection_name="rstream-locator") + self._default_client = await self._pool.get( + connection_name="rstream-locator", + max_clients_by_connections=self._max_subscribers_by_connection, + ) return self._default_client async def __aenter__(self) -> SuperStreamConsumer: @@ -123,9 +128,8 @@ def stop(self) -> None: self._stop_event.set() async def close(self) -> None: - for partition in self._consumers: - consumer = self._consumers[partition] - await consumer.close() + if self._consumer is not None: + await self._consumer.close() self.stop() await self._pool.close() @@ -144,13 +148,11 @@ async def _get_or_create_client(self, stream: str) -> Client: connection_closed_handler=self._on_close_handler, connection_name=self._connection_name, stream=stream, + max_clients_by_connections=self._max_subscribers_by_connection, ) return self._clients[stream] - async def get_consumer(self, partition: str): - return self._consumers[partition] - async def subscribe( self, callback: Callable[[AMQPMessage, MessageContext], Union[None, Awaitable[None]]], @@ -176,12 +178,10 @@ async def subscribe( self._super_stream_metadata = DefaultSuperstreamMetadata(self.super_stream, self._default_client) partitions = await self._super_stream_metadata.partitions() + if self._consumer is None: + self._consumer = await self._create_consumer() for partition in partitions: - if partition not in self._consumers.keys(): - consumer = await self._create_consumer() - self._consumers[partition] = consumer - - consumer_partition: Optional[Consumer] = self._consumers.get(partition) + consumer_partition: Optional[Consumer] = self._consumer if consumer_partition is None: return @@ -214,6 +214,7 @@ async def _create_consumer(self) -> Consumer: max_retries=self.max_retries, on_close_handler=self._on_close_handler, connection_name=self._connection_name, + max_subscribers_by_connection=self._max_subscribers_by_connection, ) await consumer.start() @@ -224,17 +225,18 @@ async def unsubscribe(self) -> None: logger.debug("unsubscribe(): unsubscribe superstream consumer unsubscribe all consumers") partitions = await self._super_stream_metadata.partitions() for partition in partitions: - if self._consumers[partition] is None: - self._consumers[partition] = await self._create_consumer() - - consumer = self._consumers[partition] - await consumer.unsubscribe(self._subscribers[partition]) + consumer = self._consumer + if consumer is not None: + await consumer.unsubscribe(self._subscribers[partition]) async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> None: - await self._consumers[stream].reconnect_stream(stream, offset) + if self._consumer is not None: + await self._consumer.reconnect_stream(stream, offset) async def stream_exists(self, stream: str) -> bool: - stream_exist = await self._consumers[stream].stream_exists(stream) + stream_exist = False + if self._consumer is not None: + stream_exist = await self._consumer.stream_exists(stream) return stream_exist async def create_super_stream( @@ -277,10 +279,10 @@ async def delete_super_stream(self, super_stream: str, missing_ok: bool = False) # clean up the subscribers if super_stream == self.super_stream: for partition in self._partitions: - if partition in self._consumers: - consumer = self._consumers[partition] + consumer = self._consumer + if consumer is not None: await consumer.clean_up_subscribers(partition) - self._partitions = [] + self._partitions = [] try: await (await self.default_client).delete_super_stream(super_stream) diff --git a/rstream/superstream_producer.py b/rstream/superstream_producer.py index b08ef6f..0595ae2 100644 --- a/rstream/superstream_producer.py +++ b/rstream/superstream_producer.py @@ -57,6 +57,7 @@ def __init__( heartbeat: int = 60, load_balancer_mode: bool = False, max_retries: int = 20, + max_publishers_by_connection=256, default_batch_publishing_delay: float = 0.2, connection_name: str = None, filter_value_extractor: Optional[CB_F[Any]] = None, @@ -97,6 +98,7 @@ def __init__( self.super_stream_creation_option = super_stream_creation_option # is containing partitions name for every stream in case of CREATE/DELETE superstream (to clean up publishers) self._partitions: list = [] + self._max_publishers_by_connection = max_publishers_by_connection async def _get_producer(self) -> Producer: logger.debug("_get_producer() Making or getting a producer") @@ -112,9 +114,9 @@ async def _get_producer(self) -> Producer: heartbeat=self.heartbeat, load_balancer_mode=self.load_balancer_mode, default_batch_publishing_delay=self.default_batch_publishing_delay, - # on_close_handler=self._on_close_handler, connection_name=self._connection_name, filter_value_extractor=self._filter_value_extractor, + max_publishers_by_connection=self._max_publishers_by_connection, ) await producer.start() self._producer = producer @@ -154,7 +156,9 @@ async def start(self) -> None: self.super_stream_creation_option.arguments, True, ) - self._default_client = await self._pool.get(connection_name="rstream-locator") + self._default_client = await self._pool.get( + connection_name="rstream-locator", max_clients_by_connections=self._max_publishers_by_connection + ) self.super_stream_metadata = DefaultSuperstreamMetadata(self.super_stream, self._default_client) if self.routing == RouteType.Hash: self._routing_strategy = HashRoutingMurmurStrategy(self.routing_extractor)