Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Multicast poc #32

Open
wants to merge 10 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Receiver(MessagingHandler, threading.Thread):
Receiver implementation of a Proton client that run as a thread.
"""
def __init__(self, url, message_count, timeout=0, container_id=None, durable=False, save_messages=False,
ignore_dups=False):
super(Receiver, self).__init__()
ignore_dups=False, auto_accept=True, auto_settle=True):
super(Receiver, self).__init__(auto_accept=auto_accept, auto_settle=auto_settle)
threading.Thread.__init__(self)
self.url = url
self.receiver = None
Expand Down
33 changes: 22 additions & 11 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import math

from iqa_common.utils.timeout import TimeoutCallback
from proton import Message
from proton import Message, Delivery
from proton._handlers import MessagingHandler
from proton._reactor import Container, AtLeastOnce

Expand All @@ -22,18 +22,21 @@ class Sender(MessagingHandler, threading.Thread):
lock = threading.Lock()

def __init__(self, url, message_count, sender_id, message_size=1024, timeout=0,
user_id=None, proton_option=AtLeastOnce(), use_unique_body=False):
super(Sender, self).__init__()
user_id=None, proton_option=AtLeastOnce(), use_unique_body=False,
auto_accept=True, auto_settle=True):
super(Sender, self).__init__(auto_accept=auto_accept, auto_settle=auto_settle)
threading.Thread.__init__(self)
self.url = url
self.total = message_count
self.sender_id = sender_id
self.sender = None
self.connection = None
self.sent = 0
self.confirmed = 0
self.accepted = 0
self.released = 0
self.rejected = 0
self.modified = 0
self.settled = 0
self.container = None
self.message_size = message_size

Expand Down Expand Up @@ -91,6 +94,7 @@ def is_done_sending(self):
Returns True if all expected messages have been sent or if sender has timed out.
:return:
"""
#????
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a doubt or do you have concerns with the logic?

return self.stopped or (self.total > 0 and (self.sent - self.released - self.rejected == self.total))

def _generate_message_id_and_body(self) -> list:
Expand Down Expand Up @@ -134,21 +138,28 @@ def on_sendable(self, event):

def on_accepted(self, event):
"""
Increases the confirmed count (if delivery not yet in tracker list).
Increases the accepted count (if delivery not yet in tracker list).
:param event:
:return:
"""
if event.delivery not in self.tracker:
logging.debug('Ignoring confirmation for other deliveries - %s' % event.delivery.tag)
self.confirmed += 1
self.accepted += 1
self.verify_sender_done(event)

def on_modified(self, event):
# XXX verify if this has sense, it seems to be never called.
self.modified += 1

def on_settled(self, event):
self.settled += 1

def on_released(self, event):
"""
Increases the released count
:param event:
:return:
"""
# from qpid_dispatch system tests:
# for some reason Proton 'helpfully' calls on_released even though the
# delivery state is actually MODIFIED
if event.delivery.remote_state == Delivery.MODIFIED:
return self.on_modified(event)
self.released += 1
logging.debug('Message released - %s' % event.delivery.tag)

Expand Down
199 changes: 199 additions & 0 deletions integration/int_3Ri_2BhaRi2_3Re_2BhaRe3/test_multicast_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import math
import hashlib
import logging
import ast
import time
import os
import random

import pytest
from messaging_abstract.message import Message
from messaging_components.brokers import Artemis
from pytest_iqa.instance import IQAInstance

from integration.int_3Ri_2BhaRi2_3Re_2BhaRe3.receiver import Receiver
from integration.int_3Ri_2BhaRi2_3Re_2BhaRe3.sender import Sender

class Outcome:
accept = "accept"
release = "release"
modify = "modify"
reject = "reject"

class Expected:
accepted = "accepted"
released = "released"
modified = "modified"
rejected = "rejected"

def idfn(outcome):
return outcome["test_id"]

