Skip to content

Commit

Permalink
Docs for subscribe_to
Browse files Browse the repository at this point in the history
  • Loading branch information
Bob Gregory committed Sep 14, 2018
1 parent e131e2a commit f45ecdb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ Eventstore will send each event to one consumer at a time. When you have handled
Volatile Subscriptions
~~~~~~~~~~~~~~~~~~~~~~
In a volatile subscription, state is stored by the client. When your application restarts, you must re-subscribe to the stream. There is no support in Eventstore for competing consumers to a volatile subscription. Volatile subscriptions can run against any node in a cluster.
In a Volatile Subscription, state is stored by the client. When your application restarts, you must re-subscribe to the stream. There is no support in Eventstore for competing consumers to a volatile subscription. Volatile subscriptions can run against any node in a cluster.
Volatile subsciptions do not support event acknowledgement.
Expand Down Expand Up @@ -256,4 +256,4 @@ If you provide both a `host` and `discovery_host`, photonpump will prefer discov
Debugging
~~~~~~~~~
If you want to step through code that uses photonpump, it's helpful to be aware that Event Store's TCP API (which photonpump uses) makes use of a 'heartbeat' to ensure that connections are not left open. This means that if you're sitting at a debugger (e.g. pdb) prompt -- and therefore not running the event loop for tens of seconds at a time -- you'll find that you get disconnected. To prevent that, you can run it with Event Store's heartbeat timeouts set to high values -- e.g. with a `Dockerfile` `like this <http://github.com/jlee1-made/resting-eventstore>`_.
If you want to step through code that uses photonpump, it's helpful to be aware that Event Store's TCP API (which photonpump uses) makes use of a 'heartbeat' to ensure that connections are not left open. This means that if you're sitting at a debugger (e.g. pdb) prompt -- and therefore not running the event loop for tens of seconds at a time -- you'll find that you get disconnected. To prevent that, you can run it with Event Store's heartbeat timeouts set to high values -- e.g. with a `Dockerfile` `like this <http://github.com/jlee1-made/resting-eventstore>`_.
44 changes: 44 additions & 0 deletions photonpump/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,59 @@ async def connect_subscription(
conversation_id=conversation_id,
)
future = await self.dispatcher.start_conversation(cmd)

return await future

async def subscribe_to(self, stream, resolve_link_tos=True, start_from=-1):

"""
Subscribe to receive notifications when a new event is published
to a stream.
Args:
stream: The name of the stream.
start_from: The first event to read.
This parameter defaults to the magic value -1 which is treated
as meaning "from the end of the stream". IF this value is used,
no historical events will be returned.
For any other value, photonpump will read all events from
start_from until the end of the stream in pages of max_size
before subscribing to receive new events as they arrive.
resolve_links (optional): True if eventstore should
automatically resolve Link Events, otherwise False.
required_master (optional): True if this command must be
sent direct to the master node, otherwise False.
correlation_id (optional): A unique identifer for this
command.
Returns:
A VolatileSubscription.
Examples:
>>> async with connection() as conn:
>>> # Subscribe only to NEW events on the cpu-metrics stream
>>> subs = await conn.subscribe_to("price-changes")
>>> async for event in subs.events:
>>> print(event)
>>> async with connection() as conn:
>>> # Read all historical events and then receive updates as they
>>> # arrive.
>>> subs = await conn.subscribe_to("price-changes", start_from=0)
>>> async for event in subs.events:
>>> print(event)
"""

if start_from == -1:
cmd = convo.SubscribeToStream(stream, resolve_link_tos)
else:
cmd = convo.CatchupSubscription(stream, start_from)

future = await self.dispatcher.start_conversation(cmd)

return await future

async def __aenter__(self):
Expand Down Expand Up @@ -929,6 +972,7 @@ async def stop(self):
self.dispatch_loop,
self.heartbeat_loop,
loop=self.loop,

return_exceptions=True,
)
self.transport.close()
Expand Down

0 comments on commit f45ecdb

Please sign in to comment.