Skip to content

Commit

Permalink
Merge branch 'fix-1178' into 'develop'
Browse files Browse the repository at this point in the history
Refactor TaurusPollingTimer to avoid deadlock

Closes taurus-org#1178

See merge request taurus-org/taurus!1181
  • Loading branch information
Carlos Pascual committed Mar 17, 2021
2 parents 7f7933f + 3d149a4 commit 7055d27
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 23 deletions.
1 change: 0 additions & 1 deletion lib/taurus/core/tango/test/test_tangofactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,5 @@ def test_cleanup_after_polling(nodb_dev, attrname):
# TODO: remove the gc.collect() when PyTango !412 is fixed
gc.collect() # avoid PyTango bug 412
time.sleep(polling_period * 2)
gc.collect() # avoid PyTango bug 412
assert sorted(f.tango_attrs) == old_attrs
assert sorted(f.tango_devs) == old_devs
66 changes: 44 additions & 22 deletions lib/taurus/core/tauruspollingtimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@

import weakref
import threading
import time

from .util.log import Logger
from .util.containers import CaselessWeakValueDict, CaselessDict
from .util.timer import Timer


__all__ = ["TaurusPollingTimer"]

__docformat__ = "restructuredtext"



class TaurusPollingTimer(Logger):
""" Polling timer manages a list of attributes that have to be polled in
the same period """
Expand All @@ -48,19 +50,45 @@ def __init__(self, period, parent=None):
:param parent: (Logger) parent object (default is None)
"""
name = "TaurusPollingTimer[%d]" % period
self.call__init__(Logger, name, parent)
super(TaurusPollingTimer, self).__init__(name=name, parent=parent)
self._period = period / 1000. # we store it internally in seconds
self.dev_dict = {}
self.attr_nb = 0
self.timer = Timer(period / 1000.0, self._pollAttributes, self)
self.lock = threading.RLock()
self.timer = None
self.lock = threading.Lock()
self.__thread = threading.Thread(target=self.__run, name=name)
self.__thread.setDaemon(True)
self._started = True
self.__thread.start()

def __run(self):
""" Private Thread Function """
next_time = time.time() + self._period
while True:
if not self._started:
# emulate stopped
time.sleep(self._period)
continue
self._pollAttributes()
curr_time = time.time()
nap = max(0, next_time - curr_time)
if curr_time > next_time:
self.warning(
"loop function took more than loop interval (%ss)",
self._period
)
next_time += self._period
time.sleep(nap)

def start(self):
""" Starts the polling timer """
self.timer.start()
self.deprecated("TaurusPollingTimer.start()", rel="4.7.1")
self._started = True

def stop(self, sync=False):
""" Stop the polling timer"""
self.timer.stop(sync=sync)
self.deprecated("TaurusPollingTimer.stop()", rel="4.7.1")
self._started = False

def containsAttribute(self, attribute):
"""Determines if the polling timer already contains this attribute
Expand All @@ -71,12 +99,9 @@ def containsAttribute(self, attribute):
False otherwise
"""
dev, attr_name = attribute.getParentObj(), attribute.getSimpleName()
self.lock.acquire()
try:
with self.lock:
attr_dict = self.dev_dict.get(dev)
return attr_dict and attr_name in attr_dict
finally:
self.lock.release()

def getAttributeCount(self):
"""Returns the number of attributes registered for polling
Expand All @@ -85,14 +110,19 @@ def getAttributeCount(self):
"""
return self.attr_nb

def addAttribute(self, attribute, auto_start=True):
def addAttribute(self, attribute, auto_start=None):
"""Registers the attribute in this polling.
:param attribute: (taurus.core.taurusattribute.TaurusAttribute) the attribute to be added
:param auto_start: (bool) if True (default) it tells the polling timer
that it should startup as soon as there is at least
one attribute registered.
"""
if auto_start == False:
self.deprecated(
"TaurusPollingTimer.addAttribute auto_start argument",
rel="4.7.1")

with self.lock:
dev, attr_name = attribute.getParentObj(), attribute.getSimpleName()
attr_dict = self.dev_dict.get(dev)
Expand All @@ -104,11 +134,6 @@ def addAttribute(self, attribute, auto_start=True):
if attr_name not in attr_dict:
attr_dict[attr_name] = attribute
self.attr_nb += 1
if self.attr_nb == 1 and auto_start:
self.start()
else:
import taurus
taurus.Manager().enqueueJob(attribute.poll)

def removeAttribute(self, attribute):
"""Unregisters the attribute from this polling. If the number of registered
Expand All @@ -127,16 +152,14 @@ def removeAttribute(self, attribute):
self.attr_nb -= 1
if not attr_dict:
del self.dev_dict[dev]
if not self.dev_dict:
self.stop(sync=True)

def _pollAttributes(self):
"""Polls the registered attributes. This method is called by the timer
when it is time to poll. Do not call this method directly
"""
req_ids = {}
dev_dict = {}
with self.lock:
req_ids = {}
dev_dict = {}
for dev, attrs in self.dev_dict.items():
if dev.factory().caseSensitive:
dev_dict[dev] = dict(attrs)
Expand All @@ -155,5 +178,4 @@ def _pollAttributes(self):
try:
dev.poll(attrs, req_id=req_id)
except Exception as e:
self.error("poll_reply error %r", e)

self.error("poll_reply error %r", e)
61 changes: 61 additions & 0 deletions lib/taurus/core/test/test_pollingtimer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@

from taurus.core import TaurusPollingTimer
import taurus
import sys
import pytest
import signal
import time


class _Timeout:
"""Timeout context. Inspired by https://stackoverflow.com/a/59997890 """
def __init__(self, seconds=1, msg='Timeout'):
self.seconds = seconds
self.msg = msg

def handle_timeout(self, signum, frame):
pytest.fail(self.msg)

def __enter__(self):
signal.signal(signal.SIGALRM, self.handle_timeout)
signal.alarm(self.seconds)

def __exit__(self, type, value, traceback):
signal.alarm(0)


def test_polling():
period = 50
a = taurus.Attribute("eval:'test_polling'")

class _Listener:
def __init__(self):
self.count = 0

def cb(self, *_):
self.count += 1

listener = _Listener()
a.addListener(listener.cb)
pt = TaurusPollingTimer(period=period)
pt.addAttribute(a)
time.sleep(period * 4 / 1000.)
assert listener.count > 0
assert listener.count < 7


def test_bug_1178():
"""
Test if we are affected by https://github.com/taurus-org/taurus/issues/1178
"""
a = taurus.Attribute("eval:1")
pt = TaurusPollingTimer(period=500) # poll every 500ms
with _Timeout(seconds=3, msg="Deadlock in TaurusPollingTimer (see #1178)"):
for i in range(100): # this should finish in << 1s
pt.addAttribute(a)
print(i, end=" ")
sys.stdout.flush()
pt.removeAttribute(a)
assert i == 99


0 comments on commit 7055d27

Please sign in to comment.