From d25140d874aacb4fabb72df9e253b43add712924 Mon Sep 17 00:00:00 2001 From: Glyph Date: Mon, 7 Dec 2015 18:49:12 -0800 Subject: [PATCH 01/11] fanchat as unit test this isolates the problem away from the _full_ integration stack, because we are not touching Twisted at all here. --- tubes/test/test_chatter.py | 254 +++++++++++++++++++++++++++++++++++++ 1 file changed, 254 insertions(+) create mode 100644 tubes/test/test_chatter.py diff --git a/tubes/test/test_chatter.py b/tubes/test/test_chatter.py new file mode 100644 index 0000000..0a71517 --- /dev/null +++ b/tubes/test/test_chatter.py @@ -0,0 +1,254 @@ +# -*- test-case-name: tubes.test.test_listening -*- +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Integration test for L{tubes.routing} and L{tubes.fan} implementing a chat +server. +""" + +from unittest import TestCase + +from zope.interface.common.mapping import IMapping + +from tubes.routing import Router, Routed, to +from tubes.tube import series, tube, receiver +from tubes.fan import Out, In + +@tube +class Participant(object): + """ + A single participant in a chat system. + """ + outputType = Routed(IMapping) + + def __init__(self, hub, requestsFount, responsesDrain): + """ + Create a L{Participant}. + """ + self._hub = hub + self._in = In() + self._router = Router() + self._participating = {} + + # `self._in' is both commands from our own client and also messages + # from other clients. + requestsFount.flowTo(self._in.newDrain()) + self._in.fount.flowTo(series(self, self._router.drain)) + + self.client = self._router.newRoute() + self.client.flowTo(responsesDrain) + + + def received(self, item): + """ + An item was received. + + @param item: A dictionary featuring a 'type' indicating which command + it is. + + @return: a response routed to the router. + """ + kwargs = item.copy() + return getattr(self, "do_" + kwargs.pop("type"))(**kwargs) + + + def do_name(self, name): + """ + From client; set the name of this client. + + @param name: The nickname for this client. + """ + self.name = name + yield to(self.client, dict(named=name)) + + + def do_join(self, channel): + """ + From client; instruct this client to join a channel with the given + name. + + @param channel: the name of the channel to join. + """ + fountFromChannel, drainToChannel = ( + self._hub.channelNamed(channel).participate(self) + ) + fountFromChannel.flowTo(self._in.newDrain()) + fountToChannel = self._router.newRoute() + fountToChannel.flowTo(drainToChannel) + + self._participating[channel] = fountToChannel + yield to(self._participating[channel], + dict(type="joined")) + + + def do_speak(self, channel, message, id): + """ + From client; say something on the given channel. + + @param channel: the name of the channel + + @param message: the text of the message to relay + + @param id: a unique identifier for this message + """ + yield to(self._participating[channel], + dict(type="spoke", message=message, id=id)) + + + def do_joined(self, sender, channel): + """ + From Channels; say something on the given channel. + + @param sender: the name of the person joining + + @param channel: the name of the channel they joined + """ + yield to(self.client, dict(type="joined")) + + + def do_spoke(self, channel, sender, message, id): + """ + From Channels; a participant (possibly ourselves) spoke on the given + channel. + + @param channel: the name of the channel where the message was received + + @param sender: the name of the sender of the message + + @param message: the text of the message + + @param id: a unique ID for the message. + """ + yield to(self.client, + dict(type="spoke", channel=channel, + sender=sender.name, message=message, + id=id)) + + + +class Channel(object): + """ + A chat room. + """ + def __init__(self, name): + self._name = name + self._out = Out() + self._in = In() + self._in.fount.flowTo(self._out.drain) + + + def participate(self, participant): + """ + Create a new drain of messages going to this channel and a new fount of + messages coming from this channel, for the given participant. + + @param participant: the name of the participant joining. + + @return: a 2-tuple of (new fount, new drain) + """ + @receiver(IMapping, IMapping) + def addSender(item): + yield dict(item, sender=participant, channel=self._name) + + return (self._out.newFount(), + series(addSender, self._in.newDrain())) + + + +@tube +class OnStop(object): + """ + Utility class to hook 'stopped' with a callable. + """ + + def __init__(self, callback): + """ + Create an L{OnStop} with a callback. + """ + self.callback = callback + + + def received(self, item): + """ + We received a message; relay it on unmodified since we only care about + L{OnStop}. + + @param item: anything + """ + yield item + + + def stopped(self, reason): + """ + The flow stopped; invoke the given callback. + + @param reason: ignored. + + @return: no results (empty iterable) + """ + self.callback() + return () + + + +class Hub(object): + """ + A chat hub; the nexus object for a whole channel namespace (i.e.: server). + """ + def __init__(self): + self.participants = [] + self.channels = {} + + + def newParticipantFlow(self, flow): + """ + Create a flow for a new participant. + + @param flow: a L{Flow} with a drain and a fount for receiving commands; + JSON-style dictionaries with a 'type' key indicating which verb to + invoke on L{Participant}. + """ + commandFount = flow.fount.flowTo( + series(OnStop(lambda: self.participants.remove(participant))) + ) + commandDrain = flow.drain + participant = Participant(self, commandFount, commandDrain) + self.participants.append(participant) + + + def channelNamed(self, name): + """ + Retrieve a L{Channel} with the given name. + + @param name: the name of the channel. + + @return: a L{Channel}. + """ + if name not in self.channels: + self.channels[name] = Channel(name) + return self.channels[name] + + + +class ChatTest(TestCase): + """ + Integration test cases for putting together fan.In and fan.Out in a useful + configuration for pubsub or multi-user chat. + """ + + def test_joining(self): + """ + Test that we receive a response from joining. + """ + from ..listening import Flow + from .util import FakeFount, FakeDrain + h = Hub() + ff = FakeFount() + fd = FakeDrain() + h.newParticipantFlow(Flow(ff, fd)) + ff.drain.receive({"type": "name", "name": "bob"}) + self.assertEqual(fd.received.pop(0), {"named": "bob"}) + ff.drain.receive({"type": "join", "channel": "bobs"}) + self.assertEqual(fd.received.pop(0), {"type": "joined", + "channel": "bobs"}) From 1973a8f1e7a2b77ab6324ef725bd166b766748cc Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:27:06 -0800 Subject: [PATCH 02/11] strings for easier debugging plus missing parameters --- tubes/test/test_chatter.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tubes/test/test_chatter.py b/tubes/test/test_chatter.py index 0a71517..22c7d5c 100644 --- a/tubes/test/test_chatter.py +++ b/tubes/test/test_chatter.py @@ -36,7 +36,7 @@ def __init__(self, hub, requestsFount, responsesDrain): requestsFount.flowTo(self._in.newDrain()) self._in.fount.flowTo(series(self, self._router.drain)) - self.client = self._router.newRoute() + self.client = self._router.newRoute("client") self.client.flowTo(responsesDrain) @@ -74,7 +74,7 @@ def do_join(self, channel): self._hub.channelNamed(channel).participate(self) ) fountFromChannel.flowTo(self._in.newDrain()) - fountToChannel = self._router.newRoute() + fountToChannel = self._router.newRoute("->{}".format(channel)) fountToChannel.flowTo(drainToChannel) self._participating[channel] = fountToChannel @@ -104,7 +104,8 @@ def do_joined(self, sender, channel): @param channel: the name of the channel they joined """ - yield to(self.client, dict(type="joined")) + yield to(self.client, dict(type="joined", + sender=sender, channel=channel)) def do_spoke(self, channel, sender, message, id): @@ -147,9 +148,11 @@ def participate(self, participant): @return: a 2-tuple of (new fount, new drain) """ - @receiver(IMapping, IMapping) + @receiver(IMapping, IMapping, + name="->addSender({}, {})".format(participant.name, + self._name)) def addSender(item): - yield dict(item, sender=participant, channel=self._name) + yield dict(item, sender=participant.name, channel=self._name) return (self._out.newFount(), series(addSender, self._in.newDrain())) @@ -250,5 +253,6 @@ def test_joining(self): ff.drain.receive({"type": "name", "name": "bob"}) self.assertEqual(fd.received.pop(0), {"named": "bob"}) ff.drain.receive({"type": "join", "channel": "bobs"}) - self.assertEqual(fd.received.pop(0), {"type": "joined", - "channel": "bobs"}) + self.assertEqual(fd.received, [{"type": "joined", + "sender": "bob", + "channel": "bobs"}]) From dffe596a882e2077eb681f2c7f0ce0f2e15cb0cd Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:27:29 -0800 Subject: [PATCH 03/11] another string for easier debugging --- tubes/routing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubes/routing.py b/tubes/routing.py index 725d868..6d0da52 100644 --- a/tubes/routing.py +++ b/tubes/routing.py @@ -198,7 +198,7 @@ def receive(self, item): pass def flowStopped(self, reason): pass - self.newRoute().flowTo(NullDrain()) + self.newRoute("NULL").flowTo(NullDrain()) self.drain = self._out.drain From 8d3779b10d1cd6612a6bc8c991eea8eb88d79e62 Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:28:02 -0800 Subject: [PATCH 04/11] fix it not *entirely* sure why this does fix it, but it fixes an inconsistency between different states; _pauseBecausePauseCalled should not be None when we are paused regardless of whether we are in fact holding a pause for an upstream fount or not. --- tubes/_siphon.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tubes/_siphon.py b/tubes/_siphon.py index 665c901..881db32 100644 --- a/tubes/_siphon.py +++ b/tubes/_siphon.py @@ -169,8 +169,10 @@ def _actuallyPause(): fount = self._siphon._tdrain.fount self._siphon._pending.suspend() if fount is None: - return - self._siphon._pauseBecausePauseCalled = fount.pauseFlow() + pbpc = fount.pauseFlow() + else: + pbpc = NoPause() + self._siphon._pauseBecausePauseCalled = pbpc def _actuallyResume(): fp = self._siphon._pauseBecausePauseCalled From 836d6021f14ad923bb595dffb0a6ca39f0b884b5 Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:34:59 -0800 Subject: [PATCH 05/11] the conditional was backwards except now, um... --- tubes/_siphon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubes/_siphon.py b/tubes/_siphon.py index 881db32..fb0097f 100644 --- a/tubes/_siphon.py +++ b/tubes/_siphon.py @@ -168,7 +168,7 @@ def __init__(self, siphon): def _actuallyPause(): fount = self._siphon._tdrain.fount self._siphon._pending.suspend() - if fount is None: + if fount is not None: pbpc = fount.pauseFlow() else: pbpc = NoPause() From e286f708c758bb816f76c43a3e3a3309757c9eca Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:37:14 -0800 Subject: [PATCH 06/11] this test is wrong there's no drain on the end, of course it should be paused. --- tubes/test/test_tube.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubes/test/test_tube.py b/tubes/test/test_tube.py index 533069a..bd5a9cc 100644 --- a/tubes/test/test_tube.py +++ b/tubes/test/test_tube.py @@ -201,7 +201,7 @@ def started(self): srs = series(PassthruTube(), aStarter, PassthruTube()) nextFount = self.ff.flowTo(srs) - self.assertEqual(self.ff.flowIsPaused, 0) + self.assertEqual(self.ff.flowIsPaused, 1) nextFount.flowTo(self.fd) self.assertEqual(self.ff.flowIsPaused, 0) self.assertEqual(self.fd.received, ["greeting"]) From 306ef818a72feb063405db0172e540845cbe8670 Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 01:38:15 -0800 Subject: [PATCH 07/11] it's set consistently now --- tubes/_siphon.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tubes/_siphon.py b/tubes/_siphon.py index fb0097f..58b92f8 100644 --- a/tubes/_siphon.py +++ b/tubes/_siphon.py @@ -181,10 +181,7 @@ def _actuallyResume(): self._siphon._pending.resume() self._siphon._unbufferIterator() - # TODO: validate that the siphon's fount is always set consistently - # with _pauseBecausePauseCalled. - if fp is not None: - fp.unpause() + fp.unpause() self._pauser = Pauser(_actuallyPause, _actuallyResume) From 1b150a4d9e80949c878fe5776cb188c7562041b8 Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 03:06:21 -0800 Subject: [PATCH 08/11] eliminate cycles 'joined' and 'spoke' messages from Channels are fully cooked anyway; no need to push them through the Participant for command-processing. --- tubes/test/test_chatter.py | 38 ++++---------------------------------- 1 file changed, 4 insertions(+), 34 deletions(-) diff --git a/tubes/test/test_chatter.py b/tubes/test/test_chatter.py index 22c7d5c..a6c48d8 100644 --- a/tubes/test/test_chatter.py +++ b/tubes/test/test_chatter.py @@ -28,16 +28,17 @@ def __init__(self, hub, requestsFount, responsesDrain): """ self._hub = hub self._in = In() + self._in.fount.flowTo(responsesDrain) + self._router = Router() self._participating = {} # `self._in' is both commands from our own client and also messages # from other clients. - requestsFount.flowTo(self._in.newDrain()) - self._in.fount.flowTo(series(self, self._router.drain)) + requestsFount.flowTo(series(self, self._router.drain)) self.client = self._router.newRoute("client") - self.client.flowTo(responsesDrain) + self.client.flowTo(self._in.newDrain()) def received(self, item): @@ -96,37 +97,6 @@ def do_speak(self, channel, message, id): dict(type="spoke", message=message, id=id)) - def do_joined(self, sender, channel): - """ - From Channels; say something on the given channel. - - @param sender: the name of the person joining - - @param channel: the name of the channel they joined - """ - yield to(self.client, dict(type="joined", - sender=sender, channel=channel)) - - - def do_spoke(self, channel, sender, message, id): - """ - From Channels; a participant (possibly ourselves) spoke on the given - channel. - - @param channel: the name of the channel where the message was received - - @param sender: the name of the sender of the message - - @param message: the text of the message - - @param id: a unique ID for the message. - """ - yield to(self.client, - dict(type="spoke", channel=channel, - sender=sender.name, message=message, - id=id)) - - class Channel(object): """ From 77deb4b3d38ee6ae48c3a5a778fe6fd9da059187 Mon Sep 17 00:00:00 2001 From: Glyph Date: Tue, 8 Dec 2015 03:11:50 -0800 Subject: [PATCH 09/11] single letter for lint --- tubes/test/test_chatter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tubes/test/test_chatter.py b/tubes/test/test_chatter.py index a6c48d8..9be75bc 100644 --- a/tubes/test/test_chatter.py +++ b/tubes/test/test_chatter.py @@ -204,7 +204,7 @@ def channelNamed(self, name): -class ChatTest(TestCase): +class ChatTests(TestCase): """ Integration test cases for putting together fan.In and fan.Out in a useful configuration for pubsub or multi-user chat. From 15fa06407b25e2c47424a45214177253b0440508 Mon Sep 17 00:00:00 2001 From: Glyph Date: Sun, 17 Jan 2016 19:31:48 -0800 Subject: [PATCH 10/11] forget about py26, simplify coverage reporting --- .travis.yml | 2 +- tox.ini | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6944375..b347a14 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ script: after_success: - - if [[ "${TOX_ENV:0:2}" == 'py' ]]; then coveralls; fi + - if [[ "${TOX_ENV:0:2}" == 'py' ]]; then coverage combine; coveralls; fi notifications: irc: diff --git a/tox.ini b/tox.ini index e689da0..6923316 100644 --- a/tox.ini +++ b/tox.ini @@ -1,19 +1,11 @@ [tox] -envlist = py26, py27, pypy, docs, lint, apidocs +envlist = py27, pypy, docs, lint, apidocs [testenv] deps = coverage commands = - coverage erase - coverage run --source=./tubes {envbindir}/trial --rterrors {posargs:tubes} - coverage report -m - -[testenv:py26] -commands = - coverage erase - coverage run --source=./tubes {envbindir}/trial --rterrors {posargs:tubes} - coverage report -m + coverage run -p --source=./tubes {envbindir}/trial --rterrors {posargs:tubes} [testenv:docs] deps = From 4e28c674d8d27aa5910f9a73ffffa5e8839e7766 Mon Sep 17 00:00:00 2001 From: Glyph Date: Sun, 17 Jan 2016 19:47:01 -0800 Subject: [PATCH 11/11] improve coverage; test providedBy --- tubes/test/test_routing.py | 29 ++++++++++++++++++++++++++++- tubes/test/util.py | 8 ++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tubes/test/test_routing.py b/tubes/test/test_routing.py index f0b4225..258be60 100644 --- a/tubes/test/test_routing.py +++ b/tubes/test/test_routing.py @@ -10,7 +10,7 @@ from ..routing import Router, to, Routed from ..tube import series, receiver -from .util import FakeFount, FakeDrain, IFakeOutput, IFakeInput +from .util import FakeFount, FakeDrain, IFakeOutput, IFakeInput, FakeInput if 0: # Names used by PyDoctor. @@ -115,3 +115,30 @@ def test_ne(self): self.assertEqual(True, Routed(IFakeInput) != Routed(IFakeOutput)) self.assertEqual(True, Routed() != 7) + + def test_providedBy(self): + """ + L{Routed.providedBy} ensures that the given object is a L{to} and that + its payload provides the proper specification. + """ + router = Router() + route = router.newRoute() + self.assertEqual(False, + Routed(IFakeInput).providedBy(FakeInput())) + self.assertEqual(False, + Routed(IFakeInput).providedBy(to(route, object()))) + self.assertEqual(True, + Routed(IFakeInput).providedBy(to(route, FakeInput()))) + + + def test_providedByNone(self): + """ + L{Routed.providedBy} ensures that the given object is L{to} but makes + no assertions about its payload if given L{Routed} is given no + sub-specification. + """ + router = Router() + route = router.newRoute() + self.assertEqual(False, Routed().providedBy(object())) + self.assertEqual(True, Routed().providedBy(to(route, object()))) + diff --git a/tubes/test/util.py b/tubes/test/util.py index 0ebf41b..054a928 100644 --- a/tubes/test/util.py +++ b/tubes/test/util.py @@ -65,6 +65,14 @@ class IFakeInput(Interface): +@implementer(IFakeInput) +class FakeInput(object): + """ + An implementation of a sample interface. + """ + + + @implementer(IDrain) class FakeDrain(object): """