Skip to content

Commit

Permalink
Merge pull request #46 from twisted/fanchat-as-test
Browse files Browse the repository at this point in the history
fanchat as unit test - fix some flow control bugs
  • Loading branch information
glyph committed Jan 18, 2016
2 parents 04c639a + 4e28c67 commit 8522684
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ script:


after_success:
- if [[ "${TOX_ENV:0:2}" == 'py' ]]; then coveralls; fi
- if [[ "${TOX_ENV:0:2}" == 'py' ]]; then coverage combine; coveralls; fi

notifications:
irc:
Expand Down
12 changes: 2 additions & 10 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
[tox]
envlist = py26, py27, pypy, docs, lint, apidocs
envlist = py27, pypy, docs, lint, apidocs

[testenv]
deps =
coverage
commands =
coverage erase
coverage run --source=./tubes {envbindir}/trial --rterrors {posargs:tubes}
coverage report -m

[testenv:py26]
commands =
coverage erase
coverage run --source=./tubes {envbindir}/trial --rterrors {posargs:tubes}
coverage report -m
coverage run -p --source=./tubes {envbindir}/trial --rterrors {posargs:tubes}

[testenv:docs]
deps =
Expand Down
13 changes: 6 additions & 7 deletions tubes/_siphon.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ def __init__(self, siphon):
def _actuallyPause():
fount = self._siphon._tdrain.fount
self._siphon._pending.suspend()
if fount is None:
return
self._siphon._pauseBecausePauseCalled = fount.pauseFlow()
if fount is not None:
pbpc = fount.pauseFlow()
else:
pbpc = NoPause()
self._siphon._pauseBecausePauseCalled = pbpc

def _actuallyResume():
fp = self._siphon._pauseBecausePauseCalled
Expand All @@ -179,10 +181,7 @@ def _actuallyResume():
self._siphon._pending.resume()
self._siphon._unbufferIterator()

# TODO: validate that the siphon's fount is always set consistently
# with _pauseBecausePauseCalled.
if fp is not None:
fp.unpause()
fp.unpause()

self._pauser = Pauser(_actuallyPause, _actuallyResume)

Expand Down
2 changes: 1 addition & 1 deletion tubes/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def receive(self, item):
pass
def flowStopped(self, reason):
pass
self.newRoute().flowTo(NullDrain())
self.newRoute("NULL").flowTo(NullDrain())
self.drain = self._out.drain


