Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(recipe): add ExistingDataWatch class #648

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/api/recipe/watchers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Public API

.. automethod:: __call__

.. autoclass:: ExistingDataWatch
:members:

.. automethod:: __init__

.. automethod:: __call__

.. autoclass:: ChildrenWatch
:members:
Expand Down
3 changes: 2 additions & 1 deletion kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from kazoo.recipe.partitioner import SetPartitioner
from kazoo.recipe.party import Party, ShallowParty
from kazoo.recipe.queue import Queue, LockingQueue
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
from kazoo.recipe.watchers import ChildrenWatch, DataWatch, ExistingDataWatch


string_types = six.string_types
Expand Down Expand Up @@ -352,6 +352,7 @@ def _retry(*args, **kwargs):
self.DoubleBarrier = partial(DoubleBarrier, self)
self.ChildrenWatch = partial(ChildrenWatch, self)
self.DataWatch = partial(DataWatch, self)
self.ExistingDataWatch = partial(ExistingDataWatch, self)
self.Election = partial(Election, self)
self.NonBlockingLease = partial(NonBlockingLease, self)
self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self)
Expand Down
61 changes: 61 additions & 0 deletions kazoo/recipe/watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,67 @@ def _session_watcher(self, state):
self._client.handler.spawn(self._get_data)


class ExistingDataWatch(DataWatch):
"""Watches a node for data updates and calls the specified
function each time it changes

Similar to :class:`~kazoo.recipes.watchers.DataWatch`, but it does
not operate on nodes which do not exist.

The function will also be called the very first time its
registered to get the data.

Returning `False` from the registered function will disable future
data change calls. If the client connection is closed (using the
close command), the DataWatch will no longer get updates.

If the function supplied takes three arguments, then the third one
will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will
only be set if the change to the data occurs as a result of the
server notifying the watch that there has been a change. Events
like reconnection or the first call will not include an event.

If the node does not exist on creation then the function will be
called with ``None`` for all values and no futher callbacks will
occur. If the node is deleted after the watch is created, the
function will be called with the event argument indicating a
delete event and no further callbacks will occur.
"""

@_ignore_closed
def _get_data(self, event=None):
# Ensure this runs one at a time, possible because the session
# watcher may trigger a run
with self._run_lock:
if self._stopped:
return

initial_version = self._version

try:
data, stat = self._retry(self._client.get,
self._path, self._watcher)
except NoNodeError:
data = stat = None

# No node data, clear out version
if stat is None:
self._version = None
else:
self._version = stat.mzxid

# Call our function if its the first time ever, or if the
# version has changed
if initial_version != self._version or not self._ever_called:
self._log_func_exception(data, stat, event)

# If the node doesn't exist, we won't be watching any more
if stat is None:
self._stopped = True
self._func = None
self._client.remove_listener(self._session_watcher)


class ChildrenWatch(object):
"""Watches a node for children updates and calls the specified
function each time it changes
Expand Down
76 changes: 76 additions & 0 deletions kazoo/tests/test_watchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,82 @@ def changed(val, stat):
assert b is False


class KazooExistingDataWatcherTests(KazooTestCase):
def setUp(self):
super(KazooExistingDataWatcherTests, self).setUp()
self.path = "/" + uuid.uuid4().hex
self.client.ensure_path(self.path)

def test_data_watcher_non_existent_path(self):
update = threading.Event()
data = [True]

# Make it a non-existent path
self.path += 'f'

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data == [None]
update.clear()

# We should not get an update
self.client.create(self.path, b'fred')
update.wait(0.2)
assert data == [None]
update.clear()

def test_data_watcher_existing_path(self):
update = threading.Event()
data = [True]

# Make it an existing path
self.path += 'f'
self.client.create(self.path, b'fred')

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data[0] == b'fred'
update.clear()

def test_data_watcher_delete(self):
update = threading.Event()
data = [True]

# Make it an existing path
self.path += 'f'
self.client.create(self.path, b'fred')

@self.client.ExistingDataWatch(self.path)
def changed(d, stat):
data.pop()
data.append(d)
update.set()

update.wait(10)
assert data[0] == b'fred'
update.clear()

self.client.delete(self.path)
update.wait(10)
assert data == [None]
update.clear()

self.client.create(self.path, b'ginger')
update.wait(0.2)
assert data == [None]
update.clear()


class KazooChildrenWatcherTests(KazooTestCase):
def setUp(self):
super(KazooChildrenWatcherTests, self).setUp()
Expand Down