Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support azure-sdk-for-python ServicebusClient #70

Open
jvanegmond opened this issue Nov 20, 2024 · 2 comments
Open

Support azure-sdk-for-python ServicebusClient #70

jvanegmond opened this issue Nov 20, 2024 · 2 comments

Comments

@jvanegmond
Copy link

jvanegmond commented Nov 20, 2024

azure-sdk-for-python uses its own _pyamqp implementation that is specifically targeting the behavior of Azure Servicebus instead of a generic implementation of AMQP 1.0. LocalSandbox does not closely enough emulate the real Azure ServiceBus on a protocol frame level, missing multiple optional fields that real ServiceBus does send, leading to azure-sdk-for-python not working with LocalSandbox.

The following AMQP frames returned by LocalSandbox lack fields that the real Azure ServiceBus does send: open, begin, attach (and possibly more). Notably the open frame has 10 fields as returned by Azure Servicebus whereas LocalSandbox sends the open frame with 1 field. Even though the 10 fields sent are mostly empty, this leads to an error in azure-sdk-for-python in file sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_connection_async.py method _incoming_open where those Amqp frame fields are accessed via array subscription without checking the length of the fields array. This leads to the Amqp session not being able to be established.

To reproduce:
Run the following Python script:

import asyncio
import logging
import sys

# Disable TLS. Workaround for https://github.com/Azure/azure-sdk-for-python/issues/34273
from azure.servicebus._pyamqp import AMQPClient
org_init = AMQPClient.__init__
def new_init(self, hostname, **kwargs):
    kwargs["use_tls"] = False
    org_init(self, hostname, **kwargs)
AMQPClient.__init__ = new_init

# Set up logging
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

from azure.servicebus import ServiceBusMessage, TransportType
from azure.servicebus.aio import ServiceBusClient

queue_name = "default"

async def main():

    client = ServiceBusClient.from_connection_string(
        conn_str="Endpoint=sb://default.default.default.localhost.localsandbox.sh;SharedAccessKeyName=1234;SharedAccessKey=password;UseDevelopmentEmulator=true",
        transport_type=TransportType.Amqp,
        retry_total=0,
    )
    async with client:
        
        async with client.get_queue_sender(queue_name) as sender:
            # Sending a single message
            single_message = ServiceBusMessage("Hello, world!")
            await sender.send_messages(single_message)

        # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
        # Default is None; to receive forever.
        async with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
            async for msg in receiver:  # ServiceBusReceiver instance is a generator.
                print(str(msg))

print()

asyncio.run(main())

This prints

Handler failed: list index out of range.
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\aio\_base_handler_async.py", line 235, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\aio\_servicebus_sender_async.py", line 214, in _open
    while not await self._handler.client_ready_async():
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_client_async.py", line 318, in client_ready_async
    if not await self.auth_complete_async():
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_client_async.py", line 305, in auth_complete_async
    await self._connection.listen(wait=self._socket_timeout)
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 744, in listen
    if await self._read_frame(wait=wait, **kwargs):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 269, in _read_frame
    return await self._process_incoming_frame(*new_frame)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 606, in _process_incoming_frame
    await self._incoming_open(channel, fields)
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 419, in _incoming_open
    if frame[4]:
       ~~~~~^^^

An excerpt of the offending piece of code:

    def _incoming_open(self, channel: int, frame) -> None:
        """Process incoming Open frame to finish the connection negotiation.

        The incoming frame format is::

            - frame[0]: container_id (str)
            - frame[1]: hostname (str)
            - frame[2]: max_frame_size (int)
            - frame[3]: channel_max (int)
            - frame[4]: idle_timeout (Optional[int])
            - frame[5]: outgoing_locales (Optional[List[bytes]])
            - frame[6]: incoming_locales (Optional[List[bytes]])
            - frame[7]: offered_capabilities (Optional[List[bytes]])
            - frame[8]: desired_capabilities (Optional[List[bytes]])
            - frame[9]: properties (Optional[Dict[bytes, bytes]])

        :param int channel: The incoming channel number.
        :param frame: The incoming Open frame.
        :type frame: Tuple[Any, ...]
        :rtype: None
        """
        # .... error checking cut in example ....
        if frame[4]:
            self._remote_idle_timeout = cast(float, frame[4] / 1000)  # Convert to seconds
            self._remote_idle_timeout_send_frame = self._idle_timeout_empty_frame_send_ratio * self._remote_idle_timeout

        if frame[2] < 512:
            # Max frame size is less than supported minimum.
            # If any of the values in the received open frame are invalid then the connection shall be closed.
            # The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.
            self.close(
                error=AMQPError(
                    condition=ErrorCondition.InvalidField,
                    description="Failed parsing OPEN frame: Max frame size is less than supported minimum.",
                )
            )
            _LOGGER.error(
                "Failed parsing OPEN frame: Max frame size is less than supported minimum.",
                extra=self._network_trace_params,
            )
            return
        self._remote_max_frame_size = frame[2]
        self._remote_properties = frame[9]

Additional context:
Considering that Microsoft is the maintainer of azure-sdk-for-python and specifically targets Azure Servicebus, it is not reasonable to change this on the azure-sdk-for-python ServiceBusClient side. LocalSandbox should aim to emulate the real Azure ServiceBus as much as possible in order to support azure-sdk-for-python.

Copy link

linear bot commented Nov 20, 2024

@jvanegmond
Copy link
Author

jvanegmond commented Nov 27, 2024

So aside from the TLS issue on the azure-sdk-for-python side, which is most likely fixed by a side-effect of the changes made for Azure/azure-sdk-for-python#34273 , after investigation, I've found that at least one of rhea or azure-sdk-for-python needs further changes. The attach frame can be changed in rhea by setting containeroptions, but the begin frame is basically hardcoded in rhea and cannot be changed. So this means that either azure-sdk-for-python needs to support the optional fields better or rhea needs to be modified to allow for changing the begin frame. As a result, this is looking a lot harder than it seems at first glance because now it needs synchronous changes across multiple repos with different maintainers. A possible workaround for that problem is to fork rhea and change it for the emulator use case specifically.

Fortunately, while I was working on this, an official Microsoft emulator also came out: https://learn.microsoft.com/en-us/azure/service-bus-messaging/overview-emulator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant