From 7ddb1d932e01c109fe85a4cba4cf8f350f07f24e Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Fri, 4 Dec 2020 14:41:08 +0800 Subject: [PATCH 1/9] Remove the functions that do not need to be overrided --- iottalkpy/dai.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index ea3d2dd..a2e36c7 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -143,13 +143,6 @@ def finalizer(self): except Exception as e: log.warning('dai process cleanup exception: %s', e) - def start(self, *args, **kwargs): - ret = super(DAI, self).start(*args, **kwargs) - # conduct deregistration properly, - # if one doesn't stop process before main process ends - atexit.register(self.terminate) - return ret - def run(self): # this function will be executed in child process self._check_parameter() @@ -211,21 +204,6 @@ def wait(self): except KeyboardInterrupt: self.join() # wait for deregistration - def terminate(self, *args, **kwargs): - ''' - Terminate DAI. - - This is a blocking call. - ''' - try: - self._event.set() - except Exception: - # this is triggered if the ``run`` function ended already. - pass - - self.join() - return super(DAI, self).terminate(*args, **kwargs) - def parse_df_profile(sa, typ): def f(p): From 1fa43a5016882da75bcbc9240d1493c0a5b0d3ca Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Fri, 4 Dec 2020 14:44:42 +0800 Subject: [PATCH 2/9] Remove atexit package --- iottalkpy/dai.py | 1 - 1 file changed, 1 deletion(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index a2e36c7..49e28aa 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -1,4 +1,3 @@ -import atexit import logging import os.path import platform From f2a4c16acbd1500767d4ecf2a0a976b34a3428f6 Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Fri, 4 Dec 2020 14:46:17 +0800 Subject: [PATCH 3/9] Replace manager object with event object --- iottalkpy/dai.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 49e28aa..6468a2e 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -7,7 +7,8 @@ import time import traceback -from multiprocessing import Process, Manager +from multiprocessing import Event as multiprocessingEvent +from multiprocessing import Process from threading import Thread, Event from uuid import UUID @@ -25,6 +26,8 @@ except ImportError: pass +terminate_event = multiprocessingEvent() + class DAI(Process): daemon = True @@ -37,10 +40,8 @@ def __init__(self, api_url, device_model, device_addr=None, push_interval=1, interval=None, device_features=None): super(DAI, self).__init__() - # Do not make the ``Manager`` object as an attribute of DAI object, - # since the attribute in DAI need to be picklable on Windows. - # The underlying implementation of multiprocessing requires that. - self._event = Manager().Event() # create Event proxy object at main process + # Use the Event object provided by the multiprocessing package + self._event = terminate_event self.api_url = api_url self.device_model = device_model From e3a470d3cac3c5ac468b9a403dcf29060c583b6e Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Fri, 4 Dec 2020 14:48:08 +0800 Subject: [PATCH 4/9] Fix naming conflicts --- iottalkpy/dai.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 6468a2e..b5b7f20 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -74,9 +74,9 @@ def push_data(self, df_name): self.dan.push(df_name, _data) time.sleep(self.interval.get(df_name, self.push_interval)) - def on_signal(self, signal, df_list): - log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal, df_list) - if 'CONNECT' == signal: + def on_signal(self, signal_, df_list): + log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal_, df_list) + if 'CONNECT' == signal_: for df_name in df_list: # race condition if not self.flags.get(df_name): @@ -84,13 +84,13 @@ def on_signal(self, signal, df_list): t = Thread(target=self.push_data, args=(df_name,)) t.daemon = True t.start() - elif 'DISCONNECT' == signal: + elif 'DISCONNECT' == signal_: for df_name in df_list: self.flags[df_name] = False - elif 'SUSPEND' == signal: + elif 'SUSPEND' == signal_: # Not use pass - elif 'RESUME' == signal: + elif 'RESUME' == signal_: # Not use pass return True From d1f45b3a652e8da53b08da6f826ba2a936d0a99b Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Fri, 4 Dec 2020 15:01:54 +0800 Subject: [PATCH 5/9] Refactor the termination procedure --- iottalkpy/dai.py | 54 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index b5b7f20..b2eec43 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -144,6 +144,17 @@ def finalizer(self): log.warning('dai process cleanup exception: %s', e) def run(self): # this function will be executed in child process + ''' + Child process ignores the two signals listed below: + + 1. SIGINT + 2. SIGTERM + + The child process will be asked to terminate by the parent process, + so ignoring these signals prevents it from affecting by the signals. + ''' + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) self._check_parameter() self.dan = Client() @@ -186,23 +197,12 @@ def f(): ) log.info('Press Ctrl+C to exit DAI.') - try: - self._event.wait() - except KeyboardInterrupt: - pass - finally: - self.finalizer() - def wait(self): - try: - if platform.system() == 'Windows' or sys.version_info.major == 2: - # workaround for https://bugs.python.org/issue35935 - while True: - time.sleep(86400) - else: - Event().wait() - except KeyboardInterrupt: - self.join() # wait for deregistration + # Wait the termination event + self._event.wait() + + # Start the clean-up procedure + self.finalizer() def parse_df_profile(sa, typ): @@ -304,9 +304,29 @@ def __init__(self, d): return App(d) +def signal_handler(signal_number, _): + log.warning('Received signal: %s', signal_number) + ''' + Set the termination event so the child process will start + the termination process. + ''' + terminate_event.set() + + def main(dai): + ''' + Add the signal handler for the two signals listed below: + + 1. SIGINT + 2. SIGTERM + + According to the documentation, both of the signals are available on + both of the Unix-like and the Windows platforms. + ''' + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) dai.start() - dai.wait() + dai.join() if __name__ == '__main__': From 57f25ed5f39061a621e0a643ef06ae28ca437282 Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Sat, 5 Dec 2020 13:24:28 +0800 Subject: [PATCH 6/9] Remove unused module --- iottalkpy/dai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 7b4b40b..44f38ff 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -8,7 +8,7 @@ from multiprocessing import Event as multiprocessingEvent from multiprocessing import Process -from threading import Thread, Event +from threading import Thread from uuid import UUID from iottalkpy.color import DAIColor From 14314a662f4ed9ee6e93f900551466d5cd7f8c31 Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Sat, 5 Dec 2020 13:41:49 +0800 Subject: [PATCH 7/9] Resolve the signal handler halt issue on the Windows platform --- iottalkpy/dai.py | 44 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 44f38ff..150538d 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -2,6 +2,7 @@ import os.path import platform import re +import signal import sys import time import traceback @@ -144,14 +145,17 @@ def finalizer(self): def run(self): # this function will be executed in child process ''' - Child process ignores the two signals listed below: + Child process ignores the signals listed below: 1. SIGINT 2. SIGTERM + 3. SIGBREAK (Only available on the Windows platform) The child process will be asked to terminate by the parent process, so ignoring these signals prevents it from affecting by the signals. ''' + if sys.platform.startswith('win'): + signal.signal(signal.SIGBREAK, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) self._check_parameter() @@ -195,7 +199,10 @@ def f(): on_disconnect=f ) - log.info('Press Ctrl+C to exit DAI.') + if sys.platform.startswith('win'): + log.info('Press Ctrl+C or Ctrl+BREAK to exit DAI.') + else: + log.info('Press Ctrl+C to exit DAI.') # Wait the termination event self._event.wait() @@ -314,18 +321,43 @@ def signal_handler(signal_number, _): def main(dai): ''' - Add the signal handler for the two signals listed below: + Add the signal handler for the signals listed below: 1. SIGINT 2. SIGTERM + 3. SIGBREAK (Only available on the Windows platform) + + According to the documentation, the former two signals are available on + both of the Unix-like and the Windows platforms. The SIGBREAK is only available + on the Windows platform, this signal can be emitted by pressing CTRL plus BREAK. - According to the documentation, both of the signals are available on - both of the Unix-like and the Windows platforms. + Ref: + 1. https://docs.microsoft.com/en-us/windows/console/ctrl-c-and-ctrl-break-signals + 2. https://docs.python.org/3/library/signal.html ''' + if sys.platform.startswith('win'): + signal.signal(signal.SIGBREAK, signal_handler) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) dai.start() - dai.join() + + if sys.platform.startswith('win'): + ''' + Since the underlying implementation of multiprocessing.Process.join does not use + alertable waits on the Windows platform, directly calling join causes the signal + handler will not be executed when the signal arrives. + A doable way adopted here is periodically sleeping, joining the subprocess + (timeout is set to 1 second) and then checking the exitcode of the subprocess + until it actually exits. + + Ref: + 1. https://stackoverflow.com/questions/43092371/ignore-sigint-in-python-multiprocessing-subprocess # noqa: E501 + ''' + while dai.exitcode is None: + time.sleep(1) + dai.join(1) + else: + dai.join() if __name__ == '__main__': From c9d2519ef2ed6811ac7c9598167c7e2f2df34f8a Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Wed, 9 Dec 2020 14:43:25 +0800 Subject: [PATCH 8/9] Fix naming issue --- iottalkpy/dai.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 150538d..42bc8d9 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -7,7 +7,7 @@ import time import traceback -from multiprocessing import Event as multiprocessingEvent +from multiprocessing import Event as MultiprocessingEvent from multiprocessing import Process from threading import Thread from uuid import UUID @@ -26,7 +26,7 @@ except ImportError: pass -terminate_event = multiprocessingEvent() +terminate_event = MultiprocessingEvent() class DAI(Process): From 7b59cc25908107d44be3e5070be233d6e3000749 Mon Sep 17 00:00:00 2001 From: Yu-Hao Chang Date: Wed, 9 Dec 2020 14:43:44 +0800 Subject: [PATCH 9/9] Update reference link --- iottalkpy/dai.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iottalkpy/dai.py b/iottalkpy/dai.py index 42bc8d9..109cd75 100644 --- a/iottalkpy/dai.py +++ b/iottalkpy/dai.py @@ -351,7 +351,7 @@ def main(dai): until it actually exits. Ref: - 1. https://stackoverflow.com/questions/43092371/ignore-sigint-in-python-multiprocessing-subprocess # noqa: E501 + 1. https://stackoverflow.com/a/43095532/8997651 ''' while dai.exitcode is None: time.sleep(1)