Expand Down
228 changes: 228 additions & 0 deletions tubes/test/test_chatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
# -*- test-case-name: tubes.test.test_listening -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Integration test for L{tubes.routing} and L{tubes.fan} implementing a chat
server.
"""

from unittest import TestCase

from zope.interface.common.mapping import IMapping

from tubes.routing import Router, Routed, to
from tubes.tube import series, tube, receiver
from tubes.fan import Out, In

@tube
class Participant(object):
"""
A single participant in a chat system.
"""
outputType = Routed(IMapping)

def __init__(self, hub, requestsFount, responsesDrain):
"""
Create a L{Participant}.
"""
self._hub = hub
self._in = In()
self._in.fount.flowTo(responsesDrain)

self._router = Router()
self._participating = {}

# `self._in' is both commands from our own client and also messages
# from other clients.
requestsFount.flowTo(series(self, self._router.drain))

self.client = self._router.newRoute("client")
self.client.flowTo(self._in.newDrain())


def received(self, item):
"""
An item was received.
@param item: A dictionary featuring a 'type' indicating which command
it is.
@return: a response routed to the router.
"""
kwargs = item.copy()
return getattr(self, "do_" + kwargs.pop("type"))(**kwargs)


def do_name(self, name):
"""
From client; set the name of this client.
@param name: The nickname for this client.
"""
self.name = name
yield to(self.client, dict(named=name))


def do_join(self, channel):
"""
From client; instruct this client to join a channel with the given
name.
@param channel: the name of the channel to join.
"""
fountFromChannel, drainToChannel = (
self._hub.channelNamed(channel).participate(self)
)
fountFromChannel.flowTo(self._in.newDrain())
fountToChannel = self._router.newRoute("->{}".format(channel))
fountToChannel.flowTo(drainToChannel)

self._participating[channel] = fountToChannel
yield to(self._participating[channel],
dict(type="joined"))


def do_speak(self, channel, message, id):
"""
From client; say something on the given channel.
@param channel: the name of the channel
@param message: the text of the message to relay
@param id: a unique identifier for this message
"""
yield to(self._participating[channel],
dict(type="spoke", message=message, id=id))



class Channel(object):
"""
A chat room.
"""
def __init__(self, name):
self._name = name
self._out = Out()
self._in = In()
self._in.fount.flowTo(self._out.drain)


def participate(self, participant):
"""
Create a new drain of messages going to this channel and a new fount of
messages coming from this channel, for the given participant.
@param participant: the name of the participant joining.
@return: a 2-tuple of (new fount, new drain)
"""
@receiver(IMapping, IMapping,
name="->addSender({}, {})".format(participant.name,
self._name))
def addSender(item):
yield dict(item, sender=participant.name, channel=self._name)

return (self._out.newFount(),
series(addSender, self._in.newDrain()))



@tube
class OnStop(object):
"""
Utility class to hook 'stopped' with a callable.
"""

def __init__(self, callback):
"""
Create an L{OnStop} with a callback.
"""
self.callback = callback


def received(self, item):
"""
We received a message; relay it on unmodified since we only care about
L{OnStop}.
@param item: anything
"""
yield item


def stopped(self, reason):
"""
The flow stopped; invoke the given callback.
@param reason: ignored.
@return: no results (empty iterable)
"""
self.callback()
return ()



class Hub(object):
"""
A chat hub; the nexus object for a whole channel namespace (i.e.: server).
"""
def __init__(self):
self.participants = []
self.channels = {}


def newParticipantFlow(self, flow):
"""
Create a flow for a new participant.
@param flow: a L{Flow} with a drain and a fount for receiving commands;
JSON-style dictionaries with a 'type' key indicating which verb to
invoke on L{Participant}.
"""
commandFount = flow.fount.flowTo(
series(OnStop(lambda: self.participants.remove(participant)))
)
commandDrain = flow.drain
participant = Participant(self, commandFount, commandDrain)
self.participants.append(participant)


def channelNamed(self, name):
"""
Retrieve a L{Channel} with the given name.
@param name: the name of the channel.
@return: a L{Channel}.
"""
if name not in self.channels:
self.channels[name] = Channel(name)
return self.channels[name]



class ChatTests(TestCase):
"""
Integration test cases for putting together fan.In and fan.Out in a useful
configuration for pubsub or multi-user chat.
"""

def test_joining(self):
"""
Test that we receive a response from joining.
"""
from ..listening import Flow
from .util import FakeFount, FakeDrain
h = Hub()
ff = FakeFount()
fd = FakeDrain()
h.newParticipantFlow(Flow(ff, fd))
ff.drain.receive({"type": "name", "name": "bob"})
self.assertEqual(fd.received.pop(0), {"named": "bob"})
ff.drain.receive({"type": "join", "channel": "bobs"})
self.assertEqual(fd.received, [{"type": "joined",
"sender": "bob",
"channel": "bobs"}])
29 changes: 28 additions & 1 deletion tubes/test/test_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ..routing import Router, to, Routed
from ..tube import series, receiver
from .util import FakeFount, FakeDrain, IFakeOutput, IFakeInput
from .util import FakeFount, FakeDrain, IFakeOutput, IFakeInput, FakeInput

if 0:
# Names used by PyDoctor.
Expand Down Expand Up @@ -115,3 +115,30 @@ def test_ne(self):
self.assertEqual(True, Routed(IFakeInput) != Routed(IFakeOutput))
self.assertEqual(True, Routed() != 7)


def test_providedBy(self):
"""
L{Routed.providedBy} ensures that the given object is a L{to} and that
its payload provides the proper specification.
"""
router = Router()
route = router.newRoute()
self.assertEqual(False,
Routed(IFakeInput).providedBy(FakeInput()))
self.assertEqual(False,
Routed(IFakeInput).providedBy(to(route, object())))
self.assertEqual(True,
Routed(IFakeInput).providedBy(to(route, FakeInput())))


def test_providedByNone(self):
"""
L{Routed.providedBy} ensures that the given object is L{to} but makes
no assertions about its payload if given L{Routed} is given no
sub-specification.
"""
router = Router()
route = router.newRoute()
self.assertEqual(False, Routed().providedBy(object()))
self.assertEqual(True, Routed().providedBy(to(route, object())))

2 changes: 1 addition & 1 deletion tubes/test/test_tube.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def started(self):
srs = series(PassthruTube(), aStarter,
PassthruTube())
nextFount = self.ff.flowTo(srs)
self.assertEqual(self.ff.flowIsPaused, 0)
self.assertEqual(self.ff.flowIsPaused, 1)
nextFount.flowTo(self.fd)
self.assertEqual(self.ff.flowIsPaused, 0)
self.assertEqual(self.fd.received, ["greeting"])
Expand Down
8 changes: 8 additions & 0 deletions tubes/test/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ class IFakeInput(Interface):



@implementer(IFakeInput)
class FakeInput(object):
"""
An implementation of a sample interface.
"""



@implementer(IDrain)
class FakeDrain(object):
"""
Expand Down

0 comments on commit 8522684

Please sign in to comment.