From 72fc34857cfb0a62bb8dcb9fdf5cd08968646d57 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 20 Sep 2018 09:51:48 -0700 Subject: [PATCH 01/10] Fix for repeat startup --- azure/eventhub/async_ops/__init__.py | 3 ++- azure/eventhub/async_ops/receiver_async.py | 3 +++ azure/eventhub/async_ops/sender_async.py | 3 +++ azure/eventhub/client.py | 3 ++- azure/eventhub/receiver.py | 3 +++ azure/eventhub/sender.py | 3 +++ 6 files changed, 16 insertions(+), 2 deletions(-) diff --git a/azure/eventhub/async_ops/__init__.py b/azure/eventhub/async_ops/__init__.py index 7774724..7c62b3d 100644 --- a/azure/eventhub/async_ops/__init__.py +++ b/azure/eventhub/async_ops/__init__.py @@ -75,7 +75,8 @@ async def _wait_for_client(self, client): async def _start_client_async(self, client): try: - await client.open_async() + if not client.running: + await client.open_async() except Exception as exp: # pylint: disable=broad-except log.info("Encountered error while starting handler: %r", exp) await client.close_async(exception=exp) diff --git a/azure/eventhub/async_ops/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py index ad04520..066915f 100644 --- a/azure/eventhub/async_ops/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -40,6 +40,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. """ self.loop = loop or asyncio.get_event_loop() + self.running = False self.client = client self.source = source self.offset = offset @@ -81,6 +82,7 @@ async def open_async(self): :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ # pylint: disable=protected-access + self.running = True if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -188,6 +190,7 @@ async def close_async(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return elif isinstance(exception, errors.LinkRedirect): diff --git a/azure/eventhub/async_ops/sender_async.py b/azure/eventhub/async_ops/sender_async.py index 098c026..be53b1d 100644 --- a/azure/eventhub/async_ops/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -47,6 +47,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() + self.running = False self.client = client self.target = target self.partition = partition @@ -82,6 +83,7 @@ async def open_async(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ + self.running = True if self.redirected: self.target = self.redirected.address self._handler = SendClientAsync( @@ -173,6 +175,7 @@ async def close_async(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return elif isinstance(exception, errors.LinkRedirect): diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 43c3b65..73abad5 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -233,7 +233,8 @@ def _close_clients(self): def _start_clients(self): for client in self.clients: try: - client.open() + if not client.running: + client.open() except Exception as exp: # pylint: disable=broad-except client.close(exception=exp) diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 0b7b8a9..ebbb2b1 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -35,6 +35,7 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a :param epoch: An optional epoch value. :type epoch: int """ + self.running = False self.client = client self.source = source self.offset = offset @@ -75,6 +76,7 @@ def open(self): :type: connection: ~uamqp.connection.Connection """ # pylint: disable=protected-access + self.running = True if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -185,6 +187,7 @@ def close(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return elif isinstance(exception, errors.LinkRedirect): diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index b7fef5e..3f23885 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -40,6 +40,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N Default value is `True`. :type auto_reconnect: bool """ + self.running = False self.client = client self.target = target self.partition = partition @@ -74,6 +75,7 @@ def open(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + self.running = True if self.redirected: self.target = self.redirected.address self._handler = SendClient( @@ -169,6 +171,7 @@ def close(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return elif isinstance(exception, errors.LinkRedirect): From 01b89986fe9b7fb08401b2fa3fb538822148b05e Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 20 Sep 2018 11:56:11 -0700 Subject: [PATCH 02/10] Added more storage connect options to EPH --- .../azure_storage_checkpoint_manager.py | 36 ++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py index 8ac3abe..a749bf2 100644 --- a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py +++ b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py @@ -28,14 +28,39 @@ class AzureStorageCheckpointLeaseManager(AbstractCheckpointManager, AbstractLeas Manages checkpoints and lease with azure storage blobs. In this implementation, checkpoints are data that's actually in the lease blob, so checkpoint operations turn into lease operations under the covers. + + :param str storage_account_name: The storage account name. This is used to + authenticate requests signed with an account key and to construct the storage + endpoint. It is required unless a connection string is given. + :param str storage_account_key: The storage account key. This is used for shared key + authentication. If neither account key or sas token is specified, anonymous access + will be used. + :param str lease_container_name: The name of the container that will be used to store + leases. If it does not already exist it will be created. Default value is 'eph-leases'. + :param int lease_renew_interval: The interval in seconds at which EPH will attempt to + renew the lease of a particular partition. Default value is 10. + :param int lease_duration: The duration in seconds of a lease on a partition. + Default value is 30. + :param str sas_token: A shared access signature token to use to authenticate requests + instead of the account key. If account key and sas token are both specified, + account key will be used to sign. If neither are specified, anonymous access will be used. + :param str endpoint_suffix: The host base component of the url, minus the account name. + Defaults to Azure (core.windows.net). Override this to use a National Cloud. + :param str connection_string: If specified, this will override all other endpoint parameters. + See http://azure.microsoft.com/en-us/documentation/articles/storage-configure-connection-string/ + for the connection string format. """ - def __init__(self, storage_account_name, storage_account_key, lease_container_name, - storage_blob_prefix=None, lease_renew_interval=10, lease_duration=30): + def __init__(self, storage_account_name=None, storage_account_key=None, lease_container_name="eph-leases", + storage_blob_prefix=None, lease_renew_interval=10, lease_duration=30, + sas_token=None, endpoint_suffix="core.windows.net", connection_string=None): AbstractCheckpointManager.__init__(self) AbstractLeaseManager.__init__(self, lease_renew_interval, lease_duration) self.storage_account_name = storage_account_name self.storage_account_key = storage_account_key + self.storage_sas_token = sas_token + self.endpoint_suffix = endpoint_suffix + self.connection_string = connection_string self.lease_container_name = lease_container_name self.storage_blob_prefix = storage_blob_prefix self.storage_client = None @@ -47,8 +72,8 @@ def __init__(self, storage_account_name, storage_account_key, lease_container_na self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=32) # Validate storage inputs - if not self.storage_account_name or not self.storage_account_key: - raise ValueError("Need a valid storage account name and key") + if not self.storage_account_name and not self.connection_string: + raise ValueError("Need a valid storage account name or connection string.") if not re.compile(r"^[a-z0-9](([a-z0-9\-[^\-])){1,61}[a-z0-9]$").match(self.lease_container_name): raise ValueError("Azure Storage lease container name is invalid.\ Please check naming conventions at\ @@ -68,6 +93,9 @@ def initialize(self, host): self.host = host self.storage_client = BlockBlobService(account_name=self.storage_account_name, account_key=self.storage_account_key, + sas_token=self.storage_sas_token, + endpoint_suffix=self.endpoint_suffix, + connection_string=self.connection_string, request_session=self.request_session) self.consumer_group_directory = self.storage_blob_prefix + self.host.eh_config.consumer_group From d33c244e5da2a22fad0b20cdc11a737dce9c260c Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 20 Sep 2018 11:56:38 -0700 Subject: [PATCH 03/10] Bumped version --- HISTORY.rst | 14 ++++++++++++++ azure/eventhub/__init__.py | 2 +- setup.py | 3 ++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 6e90e55..1d5a4d0 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,20 @@ Release History =============== +1.1.0 (2018-09-21) +++++++++++++++++++ + +- Changes to `AzureStorageCheckpointLeaseManager` parameters to support other connection options (issue #61): + + - The `storage_account_name`, `storage_account_key` and `lease_container_name` arguments are now optional keyword arguments. + - Added a `sas_token` argument that must be specified with `storage_account_name` in place of `storage_account_key`. + - Added an `endpoint_suffix` argument to support storage endpoints in National Clouds. + - Added a `connection_string` argument that, if specified, overrides all other endpoint arguments. + - The `lease_container_name` argument now defaults to `"eph-leases"` if not specified. + +- Fix for clients failing to start if run called multipled times (issue #64). + + 1.0.0 (2018-08-22) ++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index 3cde06c..e07a603 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "1.0.0" +__version__ = "1.1.0" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/setup.py b/setup.py index 891a80e..d95ac21 100644 --- a/setup.py +++ b/setup.py @@ -44,12 +44,13 @@ author_email='azpysdkhelp@microsoft.com', url='https://github.com/Azure/azure-event-hubs-python', classifiers=[ - 'Development Status :: 3 - Alpha', + 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', 'License :: OSI Approved :: MIT License', ], zip_safe=False, From 514f8264b3c41ee2ac7cb3dda5df5a39eaa6baff Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 11:32:57 -0700 Subject: [PATCH 04/10] Handler blocked until client started --- azure/eventhub/async_ops/receiver_async.py | 2 ++ azure/eventhub/async_ops/sender_async.py | 4 ++++ azure/eventhub/receiver.py | 2 ++ azure/eventhub/sender.py | 6 ++++++ tests/__init__.py | 2 +- tests/test_eph.py | 17 ----------------- tests/test_reconnect.py | 11 ++++++++++- 7 files changed, 25 insertions(+), 19 deletions(-) delete mode 100644 tests/test_eph.py diff --git a/azure/eventhub/async_ops/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py index 066915f..c69681e 100644 --- a/azure/eventhub/async_ops/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -219,6 +219,8 @@ async def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to receive until client has been started.") data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 diff --git a/azure/eventhub/async_ops/sender_async.py b/azure/eventhub/async_ops/sender_async.py index be53b1d..4579602 100644 --- a/azure/eventhub/async_ops/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -202,6 +202,8 @@ async def send(self, event_data): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") event_data.message.on_send_complete = self._on_outcome @@ -241,6 +243,8 @@ async def wait_async(self): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") try: await self._handler.wait_async() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index ebbb2b1..f7724a7 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -226,6 +226,8 @@ def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to receive until client has been started.") data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 3f23885..a73dfdd 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -198,6 +198,8 @@ def send(self, event_data): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") event_data.message.on_send_complete = self._on_outcome @@ -242,6 +244,8 @@ def transfer(self, event_data, callback=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") if callback: @@ -254,6 +258,8 @@ def wait(self): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") try: self._handler.wait() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: diff --git a/tests/__init__.py b/tests/__init__.py index 7ec7d3b..7b7c91a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,7 @@ def get_logger(filename, level=logging.INFO): azure_logger = logging.getLogger("azure") azure_logger.setLevel(level) uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.DEBUG) + uamqp_logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') console_handler = logging.StreamHandler(stream=sys.stdout) diff --git a/tests/test_eph.py b/tests/test_eph.py deleted file mode 100644 index c1d43e7..0000000 --- a/tests/test_eph.py +++ /dev/null @@ -1,17 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import asyncio -import pytest - - -def test_eph_start(eph): - """ - Test that the processing host starts correctly - """ - pytest.skip("Not working yet") - loop = asyncio.get_event_loop() - loop.run_until_complete(eph.open_async()) - loop.run_until_complete(eph.close_async()) diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py index a6aa0ce..bd10bbd 100644 --- a/tests/test_reconnect.py +++ b/tests/test_reconnect.py @@ -85,6 +85,15 @@ def test_send_with_forced_conn_close_sync(connection_str, receivers): assert list(received[0].body)[0] == b"A single event" +def pump(receiver): + messages = [] + batch = receiver.receive(timeout=1) + messages.extend(batch) + while batch: + batch = receiver.receive(timeout=1) + messages.extend(batch) + return messages + @pytest.mark.asyncio async def test_send_with_forced_conn_close_async(connection_str, receivers): #pytest.skip("long running") @@ -106,7 +115,7 @@ async def test_send_with_forced_conn_close_async(connection_str, receivers): received = [] for r in receivers: - received.extend(r.receive(timeout=1)) + received.extend(pump(r)) assert len(received) == 5 assert list(received[0].body)[0] == b"A single event" From 2fa535a121fb8e7df034e33d0552cd9849eb485e Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 12:58:04 -0700 Subject: [PATCH 05/10] Added event data methods --- .gitignore | 3 +++ HISTORY.rst | 1 + azure/eventhub/common.py | 42 +++++++++++++++++++++++++++-- examples/recv.py | 14 ++++++---- tests/test_negative.py | 58 +++++++++++++++++++++++++++++++++++++++- tests/test_receive.py | 10 +++++++ 6 files changed, 120 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 6b8bf2d..0785980 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,9 @@ pip-delete-this-directory.txt azure/storage/ azure/common/ azure/profiles/ +*.log.1 +*.log.2 +*.log.3 htmlcov/ .tox/ diff --git a/HISTORY.rst b/HISTORY.rst index 1d5a4d0..c77b3c7 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -15,6 +15,7 @@ Release History - The `lease_container_name` argument now defaults to `"eph-leases"` if not specified. - Fix for clients failing to start if run called multipled times (issue #64). +- Added convenience methods `body_as_str` and `body_as_json` to EventData object for easier processing of message data. 1.0.0 (2018-08-22) diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index af4db4e..1c7d176 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -5,6 +5,7 @@ import datetime import time +import json from uamqp import Message, BatchMessage from uamqp import types, constants, errors @@ -88,7 +89,6 @@ def __init__(self, body=None, batch=None, to_device=None, message=None): else: self.message = Message(body, properties=self.msg_properties) - @property def sequence_number(self): """ @@ -188,7 +188,45 @@ def body(self): :rtype: bytes or Generator[bytes] """ - return self.message.get_data() + try: + return self.message.get_data() + except TypeError: + raise ValueError("Message data empty.") + + def body_as_str(self, encoding='UTF-8'): + """ + The body of the event data as a string if the data is of a + compatible type. + + :param encoding: The encoding to use for decoding message data. + Default is 'UTF-8' + :rtype: str + """ + data = self.body + try: + return "".join(b.decode(encoding) for b in data) + except TypeError: + return str(data) + except: + pass + try: + return data.decode(encoding) + except Exception as e: + raise TypeError("Message data is not compatible with string type: {}".format(e)) + + def body_as_json(self, encoding='UTF-8'): + """ + The body of the event loaded as a JSON object is the data is compatible. + + :param encoding: The encoding to use for decoding message data. + Default is 'UTF-8' + :rtype: dict + """ + data_str = self.body_as_str() + try: + return json.loads(data_str) + except Exception as e: + raise TypeError("Event data is not compatible with JSON type: {}".format(e)) class Offset(object): diff --git a/examples/recv.py b/examples/recv.py index d2fbdf7..f43d03b 100644 --- a/examples/recv.py +++ b/examples/recv.py @@ -38,11 +38,15 @@ receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET) client.run() start_time = time.time() - for event_data in receiver.receive(timeout=100): - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset.value, last_sn)) - total += 1 + batch = receiver.receive(timeout=5000) + while batch: + for event_data in batch: + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset.value, last_sn)) + print(event_data.body_as_str()) + total += 1 + batch = receiver.receive(timeout=5000) end_time = time.time() client.stop() diff --git a/tests/test_negative.py b/tests/test_negative.py index dbc8096..bdfcbfd 100644 --- a/tests/test_negative.py +++ b/tests/test_negative.py @@ -7,6 +7,7 @@ import os import asyncio import pytest +import time from azure import eventhub from azure.eventhub import ( @@ -303,4 +304,59 @@ async def test_max_receivers_async(connection_str, senders): assert len(failed) == 1 print(failed[0].message) finally: - await client.stop_async() \ No newline at end of file + await client.stop_async() + + +def test_message_body_types(connection_str, senders): + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) + try: + client.run() + + received = receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Bytes Data")) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'Bytes Data'] + assert received[0].body_as_str() == "Bytes Data" + with pytest.raises(TypeError): + received[0].body_as_json() + + senders[0].send(EventData("Str Data")) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'Str Data'] + assert received[0].body_as_str() == "Str Data" + with pytest.raises(TypeError): + received[0].body_as_json() + + senders[0].send(EventData(b'{"test_value": "JSON bytes data", "key1": true, "key2": 42}')) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'{"test_value": "JSON bytes data", "key1": true, "key2": 42}'] + assert received[0].body_as_str() == '{"test_value": "JSON bytes data", "key1": true, "key2": 42}' + assert received[0].body_as_json() == {"test_value": "JSON bytes data", "key1": True, "key2": 42} + + senders[0].send(EventData('{"test_value": "JSON str data", "key1": true, "key2": 42}')) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'{"test_value": "JSON str data", "key1": true, "key2": 42}'] + assert received[0].body_as_str() == '{"test_value": "JSON str data", "key1": true, "key2": 42}' + assert received[0].body_as_json() == {"test_value": "JSON str data", "key1": True, "key2": 42} + + senders[0].send(EventData(42)) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert received[0].body_as_str() == "42" + with pytest.raises(ValueError): + received[0].body + except: + raise + finally: + client.stop() \ No newline at end of file diff --git a/tests/test_receive.py b/tests/test_receive.py index 1b7480e..1bdbe86 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -24,6 +24,7 @@ def test_receive_end_of_stream(connection_str, senders): received = receiver.receive(timeout=5) assert len(received) == 1 + assert received[0].body_as_str() == "Receiving only a single event" assert list(received[-1].body)[0] == b"Receiving only a single event" except: raise @@ -48,6 +49,9 @@ def test_receive_with_offset_sync(connection_str, senders): assert len(received) == 1 offset = received[0].offset + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=offset) client.run() received = offset_receiver.receive(timeout=5) @@ -75,6 +79,9 @@ def test_receive_with_inclusive_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) client.run() received = offset_receiver.receive(timeout=5) @@ -101,6 +108,9 @@ def test_receive_with_datetime(connection_str, senders): assert len(received) == 1 offset = received[0].enqueued_time + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) client.run() received = offset_receiver.receive(timeout=5) From 551d47db4a874d69165fbfc011a2dbfee912cfaa Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 13:06:12 -0700 Subject: [PATCH 06/10] Fix pylint --- .travis.yml | 10 +++++++++- azure/eventhub/common.py | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 17f7c2e..5b2b8ea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,15 @@ language: python cache: pip python: - "3.6" -# command to install dependencies +dist: xenial +matrix: + include: + - os: linux + python: "3.5" + - os: linux + python: "3.6" + - os: linux + python: "3.7" install: - pip install -r dev_requirements.txt - pip install -e . diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 1c7d176..e63b639 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -207,7 +207,7 @@ def body_as_str(self, encoding='UTF-8'): return "".join(b.decode(encoding) for b in data) except TypeError: return str(data) - except: + except: # pylint: disable=bare-except pass try: return data.decode(encoding) @@ -222,7 +222,7 @@ def body_as_json(self, encoding='UTF-8'): Default is 'UTF-8' :rtype: dict """ - data_str = self.body_as_str() + data_str = self.body_as_str(encoding=encoding) try: return json.loads(data_str) except Exception as e: From 0a4cc2eae0f01c33c7203036de7fa2ccd389b105 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 13:09:40 -0700 Subject: [PATCH 07/10] Fix 3.7 CI --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5b2b8ea..ac5bef4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,5 @@ language: python cache: pip -python: - - "3.6" dist: xenial matrix: include: From 9afbdbfbf561d458039ec45951b73c77d128a737 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 13:28:52 -0700 Subject: [PATCH 08/10] Fix 3.7 CI --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ac5bef4..6addbf5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: python cache: pip dist: xenial +sudo: required matrix: include: - os: linux From b3910455851e953f1f58a23faa7ed880eebc4ec8 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 13:46:34 -0700 Subject: [PATCH 09/10] Updated pylint version --- dev_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev_requirements.txt b/dev_requirements.txt index 3cbeb9a..31a0ba7 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -2,5 +2,5 @@ pytest>=3.4.1 pytest-asyncio>=0.8.0 docutils>=0.14 pygments>=2.2.0 -pylint==1.8.4 +pylint==2.1.1 behave==1.2.6 \ No newline at end of file From da94d065bba10412bf6afd4cb7fbaac9662ff279 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 21 Sep 2018 14:51:16 -0700 Subject: [PATCH 10/10] Pylint fixes --- azure/eventhub/async_ops/receiver_async.py | 27 +++++++++---------- azure/eventhub/async_ops/sender_async.py | 9 +++---- azure/eventhub/client.py | 2 -- azure/eventhub/common.py | 10 +++---- azure/eventhub/receiver.py | 23 +++++++--------- azure/eventhub/sender.py | 9 +++---- azure/eventprocessorhost/eh_partition_pump.py | 4 +-- azure/eventprocessorhost/partition_context.py | 2 +- azure/eventprocessorhost/partition_pump.py | 2 +- pylintrc | 2 +- 10 files changed, 40 insertions(+), 50 deletions(-) diff --git a/azure/eventhub/async_ops/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py index c69681e..814adff 100644 --- a/azure/eventhub/async_ops/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -173,12 +173,11 @@ async def has_started(self): timeout, auth_in_progress = await self._handler._auth.handle_token_async() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not await self._handler._client_ready_async(): + if not await self._handler._client_ready_async(): return False - else: - return True + return True async def close_async(self, exception=None): """ @@ -193,7 +192,7 @@ async def close_async(self, exception=None): self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -237,21 +236,19 @@ async def receive(self, max_batch_size=None, timeout=None): log.info("AsyncReceiver detached. Attempting reconnect.") await self.reconnect_async() return data_batch - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("AsyncReceiver detached. Attempting reconnect.") await self.reconnect_async() return data_batch - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) diff --git a/azure/eventhub/async_ops/sender_async.py b/azure/eventhub/async_ops/sender_async.py index 4579602..9f46fdd 100644 --- a/azure/eventhub/async_ops/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -158,12 +158,11 @@ async def has_started(self): timeout, auth_in_progress = await self._handler._auth.handle_token_async() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not await self._handler._client_ready_async(): + if not await self._handler._client_ready_async(): return False - else: - return True + return True async def close_async(self, exception=None): """ @@ -178,7 +177,7 @@ async def close_async(self, exception=None): self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 73abad5..06508df 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -330,8 +330,6 @@ def get_eventhub_info(self): output['partition_count'] = eh_info[b'partition_count'] output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] return output - except: - raise finally: mgmt_client.close() diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index e63b639..b4a1755 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -32,13 +32,13 @@ def _error_handler(error): """ if error.condition == b'com.microsoft:server-busy': return errors.ErrorAction(retry=True, backoff=4) - elif error.condition == b'com.microsoft:timeout': + if error.condition == b'com.microsoft:timeout': return errors.ErrorAction(retry=True, backoff=2) - elif error.condition == b'com.microsoft:operation-cancelled': + if error.condition == b'com.microsoft:operation-cancelled': return errors.ErrorAction(retry=True) - elif error.condition == b"com.microsoft:container-close": + if error.condition == b"com.microsoft:container-close": return errors.ErrorAction(retry=True, backoff=4) - elif error.condition in _NO_RETRY_ERRORS: + if error.condition in _NO_RETRY_ERRORS: return errors.ErrorAction(retry=False) return errors.ErrorAction(retry=True) @@ -269,7 +269,7 @@ def selector(self): if isinstance(self.value, datetime.datetime): timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000) return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8') - elif isinstance(self.value, int): + if isinstance(self.value, int): return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8') return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8') diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index f7724a7..6822149 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -170,12 +170,11 @@ def has_started(self): timeout, auth_in_progress = self._handler._auth.handle_token() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not self._handler._client_ready(): + if not self._handler._client_ready(): return False - else: - return True + return True def close(self, exception=None): """ @@ -190,7 +189,7 @@ def close(self, exception=None): self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -243,18 +242,16 @@ def receive(self, max_batch_size=None, timeout=None): if shutdown.action.retry and self.auto_reconnect: self.reconnect() return data_batch - else: - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: self.reconnect() return data_batch - else: - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index a73dfdd..b4ed3b7 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -154,12 +154,11 @@ def has_started(self): timeout, auth_in_progress = self._handler._auth.handle_token() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not self._handler._client_ready(): + if not self._handler._client_ready(): return False - else: - return True + return True def close(self, exception=None): """ @@ -174,7 +173,7 @@ def close(self, exception=None): self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 4ebd6a9..e0aa25d 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -36,8 +36,8 @@ async def on_open_async(self): _opened_ok = True except Exception as err: # pylint: disable=broad-except _logger.warning( - "%r,%r PartitionPumpWarning: Failure creating client or receiver, " + - "retrying: %r", self.host.guid, self.partition_context.partition_id, err) + "%r,%r PartitionPumpWarning: Failure creating client or receiver, retrying: %r", + self.host.guid, self.partition_context.partition_id, err) last_exception = err _retry_count += 1 diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index 510fdd6..b33099e 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -121,7 +121,7 @@ async def persist_checkpoint_async(self, checkpoint): self.lease.offset = checkpoint.offset self.lease.sequence_number = checkpoint.sequence_number else: - _logger.error( + _logger.error( # pylint: disable=logging-not-lazy "Ignoring out of date checkpoint with offset %r/sequence number %r because " + "current persisted checkpoint has higher offset %r/sequence number %r", checkpoint.offset, diff --git a/azure/eventprocessorhost/partition_pump.py b/azure/eventprocessorhost/partition_pump.py index be8be04..cc2dcdc 100644 --- a/azure/eventprocessorhost/partition_pump.py +++ b/azure/eventprocessorhost/partition_pump.py @@ -143,7 +143,7 @@ async def process_events_async(self, events): # CloseAsync are protected by synchronizing too. try: last = events[-1] - if last != None: + if last is not None: self.partition_context.set_offset_and_sequence_number(last) await self.processor.process_events_async(self.partition_context, events) except Exception as err: # pylint: disable=broad-except diff --git a/pylintrc b/pylintrc index 7b3f956..6e495c8 100644 --- a/pylintrc +++ b/pylintrc @@ -6,7 +6,7 @@ reports=no # For all codes, run 'pylint --list-msgs' or go to 'https://pylint.readthedocs.io/en/latest/reference_guide/features.html' # locally-disabled: Warning locally suppressed using disable-msg # cyclic-import: because of https://github.com/PyCQA/pylint/issues/850 -disable=raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods +disable=useless-object-inheritance,raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods [FORMAT] max-line-length=120