outcomes_config_list = [
{
"recv_outcomes": 4*[Outcome.accept],
"expected": Expected.accepted,
"test_id": "Expect accepted if all accept.",
},
{
## expect REJECTED if any reject:
"recv_outcomes": [Outcome.reject, Outcome.reject, Outcome.modify, Outcome.release, Outcome.reject],
"expected": Expected.rejected,
"test_id": "Expect rejected if any reject.",
},
{
"recv_outcomes": [Outcome.reject, Outcome.reject, Outcome.release, Outcome.reject],
"expected": Expected.rejected,
"test_id": "Expect rejected if any reject.",
},
{
"recv_outcomes": [Outcome.modify, Outcome.accept, Outcome.accept, Outcome.release],
"expected": Expected.accepted,
"test_id": "Expect accept if no rejects",
},
{
"recv_outcomes": 3*[Outcome.modify] + [Outcome.modify],
"expected": Expected.modified,
"test_id": "Expect modified over released",
},
{
"recv_outcomes": 4*[Outcome.modify],
"expected": Expected.modified,
"test_id": "Expected modify if all modify",
},
{
"recv_outcomes": 5*[Outcome.release],
"expected": Expected.released,
"test_id": "Release only if all released",
},
]

@pytest.fixture(params=outcomes_config_list, ids=idfn)
def outcomes(request):
return request.param

class _Receiver(Receiver):
def __init__(self, *args, settle=Outcome.accept, **kwargs):
super(_Receiver, self).__init__(*args, auto_accept=False, **kwargs)
self._settle = getattr(self, settle)

def modify(self, delivery):
super(_Receiver, self).release(delivery, delivered=True)

def release(self, delivery):
super(_Receiver, self).release(delivery, delivered=False)

def on_message(self, event):
self.last_received_id[event.message.user_id] = event.message.id
self.received += 1
self.messages.append(event.message.body)

logging.debug("settle = %s" % self._settle.__name__)
self._settle(event.delivery)

if self.is_done_receiving():
self.stop_receiver(event.receiver, event.connection)

class _Sender(Sender):
def is_done_sending(self):
done = (self.stopped or (self.total > 0 and self.sent == self.total))
logging.info("===== is done sending? %s", done)
return done

class TestMulticast:
MESSAGES_COUNT = 5
MESSAGE_SIZE = 128

TIMEOUT = 6 #why?
address = "multicast/bla"

@staticmethod
def _get_router_url(router, topic):
return "amqp://%s:%s/%s" % (router.node.get_ip(), router.port, topic)

def _sender(self, router, topic):
s = _Sender(url=self._get_router_url(router, topic),
message_count=self.MESSAGES_COUNT,
sender_id='sender-%s' % router.node.hostname,
timeout=self.TIMEOUT,
message_size=self.MESSAGE_SIZE,
use_unique_body=True,
auto_settle=True,
)

s.start()
return s

def _receiver(self, router, topic, settle=Outcome.modify):
r = _Receiver(url=self._get_router_url(router, topic),
message_count=self.MESSAGES_COUNT,
settle=settle,
timeout=self.TIMEOUT,
)
r.start()
return r

def launch_receivers(self, outcomes, iqa):
def _wait(receivers):
for r in receivers:
while not r.receiver:
time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if a time.sleep here might cause issues to proton.
Maybe we can use some of the proton events to consider the receivers in the list as ready.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change this as you suggest


_ROUTER_I3_INDEX = 2 #not using delayed router
all_routers = iqa.get_routers()
all_routers.pop(_ROUTER_I3_INDEX)

routers = random.sample(all_routers, len(outcomes))
receivers = []
for idx, router in enumerate(routers):
receivers.append(self._receiver(router, self.address,
settle=outcomes[idx]))
_wait(receivers)
return receivers

def test_base_multicast(self, iqa: IQAInstance, router, outcomes):

def _wait_for_all_process_to_terminate(threads):
for t in threads:
t.join()

def _assert_sender_expected_settlement(sender, expected):
assert sender.settled == self.MESSAGES_COUNT
for e in [Expected.accepted, Expected.released,
Expected.rejected, Expected.modified]:

outcome_count = getattr(sender, e)
if e == expected:
assert outcome_count == self.MESSAGES_COUNT
else:
assert outcome_count == 0

#router_send = router_e1
router_send = router
if router_send.name == "router-Dispatch-Router.I3":
logging.info("skipping bad bad delayed router I3")
return


receivers = self.launch_receivers(outcomes["recv_outcomes"], iqa)
sender = self._sender(router_send, self.address)

_wait_for_all_process_to_terminate(receivers + [sender])

logging.info("sender_id: {}".format(sender.sender_id))

logging.info("""sent: accepted: {}
rejected: {}
released: {}
modified: {}
settled {}
""".format(
sender.accepted,
sender.rejected,
sender.released,
sender.modified,
sender.settled))

assert sender.sent == self.MESSAGES_COUNT

_assert_sender_expected_settlement(sender, outcomes["expected"])