Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit 1433553

Browse files
authored
Merge pull request #55 from annatisch/eh_scenarios
New send settings + auth timeout
2 parents 97137ac + c8db793 commit 1433553

16 files changed

+354
-147
lines changed

HISTORY.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
Release History
44
===============
55

6+
1.0.0 (2018-08-22)
7+
++++++++++++++++++
8+
9+
- API stable.
10+
- Renamed internal `_async` module to `async_ops` for docs generation.
11+
- Added optional `auth_timeout` parameter to `EventHubClient` and `EventHubClientAsync` to configure how long to allow for token
12+
negotiation to complete. Default is 60 seconds.
13+
- Added optional `send_timeout` parameter to `EventHubClient.add_sender` and `EventHubClientAsync.add_async_sender` to determine the
14+
timeout for Events to be successfully sent. Default value is 60 seconds.
15+
- Reformatted logging for performance.
16+
17+
618
0.2.0 (2018-08-06)
719
++++++++++++++++++
820

README.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ Python 2.7 support
2626
The uAMQP library currently only supports Python 3.4 and above. Python 2.7 support is planned for a future release.
2727

2828

29+
Documentation
30+
+++++++++++++
31+
Reference documentation is available at `docs.microsoft.com/python/api/azure-eventhub <https://docs.microsoft.com/python/api/azure-eventhub>`__.
32+
33+
2934
Examples
3035
+++++++++
3136

azure/eventhub/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
# Licensed under the MIT License. See License.txt in the project root for license information.
44
# --------------------------------------------------------------------------------------------
55

6-
__version__ = "0.2.0"
6+
__version__ = "1.0.0"
77

88
from azure.eventhub.common import EventData, EventHubError, Offset
99
from azure.eventhub.client import EventHubClient
1010
from azure.eventhub.sender import Sender
1111
from azure.eventhub.receiver import Receiver
1212

1313
try:
14-
from azure.eventhub._async import (
14+
from azure.eventhub.async_ops import (
1515
EventHubClientAsync,
1616
AsyncSender,
1717
AsyncReceiver)

azure/eventhub/_async/__init__.py renamed to azure/eventhub/async_ops/__init__.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self
5858
return authentication.SASLPlain(
5959
self.address.hostname, username, password, http_proxy=self.http_proxy)
6060
return authentication.SASTokenAsync.from_shared_access_key(
61-
self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy)
61+
self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy)
6262

6363
async def _close_clients_async(self):
6464
"""
@@ -77,8 +77,9 @@ async def _start_client_async(self, client):
7777
try:
7878
await client.open_async()
7979
except Exception as exp: # pylint: disable=broad-except
80-
log.info("Encountered error while starting handler: {}".format(exp))
80+
log.info("Encountered error while starting handler: %r", exp)
8181
await client.close_async(exception=exp)
82+
log.info("Finished closing failed handler")
8283

8384
async def _handle_redirect(self, redirects):
8485
if len(redirects) != len(self.clients):
@@ -104,17 +105,17 @@ async def run_async(self):
104105
105106
:rtype: list[~azure.eventhub.common.EventHubError]
106107
"""
107-
log.info("{}: Starting {} clients".format(self.container_id, len(self.clients)))
108+
log.info("%r: Starting %r clients", self.container_id, len(self.clients))
108109
tasks = [self._start_client_async(c) for c in self.clients]
109110
try:
110111
await asyncio.gather(*tasks)
111112
redirects = [c.redirected for c in self.clients if c.redirected]
112113
failed = [c.error for c in self.clients if c.error]
113114
if failed and len(failed) == len(self.clients):
114-
log.warning("{}: All clients failed to start.".format(self.container_id))
115+
log.warning("%r: All clients failed to start.", self.container_id)
115116
raise failed[0]
116117
elif failed:
117-
log.warning("{}: {} clients failed to start.".format(self.container_id, len(failed)))
118+
log.warning("%r: %r clients failed to start.", self.container_id, len(failed))
118119
elif redirects:
119120
await self._handle_redirect(redirects)
120121
except EventHubError:
@@ -129,7 +130,7 @@ async def stop_async(self):
129130
"""
130131
Stop the EventHubClient and all its Sender/Receiver clients.
131132
"""
132-
log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients)))
133+
log.info("%r: Stopping %r clients", self.container_id, len(self.clients))
133134
self.stopped = True
134135
await self._close_clients_async()
135136

