From 4b656cffa80544ddec0a2c5a568662373bf608e2 Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 23 Jun 2020 17:36:58 +0800 Subject: [PATCH 1/7] brpc multi client thread Signed-off-by: WangXi --- python/paddle_edl/distill/distill_reader.py | 2 +- python/paddle_edl/distill/distill_worker.py | 135 ++++++++++++++++---- 2 files changed, 111 insertions(+), 26 deletions(-) diff --git a/python/paddle_edl/distill/distill_reader.py b/python/paddle_edl/distill/distill_reader.py index 55ef0c33..0a3826b6 100644 --- a/python/paddle_edl/distill/distill_reader.py +++ b/python/paddle_edl/distill/distill_reader.py @@ -211,7 +211,7 @@ def _init_args(self): self._reader_out_queue = mps.Queue() self._reader_stop_event = mps.Event() self._reader_cond = mps.Condition() - self._task_semaphore = mps.Semaphore(2 * self._require_num + 2) + self._task_semaphore = mps.Semaphore(4 * self._require_num) # predict self._predict_server_queue = mps.Queue(self._require_num) diff --git a/python/paddle_edl/distill/distill_worker.py b/python/paddle_edl/distill/distill_worker.py index 9c9691d5..e5fe8e85 100644 --- a/python/paddle_edl/distill/distill_worker.py +++ b/python/paddle_edl/distill/distill_worker.py @@ -20,6 +20,7 @@ import sys import time +from concurrent import futures from paddle_serving_client import Client from six.moves import queue from six.moves import reduce @@ -357,28 +358,92 @@ def predict_signal_handle(signum, frame): six.reraise(*sys.exc_info()) -def predict_loop(server_item, working_predict_count, in_queue, out_queue, - feeds, fetchs, conf_file, stop_events, predict_lock, - global_finished_task, predict_cond): +def predict_loop(server_item, + working_predict_count, + in_queue, + out_queue, + feeds, + fetchs, + conf_file, + stop_events, + predict_lock, + global_finished_task, + predict_cond, + max_concurrent=3): logger.info('connect server={}'.format(server_item.server)) predict_server = PaddlePredictServer if _NOP_PREDICT_TEST is False else _TestNopPaddlePredictServer - client = predict_server(server_item.server, conf_file, feeds, fetchs) - if not client.connect(): - return False + idx = 0 + clients = [] + for _ in range(max_concurrent): + client = predict_server(server_item.server, conf_file, feeds, fetchs) + if not client.connect(): + return False + clients.append(client) + + tasks = [None for _ in range(max_concurrent)] stop_event = stop_events[server_item.stop_event_id] with predict_lock: working_predict_count.value += 1 + thread_pool = futures.ThreadPoolExecutor( + max_concurrent, thread_name_prefix=server_item.server) + time_line = _TimeLine() finished_task = 0 # predict loop while not stop_event.is_set(): + if tasks[idx] is not None: + task = tasks[idx] + tasks[idx] = None + + success, out_data = task.result() + if not success: + failed_datas = [out_data, ] + finished_task += process_remain_predict_data( + idx, tasks, max_concurrent, failed_datas, out_queue) + + with predict_lock: + global_finished_task.value += finished_task + for failed_data in failed_datas: + in_queue.put( + failed_data) # write back failed task data + # last process + if working_predict_count.value == 1: + # NOTE. need notify other predict worker, or maybe deadlock + with predict_cond: + predict_cond.notify_all() + working_predict_count.value -= 1 + return False + + out_queue.put(out_data) + finished_task += 1 + data = in_queue.get() time_line.record('get_data') # Poison if isinstance(data, _PoisonPill): + failed_datas = [] + finished_task += process_remain_predict_data( + idx, tasks, max_concurrent, failed_datas, out_queue) + + if len(failed_datas) != 0: + failed_datas.append(data) + + with predict_lock: + global_finished_task.value += finished_task + for failed_data in failed_datas: + in_queue.put( + failed_data) # write back failed task data + # last process + if working_predict_count.value == 1: + # NOTE. need notify other predict worker, or maybe deadlock + with predict_cond: + predict_cond.notify_all() + working_predict_count.value -= 1 + return False + poison_pill = data all_worker_done = False @@ -430,24 +495,26 @@ def predict_loop(server_item, working_predict_count, in_queue, out_queue, working_predict_count.value += 1 continue - success, out_data = client_predict(client, data) - time_line.record('predict') - - if not success: - with predict_lock: - global_finished_task.value += finished_task - in_queue.put(data) # write back failed task data - # last process - if working_predict_count.value == 1: - # NOTE. need notify other predict worker, or maybe deadlock - with predict_cond: - predict_cond.notify_all() - working_predict_count.value -= 1 - return False - - out_queue.put(out_data) - finished_task += 1 - time_line.record('put_data') + client = clients[idx] + tasks[idx] = thread_pool.submit(client_predict, client, data) + idx = (idx + 1) % max_concurrent + + idx = (idx + max_concurrent - 1) % max_concurrent + failed_datas = [] + finished_task += process_remain_predict_data(idx, tasks, max_concurrent, + failed_datas, out_queue) + if len(failed_datas) != 0: + with predict_lock: + global_finished_task.value += finished_task + for failed_data in failed_datas: + in_queue.put(failed_data) # write back failed task data + # last process + if working_predict_count.value == 1: + # NOTE. need notify other predict worker, or maybe deadlock + with predict_cond: + predict_cond.notify_all() + working_predict_count.value -= 1 + return False # disconnect with server with predict_lock: @@ -461,6 +528,24 @@ def predict_loop(server_item, working_predict_count, in_queue, out_queue, return True +def process_remain_predict_data(idx, tasks, max_concurrent, failed_datas, + out_queue): + finished_task = 0 + for i in range(max_concurrent): + next_idx = (idx + i + 1) % max_concurrent + if tasks[next_idx] is None: + break + next_task = tasks[next_idx] + tasks[next_idx] = None + next_success, next_out_data = next_task.result() + if not next_success: + failed_datas.append(next_out_data) + else: + out_queue.put(next_out_data) + finished_task += 1 + return finished_task + + def client_predict(client, data): # read_data format e.g. [(img, label, img1, label1), (img, label, img1, label1)] # predict_data format e.g. [(predict0, predict1), (predict0, predict1)] @@ -470,7 +555,7 @@ def client_predict(client, data): task, read_data = data success, predict_data = client.predict(read_data) if not success: - return False, None + return False, data out_data = read_data for i in range(len(out_data)): From 20c95032b4af6527a0a9d5df7d28b79e3356a380 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 24 Jun 2020 01:00:39 +0800 Subject: [PATCH 2/7] avoid hang Signed-off-by: WangXi --- python/paddle_edl/distill/distill_worker.py | 87 +++++++++++++++------ 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/python/paddle_edl/distill/distill_worker.py b/python/paddle_edl/distill/distill_worker.py index e5fe8e85..8340ac3c 100644 --- a/python/paddle_edl/distill/distill_worker.py +++ b/python/paddle_edl/distill/distill_worker.py @@ -20,6 +20,7 @@ import sys import time +from collections import deque from concurrent import futures from paddle_serving_client import Client from six.moves import queue @@ -380,7 +381,7 @@ def predict_loop(server_item, return False clients.append(client) - tasks = [None for _ in range(max_concurrent)] + tasks = deque(maxlen=max_concurrent) stop_event = stop_events[server_item.stop_event_id] with predict_lock: @@ -391,17 +392,17 @@ def predict_loop(server_item, time_line = _TimeLine() finished_task = 0 + delay = 0.0005 # 500us # predict loop while not stop_event.is_set(): - if tasks[idx] is not None: - task = tasks[idx] - tasks[idx] = None - + if len(tasks) == max_concurrent: + # full, sync wait + task = tasks.popleft() success, out_data = task.result() if not success: failed_datas = [out_data, ] finished_task += process_remain_predict_data( - idx, tasks, max_concurrent, failed_datas, out_queue) + tasks, failed_datas, out_queue) with predict_lock: global_finished_task.value += finished_task @@ -416,17 +417,57 @@ def predict_loop(server_item, working_predict_count.value -= 1 return False + logger.debug('task_id={}'.format(out_data[0].task_id)) out_queue.put(out_data) finished_task += 1 + elif len(tasks) > 0: + # not full, query left + task = tasks.popleft() + if task.done(): + success, out_data = task.result() + if not success: + failed_datas = [out_data, ] + finished_task += process_remain_predict_data( + tasks, failed_datas, out_queue) + + with predict_lock: + global_finished_task.value += finished_task + for failed_data in failed_datas: + in_queue.put( + failed_data) # write back failed task data + # last process + if working_predict_count.value == 1: + # NOTE. need notify other predict worker, or maybe deadlock + with predict_cond: + predict_cond.notify_all() + working_predict_count.value -= 1 + return False + + logger.debug('task_id={}'.format(out_data[0].task_id)) + out_queue.put(out_data) + finished_task += 1 + else: + # not done, write back left + tasks.appendleft(task) + + if len(tasks) == 0: + data = in_queue.get() + else: + # avoid hang + try: + data = in_queue.get(timeout=delay) + delay = 0.0005 # 500us + except queue.Empty: + delay = min(delay * 2, 0.032) # max 32ms + continue - data = in_queue.get() time_line.record('get_data') # Poison if isinstance(data, _PoisonPill): failed_datas = [] - finished_task += process_remain_predict_data( - idx, tasks, max_concurrent, failed_datas, out_queue) + finished_task += process_remain_predict_data(tasks, failed_datas, + out_queue) if len(failed_datas) != 0: failed_datas.append(data) @@ -496,13 +537,13 @@ def predict_loop(server_item, continue client = clients[idx] - tasks[idx] = thread_pool.submit(client_predict, client, data) idx = (idx + 1) % max_concurrent + future = thread_pool.submit(client_predict, client, data) + tasks.append(future) - idx = (idx + max_concurrent - 1) % max_concurrent failed_datas = [] - finished_task += process_remain_predict_data(idx, tasks, max_concurrent, - failed_datas, out_queue) + finished_task += process_remain_predict_data(tasks, failed_datas, + out_queue) if len(failed_datas) != 0: with predict_lock: global_finished_task.value += finished_task @@ -528,20 +569,16 @@ def predict_loop(server_item, return True -def process_remain_predict_data(idx, tasks, max_concurrent, failed_datas, - out_queue): +def process_remain_predict_data(tasks, failed_datas, out_queue): finished_task = 0 - for i in range(max_concurrent): - next_idx = (idx + i + 1) % max_concurrent - if tasks[next_idx] is None: - break - next_task = tasks[next_idx] - tasks[next_idx] = None - next_success, next_out_data = next_task.result() - if not next_success: - failed_datas.append(next_out_data) + remain_task_count = len(tasks) + for i in range(remain_task_count): + task = tasks.popleft() + success, out_data = task.result() + if not success: + failed_datas.append(out_data) else: - out_queue.put(next_out_data) + out_queue.put(out_data) finished_task += 1 return finished_task From 91cdab220b3779650fd9f6af02be8330a0a30187 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 24 Jun 2020 01:14:25 +0800 Subject: [PATCH 3/7] with thread pool Signed-off-by: WangXi --- python/paddle_edl/distill/distill_worker.py | 39 +++++++++++---------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/python/paddle_edl/distill/distill_worker.py b/python/paddle_edl/distill/distill_worker.py index 8340ac3c..305befca 100644 --- a/python/paddle_edl/distill/distill_worker.py +++ b/python/paddle_edl/distill/distill_worker.py @@ -336,22 +336,25 @@ def predict_signal_handle(signum, frame): signal.signal(signal.SIGTERM, predict_signal_handle) try: - while True: - # get server - server_item = server_queue.get() - if server_item is None: - server_result_queue.put(None) - return - - # predict - success = predict_loop(server_item, working_predict_count, - in_queue, out_queue, feeds, fetchs, - conf_file, stop_events, predict_lock, - global_finished_task, predict_cond) - - server_item.state = ServerItem.FINISHED if success else ServerItem.ERROR - server_result_queue.put(server_item) - logger.info('Stopped server={}'.format(server_item.server)) + max_concurrent = 3 + with futures.ThreadPoolExecutor(max_concurrent) as thread_pool: + while True: + # get server + server_item = server_queue.get() + if server_item is None: + server_result_queue.put(None) + return + + # predict + success = predict_loop(server_item, working_predict_count, + in_queue, out_queue, feeds, fetchs, + conf_file, stop_events, predict_lock, + global_finished_task, predict_cond, + thread_pool, max_concurrent) + + server_item.state = ServerItem.FINISHED if success else ServerItem.ERROR + server_result_queue.put(server_item) + logger.info('Stopped server={}'.format(server_item.server)) except Exception as e: if signal_exit[0] is True: pass @@ -370,6 +373,7 @@ def predict_loop(server_item, predict_lock, global_finished_task, predict_cond, + thread_pool, max_concurrent=3): logger.info('connect server={}'.format(server_item.server)) predict_server = PaddlePredictServer if _NOP_PREDICT_TEST is False else _TestNopPaddlePredictServer @@ -387,9 +391,6 @@ def predict_loop(server_item, with predict_lock: working_predict_count.value += 1 - thread_pool = futures.ThreadPoolExecutor( - max_concurrent, thread_name_prefix=server_item.server) - time_line = _TimeLine() finished_task = 0 delay = 0.0005 # 500us From dd40940a8b8dadc955d5e27509eedf52fb783404 Mon Sep 17 00:00:00 2001 From: WangXi Date: Wed, 24 Jun 2020 01:16:00 +0800 Subject: [PATCH 4/7] fix dict item in py3 Signed-off-by: WangXi --- python/paddle_edl/distill/distill_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle_edl/distill/distill_reader.py b/python/paddle_edl/distill/distill_reader.py index 0a3826b6..d23b581e 100644 --- a/python/paddle_edl/distill/distill_reader.py +++ b/python/paddle_edl/distill/distill_reader.py @@ -343,7 +343,7 @@ def print_config(self): 'teacher_service_name': self._service_name, 'reader_type': self._reader_type, } - for config, value in print_config.iteritems(): + for config, value in print_config.items(): print("%s: %s" % (config, value)) print("------------------------------------------------") From 9995dd1903801ac316ec2ed15896181c13abfa68 Mon Sep 17 00:00:00 2001 From: WangXi Date: Sun, 28 Jun 2020 11:18:21 +0800 Subject: [PATCH 5/7] some bug with serving client release Signed-off-by: WangXi --- python/paddle_edl/distill/distill_worker.py | 26 +++++++-------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/python/paddle_edl/distill/distill_worker.py b/python/paddle_edl/distill/distill_worker.py index 305befca..0cd723d6 100644 --- a/python/paddle_edl/distill/distill_worker.py +++ b/python/paddle_edl/distill/distill_worker.py @@ -187,7 +187,7 @@ def predict(self, feed_data): class PaddlePredictServer(PredictServer): - def __init__(self, server, config_file, feeds, fetchs, max_failed_times=3): + def __init__(self, server, config_file, feeds, fetchs, max_failed_times=2): self._server = server self._config_file = config_file self._predict_feed_idxs = [] @@ -295,14 +295,15 @@ def predict(self, feed_data): def __del__(self): try: # avoid serving exit bug when hasn't predict - if self.client is not None and self._has_predict: - self.client.release() + #if self.client is not None and self._has_predict: + # self.client.release() + pass except Exception as e: logger.critical('Release client failed with server={}, ' 'there may be an unknown error'.format( self._server)) logger.critical('Exception:\n{}'.format(str(e))) - logger.warning('Stopped predict server={}'.format(self._server)) + #logger.warning('Stopped predict server={}'.format(self._server)) class _TestNopPaddlePredictServer(PaddlePredictServer): @@ -362,19 +363,10 @@ def predict_signal_handle(signum, frame): six.reraise(*sys.exc_info()) -def predict_loop(server_item, - working_predict_count, - in_queue, - out_queue, - feeds, - fetchs, - conf_file, - stop_events, - predict_lock, - global_finished_task, - predict_cond, - thread_pool, - max_concurrent=3): +def predict_loop(server_item, working_predict_count, in_queue, out_queue, + feeds, fetchs, conf_file, stop_events, predict_lock, + global_finished_task, predict_cond, thread_pool, + max_concurrent): logger.info('connect server={}'.format(server_item.server)) predict_server = PaddlePredictServer if _NOP_PREDICT_TEST is False else _TestNopPaddlePredictServer idx = 0 From c29214f0a74658bf94604839d55dca81bdef4d96 Mon Sep 17 00:00:00 2001 From: WangXi Date: Sun, 28 Jun 2020 14:37:08 +0800 Subject: [PATCH 6/7] Add futures requirement, test=develop Signed-off-by: WangXi --- python/requirements.txt | 1 + python/setup.py.in | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/python/requirements.txt b/python/requirements.txt index b1505f5e..686da794 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -1,2 +1,3 @@ flask pathlib2 +futures; python_version == "2.7" diff --git a/python/setup.py.in b/python/setup.py.in index 74a55c95..b6602eae 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -41,7 +41,8 @@ if os.getenv("PADDLE_EDL_VERSION"): max_version, mid_version, min_version = python_version() REQUIRED_PACKAGES = [ - 'six >= 1.10.0', 'protobuf >= 3.1.0', "flask", "pathlib2" + 'six >= 1.10.0', 'protobuf >= 3.1.0', "flask", "pathlib2", + 'futures; python_version == "2.7"' ] packages=['paddle_edl', From 0752ea1fb1250925c52a6f34cac5599f399025a8 Mon Sep 17 00:00:00 2001 From: WangXi Date: Tue, 21 Jul 2020 15:36:59 +0800 Subject: [PATCH 7/7] fix reader batch --- python/paddle_edl/distill/distill_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle_edl/distill/distill_worker.py b/python/paddle_edl/distill/distill_worker.py index 0cd723d6..dc0885a3 100644 --- a/python/paddle_edl/distill/distill_worker.py +++ b/python/paddle_edl/distill/distill_worker.py @@ -736,7 +736,7 @@ def read_batch(reader, teacher_batch_size, out_queue, task_semaphore): for i in range(batch_size): slot_data = tuple() for j in range(slot_size): - slot_data += (read_data[j][i], ) + slot_data += (np.asarray(read_data[j][i]), ) send_data.append(slot_data) sample_size += 1