-
Notifications
You must be signed in to change notification settings - Fork 10
/
helper.py
101 lines (79 loc) · 3.26 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
from collections import Counter
import random
import numpy as np
def load_books():
corpus = ''
books_list = os.listdir('./final_data')
for book in books_list:
with open('./final_data/'+book) as f:
corpus += f.read()
return corpus
def preprocess(text):
text = text.lower()
text = text.replace('.', ' <PERIOD> ')
text = text.replace(',', ' <COMMA> ')
text = text.replace('"', ' <QUOTATION_MARK> ')
text = text.replace(';', ' <SEMICOLON> ')
text = text.replace('!', ' <EXCLAMATION_MARK> ')
text = text.replace('?', ' <QUESTION_MARK> ')
text = text.replace('(', ' <LEFT_PAREN> ')
text = text.replace(')', ' <RIGHT_PAREN> ')
text = text.replace('--', ' <HYPHENS> ')
text = text.replace('?', ' <QUESTION_MARK> ')
text = text.replace('\n', ' <NEW_LINE> ')
text = text.replace(':', ' <COLON> ')
words = text.split()
word_counts = Counter(words)
corpus = [word for word in words if word_counts[word] > 5]
return corpus
def create_dict(corpus):
word_counts = Counter(corpus)
sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
int_to_vocab = {ii: word for ii, word in enumerate(sorted_vocab)}
vocab_to_int = {word: ii for ii, word in int_to_vocab.items()}
return vocab_to_int, int_to_vocab
def sub_sampling(encoded_corpus):
threshold = 1e-5
word_counts = Counter(encoded_corpus)
total_count = len(encoded_corpus)
freqs = {word: count / total_count for word, count in word_counts.items()}
p_drop = {word: 1 - np.sqrt(threshold / freqs[word]) for word in word_counts}
sampled_encoded_corpus = [word for word in encoded_corpus if random.random() < (1 - p_drop[word])]
return sampled_encoded_corpus
def get_target(words, idx, window_size=5):
R = np.random.randint(1, window_size + 1)
start = idx - R if (idx - R) > 0 else 0
stop = idx + R
target_words = set(words[start:idx] + words[idx + 1:stop + 1])
return list(target_words)
def get_batches(words, batch_size, window_size=5):
n_batches = len(words) // batch_size
words = words[:n_batches * batch_size]
for idx in range(0, len(words), batch_size):
x, y = [], []
batch = words[idx:idx + batch_size]
for ii in range(len(batch)):
batch_x = batch[ii]
batch_y = get_target(batch, ii, window_size)
y.extend(batch_y)
x.extend([batch_x] * len(batch_y))
yield x, y
# def get_batches(int_text, batch_size, seq_length):
# """
# Return batches of input and target
# :param int_text: Text with the words replaced by their ids
# :param batch_size: The size of batch
# :param seq_length: The length of sequence
# :return: A list where each item is a tuple of (batch of input, batch of target).
# """
# n_batches = int(len(int_text) / (batch_size * seq_length))
#
# # Drop the last few characters to make only full batches
# xdata = np.array(int_text[: n_batches * batch_size * seq_length])
# ydata = np.array(int_text[1: n_batches * batch_size * seq_length + 1])
#
# x_batches = np.split(xdata.reshape(batch_size, -1), n_batches, 1)
# y_batches = np.split(ydata.reshape(batch_size, -1), n_batches, 1)
#
# return list(zip(x_batches, y_batches))