Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Haohao/fix halt problem #40

Closed
wants to merge 12 commits into from
134 changes: 82 additions & 52 deletions iottalkpy/dai.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import atexit
import logging
import os.path
import platform
import re
import signal
import sys
import time
import traceback

from multiprocessing import Process, Manager
from threading import Thread, Event
from multiprocessing import Event as MultiprocessingEvent
from multiprocessing import Process
from threading import Thread
from uuid import UUID

from iottalkpy.color import DAIColor
Expand All @@ -25,6 +26,8 @@
except ImportError:
pass

terminate_event = MultiprocessingEvent()


class DAI(Process):
daemon = True
Expand All @@ -37,10 +40,8 @@ def __init__(self, api_url, device_model, device_addr=None,
push_interval=1, interval=None, device_features=None):
super(DAI, self).__init__()

# Do not make the ``Manager`` object as an attribute of DAI object,
# since the attribute in DAI need to be picklable on Windows.
# The underlying implementation of multiprocessing requires that.
self._event = Manager().Event() # create Event proxy object at main process
# Use the Event object provided by the multiprocessing package
self._event = terminate_event

self.api_url = api_url
self.device_model = device_model
Expand Down Expand Up @@ -73,23 +74,23 @@ def push_data(self, df_name):
self.dan.push(df_name, _data)
time.sleep(self.interval.get(df_name, self.push_interval))

def on_signal(self, signal, df_list):
log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal, df_list)
if 'CONNECT' == signal:
def on_signal(self, signal_, df_list):
log.info('Receive signal: \033[1;33m%s\033[0m, %s', signal_, df_list)
if 'CONNECT' == signal_:
for df_name in df_list:
# race condition
if not self.flags.get(df_name):
self.flags[df_name] = True
t = Thread(target=self.push_data, args=(df_name,))
t.daemon = True
t.start()
elif 'DISCONNECT' == signal:
elif 'DISCONNECT' == signal_:
for df_name in df_list:
self.flags[df_name] = False
elif 'SUSPEND' == signal:
elif 'SUSPEND' == signal_:
# Not use
pass
elif 'RESUME' == signal:
elif 'RESUME' == signal_:
# Not use
pass
return True
Expand Down Expand Up @@ -142,14 +143,21 @@ def finalizer(self):
except Exception as e:
log.warning('dai process cleanup exception: %s', e)

def start(self, *args, **kwargs):
ret = super(DAI, self).start(*args, **kwargs)
# conduct deregistration properly,
# if one doesn't stop process before main process ends
atexit.register(self.terminate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這個被拔掉後,main thread 結束,沒有送出 deregister message

In [5]: m = dai.load_module('./sa.py')

In [6]: m
Out[6]: <module 'sa' from '/home/iblis/git/iottalk-py/tmp/sa.py'>

In [7]: da = dai.module_to_sa(m)

In [8]: da
Out[8]: <DAI(DAI-1, initial daemon)>

In [9]: da.start
Out[9]: <bound method BaseProcess.start of <DAI(DAI-1, initial daemon)>>
                                                                                                                                                                             
In [10]: da.start()
                                                                                                                                                                             
In [11]: INFO:DAN:Successfully connect to http://panettone.iottalk.tw:11150/csm.
INFO:DAN:Device ID: ff73f56c-4439-46e4-81eb-03372e08e18d.
INFO:DAN:Device name: 00.Dummy_Device.
register successfully
INFO:DAI:Press Ctrl+C to exit DAI.
In [11]: 
                                                                                                                                                                             
In [11]: 
                                                                                                                                                                             
In [11]:                                                                                                                                                                      
Do you really want to exit ([y]/n)? y
                                                                                                                                                                             
                                                                                                                                                                             
                                                                                                                                                                             
                                                                                                                                                                             

^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我先把 PR 改成 Draft,之後來研究有沒有好的做法

return ret

def run(self): # this function will be executed in child process
'''
Child process ignores the signals listed below:

1. SIGINT
2. SIGTERM
3. SIGBREAK (Only available on the Windows platform)

The child process will be asked to terminate by the parent process,
so ignoring these signals prevents it from affecting by the signals.
'''
if sys.platform.startswith('win'):
signal.signal(signal.SIGBREAK, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
self._check_parameter()

self.dan = Client()
Expand Down Expand Up @@ -191,39 +199,16 @@ def f():
on_disconnect=f
)

log.info('Press Ctrl+C to exit DAI.')
try:
self._event.wait()
except KeyboardInterrupt:
pass
finally:
self.finalizer()

def wait(self):
try:
if platform.system() == 'Windows' or sys.version_info.major == 2:
# workaround for https://bugs.python.org/issue35935
while True:
time.sleep(86400)
else:
Event().wait()
except KeyboardInterrupt:
self.join() # wait for deregistration

def terminate(self, *args, **kwargs):
'''
Terminate DAI.
if sys.platform.startswith('win'):
log.info('Press Ctrl+C or Ctrl+BREAK to exit DAI.')
else:
log.info('Press Ctrl+C to exit DAI.')

This is a blocking call.
'''
try:
self._event.set()
except Exception:
# this is triggered if the ``run`` function ended already.
pass
# Wait the termination event
self._event.wait()

self.join()
return super(DAI, self).terminate(*args, **kwargs)
# Start the clean-up procedure
self.finalizer()


def parse_df_profile(sa, typ):
Expand Down Expand Up @@ -325,9 +310,54 @@ def __init__(self, d):
return App(d)


def signal_handler(signal_number, _):
log.warning('Received signal: %s', signal_number)
'''
Set the termination event so the child process will start
the termination process.
'''
terminate_event.set()


def main(dai):
'''
Add the signal handler for the signals listed below:

1. SIGINT
2. SIGTERM
3. SIGBREAK (Only available on the Windows platform)

According to the documentation, the former two signals are available on
both of the Unix-like and the Windows platforms. The SIGBREAK is only available
on the Windows platform, this signal can be emitted by pressing CTRL plus BREAK.

Ref:
1. https://docs.microsoft.com/en-us/windows/console/ctrl-c-and-ctrl-break-signals
2. https://docs.python.org/3/library/signal.html
'''
if sys.platform.startswith('win'):
signal.signal(signal.SIGBREAK, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
dai.start()
dai.wait()

if sys.platform.startswith('win'):
'''
Since the underlying implementation of multiprocessing.Process.join does not use
alertable waits on the Windows platform, directly calling join causes the signal
handler will not be executed when the signal arrives.
A doable way adopted here is periodically sleeping, joining the subprocess
(timeout is set to 1 second) and then checking the exitcode of the subprocess
until it actually exits.

Ref:
1. https://stackoverflow.com/a/43095532/8997651
'''
while dai.exitcode is None:
time.sleep(1)
dai.join(1)
else:
dai.join()


if __name__ == '__main__':
Expand Down