diff --git a/easy_rec/python/compat/estimator_train.py b/easy_rec/python/compat/estimator_train.py index 076878a3e..832d35a8d 100644 --- a/easy_rec/python/compat/estimator_train.py +++ b/easy_rec/python/compat/estimator_train.py @@ -8,6 +8,8 @@ from tensorflow.python.estimator.training import _assert_eval_spec from tensorflow.python.estimator.training import _ContinuousEvalListener from tensorflow.python.estimator.training import _TrainingExecutor +from tensorflow.python.util import compat +from easy_rec.python.compat.exporter import FinalExporter from easy_rec.python.utils import estimator_utils @@ -17,7 +19,6 @@ tf = tf.compat.v1 gfile = tf.gfile - class TrainDoneListener(_ContinuousEvalListener): """Interface for listeners that take action before or after evaluation.""" @@ -80,6 +81,27 @@ def train_and_evaluate(estimator, train_spec, eval_spec): '(with task id 0). Given task id {}'.format(config.task_id)) result = executor.run() + + # fix for the bug evaluator fails to export in case num_epoch is reached + # before num_steps is reached or num_steps is set to infinite + if estimator_utils.is_evaluator(): + export_dir_base = os.path.join( + compat.as_str_any(estimator.model_dir), + compat.as_str_any('export')) + for exporter in eval_spec.exporters: + if isinstance(exporter, FinalExporter): + export_path = os.path.join(compat.as_str_any(export_dir_base), + compat.as_str_any(exporter.name)) + # avoid duplicate export + if gfile.IsDirectory(export_path + '/'): + continue + exporter.export( + estimator=estimator, + export_path=export_path, + checkpoint_path=estimator_utils.latest_checkpoint(estimator.model_dir), + eval_result=None, + is_the_final_export=True) + if estimator_utils.is_chief(): with gfile.GFile(train_done_listener.train_done_file, 'w') as fout: fout.write('Train Done.') diff --git a/easy_rec/python/input/hive_input.py b/easy_rec/python/input/hive_input.py index e4a978e74..b81a4ae76 100644 --- a/easy_rec/python/input/hive_input.py +++ b/easy_rec/python/input/hive_input.py @@ -4,7 +4,11 @@ import numpy as np import tensorflow as tf -from pyhive import hive + +try: + from pyhive import hive +except ImportError: + logging.warning('pyhive is not installed.') from easy_rec.python.input.input import Input from easy_rec.python.utils import odps_util diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index a3078e090..f60be60fe 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -3,6 +3,7 @@ import logging import os +import glob import unittest from distutils.version import LooseVersion @@ -474,6 +475,17 @@ def test_train_with_ps_worker(self): 'samples/model_config/multi_tower_on_taobao.config', self._test_dir) self.assertTrue(self._success) + def test_train_with_ps_worker_with_evaluator(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao.config', self._test_dir, + num_evaluator=1) + self.assertTrue(self._success) + final_export_dir = os.path.join(self._test_dir, 'train/export/final') + all_saved_files = glob.glob(final_export_dir + '/*/saved_model.pb') + logging.info('final_export_dir=%s all_saved_files=%s' % (final_export_dir, + ','.join(all_saved_files))) + self.assertTrue(len(all_saved_files) == 1) + def test_train_with_ps_worker_chief_redundant(self): self._success = test_utils.test_distributed_train_eval( 'samples/model_config/multi_tower_on_taobao_chief_redundant.config', diff --git a/easy_rec/python/utils/estimator_utils.py b/easy_rec/python/utils/estimator_utils.py index a12880113..e406d1c73 100644 --- a/easy_rec/python/utils/estimator_utils.py +++ b/easy_rec/python/utils/estimator_utils.py @@ -626,3 +626,18 @@ def is_chief(): if 'task' in tf_config: return tf_config['task']['type'] in ['chief', 'master'] return True + + +def is_master(): + if 'TF_CONFIG' in os.environ: + tf_config = json.loads(os.environ['TF_CONFIG']) + if 'task' in tf_config: + return tf_config['task']['type'] == 'master' + return True + +def is_evaluator(): + if 'TF_CONFIG' in os.environ: + tf_config = json.loads(os.environ['TF_CONFIG']) + if 'task' in tf_config: + return tf_config['task']['type'] == 'evaluator' + return False diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index 9d719507d..269b174df 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -385,25 +385,27 @@ def _get_ports(num_worker): logging.info('ports %s in use, retry...' % ports) -def _ps_worker_train(pipeline_config_path, test_dir, num_worker): +def _ps_worker_train(pipeline_config_path, test_dir, num_worker, num_evaluator=0): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: gpus = [None] * num_worker ports = _get_ports(num_worker + 1) + chief_or_master = 'master' if num_evaluator == 0 else 'chief' + cluster = { + chief_or_master: ['localhost:%d' % ports[0]], + 'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)], + 'ps': ['localhost:%d' % ports[-1]] + } tf_config = { - 'cluster': { - 'master': ['localhost:%d' % ports[0]], - 'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)], - 'ps': ['localhost:%d' % ports[-1]] - } + 'cluster': cluster } procs = {} - tf_config['task'] = {'type': 'master', 'index': 0} + tf_config['task'] = {'type': chief_or_master, 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path - procs['master'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master')) + procs[chief_or_master] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id('') @@ -416,6 +418,12 @@ def _ps_worker_train(pipeline_config_path, test_dir, num_worker): worker_name = 'worker_%d' % idx procs[worker_name] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, worker_name)) + if num_evaluator > 0: + tf_config['task'] = {'type':'evaluator', 'index':0} + os.environ['TF_CONFIG'] = json.dumps(tf_config) + set_gpu_id('') + procs['evaluator'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'evaluator')) + return procs @@ -442,7 +450,7 @@ def _multi_worker_mirror_train(pipeline_config_path, test_dir, num_worker): return procs -def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50): +def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50, num_evaluator=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) @@ -455,7 +463,7 @@ def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50): try: if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 - procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker) + procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker, num_evaluator) elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy: num_worker = 2 procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir,