Skip to content

Commit

Permalink
[bugfix] fix final exporter not work bug (#133)
Browse files Browse the repository at this point in the history
* fix final exporter not work bug

* add test cases for final exporter check

Co-authored-by: 杨熙 <[email protected]>
  • Loading branch information
chengmengli06 and 杨熙 committed Mar 10, 2022
1 parent dfa375b commit 2e7d388
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
24 changes: 23 additions & 1 deletion easy_rec/python/compat/estimator_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from tensorflow.python.estimator.training import _assert_eval_spec
from tensorflow.python.estimator.training import _ContinuousEvalListener
from tensorflow.python.estimator.training import _TrainingExecutor
from tensorflow.python.util import compat
from easy_rec.python.compat.exporter import FinalExporter

from easy_rec.python.utils import estimator_utils

Expand All @@ -17,7 +19,6 @@
tf = tf.compat.v1
gfile = tf.gfile


class TrainDoneListener(_ContinuousEvalListener):
"""Interface for listeners that take action before or after evaluation."""

Expand Down Expand Up @@ -80,6 +81,27 @@ def train_and_evaluate(estimator, train_spec, eval_spec):
'(with task id 0). Given task id {}'.format(config.task_id))

result = executor.run()

# fix for the bug evaluator fails to export in case num_epoch is reached
# before num_steps is reached or num_steps is set to infinite
if estimator_utils.is_evaluator():
export_dir_base = os.path.join(
compat.as_str_any(estimator.model_dir),
compat.as_str_any('export'))
for exporter in eval_spec.exporters:
if isinstance(exporter, FinalExporter):
export_path = os.path.join(compat.as_str_any(export_dir_base),
compat.as_str_any(exporter.name))
# avoid duplicate export
if gfile.IsDirectory(export_path + '/'):
continue
exporter.export(
estimator=estimator,
export_path=export_path,
checkpoint_path=estimator_utils.latest_checkpoint(estimator.model_dir),
eval_result=None,
is_the_final_export=True)

if estimator_utils.is_chief():
with gfile.GFile(train_done_listener.train_done_file, 'w') as fout:
fout.write('Train Done.')
Expand Down
6 changes: 5 additions & 1 deletion easy_rec/python/input/hive_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

import numpy as np
import tensorflow as tf
from pyhive import hive

try:
from pyhive import hive
except ImportError:
logging.warning('pyhive is not installed.')

from easy_rec.python.input.input import Input
from easy_rec.python.utils import odps_util
Expand Down
12 changes: 12 additions & 0 deletions easy_rec/python/test/train_eval_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import os
import glob
import unittest
from distutils.version import LooseVersion

Expand Down Expand Up @@ -474,6 +475,17 @@ def test_train_with_ps_worker(self):
'samples/model_config/multi_tower_on_taobao.config', self._test_dir)
self.assertTrue(self._success)

def test_train_with_ps_worker_with_evaluator(self):
self._success = test_utils.test_distributed_train_eval(
'samples/model_config/multi_tower_on_taobao.config', self._test_dir,
num_evaluator=1)
self.assertTrue(self._success)
final_export_dir = os.path.join(self._test_dir, 'train/export/final')
all_saved_files = glob.glob(final_export_dir + '/*/saved_model.pb')
logging.info('final_export_dir=%s all_saved_files=%s' % (final_export_dir,
','.join(all_saved_files)))
self.assertTrue(len(all_saved_files) == 1)

def test_train_with_ps_worker_chief_redundant(self):
self._success = test_utils.test_distributed_train_eval(
'samples/model_config/multi_tower_on_taobao_chief_redundant.config',
Expand Down
15 changes: 15 additions & 0 deletions easy_rec/python/utils/estimator_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,18 @@ def is_chief():
if 'task' in tf_config:
return tf_config['task']['type'] in ['chief', 'master']
return True


def is_master():
if 'TF_CONFIG' in os.environ:
tf_config = json.loads(os.environ['TF_CONFIG'])
if 'task' in tf_config:
return tf_config['task']['type'] == 'master'
return True

def is_evaluator():
if 'TF_CONFIG' in os.environ:
tf_config = json.loads(os.environ['TF_CONFIG'])
if 'task' in tf_config:
return tf_config['task']['type'] == 'evaluator'
return False
28 changes: 18 additions & 10 deletions easy_rec/python/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,25 +385,27 @@ def _get_ports(num_worker):
logging.info('ports %s in use, retry...' % ports)


def _ps_worker_train(pipeline_config_path, test_dir, num_worker):
def _ps_worker_train(pipeline_config_path, test_dir, num_worker, num_evaluator=0):
gpus = get_available_gpus()
# not enough gpus, run on cpu only
if len(gpus) < num_worker:
gpus = [None] * num_worker
ports = _get_ports(num_worker + 1)
chief_or_master = 'master' if num_evaluator == 0 else 'chief'
cluster = {
chief_or_master: ['localhost:%d' % ports[0]],
'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)],
'ps': ['localhost:%d' % ports[-1]]
}
tf_config = {
'cluster': {
'master': ['localhost:%d' % ports[0]],
'worker': ['localhost:%d' % ports[i] for i in range(1, num_worker)],
'ps': ['localhost:%d' % ports[-1]]
}
'cluster': cluster
}
procs = {}
tf_config['task'] = {'type': 'master', 'index': 0}
tf_config['task'] = {'type': chief_or_master, 'index': 0}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
set_gpu_id(gpus[0])
train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path
procs['master'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'master'))
procs[chief_or_master] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master))
tf_config['task'] = {'type': 'ps', 'index': 0}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
set_gpu_id('')
Expand All @@ -416,6 +418,12 @@ def _ps_worker_train(pipeline_config_path, test_dir, num_worker):
worker_name = 'worker_%d' % idx
procs[worker_name] = run_cmd(train_cmd,
'%s/log_%s.txt' % (test_dir, worker_name))
if num_evaluator > 0:
tf_config['task'] = {'type':'evaluator', 'index':0}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
set_gpu_id('')
procs['evaluator'] = run_cmd(train_cmd, '%s/log_%s.txt' % (test_dir, 'evaluator'))

return procs


Expand All @@ -442,7 +450,7 @@ def _multi_worker_mirror_train(pipeline_config_path, test_dir, num_worker):
return procs


def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50):
def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50, num_evaluator=0):
logging.info('testing pipeline config %s' % pipeline_config_path)
pipeline_config = _load_config_for_test(pipeline_config_path, test_dir,
total_steps)
Expand All @@ -455,7 +463,7 @@ def test_distributed_train_eval(pipeline_config_path, test_dir, total_steps=50):
try:
if train_config.train_distribute == DistributionStrategy.NoStrategy:
num_worker = 2
procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker)
procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker, num_evaluator)
elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy:
num_worker = 2
procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir,
Expand Down

0 comments on commit 2e7d388

Please sign in to comment.