From 2ab57132a1d516a23915c058237a1ea1554ce57b Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Sat, 15 Jul 2023 22:07:59 +0800 Subject: [PATCH 1/4] add support for continue train on evaluation data --- docs/source/quick_start/local_tutorial.md | 3 ++- easy_rec/python/compat/estimator_train.py | 5 ++++ easy_rec/python/layers/input_layer.py | 2 +- .../python/layers/sequence_feature_layer.py | 1 + easy_rec/python/main.py | 26 ++++++++++++++++--- easy_rec/python/test/train_eval_test.py | 8 ++++++ easy_rec/python/train_eval.py | 18 +++++++++++-- easy_rec/python/utils/estimator_utils.py | 8 ++++++ easy_rec/python/utils/test_utils.py | 20 +++++++++++--- examples/readme.md | 3 ++- 10 files changed, 81 insertions(+), 13 deletions(-) diff --git a/docs/source/quick_start/local_tutorial.md b/docs/source/quick_start/local_tutorial.md index 1254655f8..8074c1218 100644 --- a/docs/source/quick_start/local_tutorial.md +++ b/docs/source/quick_start/local_tutorial.md @@ -1,6 +1,7 @@ # Local Tutorial ### 安装EasyRec + 我们提供了`本地Anaconda安装`和`Docker镜像启动`两种方式。 #### 本地Anaconda安装 @@ -45,7 +46,7 @@ sudo docker run -td --network host -v /local_path:/docker_path mybigpai-registry sudo docker exec -it bash ``` -注:需匹配当前EasyRec版本。 +注:\需匹配当前EasyRec版本。 ### 输入数据: diff --git a/easy_rec/python/compat/estimator_train.py b/easy_rec/python/compat/estimator_train.py index 99f65380f..1ec71e491 100644 --- a/easy_rec/python/compat/estimator_train.py +++ b/easy_rec/python/compat/estimator_train.py @@ -109,3 +109,8 @@ def train_and_evaluate(estimator, train_spec, eval_spec): fout.write('Train Done.') return result + + +def estimator_train_done(estimator): + train_done_file = os.path.join(estimator.model_dir, 'ESTIMATOR_TRAIN_DONE') + return gfile.Exists(train_done_file) diff --git a/easy_rec/python/layers/input_layer.py b/easy_rec/python/layers/input_layer.py index 92ed236db..318650317 100644 --- a/easy_rec/python/layers/input_layer.py +++ b/easy_rec/python/layers/input_layer.py @@ -102,7 +102,7 @@ def __call__(self, features, group_name, is_combine=True, is_dict=False): with conditional(self._is_predicting and place_on_cpu, ops.device('/CPU:0')): concat_features, group_features = self.single_call_input_layer( - features, group_name, feature_name_to_output_tensors) + features, group_name, feature_name_to_output_tensors) if group_name in self._group_name_to_seq_features: # for target attention group_seq_arr = self._group_name_to_seq_features[group_name] diff --git a/easy_rec/python/layers/sequence_feature_layer.py b/easy_rec/python/layers/sequence_feature_layer.py index bb948f785..fd01b5b2c 100644 --- a/easy_rec/python/layers/sequence_feature_layer.py +++ b/easy_rec/python/layers/sequence_feature_layer.py @@ -3,6 +3,7 @@ import tensorflow as tf from tensorflow.python.framework import ops + from easy_rec.python.compat import regularizers from easy_rec.python.layers import dnn from easy_rec.python.layers import seq_input_layer diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index 218452e79..91e70d671 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -9,6 +9,7 @@ import logging import math import os +import time import six import tensorflow as tf @@ -279,7 +280,9 @@ def train_and_evaluate(pipeline_config_path, continue_train=False): def _train_and_evaluate_impl(pipeline_config, continue_train=False, - check_mode=False): + check_mode=False, + fit_on_eval=False, + fit_on_eval_steps=None): train_config = pipeline_config.train_config data_config = pipeline_config.data_config feature_configs = config_util.get_compatible_feature_configs(pipeline_config) @@ -301,15 +304,12 @@ def _train_and_evaluate_impl(pipeline_config, estimator, run_config = _create_estimator( pipeline_config, distribution=distribution, params=params) - master_stat_file = os.path.join(pipeline_config.model_dir, 'master.stat') version_file = os.path.join(pipeline_config.model_dir, 'version') if estimator_utils.is_chief(): _check_model_dir(pipeline_config.model_dir, continue_train) config_util.save_pipeline_config(pipeline_config, pipeline_config.model_dir) with gfile.GFile(version_file, 'w') as f: f.write(easy_rec.__version__ + '\n') - if gfile.Exists(master_stat_file): - gfile.Remove(master_stat_file) train_steps = None if train_config.HasField('num_steps'): @@ -347,6 +347,24 @@ def _train_and_evaluate_impl(pipeline_config, from easy_rec.python.compat import estimator_train estimator_train.train_and_evaluate(estimator, train_spec, eval_spec) logging.info('Train and evaluate finish') + if fit_on_eval and (not estimator_utils.is_evaluator()): + logging.info('Start continue training on eval data') + eval_input_fn = _get_input_fn(data_config, feature_configs, eval_data, + **input_fn_kwargs) + if fit_on_eval_steps is not None: + # wait estimator train done to get the correct train_steps + while not estimator_train.estimator_train_done(estimator): + time.sleep(1) + train_steps = estimator_utils.get_trained_steps(estimator.model_dir) + logging.info('\ttrain_steps=%d fit_on_eval_steps=%d' % + (train_steps, fit_on_eval_steps)) + fit_on_eval_steps += train_steps + estimator.train( + input_fn=eval_input_fn, + max_steps=fit_on_eval_steps, + hooks=list(train_spec.hooks), + saving_listeners=train_spec.saving_listeners) + logging.info('Finished training on eval data') def evaluate(pipeline_config, diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index 777dfd21a..6b59bcdeb 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -714,6 +714,14 @@ def test_train_with_ps_worker(self): 'samples/model_config/multi_tower_on_taobao.config', self._test_dir) self.assertTrue(self._success) + def test_fit_on_eval(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao.config', + self._test_dir, + num_evaluator=1, + fit_on_eval=True) + self.assertTrue(self._success) + def test_train_with_ps_worker_with_evaluator(self): self._success = test_utils.test_distributed_train_eval( 'samples/model_config/multi_tower_on_taobao.config', diff --git a/easy_rec/python/train_eval.py b/easy_rec/python/train_eval.py index cfe592b6f..7b2dead63 100644 --- a/easy_rec/python/train_eval.py +++ b/easy_rec/python/train_eval.py @@ -67,6 +67,16 @@ nargs='*', default=None, help='eval data input path') + parser.add_argument( + 'fit_on_eval', + type=bool, + default=False, + help='Fit evaluation data after fitting and evaluating train data') + parser.add_argument( + 'fit_on_eval_steps', + type=int, + default=None, + help='Fit evaluation data steps') parser.add_argument( '--fine_tune_checkpoint', type=str, @@ -165,7 +175,11 @@ has_evaluator=False) else: config_util.auto_expand_share_feature_configs(pipeline_config) - _train_and_evaluate_impl(pipeline_config, args.continue_train, - args.check_mode) + _train_and_evaluate_impl( + pipeline_config, + args.continue_train, + args.check_mode, + fit_on_eval=args.fit_on_eval, + fit_on_eval_steps=args.fit_on_eval_steps) else: raise ValueError('pipeline_config_path should not be empty when training!') diff --git a/easy_rec/python/utils/estimator_utils.py b/easy_rec/python/utils/estimator_utils.py index 0525f84e6..3d42b410a 100644 --- a/easy_rec/python/utils/estimator_utils.py +++ b/easy_rec/python/utils/estimator_utils.py @@ -922,6 +922,14 @@ def latest_checkpoint(model_dir): return None +def get_trained_steps(model_dir): + ckpt_path = latest_checkpoint(model_dir) + if ckpt_path is not None: + return int(ckpt_path.split('-')[-1]) + else: + return 0 + + def master_to_chief(): if 'TF_CONFIG' in os.environ: tf_config = json.loads(os.environ['TF_CONFIG']) diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index 46e47e520..be987efae 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -529,7 +529,9 @@ def _get_ports(num_worker): def _ps_worker_train(pipeline_config_path, test_dir, num_worker, - num_evaluator=0): + num_evaluator=0, + fit_on_eval=False, + fit_on_eval_steps=None): gpus = get_available_gpus() # not enough gpus, run on cpu only if len(gpus) < num_worker: @@ -547,6 +549,10 @@ def _ps_worker_train(pipeline_config_path, os.environ['TF_CONFIG'] = json.dumps(tf_config) set_gpu_id(gpus[0]) train_cmd = 'python -m easy_rec.python.train_eval --pipeline_config_path %s' % pipeline_config_path + if fit_on_eval: + train_cmd += ' --fit_on_eval' + if fit_on_eval_steps is not None: + train_cmd += ' --fit_on_eval_steps ' + str(int(fit_on_eval_steps)) procs[chief_or_master] = run_cmd( train_cmd, '%s/log_%s.txt' % (test_dir, chief_or_master)) tf_config['task'] = {'type': 'ps', 'index': 0} @@ -665,7 +671,8 @@ def test_distributed_train_eval(pipeline_config_path, total_steps=50, num_evaluator=0, edit_config_json=None, - use_hvd=False): + use_hvd=False, + fit_on_eval=False): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, total_steps) @@ -687,8 +694,13 @@ def test_distributed_train_eval(pipeline_config_path, return _multi_worker_hvd_train(test_pipeline_config_path, test_dir, 2) if train_config.train_distribute == DistributionStrategy.NoStrategy: num_worker = 2 - procs = _ps_worker_train(test_pipeline_config_path, test_dir, num_worker, - num_evaluator) + procs = _ps_worker_train( + test_pipeline_config_path, + test_dir, + num_worker, + num_evaluator, + fit_on_eval, + fit_on_eval_steps=int(total_steps // 2)) elif train_config.train_distribute == DistributionStrategy.MultiWorkerMirroredStrategy: num_worker = 2 procs = _multi_worker_mirror_train(test_pipeline_config_path, test_dir, diff --git a/examples/readme.md b/examples/readme.md index 6a9ed63d3..9688ba29d 100644 --- a/examples/readme.md +++ b/examples/readme.md @@ -5,6 +5,7 @@ 这些Demo包含了在公开数据集上针对不同模型做的多种实验,涵盖了推荐系统中的召回任务和排序任务,主要包括数据集下载、预处理、模型配置、训练及评估等过程。 # 安装EasyRec + 我们提供了`本地Anaconda安装`和`Docker镜像启动`两种方式。 ## 本地Anaconda安装 @@ -49,7 +50,7 @@ sudo docker run -td --network host -v /local_path:/docker_path mybigpai-registry sudo docker exec -it bash ``` -注:需匹配当前EasyRec版本。 +注:\需匹配当前EasyRec版本。 # 准备数据集 From 0b6bf368d65e6fed1a58a760c45c3a6e5a437289 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Mon, 17 Jul 2023 13:48:53 +0800 Subject: [PATCH 2/4] add support for continue train on evaluation data --- easy_rec/python/main.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index 91e70d671..eb760ac2e 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -359,12 +359,16 @@ def _train_and_evaluate_impl(pipeline_config, logging.info('\ttrain_steps=%d fit_on_eval_steps=%d' % (train_steps, fit_on_eval_steps)) fit_on_eval_steps += train_steps + # Do not use estimator_train.train_and_evaluate as it starts tf.Server, + # which is redundant and reports port not available error. estimator.train( input_fn=eval_input_fn, max_steps=fit_on_eval_steps, hooks=list(train_spec.hooks), saving_listeners=train_spec.saving_listeners) logging.info('Finished training on eval data') + # return estimator for custom training using estimator.train + return estimator def evaluate(pipeline_config, From eb535f3026750fd07fb0d9e11e42a187f6489f32 Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Wed, 19 Jul 2023 14:11:38 +0800 Subject: [PATCH 3/4] fix sync_replicas_optimizer que runners --- .../python/compat/sync_replicas_optimizer.py | 24 +- easy_rec/python/main.py | 3 +- easy_rec/python/test/train_eval_test.py | 9 + easy_rec/python/train_eval.py | 6 +- easy_rec/python/utils/test_utils.py | 12 +- .../multi_tower_on_taobao_unblanace.config | 304 ++++++++++++++++++ 6 files changed, 343 insertions(+), 15 deletions(-) create mode 100644 samples/model_config/multi_tower_on_taobao_unblanace.config diff --git a/easy_rec/python/compat/sync_replicas_optimizer.py b/easy_rec/python/compat/sync_replicas_optimizer.py index 4f1d97d4b..24c2921ba 100644 --- a/easy_rec/python/compat/sync_replicas_optimizer.py +++ b/easy_rec/python/compat/sync_replicas_optimizer.py @@ -135,6 +135,8 @@ class SyncReplicasOptimizer(optimizer.Optimizer): ``` """ + sync_que_id = -1 + def __init__(self, opt, replicas_to_aggregate, @@ -299,15 +301,24 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): update_op = self._opt.apply_gradients(aggregated_grads_and_vars, global_step) + def _get_token_qname(): + SyncReplicasOptimizer.sync_que_id += 1 + if SyncReplicasOptimizer.sync_que_id == 0: + return 'sync_token_q' + else: + return 'sync_token_q_' + str(SyncReplicasOptimizer.sync_que_id) + # Create token queue. + token_qname = _get_token_qname() + logging.info('create sync_token_queue[%s]' % token_qname) with ops.device(global_step.device), ops.name_scope(''): sync_token_queue = ( data_flow_ops.FIFOQueue( -1, global_step.dtype.base_dtype, shapes=(), - name='sync_token_q', - shared_name='sync_token_q')) + name=token_qname, + shared_name=token_qname)) self._sync_token_queue = sync_token_queue self._is_sync_que_closed = sync_token_queue.is_closed() self._close_sync_que = sync_token_queue.close( @@ -342,6 +353,8 @@ def apply_gradients(self, grads_and_vars, global_step=None, name=None): self._chief_queue_runner = queue_runner.QueueRunner( dummy_queue, [sync_op]) + ops.add_to_collection(ops.GraphKeys.QUEUE_RUNNERS, + self._chief_queue_runner) for accum, dev in self._accumulator_list: with ops.device(dev): chief_init_ops.append( @@ -479,14 +492,12 @@ def begin(self): self._local_init_op = self._sync_optimizer.chief_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = self._sync_optimizer.get_chief_queue_runner() self._init_tokens_op = self._sync_optimizer.get_init_tokens_op( self._num_tokens) else: self._local_init_op = self._sync_optimizer.local_step_init_op self._ready_for_local_init_op = ( self._sync_optimizer.ready_for_local_init_op) - self._q_runner = None self._init_tokens_op = None def after_create_session(self, session, coord): @@ -500,11 +511,10 @@ def after_create_session(self, session, coord): 'local_init. Init op: %s, error: %s' % (self._local_init_op.name, msg)) session.run(self._local_init_op) + is_closed = session.run(self._sync_optimizer._is_sync_que_closed) + assert not is_closed, 'sync_que is closed' if self._init_tokens_op is not None: session.run(self._init_tokens_op) - if self._q_runner is not None: - self._q_runner.create_threads( - session, coord=coord, daemon=True, start=True) def end(self, session): try: diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index eb760ac2e..2569afaff 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -312,7 +312,7 @@ def _train_and_evaluate_impl(pipeline_config, f.write(easy_rec.__version__ + '\n') train_steps = None - if train_config.HasField('num_steps'): + if train_config.HasField('num_steps') and train_config.num_steps > 0: train_steps = train_config.num_steps assert train_steps is not None or data_config.num_epochs > 0, ( 'either num_steps and num_epochs must be set to an integer > 0.') @@ -348,6 +348,7 @@ def _train_and_evaluate_impl(pipeline_config, estimator_train.train_and_evaluate(estimator, train_spec, eval_spec) logging.info('Train and evaluate finish') if fit_on_eval and (not estimator_utils.is_evaluator()): + tf.reset_default_graph() logging.info('Start continue training on eval data') eval_input_fn = _get_input_fn(data_config, feature_configs, eval_data, **input_fn_kwargs) diff --git a/easy_rec/python/test/train_eval_test.py b/easy_rec/python/test/train_eval_test.py index 6b59bcdeb..f66e5ead3 100644 --- a/easy_rec/python/test/train_eval_test.py +++ b/easy_rec/python/test/train_eval_test.py @@ -722,6 +722,15 @@ def test_fit_on_eval(self): fit_on_eval=True) self.assertTrue(self._success) + def test_unbalance_data(self): + self._success = test_utils.test_distributed_train_eval( + 'samples/model_config/multi_tower_on_taobao_unblanace.config', + self._test_dir, + total_steps=0, + num_epoch=1, + num_evaluator=1) + self.assertTrue(self._success) + def test_train_with_ps_worker_with_evaluator(self): self._success = test_utils.test_distributed_train_eval( 'samples/model_config/multi_tower_on_taobao.config', diff --git a/easy_rec/python/train_eval.py b/easy_rec/python/train_eval.py index 7b2dead63..a96d0c58e 100644 --- a/easy_rec/python/train_eval.py +++ b/easy_rec/python/train_eval.py @@ -68,12 +68,12 @@ default=None, help='eval data input path') parser.add_argument( - 'fit_on_eval', - type=bool, + '--fit_on_eval', + action='store_true', default=False, help='Fit evaluation data after fitting and evaluating train data') parser.add_argument( - 'fit_on_eval_steps', + '--fit_on_eval_steps', type=int, default=None, help='Fit evaluation data steps') diff --git a/easy_rec/python/utils/test_utils.py b/easy_rec/python/utils/test_utils.py index be987efae..a54591ce5 100644 --- a/easy_rec/python/utils/test_utils.py +++ b/easy_rec/python/utils/test_utils.py @@ -159,7 +159,10 @@ def _replace_data_for_test(data_path): return data_path -def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): +def _load_config_for_test(pipeline_config_path, + test_dir, + total_steps=50, + num_epochs=0): pipeline_config = config_util.get_configs_from_pipeline_file( pipeline_config_path) train_config = pipeline_config.train_config @@ -171,7 +174,7 @@ def _load_config_for_test(pipeline_config_path, test_dir, total_steps=50): pipeline_config.model_dir = os.path.join(test_dir, 'train') logging.info('test_model_dir %s' % pipeline_config.model_dir) eval_config.num_examples = max(10, data_config.batch_size) - data_config.num_epochs = 0 + data_config.num_epochs = num_epochs return pipeline_config @@ -672,10 +675,11 @@ def test_distributed_train_eval(pipeline_config_path, num_evaluator=0, edit_config_json=None, use_hvd=False, - fit_on_eval=False): + fit_on_eval=False, + num_epoch=0): logging.info('testing pipeline config %s' % pipeline_config_path) pipeline_config = _load_config_for_test(pipeline_config_path, test_dir, - total_steps) + total_steps, num_epoch) if edit_config_json is not None: config_util.edit_config(pipeline_config, edit_config_json) diff --git a/samples/model_config/multi_tower_on_taobao_unblanace.config b/samples/model_config/multi_tower_on_taobao_unblanace.config new file mode 100644 index 000000000..8105483c2 --- /dev/null +++ b/samples/model_config/multi_tower_on_taobao_unblanace.config @@ -0,0 +1,304 @@ +train_input_path: "data/test/tb_data/taobao_train_data,data/test/tb_data/taobao_test_data" +eval_input_path: "data/test/tb_data/taobao_test_data" +model_dir: "experiments/multi_tower_taobao_ckpt" + +train_config { + log_step_count_steps: 100 + optimizer_config: { + adam_optimizer: { + learning_rate: { + exponential_decay_learning_rate { + initial_learning_rate: 0.001 + decay_steps: 1000 + decay_factor: 0.5 + min_learning_rate: 0.00001 + } + } + } + use_moving_average: false + } + save_checkpoints_steps: 100 + sync_replicas: True + num_steps: 200 +} + +eval_config { + metrics_set: { + auc {} + } +} + +data_config { + file_shard: true + input_fields { + input_name:'clk' + input_type: INT32 + } + input_fields { + input_name:'buy' + input_type: INT32 + } + input_fields { + input_name: 'pid' + input_type: STRING + } + input_fields { + input_name: 'adgroup_id' + input_type: STRING + } + input_fields { + input_name: 'cate_id' + input_type: STRING + } + input_fields { + input_name: 'campaign_id' + input_type: STRING + } + input_fields { + input_name: 'customer' + input_type: STRING + } + input_fields { + input_name: 'brand' + input_type: STRING + } + input_fields { + input_name: 'user_id' + input_type: STRING + } + input_fields { + input_name: 'cms_segid' + input_type: STRING + } + input_fields { + input_name: 'cms_group_id' + input_type: STRING + } + input_fields { + input_name: 'final_gender_code' + input_type: STRING + } + input_fields { + input_name: 'age_level' + input_type: STRING + } + input_fields { + input_name: 'pvalue_level' + input_type: STRING + } + input_fields { + input_name: 'shopping_level' + input_type: STRING + } + input_fields { + input_name: 'occupation' + input_type: STRING + } + input_fields { + input_name: 'new_user_class_level' + input_type: STRING + } + input_fields { + input_name: 'tag_category_list' + input_type: STRING + } + input_fields { + input_name: 'tag_brand_list' + input_type: STRING + } + input_fields { + input_name: 'price' + input_type: INT32 + } + + label_fields: 'clk' + batch_size: 4096 + num_epochs: 1 + prefetch_size: 32 + input_type: CSVInput +} + +feature_config: { + features: { + input_names: 'pid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'adgroup_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'cate_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10000 + } + features: { + input_names: 'campaign_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'customer' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'brand' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100000 + } + features: { + input_names: 'user_id' + feature_type: IdFeature + embedding_dim: 32 + hash_bucket_size: 100000 + } + features: { + input_names: 'cms_segid' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'cms_group_id' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 100 + } + features: { + input_names: 'final_gender_code' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'age_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'pvalue_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'shopping_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'occupation' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'new_user_class_level' + feature_type: IdFeature + embedding_dim: 16 + hash_bucket_size: 10 + } + features: { + input_names: 'tag_category_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'tag_brand_list' + feature_type: TagFeature + separator: '|' + hash_bucket_size: 100000 + embedding_dim: 16 + } + features: { + input_names: 'price' + feature_type: IdFeature + embedding_dim: 16 + num_buckets: 50 + } +} +model_config: { + model_class: 'MultiTower' + feature_groups: { + group_name: 'user' + feature_names: 'user_id' + feature_names: 'cms_segid' + feature_names: 'cms_group_id' + feature_names: 'age_level' + feature_names: 'pvalue_level' + feature_names: 'shopping_level' + feature_names: 'occupation' + feature_names: 'new_user_class_level' + wide_deep: DEEP + } + feature_groups: { + group_name: 'item' + feature_names: 'adgroup_id' + feature_names: 'cate_id' + feature_names: 'campaign_id' + feature_names: 'customer' + feature_names: 'brand' + feature_names: 'price' + wide_deep: DEEP + } + feature_groups: { + group_name: 'combo' + feature_names: 'pid' + feature_names: 'tag_category_list' + feature_names: 'tag_brand_list' + wide_deep: DEEP + } + losses { + loss_type: F1_REWEIGHTED_LOSS + weight: 1.0 + f1_reweighted_loss { + f1_beta_square: 2.25 + } + } + losses { + loss_type: PAIR_WISE_LOSS + weight: 1.0 + } + multi_tower { + towers { + input: "user" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "item" + dnn { + hidden_units: [256, 128, 96, 64] + } + } + towers { + input: "combo" + dnn { + hidden_units: [128, 96, 64, 32] + } + } + final_dnn { + hidden_units: [128, 96, 64, 32, 16] + } + l2_regularization: 1e-6 + } + embedding_regularization: 1e-4 +} + +export_config { + multi_placeholder: false +} From a380eee3951eba421d4e3dfbb453274c10ed986f Mon Sep 17 00:00:00 2001 From: chengmengli06 Date: Wed, 19 Jul 2023 15:57:48 +0800 Subject: [PATCH 4/4] fix bug --- easy_rec/python/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/easy_rec/python/main.py b/easy_rec/python/main.py index 2569afaff..f2b1ad719 100644 --- a/easy_rec/python/main.py +++ b/easy_rec/python/main.py @@ -366,7 +366,8 @@ def _train_and_evaluate_impl(pipeline_config, input_fn=eval_input_fn, max_steps=fit_on_eval_steps, hooks=list(train_spec.hooks), - saving_listeners=train_spec.saving_listeners) + saving_listeners=train_spec.saving_listeners if hasattr( + train_spec, 'saving_listeners') else None) logging.info('Finished training on eval data') # return estimator for custom training using estimator.train return estimator