From eb535f3026750fd07fb0d9e11e42a187f6489f32 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Wed, 19 Jul 2023 14:11:38 +0800 Subject: [PATCH] fix sync_replicas_optimizer que runners --- .../python/compat/sync_replicas_optimizer.py | 24 +- easy_rec/python/main.py | 3 +- easy_rec/python/test/train_eval_test.py | 9 + easy_rec/python/train_eval.py | 6 +- easy_rec/python/utils/test_utils.py | 12 +- .../multi_tower_on_taobao_unblanace.config | 304 ++++++++++++++++++ 6 files changed, 343 insertions(+), 15 deletions(-) create mode 100644 samples/model_config/multi_tower_on_taobao_unblanace.config diff --git a/easy_rec/python/compat/sync_replicas_optimizer.py b/easy_rec/python/compat/sync_replicas_optimizer.py index 4f1d97d4b..24c2921ba 100644 --- a/easy_rec/python/compat/sync_replicas_optimizer.py +++ b/easy_rec/python/compat/sync_replicas_optimizer.py @@ -135,6 +135,8 @@ class SyncReplicasOptimizer(optimizer.Optimizer): ``` """ + sync_que_id = -1 + def __init__(self, opt, replicas_to_aggregate, @@ -299,15 +301,24 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): update_op = self._opt.apply_gradients(aggregated_grads_and_vars, global_step) + def _get_token_qname(): + SyncReplicasOptimizer.sync_que_id += 1 + if SyncReplicasOptimizer.sync_que_id == 0: + return 'sync_token_q' + else: + return 'sync_token_q_' + str(SyncReplicasOptimizer.sync_que_id) + # Create token queue. + token_qname = _get_token_qname() + logging.info('create sync_token_queue[%s]' % token_qname) with ops.device(global_step.device), ops.name_scope(''): sync_token_queue = ( data_flow_ops.FIFOQueue( -1, global_step.dtype.base_dtype, shapes=(), - name='sync_token_q', - shared_name='sync_token_q')) + name=token_qname, + shared_name=token_qname)) self._sync_token_queue = sync_token_queue self._is_sync_que_closed = sync_token_queue.is_closed() self._close_sync_que = sync_token_queue.close( @@ -342,6 +353,8 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): self._chief_queue_runner = queue_runner.QueueRunner( dummy_queue, [sync_op]) + ops.add_to_collection(ops.GraphKeys.QUEUE_RUNNERS, + self._chief_queue_runner) for accum, dev in self._accumulator_list: with ops.device(dev): chief_init_ops.append( @@ -479,14 +492,12 @@ def begin(self): self._local_init_op = self._sync_optimizer.chief_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = self._sync_optimizer.get_chief_queue_runner() self._init_tokens_op = self._sync_optimizer.get_init_tokens_op( self._num_tokens) else: self._local_init_op = self._sync_optimizer.local_step_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = None self._init_tokens_op = None def after_create_session(self, session, coord): @@ -500,11 +511,10 @@ def after_create_session(self, session, coord): 'local_init. Init op: %s, error: %s' % (self._local_init_op.name, msg)) session.run(self._local_init_op) + is_closed = session.run(self._sync_optimizer._is_sync_que_closed) + assert not is_closed, 'sync_que is closed' if self._init_tokens_op is not None: session.run(self._init_tokens_op) - if self._q_runner is not None: - self._q_runner.create_threads( - session, coord=coord, daemon=True, start=True) def end(self, session): try: diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index eb760ac2e..2569afaff 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -312,7 +312,7 @@ def _train_and_evaluate_impl(pipeline_config, f.write(easy_rec.__version__ + '\n') train_steps = None - if train_config.HasField('num_steps'): + if train_config.HasField('num_steps') and train_config.num_steps > 0: train_steps = train_config.num_steps assert train_steps is not None or data_config.num_epochs > 0, ( 'either num_steps and num_epochs must be set to an integer > 0.') @@ -348,6 +348,7 @@ def _train_and_evaluate_impl(pipeline_config, estimator_train.train_and_evaluate(estimator, train_spec, eval_spec) logging.info('Train and evaluate finish') if fit_on_eval and (not estimator_utils.is_evaluator()): + tf.reset_default_graph() logging.info('Start continue training on eval data') eval_input_fn = _get_input_fn(data_config, feature_configs, eval_data, **input_fn_kwargs) diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index 6b59bcdeb..f66e5ead3 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -722,6 +722,15 @@ def test_fit_on_eval(self): fit_on_eval=True) self.assertTrue(self._success) + def test_unbalance_data(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao_unblanace.config', + self._test_dir, + total_steps=0, + num_epoch=1, + num_evaluator=1) + self.assertTrue(self._success) + def test_train_with_ps_worker_with_evaluator(self): self._success = test_utils.test_distributed_train_eval( 'samples/model_config/multi_tower_on_taobao.config', diff --git a/easy_rec/python/train_eval.py b/easy_rec/python/train_eval.py index 7b2dead63..a96d0c58e 100644 --- a/easy_rec/python/train_eval.py +++ b/easy_rec/python/train_eval.py @@ -68,12 +68,12 @@ default=None, help='eval data input path') parser.add_argument( - 'fit_on_eval', - type=bool, + '--fit_on_eval', + action='store_true', default=False, help='Fit evaluation data after fitting and evaluating train data') parser.add_argument( - 'fit_on_eval_steps', + '--fit_on_eval_steps', type=int, default=None, help='Fit evaluation data steps') diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index be987efae..a54591ce5 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -159,7 +159,10 @@ def _replace_data_for_test(data_path): return data_path -def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): +def _load_config_for_test(pipeline_config_path, + test_dir, + total_steps=50, + num_epochs=0): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) train_config = pipeline_config.train_config @@ -171,7 +174,7 @@ def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): pipeline_config.model_dir = os.path.join(test_dir, 'train') logging.info('test_model_dir %s' % pipeline_config.model_dir) eval_config.num_examples = max(10, data_config.batch_size) - data_config.num_epochs = 0 + data_config.num_epochs = num_epochs return pipeline_config @@ -672,10 +675,11 @@ def test_distributed_train_eval(pipeline_config_path, num_evaluator=0, edit_config_json=None, use_hvd=False, - fit_on_eval=False): + fit_on_eval=False, + num_epoch=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, - total_steps) + total_steps, num_epoch) if edit_config_json is not None: config_util.edit_config(pipeline_config, edit_config_json) diff --git a/samples/model_config/multi_tower_on_taobao_unblanace.config b/samples/model_config/multi_tower_on_taobao_unblanace.config new file mode 100644 index 000000000..8105483c2 --- /dev/null +++ b/samples/model_config/multi_tower_on_taobao_unblanace.config @@ -0,0 +1,304 @@ +train_input_path: "data/test/tb_data/taobao_train_data,data/test/tb_data/taobao_test_data" +eval_input_path: "data/test/tb_data/taobao_test_data" +model_dir: "experiments/multi_tower_taobao_ckpt" + +train_config { + log_step_count_steps: 100 + optimizer_config: { + adam_optimizer: { + learning_rate: { + exponential_decay_learning_rate { + initial_learning_rate: 0.001 + decay_steps: 1000 + decay_factor: 0.5 + min_learning_rate: 0.00001 + } + } + } + use_moving_average: false + } + save_checkpoints_steps: 100 + sync_replicas: True + num_steps: 200 +} + +eval_config { + metrics_set: { + auc {} + } +} + +data_config { + file_shard: true + input_fields { + input_name:'clk' + input_type: INT32 + } + input_fields { + input_name:'buy' + input_type: INT32 + } + input_fields { + input_name: 'pid' + input_type: STRING + } + input_fields { + input_name: 'adgroup_id' + input_type: STRING + } + input_fields { + input_name: 'cate_id' + input_type: STRING + } + input_fields { + input_name: 'campaign_id' + input_type: STRING + } + input_fields { + input_name: 'customer' + input_type: STRING + } + input_fields { + input_name: 'brand' + input_type: STRING + } + input_fields { + input_name: 'user_id' + input_type: STRING + } + input_fields { + input_name: 'cms_segid' + input_type: STRING + } + input_fields { + input_name: 'cms_group_id' + input_type: STRING + } + input_fields { + input_name: 'final_gender_code' + input_type: STRING + } + input_fields { + input_name: 'age_level' + input_type: STRING + } + input_fields { + input_name: 'pvalue_level' + input_type: STRING + } + input_fields { + input_name: 'shopping_level' + input_type: STRING + } + input_fields { + input_name: 'occupation' + input_type: STRING + } + input_fields { + input_name: 'new_user_class_level' + input_type: STRING + } + input_fields { + input_name: 'tag_category_list' + input_type: STRING + } + input_fields { + input_name: 'tag_brand_list' + input_type: STRING + } + input_fields { + input_name: 'price' + input_type: INT32 + } + + label_fields: 'clk' + batch_size: 4096 + num_epochs: 1 + prefetch_size: 32 + input_type: CSVInput +} + +feature_config: { + features: { + input_names: 'pid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'adgroup_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'cate_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: 'campaign_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'customer' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'brand' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'user_id' + feature_type: IdFeature + embedding_dim: 32 + hash_bucket_size: 100000 + } + features: { + input_names: 'cms_segid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'cms_group_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'final_gender_code' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'age_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'pvalue_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'shopping_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'occupation' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'new_user_class_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'tag_category_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'tag_brand_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'price' + feature_type: IdFeature + embedding_dim: 16 + num_buckets: 50 + } +} +model_config: { + model_class: 'MultiTower' + feature_groups: { + group_name: 'user' + feature_names: 'user_id' + feature_names: 'cms_segid' + feature_names: 'cms_group_id' + feature_names: 'age_level' + feature_names: 'pvalue_level' + feature_names: 'shopping_level' + feature_names: 'occupation' + feature_names: 'new_user_class_level' + wide_deep: DEEP + } + feature_groups: { + group_name: 'item' + feature_names: 'adgroup_id' + feature_names: 'cate_id' + feature_names: 'campaign_id' + feature_names: 'customer' + feature_names: 'brand' + feature_names: 'price' + wide_deep: DEEP + } + feature_groups: { + group_name: 'combo' + feature_names: 'pid' + feature_names: 'tag_category_list' + feature_names: 'tag_brand_list' + wide_deep: DEEP + } + losses { + loss_type: F1_REWEIGHTED_LOSS + weight: 1.0 + f1_reweighted_loss { + f1_beta_square: 2.25 + } + } + losses { + loss_type: PAIR_WISE_LOSS + weight: 1.0 + } + multi_tower { + towers { + input: "user" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "item" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "combo" + dnn { + hidden_units: [128, 96, 64, 32] + } + } + final_dnn { + hidden_units: [128, 96, 64, 32, 16] + } + l2_regularization: 1e-6 + } + embedding_regularization: 1e-4 +} + +export_config { + multi_placeholder: false +}