Skip to content

Commit

Permalink
Merge pull request #21 from aremm/handle-forced-closed-conns
Browse files Browse the repository at this point in the history
Fix 2 issues with reopening a channel and publishing confirmation
  • Loading branch information
rklingsberg authored Aug 8, 2019
2 parents d2c38ea + 24d1697 commit 3231a3a
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 7 deletions.
9 changes: 7 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
sudo: false
language: python
dist: trusty
dist: xenial
python:
- 3.5
- 3.6
- 3.7
services:
- rabbitmq
addons:
apt:
packages:
- rabbitmq-server
install:
- pip install -r requires/testing.txt -r docs/requirements.txt codecov
script:
Expand All @@ -20,6 +25,6 @@ deploy:
password:
secure: IJVu1MUk2NtRprWkYL+prPRbWrDdSiP+L06S6xERqYnu+fy1ez8/zODazkQGKagXAAujbJK8OwyCgoMzCGDNHV3/NfFtz9dirGVAD2rXZ6AVfHjtEh31L2b2YzXEK0EnBMRsYRjsqLva6q7tfxzjMWKFria25wsd9bN8VlofNDQ=
on:
python: 3.6
python: 3.7
tags: true
repo: sprockets/sprockets.mixins.amqp
5 changes: 3 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Python Compatibility
--------------------
- python 3.5
- python 3.6
- python 3.7 (currently untested in travis-ci).
- python 3.7

Requirements
------------
Expand Down Expand Up @@ -106,11 +106,12 @@ Source
Running Tests Locally
---------------------

You'll need to have python 3.7 installed, and RabbitMQ installed locally running on port 5672 of localhost.
You'll need to have python 3.7, Docker and Docker Compose installed.

-- $ python3.7 -m venv env
-- $ env/bin/activate
-- (env) $ pip install -r requires/testing.txt
-- (env) $ ./bootstrap.sh
-- (env) $ nosetests

License
Expand Down
23 changes: 23 additions & 0 deletions bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env sh
set -e

TEST_HOST="${TEST_HOST:-127.0.0.1}"

get_exposed_port() {
docker-compose port "$@" | cut -d: -f2
}

rm -rf build && mkdir build

docker-compose down --volumes --remove-orphans
docker-compose pull -q
docker-compose up -d

echo "Environment variables (build/test-environment):"
tee build/test-environment << EOF
export AMQP_EXCHANGE=amq.topic
export AMQP_URL=amqp://guest:guest@$TEST_HOST:$(get_exposed_port rabbitmq 5672)/%2f
export RABBIMQ_URL=http://guest:guest@$TEST_HOST:$(get_exposed_port rabbitmq 15672)
EOF

echo 'Bootstrap complete'
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "2.4"
services:
rabbitmq:
image: rabbitmq:management-alpine
ports:
- 5672:5672
- 15672:15672
10 changes: 9 additions & 1 deletion docs/history.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Version History
===============

`2.2.0`_ Aug 8, 2019
---------------------
- Fix issue opening a channel is not checking if the conn is still open
- Fix issue with publishing confirmation bookkeeping not reset when channel is reopened
- Add bootstrap and docker-compose instead of using local rabbitmq
- Update CI to run bootstrap before tests

`2.1.5`_ July 3, 2019
---------------------
- Remove official support for python versions less than 3.5
Expand Down Expand Up @@ -77,7 +84,8 @@ Version History
----------------------
- Initial implementation

.. _Next Release: https://github.com/sprockets/sprockets.amqp/compare/2.1.5...HEAD
.. _Next Release: https://github.com/sprockets/sprockets.amqp/compare/2.2.0...HEAD
.. _2.2.0: https://github.com/sprockets/sprockets.amqp/compare/2.1.5...2.2.0
.. _2.1.5: https://github.com/sprockets/sprockets.amqp/compare/2.1.4...2.1.5
.. _2.1.4: https://github.com/sprockets/sprockets.amqp/compare/2.1.3...2.1.4
.. _2.1.3: https://github.com/sprockets/sprockets.amqp/compare/2.1.2...2.1.3
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
[bdist_wheel]
universal = 1

[coverage:report]
show_missing = 1

[nosetests]
cover-branches = 1
cover-erase = 1
Expand Down
10 changes: 8 additions & 2 deletions sprockets/mixins/amqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
concurrent, ioloop, exceptions, pika = \
object(), object(), object(), object()

__version__ = '2.1.5'
__version__ = '2.2.0'

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -407,6 +407,9 @@ def _open_channel(self):
"""
LOGGER.debug('Creating a new channel')
if not self.connection.is_open:
LOGGER.info('Channel connection is closed, waiting for reconnect')
return
return self.connection.channel(self.on_channel_open)

def _reconnect(self):
Expand Down Expand Up @@ -548,6 +551,8 @@ def on_channel_open(self, channel):
LOGGER.debug('Channel opened')
self.channel = channel
if self.publisher_confirmations:
self.messages.clear()
self.message_number = 0
self.channel.confirm_delivery(self.on_delivery_confirmation)
self.channel.add_on_close_callback(self.on_channel_closed)
self.channel.add_on_flow_callback(self.on_channel_flow)
Expand All @@ -571,9 +576,10 @@ def on_channel_closed(self, channel, reply_code, reply_text):
:param str reply_text: The text reason the channel was closed
"""
LOGGER.debug('Channel is closed')
for future in self.messages.values():
future.set_exception(AMQPException(reply_code, reply_text))
self.messages = {}

if self.closing:
LOGGER.debug('Channel %s was intentionally closed (%s) %s',
channel, reply_code, reply_text)
Expand Down
13 changes: 13 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import os


def setup_module():
try:
with open('build/test-environment') as f:
for line in f:
if line.startswith('export '):
line = line[7:]
name, _, value = line.strip().partition('=')
os.environ[name] = value
except IOError:
pass
21 changes: 21 additions & 0 deletions tests/client_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pika import exceptions, frame, spec

from sprockets.mixins import amqp
from tornado import concurrent

from . import base

Expand Down Expand Up @@ -242,3 +243,23 @@ def test_on_connection_open_error(self):
exceptions.AMQPConnectionError('200', 'Error'))
reconnect.assert_called_once()
self.assertTrue(self.client.closed)

def test_open_channel_when_connection_is_opened(self):
self.client.connection.is_open = True
self.client._open_channel()
self.assertEqual(self.client.connection.channel.call_count, 1)

def test_open_channel_when_connection_is_not_opened(self):
self.client.connection.is_open = False
self.client._open_channel()
self.assertEqual(self.client.connection.channel.call_count, 0)

def test_on_channel_open_reset_confirmation_bookkeeping(self):
self.client.message_number = 2
self.client.messages = {1: concurrent.Future(), 2: concurrent.Future()}
self.client.state = amqp.Client.STATE_CONNECTING
self.assertTrue(self.client.connecting)
self.client.on_channel_open(self.client.channel)
self.assertEqual(self.client.message_number, 0)
self.assertEqual(self.client.messages, {})
self.assertEqual(self.client.channel.confirm_delivery.call_count, 1)

0 comments on commit 3231a3a

Please sign in to comment.