From ab7f36a5afcd4b7be7282cd6e0995e0e287adbf9 Mon Sep 17 00:00:00 2001 From: Ameya Godbole Date: Tue, 6 Apr 2021 22:33:38 -0400 Subject: [PATCH] FIX ensemble RAM usage - Remove redundant objects made in ensemble model - Reduce heavy logging of scores --- .../models/multifacet_recommender/ensemble.py | 45 +++++++++---------- .../multifacet_recommender.py | 41 ++++++++++------- .../models/multifacet_recommender/specter.py | 40 +++++++++++------ 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/expertise/models/multifacet_recommender/ensemble.py b/expertise/models/multifacet_recommender/ensemble.py index 1ab99aa2..5073d9f8 100644 --- a/expertise/models/multifacet_recommender/ensemble.py +++ b/expertise/models/multifacet_recommender/ensemble.py @@ -62,32 +62,31 @@ def embed_publications(self, specter_publications_path=None, mfr_publications_pa def all_scores(self, specter_publications_path=None, mfr_publications_path=None, specter_submissions_path=None, mfr_submissions_path=None, scores_path=None): + preliminary_scores_map = {} print("SPECTER:") specter_scores_path = os.path.join(self.specter_predictor.work_dir, "specter_affinity.csv") - self.specter_predictor.all_scores(specter_publications_path, specter_submissions_path, specter_scores_path) + self.specter_predictor.all_scores(specter_publications_path, specter_submissions_path, specter_scores_path, + ensemble_mode=True, scores_map=preliminary_scores_map, + score_weight=self.merge_alpha) + print("SPECTER: Scores computed") print("MFR:") mfr_scores_path = os.path.join(self.mfr_predictor.work_dir, "mfr_affinity.csv") - self.mfr_predictor.all_scores(mfr_publications_path, mfr_submissions_path, mfr_scores_path) - - # Convert preliminary scores of SPECTER to a dictionary - csv_scores = [] - self.preliminary_scores = [] - specter_preliminary_scores_map = {} - for entry in self.specter_predictor.preliminary_scores: - specter_preliminary_scores_map[(entry[0], entry[1])] = entry[2] - - for entry in self.mfr_predictor.preliminary_scores: - new_score = specter_preliminary_scores_map[(entry[0], entry[1])] * self.merge_alpha + \ - entry[2] * (1 - self.merge_alpha) - csv_line = '{note_id},{reviewer},{score}'.format(note_id=entry[0], reviewer=entry[1], - score=new_score) - csv_scores.append(csv_line) - self.preliminary_scores.append((entry[0], entry[1], new_score)) - + self.mfr_predictor.all_scores(mfr_publications_path, mfr_submissions_path, mfr_scores_path, + ensemble_mode=True, scores_map=preliminary_scores_map, + score_weight=(1 - self.merge_alpha)) + print("MFR: Scores computed") + + self.preliminary_scores = [(score_key[0], score_key[1], score_val) + for score_key, score_val in preliminary_scores_map.items()] + del preliminary_scores_map if scores_path: + print("Writing scores") with open(scores_path, 'w') as f: - for csv_line in csv_scores: - f.write(csv_line + '\n') + for entry in self.preliminary_scores: + csv_line = '{note_id},{reviewer},{score}\n'.format(note_id=entry[0], reviewer=entry[1], + score=entry[2]) + f.write(csv_line) + print("Scores written") return self.preliminary_scores @@ -115,7 +114,7 @@ def sparse_scores(self, scores_path=None): print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[0], x[2]), reverse=True) - print('preliminary', self.preliminary_scores, len(self.preliminary_scores)) + print('Sort 1 complete') all_scores = set() # They are first sorted by note_id all_scores = self._sparse_scores_helper(all_scores, 0) @@ -123,6 +122,7 @@ def sparse_scores(self, scores_path=None): # Sort by profile_id print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[1], x[2]), reverse=True) + print('Sort 2 complete') all_scores = self._sparse_scores_helper(all_scores, 1) print('Final Sort...') @@ -131,6 +131,5 @@ def sparse_scores(self, scores_path=None): with open(scores_path, 'w') as f: for note_id, profile_id, score in all_scores: f.write('{0},{1},{2}\n'.format(note_id, profile_id, score)) - - print('ALL SCORES', all_scores) + print('Sparse scores written') return all_scores diff --git a/expertise/models/multifacet_recommender/multifacet_recommender.py b/expertise/models/multifacet_recommender/multifacet_recommender.py index 4ee2d0f5..95d09992 100644 --- a/expertise/models/multifacet_recommender/multifacet_recommender.py +++ b/expertise/models/multifacet_recommender/multifacet_recommender.py @@ -823,7 +823,8 @@ def embed_publications(self, publications_path=None): print('NOTE: Archive publication embeddings are not explicitly computed. ' 'Reviewer embeddings are computed and consumed with submission embeddings during score calculation') - def all_scores(self, publications_path=None, submissions_path=None, scores_path=None): + def all_scores(self, publications_path=None, submissions_path=None, scores_path=None, + ensemble_mode=False, scores_map=None, score_weight=1.0): print("Loading data...") seed_all_randomness(self.seed, self.use_cuda) idx2word_freq, user_idx2word_freq, tag_idx2word_freq, dataloader_train_arr, _,\ @@ -897,12 +898,17 @@ def all_scores(self, publications_path=None, submissions_path=None, scores_path= # Convert distances to scores num_special_token = 3 - csv_scores = [] - self.preliminary_scores = [] + if not ensemble_mode: + self.preliminary_scores = [] + else: + assert scores_map is not None + if scores_path: + fout = open(scores_path, 'w') + else: + fout = None dist_arr = np.loadtxt(os.path.join(self.work_dir, "reviewer_submission_dist_arr.txt")) assert dist_arr.shape[0] == len(self.submission_paper_ids_list) assert dist_arr.shape[1] == len(user_idx2word_freq) - sim_arr = 1. - dist_arr for j in range(num_special_token, len(user_idx2word_freq)): user_raw = user_idx2word_freq[j][0] user_name = user_raw @@ -910,16 +916,21 @@ def all_scores(self, publications_path=None, submissions_path=None, scores_path= suffix_start = user_raw.index('|') user_name = user_raw[:suffix_start] for i, paper_id in enumerate(self.submission_paper_ids_list): - csv_line = '{note_id},{reviewer},{score}'.format(note_id=paper_id, reviewer=user_name, - score=sim_arr[i, j]) - csv_scores.append(csv_line) - self.preliminary_scores.append((paper_id, user_name, sim_arr[i, j])) - - if scores_path: - with open(scores_path, 'w') as f: - for csv_line in csv_scores: - f.write(csv_line + '\n') + if fout is not None: + csv_line = '{note_id},{reviewer},{score}\n'.format(note_id=paper_id, reviewer=user_name, + score=1. - dist_arr[i, j]) + fout.write(csv_line) + if ensemble_mode: + scores_map[(paper_id, user_name)] = scores_map.get((paper_id, user_name), 0.0) + \ + score_weight * (1. - dist_arr[i, j]) + else: + self.preliminary_scores.append((paper_id, user_name, (1. - dist_arr[i, j]))) + if fout is not None: + fout.flush() + fout.close() + if ensemble_mode: + return return self.preliminary_scores def _sparse_scores_helper(self, all_scores, id_index): @@ -946,7 +957,7 @@ def sparse_scores(self, scores_path=None): print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[0], x[2]), reverse=True) - print('preliminary', self.preliminary_scores, len(self.preliminary_scores)) + print('Sort 1 complete') all_scores = set() # They are first sorted by note_id all_scores = self._sparse_scores_helper(all_scores, 0) @@ -954,6 +965,7 @@ def sparse_scores(self, scores_path=None): # Sort by profile_id print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[1], x[2]), reverse=True) + print('Sort 2 complete') all_scores = self._sparse_scores_helper(all_scores, 1) print('Final Sort...') @@ -963,5 +975,4 @@ def sparse_scores(self, scores_path=None): for note_id, profile_id, score in all_scores: f.write('{0},{1},{2}\n'.format(note_id, profile_id, score)) - print('ALL SCORES', all_scores) return all_scores diff --git a/expertise/models/multifacet_recommender/specter.py b/expertise/models/multifacet_recommender/specter.py index ff96b0e8..88ffbb61 100644 --- a/expertise/models/multifacet_recommender/specter.py +++ b/expertise/models/multifacet_recommender/specter.py @@ -281,7 +281,8 @@ def embed_publications(self, publications_path=None): False) manager.run() - def all_scores(self, publications_path=None, submissions_path=None, scores_path=None): + def all_scores(self, publications_path=None, submissions_path=None, scores_path=None, + ensemble_mode=False, scores_map=None, score_weight=1.0): def load_emb_file(emb_file): paper_emb_size_default = 768 id_list = [] @@ -323,8 +324,14 @@ def load_emb_file(emb_file): for i in range(paper_num_test): p2p_aff[i, :] = torch.sum(paper_emb_test[i, :].unsqueeze(dim=0) * paper_emb_train, dim=1) - csv_scores = [] - self.preliminary_scores = [] + if not ensemble_mode: + self.preliminary_scores = [] + else: + assert scores_map is not None + if scores_path: + fout = open(scores_path, 'w') + else: + fout = None for reviewer_id, train_note_id_list in self.pub_author_ids_to_note_id.items(): if len(train_note_id_list) == 0: continue @@ -339,16 +346,21 @@ def load_emb_file(emb_file): elif self.max_score: all_paper_aff = train_paper_aff_j.max(dim=1)[0] for j in range(paper_num_test): - csv_line = '{note_id},{reviewer},{score}'.format(note_id=test_id_list[j], reviewer=reviewer_id, - score=all_paper_aff[j].item()) - csv_scores.append(csv_line) - self.preliminary_scores.append((test_id_list[j], reviewer_id, all_paper_aff[j].item())) - - if scores_path: - with open(scores_path, 'w') as f: - for csv_line in csv_scores: - f.write(csv_line + '\n') + if fout is not None: + csv_line = '{note_id},{reviewer},{score}\n'.format(note_id=test_id_list[j], reviewer=reviewer_id, + score=all_paper_aff[j].item()) + fout.write(csv_line) + if ensemble_mode: + scores_map[(test_id_list[j], reviewer_id)] = scores_map.get((test_id_list[j], reviewer_id), 0.0) + \ + score_weight * all_paper_aff[j].item() + else: + self.preliminary_scores.append((test_id_list[j], reviewer_id, all_paper_aff[j].item())) + if fout is not None: + fout.flush() + fout.close() + if ensemble_mode: + return return self.preliminary_scores def _sparse_scores_helper(self, all_scores, id_index): @@ -375,7 +387,7 @@ def sparse_scores(self, scores_path=None): print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[0], x[2]), reverse=True) - print('preliminary', self.preliminary_scores, len(self.preliminary_scores)) + print('Sort 1 complete') all_scores = set() # They are first sorted by note_id all_scores = self._sparse_scores_helper(all_scores, 0) @@ -383,6 +395,7 @@ def sparse_scores(self, scores_path=None): # Sort by profile_id print('Sorting...') self.preliminary_scores.sort(key=lambda x: (x[1], x[2]), reverse=True) + print('Sort 2 complete') all_scores = self._sparse_scores_helper(all_scores, 1) print('Final Sort...') @@ -392,5 +405,4 @@ def sparse_scores(self, scores_path=None): for note_id, profile_id, score in all_scores: f.write('{0},{1},{2}\n'.format(note_id, profile_id, score)) - print('ALL SCORES', all_scores) return all_scores