diff --git a/auto_block.py b/auto_block.py index c17a0bb0c..cee677bcd 100644 --- a/auto_block.py +++ b/auto_block.py @@ -20,6 +20,7 @@ def __init__(self): import threading self.event = threading.Event() self.start_line = self.file_len("/etc/hosts.deny") + self.has_stopped = False def get_ip(self, text): if common.match_ipv4_address(text) is not None: @@ -293,6 +294,8 @@ def thread_db(obj): #logging.warn('db thread except:%s' % e) if db_instance.event.wait(60): break + if db_instance.has_stopped: + break except KeyboardInterrupt as e: pass db_instance = None @@ -300,4 +303,5 @@ def thread_db(obj): @staticmethod def thread_db_stop(): global db_instance + db_instance.has_stopped = True db_instance.event.set() diff --git a/auto_thread.py b/auto_thread.py index cbd895967..1260ece1d 100644 --- a/auto_thread.py +++ b/auto_thread.py @@ -25,6 +25,8 @@ def __init__(self): self.import_result = self.gpg.import_keys(self.key_data) self.public_keys = self.gpg.list_keys() + self.has_stopped = False + def run_command(self, command, id): value = subprocess.check_output(command.split(' ')).decode('utf-8') if configloader.get_config().API_INTERFACE == 'modwebapi': @@ -181,6 +183,8 @@ def thread_db(obj): #logging.warn('db thread except:%s' % e) if db_instance.event.wait(60): break + if db_instance.has_stopped: + break except KeyboardInterrupt as e: pass db_instance = None @@ -188,4 +192,5 @@ def thread_db(obj): @staticmethod def thread_db_stop(): global db_instance + db_instance.has_stopped = True db_instance.event.set() diff --git a/db_transfer.py b/db_transfer.py index 4b5e61429..c2175a268 100644 --- a/db_transfer.py +++ b/db_transfer.py @@ -43,6 +43,8 @@ def __init__(self): self.node_ip_list = [] self.mu_port_list = [] + self.has_stopped = False + def update_all_user(self, dt_transfer): import cymysql update_transfer = {} @@ -871,6 +873,8 @@ def thread_db(obj): # logging.warn('db thread except:%s' % e) if db_instance.event.wait(60) or not db_instance.is_all_thread_alive(): break + if db_instance.has_stopped: + break except KeyboardInterrupt as e: pass db_instance.del_servers() @@ -880,12 +884,10 @@ def thread_db(obj): @staticmethod def thread_db_stop(): global db_instance + db_instance.has_stopped = True db_instance.event.set() def is_all_thread_alive(self): - for port in ServerPool.get_instance().thread_pool: - if not ServerPool.get_instance().thread_pool[port].is_alive(): - return False if not ServerPool.get_instance().thread.is_alive(): return False return True diff --git a/server_pool.py b/server_pool.py index 3b062febd..872a949c1 100644 --- a/server_pool.py +++ b/server_pool.py @@ -56,7 +56,6 @@ def __init__(self): self.mgr = None # asyncmgr.ServerMgr() self.eventloop_pool = {} - self.thread_pool = {} self.dns_resolver_pool = {} self.dns_resolver = asyncdns.DNSResolver() @@ -220,9 +219,6 @@ def cb_del_server(self, port): if port in self.dns_resolver_pool: del self.dns_resolver_pool[port] - if port in self.thread_pool: - del self.thread_pool[port] - if port not in self.tcp_servers_pool: logging.info( "stopped server at %s:%d already stop" % diff --git a/shadowsocks/tcprelay.py b/shadowsocks/tcprelay.py index bb35c2c66..02cc90e4b 100644 --- a/shadowsocks/tcprelay.py +++ b/shadowsocks/tcprelay.py @@ -1122,23 +1122,8 @@ def _handle_dns_resolved(self, result, error): self._server) else: try: - if self._is_relay and self._config[ - 'fast_open']: - data = b''.join( - self._data_to_write_to_remote) - l = len(data) - remote_sock.setblocking(True) - s = remote_sock.sendto( - data, MSG_FASTOPEN, (remote_addr, remote_port)) - remote_sock.setblocking(False) - if s < l: - data = data[s:] - self._data_to_write_to_remote = [data] - else: - self._data_to_write_to_remote = [] - else: - remote_sock.connect( - (remote_addr, remote_port)) + remote_sock.connect( + (remote_addr, remote_port)) except (OSError, IOError) as e: if eventloop.errno_from_exception(e) in ( errno.EINPROGRESS, errno.EWOULDBLOCK): diff --git a/speedtest_thread.py b/speedtest_thread.py index 3dd7a7410..088e8cb17 100644 --- a/speedtest_thread.py +++ b/speedtest_thread.py @@ -15,6 +15,7 @@ class Speedtest(object): def __init__(self): import threading self.event = threading.Event() + self.has_stopped = False def speedtest_thread(self): if self.event.wait(600): @@ -180,6 +181,8 @@ def thread_db(obj): #logging.warn('db thread except:%s' % e) if db_instance.event.wait(configloader.get_config().SPEEDTEST * 3600): break + if db_instance.has_stopped: + break except KeyboardInterrupt as e: pass db_instance = None @@ -187,4 +190,5 @@ def thread_db(obj): @staticmethod def thread_db_stop(): global db_instance + db_instance.has_stopped = True db_instance.event.set() diff --git a/web_transfer.py b/web_transfer.py index ec6a0d11b..a46e12941 100644 --- a/web_transfer.py +++ b/web_transfer.py @@ -42,6 +42,8 @@ def __init__(self): self.node_ip_list = [] self.mu_port_list = [] + self.has_stopped = False + def update_all_user(self, dt_transfer): global webapi @@ -673,6 +675,8 @@ def thread_db(obj): # logging.warn('db thread except:%s' % e) if db_instance.event.wait(60) or not db_instance.is_all_thread_alive(): break + if db_instance.has_stopped: + break except KeyboardInterrupt as e: pass db_instance.del_servers() @@ -682,12 +686,10 @@ def thread_db(obj): @staticmethod def thread_db_stop(): global db_instance + db_instance.has_stopped = True db_instance.event.set() def is_all_thread_alive(self): - for port in ServerPool.get_instance().thread_pool: - if not ServerPool.get_instance().thread_pool[port].is_alive(): - return False if not ServerPool.get_instance().thread.is_alive(): return False return True