diff --git a/.ci/ubuntu/rabbitmq.conf b/.ci/ubuntu/rabbitmq.conf index 89b64f4..5ffa593 100644 --- a/.ci/ubuntu/rabbitmq.conf +++ b/.ci/ubuntu/rabbitmq.conf @@ -11,7 +11,7 @@ listeners.tcp.default = 5672 listeners.ssl.default = 5671 reverse_dns_lookups = false -deprecated_features.permit.amqp_address_v1 = false +# deprecated_features.permit.amqp_address_v1 = false ssl_options.cacertfile = /etc/rabbitmq/certs/ca_certificate.pem ssl_options.certfile = /etc/rabbitmq/certs/server_localhost_certificate.pem diff --git a/rabbitmq_amqp_python_client/management.py b/rabbitmq_amqp_python_client/management.py index 6ff7487..7a64f2f 100644 --- a/rabbitmq_amqp_python_client/management.py +++ b/rabbitmq_amqp_python_client/management.py @@ -125,19 +125,20 @@ def request( def _request( self, - id: str, + msg_id: str, body: Any, path: str, method: str, expected_response_codes: list[int], ) -> Message: amq_message = Message( - id=id, + id=msg_id, body=body, inferred=False, reply_to="$me", address=path, subject=method, + durable=False, ) if self._sender is not None: @@ -170,9 +171,7 @@ def declare_exchange( ValidationCodeException: If exchange already exists or other validation fails """ logger.debug("declare_exchange operation called") - body: dict[str, Any] = {} - body["auto_delete"] = exchange_specification.is_auto_delete - body["durable"] = exchange_specification.is_durable + body: dict[str, Any] = {"durable": exchange_specification.is_durable} if isinstance(exchange_specification, ExchangeSpecification): body["type"] = exchange_specification.exchange_type.value elif isinstance(exchange_specification, ExchangeCustomSpecification): diff --git a/rabbitmq_amqp_python_client/qpid/proton/_message.py b/rabbitmq_amqp_python_client/qpid/proton/_message.py index 7968447..05c1aea 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_message.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_message.py @@ -110,7 +110,7 @@ class Message(object): """ Default AMQP message priority""" def __init__( - self, body: Union[bytes, None] = None, inferred=True, **kwargs + self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs ) -> None: # validate the types @@ -120,6 +120,7 @@ def __init__( self.properties = None self.body = body self.inferred = inferred + self.durable = durable for k, v in kwargs.items(): getattr(self, k) # Raise exception if it's not a valid attribute. diff --git a/tests/test_publisher.py b/tests/test_publisher.py index 0847717..02e337f 100644 --- a/tests/test_publisher.py +++ b/tests/test_publisher.py @@ -45,7 +45,6 @@ def test_validate_message_for_publishing(connection: Connection) -> None: def test_publish_queue(connection: Connection) -> None: - queue_name = "test-queue" management = connection.management() @@ -77,7 +76,6 @@ def test_publish_queue(connection: Connection) -> None: def test_publish_per_message(connection: Connection) -> None: - queue_name = "test-queue-1" queue_name_2 = "test-queue-2" management = connection.management() @@ -123,7 +121,6 @@ def test_publish_per_message(connection: Connection) -> None: def test_publish_ssl(connection_ssl: Connection) -> None: - queue_name = "test-queue" management = connection_ssl.management() @@ -148,7 +145,6 @@ def test_publish_ssl(connection_ssl: Connection) -> None: def test_publish_to_invalid_destination(connection: Connection) -> None: - queue_name = "test-queue" raised = False @@ -169,7 +165,6 @@ def test_publish_to_invalid_destination(connection: Connection) -> None: def test_publish_per_message_to_invalid_destination(connection: Connection) -> None: - queue_name = "test-queue-1" raised = False @@ -193,7 +188,6 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N def test_publish_per_message_both_address(connection: Connection) -> None: - queue_name = "test-queue-1" raised = False @@ -223,7 +217,6 @@ def test_publish_per_message_both_address(connection: Connection) -> None: def test_publish_exchange(connection: Connection) -> None: - exchange_name = "test-exchange" queue_name = "test-queue" management = connection.management() @@ -342,7 +335,6 @@ def test_disconnection_reconnection() -> None: def test_queue_info_for_stream_with_validations(connection: Connection) -> None: - stream_name = "test_stream_info_with_validation" messages_to_send = 200 @@ -361,7 +353,6 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None: def test_publish_per_message_exchange(connection: Connection) -> None: - exchange_name = "test-exchange-per-message" queue_name = "test-queue-per-message" management = connection.management() @@ -407,7 +398,6 @@ def test_publish_per_message_exchange(connection: Connection) -> None: def test_multiple_publishers(environment: Environment) -> None: - stream_name = "test_multiple_publisher_1" stream_name_2 = "test_multiple_publisher_2" connection = environment.connection() @@ -456,3 +446,40 @@ def test_multiple_publishers(environment: Environment) -> None: management.delete_queue(stream_name_2) management.close() + + +def test_durable_message(connection: Connection) -> None: + queue_name = "test_durable_message" + + management = connection.management() + management.declare_queue(QuorumQueueSpecification(name=queue_name)) + destination = AddressHelper.queue_address(queue_name) + publisher = connection.publisher(destination) + # message should be durable by default + status = publisher.publish( + Message( + body=Converter.string_to_bytes("durable"), + ) + ) + + assert status.remote_state == OutcomeState.ACCEPTED + # message should be not durable by setting the durable to False by the user + status = publisher.publish( + Message( + body=Converter.string_to_bytes("not durable"), + durable=False, + ) + ) + + assert status.remote_state == OutcomeState.ACCEPTED + + consumer = connection.consumer(destination) + should_be_durable = consumer.consume() + assert should_be_durable.durable is True + + should_be_not_durable = consumer.consume() + assert should_be_not_durable.durable is False + message_count = management.purge_queue(queue_name) + assert message_count == 0 + management.delete_queue(queue_name) + consumer.close()