Skip to content
Ivars Karpics edited this page May 12, 2016 · 1 revision

Poller module

The Hardware Repository package from mxCuBE v2 branch brings a new Poller module.

The Poller module can be used to do polling on control systems objects (like Taco or Tango servers) ; the goal is to have a well-integrated facility for such task, instead of re-inventing the wheel for every control system. Also, it is using multi-threading to make sure calls to the control system object won't block mxCuBE graphical interface (provided that Python GIL is released and acquired again in an appropriate manner by the underlying control system library).

How to use the Poller module

The Poller module comes with a "poll" function, taking 6 arguments:

def poll(polled_call, 
         polled_call_args = (),
         polling_period = 1000,
         value_changed_callback = None, 
         error_callback = None, 
         compare = True)

"polled_call" is the callable object (e.g a Python function or method) that will be regularly called in a separate thread. The second argument is a tuple with the arguments to use for the call. The polling period is in milliseconds. When the polled call returns, if "compare" is True (by default) "value_changed_callback" is called. It is always called if compare is False. In case of exception, "error_callback" is called.

No reference is kept to the callable object - if the callable object is garbage collected, polling stops (without raising an error).

The "poll" function returns a Poller object. If not needed, you can ignore it. Internally, the Poller module keeps track of all Poller objects. Keeping the Poller object is useful though to call its methods:

stop() : stops polling
restart(delay=0) : restart polling with the new specified
polled call, after delay milliseconds have passed
get_polling_period()
set_polling_period(milliseconds)
get_id() : returns a unique id for the Poller object

In case of exception when executing the polled call, polling is not restarted automatically: the error callback is fired, and polling has to be restarted using the "restart" call.

When using the "restart" call, a new Poller object is returned.

The callback receives the original exception that occured at polling time, and the poller id ; so you can use the same error callback for different Poller objects.

Here is a typical example of error callback: The error callback signature is the following :

def example_error_callback(exception, poller_id):
  try:
    raise exception
  except:
    logging.exception("Uncaught exception while polling")
 
  # reinitialize/reconnect to underlying control system ?
  # then restart polling after a delay
  poller = Poller.get_poller(poller_id)
  logging.info("restarting polling in 1 second")
  poller.restart(polled_call, 1000)

How does Poller ensures thread safety The Poller calls the polled callable object in a separate thread. But the callbacks (value changed and on error) are executed in the main mxCuBE thread.

Starting from mxCuBE v2, 2 loops are running in the main thread: Qt's loop gevent loop (more precisely, there is only one loop the Qt one that also runs gevent processing in the current implementation) The Poller interacts with the gevent loop, as it is not part of the GUI part of mxCuBE. gevent has a mechanism to deal with multiple threads called the "asynchronous watcher". Each Poller object creates an async watcher in its constructor:

class _Poller(threading.Thread):
    def __init__(self, polled_call, polled_call_args=(), polling_period=1000, value_changed_callback=None, error_callback=None, compare=True):
        threading.Thread.__init__(self)
        ...
        self.async_watcher = gevent.get_hub().loop.async()

It is associated to a callback, as soon as the Poller thread starts, using the "start" method of the async watcher object:

self.async_watcher.start(self.new_event)

From the Poller thread, it is enough to call "self.async_watcher.send()" to have the "self.new_event" callback being executed by gevent loop, as soon as it can, in a thread safe manner. In order to pass data between the threads, a Queue object can be used. Indeed, when the "new_event" callback is executed by gevent loop, in the meantime the other thread is not blocked. The way data is passed has to thread-safe too.

Here is a schematic view of the whole thing:

         THREAD A (gevent loop)             |               THREAD B
                                            |
aw = gevent.get_hub().loop.async_watcher()  | self.aw = aw
q = Queue.Queue()                           | self.q = q
def my_thread_safe_callback():              |
  ...                                       |
aw.start(my_thread_safe_callback)           |
                                            | self.run() => now in thread body
                                            | ...
                                            | from thread body: self.q.put(my_data)
                                            |                   self.aw.send()
=> in my_thread_safe_callback:              |
   my_data = q.get()                        |

Real world examples of Poller module usage within mxCuBE The Taco Command module The Tango Command module Additionally to the Poller usage, the Tango Command module also makes another use of the gevent async watcher objects with Tango events.

Indeed, Tango events are received in a callback, from the Tango threads. So the final "user" callback has to be received within the gevent loop - the same trick as described above is used. In general, any thread-safety issue between a gevent loop and a thread can be solved like described above.