@@ -182,7 +183,7 @@ def add_async_receiver(
182183
:operation: An optional operation to be appended to the hostname in the source URL.
183184
The value must start with `/` character.
184185
:type operation: str
185-
:rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync
186+
:rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync
186187
"""
187188
path = self.address.path + operation if operation else self.address.path
188189
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
@@ -213,7 +214,7 @@ def add_async_epoch_receiver(
213214
:operation: An optional operation to be appended to the hostname in the source URL.
214215
The value must start with `/` character.
215216
:type operation: str
216-
:rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync
217+
:rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync
217218
"""
218219
path = self.address.path + operation if operation else self.address.path
219220
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
@@ -224,7 +225,9 @@ def add_async_epoch_receiver(
224225
self.clients.append(handler)
225226
return handler
226227

227-
def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True, loop=None):
228+
def add_async_sender(
229+
self, partition=None, operation=None, send_timeout=60,
230+
keep_alive=30, auto_reconnect=True, loop=None):
228231
"""
229232
Add an async sender to the client to send ~azure.eventhub.common.EventData object
230233
to an EventHub.
@@ -236,13 +239,23 @@ def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_r
236239
:operation: An optional operation to be appended to the hostname in the target URL.
237240
The value must start with `/` character.
238241
:type operation: str
239-
:rtype: ~azure.eventhub._async.sender_async.SenderAsync
242+
:param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is
243+
queued. Default value is 60 seconds. If set to 0, there will be no timeout.
244+
:type send_timeout: int
245+
:param keep_alive: The time interval in seconds between pinging the connection to keep it alive during
246+
periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not
247+
be pinged.
248+
:type keep_alive: int
249+
:param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs.
250+
Default value is `True`.
251+
:type auto_reconnect: bool
252+
:rtype: ~azure.eventhub.async_ops.sender_async.SenderAsync
240253
"""
241254
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
242255
if operation:
243256
target = target + operation
244257
handler = AsyncSender(
245-
self, target, partition=partition, keep_alive=keep_alive,
258+
self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive,
246259
auto_reconnect=auto_reconnect, loop=loop)
247260
self.clients.append(handler)
248261
return handler

azure/eventhub/_async/receiver_async.py renamed to azure/eventhub/async_ops/receiver_async.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called
2929
Instantiate an async receiver.
3030
3131
:param client: The parent EventHubClientAsync.
32-
:type client: ~azure.eventhub._async.EventHubClientAsync
32+
:type client: ~azure.eventhub.async_ops.EventHubClientAsync
3333
:param source: The source EventHub from which to receive events.
3434
:type source: ~uamqp.address.Source
3535
:param prefetch: The number of events to prefetch from the service
@@ -78,7 +78,7 @@ async def open_async(self):
7878
context will be used to create a new handler before opening it.
7979
8080
:param connection: The underlying client shared connection.
81-
:type: connection: ~uamqp._async.connection_async.ConnectionAsync
81+
:type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync
8282
"""
8383
# pylint: disable=protected-access
8484
if self.redirected:
@@ -128,9 +128,33 @@ async def reconnect_async(self):
128128
client_name=self.name,
129129
properties=self.client.create_properties(),
130130
loop=self.loop)
131-
await self._handler.open_async()
132-
while not await self.has_started():
133-
await self._handler._connection.work_async()
131+
try:
132+
await self._handler.open_async()
133+
while not await self.has_started():
134+
await self._handler._connection.work_async()
135+
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
136+
if shutdown.action.retry and self.auto_reconnect:
137+
log.info("AsyncReceiver detached. Attempting reconnect.")
138+
await self.reconnect_async()
139+
else:
140+
log.info("AsyncReceiver detached. Shutting down.")
141+
error = EventHubError(str(shutdown), shutdown)
142+
await self.close_async(exception=error)
143+
raise error
144+
except errors.MessageHandlerError as shutdown:
145+
if self.auto_reconnect:
146+
log.info("AsyncReceiver detached. Attempting reconnect.")
147+
await self.reconnect_async()
148+
else:
149+
log.info("AsyncReceiver detached. Shutting down.")
150+
error = EventHubError(str(shutdown), shutdown)
151+
await self.close_async(exception=error)
152+
raise error
153+
except Exception as e:
154+
log.info("Unexpected error occurred (%r). Shutting down.", e)
155+
error = EventHubError("Receiver reconnect failed: {}".format(e))
156+
await self.close_async(exception=error)
157+
raise error
134158

135159
async def has_started(self):
136160
"""
@@ -224,7 +248,7 @@ async def receive(self, max_batch_size=None, timeout=None):
224248
await self.close_async(exception=error)
225249
raise error
226250
except Exception as e:
227-
log.info("Unexpected error occurred ({}). Shutting down.".format(e))
251+
log.info("Unexpected error occurred (%r). Shutting down.", e)
228252
error = EventHubError("Receive failed: {}".format(e))
229253
await self.close_async(exception=error)
230254
raise error

0 commit comments

Comments
 (0)