Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #65 from annatisch/eh_scenarios
Browse files Browse the repository at this point in the history
Issue fixes
  • Loading branch information
annatisch authored Sep 24, 2018
2 parents c5be0bb + da94d06 commit 7d89780
Show file tree
Hide file tree
Showing 24 changed files with 261 additions and 89 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pip-delete-this-directory.txt
azure/storage/
azure/common/
azure/profiles/
*.log.1
*.log.2
*.log.3

htmlcov/
.tox/
Expand Down
13 changes: 10 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
language: python
cache: pip
python:
- "3.6"
# command to install dependencies
dist: xenial
sudo: required
matrix:
include:
- os: linux
python: "3.5"
- os: linux
python: "3.6"
- os: linux
python: "3.7"
install:
- pip install -r dev_requirements.txt
- pip install -e .
Expand Down
15 changes: 15 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
Release History
===============

1.1.0 (2018-09-21)
++++++++++++++++++

- Changes to `AzureStorageCheckpointLeaseManager` parameters to support other connection options (issue #61):

- The `storage_account_name`, `storage_account_key` and `lease_container_name` arguments are now optional keyword arguments.
- Added a `sas_token` argument that must be specified with `storage_account_name` in place of `storage_account_key`.
- Added an `endpoint_suffix` argument to support storage endpoints in National Clouds.
- Added a `connection_string` argument that, if specified, overrides all other endpoint arguments.
- The `lease_container_name` argument now defaults to `"eph-leases"` if not specified.

- Fix for clients failing to start if run called multipled times (issue #64).
- Added convenience methods `body_as_str` and `body_as_json` to EventData object for easier processing of message data.


1.0.0 (2018-08-22)
++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.0.0"
__version__ = "1.1.0"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
3 changes: 2 additions & 1 deletion azure/eventhub/async_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ async def _wait_for_client(self, client):

async def _start_client_async(self, client):
try:
await client.open_async()
if not client.running:
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
log.info("Encountered error while starting handler: %r", exp)
await client.close_async(exception=exp)
Expand Down
32 changes: 17 additions & 15 deletions azure/eventhub/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__( # pylint: disable=super-init-not-called
:param loop: An event loop.
"""
self.loop = loop or asyncio.get_event_loop()
self.running = False
self.client = client
self.source = source
self.offset = offset
Expand Down Expand Up @@ -81,6 +82,7 @@ async def open_async(self):
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
"""
# pylint: disable=protected-access
self.running = True
if self.redirected:
self.source = self.redirected.address
source = Source(self.source)
Expand Down Expand Up @@ -171,12 +173,11 @@ async def has_started(self):
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not await self._handler._client_ready_async():
if not await self._handler._client_ready_async():
return False
else:
return True
return True

async def close_async(self, exception=None):
"""
Expand All @@ -188,9 +189,10 @@ async def close_async(self, exception=None):
due to an error.
:type exception: Exception
"""
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
Expand All @@ -216,6 +218,8 @@ async def receive(self, max_batch_size=None, timeout=None):
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to receive until client has been started.")
data_batch = []
try:
timeout_ms = 1000 * timeout if timeout else 0
Expand All @@ -232,21 +236,19 @@ async def receive(self, max_batch_size=None, timeout=None):
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receive failed: {}".format(e))
Expand Down
16 changes: 11 additions & 5 deletions azure/eventhub/async_ops/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__( # pylint: disable=super-init-not-called
:param loop: An event loop. If not specified the default event loop will be used.
"""
self.loop = loop or asyncio.get_event_loop()
self.running = False
self.client = client
self.target = target
self.partition = partition
Expand Down Expand Up @@ -82,6 +83,7 @@ async def open_async(self):
:param connection: The underlying client shared connection.
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
"""
self.running = True
if self.redirected:
self.target = self.redirected.address
self._handler = SendClientAsync(
Expand Down Expand Up @@ -156,12 +158,11 @@ async def has_started(self):
timeout, auth_in_progress = await self._handler._auth.handle_token_async()
if timeout:
raise EventHubError("Authorization timeout.")
elif auth_in_progress:
if auth_in_progress:
return False
elif not await self._handler._client_ready_async():
if not await self._handler._client_ready_async():
return False
else:
return True
return True

async def close_async(self, exception=None):
"""
Expand All @@ -173,9 +174,10 @@ async def close_async(self, exception=None):
due to an error.
:type exception: Exception
"""
self.running = False
if self.error:
return
elif isinstance(exception, errors.LinkRedirect):
if isinstance(exception, errors.LinkRedirect):
self.redirected = exception
elif isinstance(exception, EventHubError):
self.error = exception
Expand All @@ -199,6 +201,8 @@ async def send(self, event_data):
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
if event_data.partition_key and self.partition:
raise ValueError("EventData partition key cannot be used with a partition sender.")
event_data.message.on_send_complete = self._on_outcome
Expand Down Expand Up @@ -238,6 +242,8 @@ async def wait_async(self):
"""
if self.error:
raise self.error
if not self.running:
raise ValueError("Unable to send until client has been started.")
try:
await self._handler.wait_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
Expand Down
5 changes: 2 additions & 3 deletions azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ def _close_clients(self):
def _start_clients(self):
for client in self.clients:
try:
client.open()
if not client.running:
client.open()
except Exception as exp: # pylint: disable=broad-except
client.close(exception=exp)

Expand Down Expand Up @@ -329,8 +330,6 @@ def get_eventhub_info(self):
output['partition_count'] = eh_info[b'partition_count']
output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']]
return output
except:
raise
finally:
mgmt_client.close()

Expand Down
52 changes: 45 additions & 7 deletions azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import datetime
import time
import json

from uamqp import Message, BatchMessage
from uamqp import types, constants, errors
Expand All @@ -31,13 +32,13 @@ def _error_handler(error):
"""
if error.condition == b'com.microsoft:server-busy':
return errors.ErrorAction(retry=True, backoff=4)
elif error.condition == b'com.microsoft:timeout':
if error.condition == b'com.microsoft:timeout':
return errors.ErrorAction(retry=True, backoff=2)
elif error.condition == b'com.microsoft:operation-cancelled':
if error.condition == b'com.microsoft:operation-cancelled':
return errors.ErrorAction(retry=True)
elif error.condition == b"com.microsoft:container-close":
if error.condition == b"com.microsoft:container-close":
return errors.ErrorAction(retry=True, backoff=4)
elif error.condition in _NO_RETRY_ERRORS:
if error.condition in _NO_RETRY_ERRORS:
return errors.ErrorAction(retry=False)
return errors.ErrorAction(retry=True)

Expand Down Expand Up @@ -88,7 +89,6 @@ def __init__(self, body=None, batch=None, to_device=None, message=None):
else:
self.message = Message(body, properties=self.msg_properties)


@property
def sequence_number(self):
"""
Expand Down Expand Up @@ -188,7 +188,45 @@ def body(self):
:rtype: bytes or Generator[bytes]
"""
return self.message.get_data()
try:
return self.message.get_data()
except TypeError:
raise ValueError("Message data empty.")

def body_as_str(self, encoding='UTF-8'):
"""
The body of the event data as a string if the data is of a
compatible type.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: str
"""
data = self.body
try:
return "".join(b.decode(encoding) for b in data)
except TypeError:
return str(data)
except: # pylint: disable=bare-except
pass
try:
return data.decode(encoding)
except Exception as e:
raise TypeError("Message data is not compatible with string type: {}".format(e))

def body_as_json(self, encoding='UTF-8'):
"""
The body of the event loaded as a JSON object is the data is compatible.
:param encoding: The encoding to use for decoding message data.
Default is 'UTF-8'
:rtype: dict
"""
data_str = self.body_as_str(encoding=encoding)
try:
return json.loads(data_str)
except Exception as e:
raise TypeError("Event data is not compatible with JSON type: {}".format(e))


class Offset(object):
Expand Down Expand Up @@ -231,7 +269,7 @@ def selector(self):
if isinstance(self.value, datetime.datetime):
timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000)
return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8')
elif isinstance(self.value, int):
if isinstance(self.value, int):
return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8')
return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8')

Expand Down
Loading

0 comments on commit 7d89780

Please sign in to comment.