-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbi_trigrams.py
144 lines (118 loc) · 4.45 KB
/
bi_trigrams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""
Read output from a corpus of text files
Create bigrams, trigrams, and frequency distributions
See documentation here: http://www.nltk.org/howto/collocations.html
Usage: bi_trigrams.py (-i DIR) (-o DIR) [-s FILE]
-h --help show this
-o --output DIR specify output directory [default: ./results]
-i --input DIR specify input directory [default: .]
-s --stopwords FILE specify file containing list of stop words [default: ./stopwords.txt]
Mike Widner <[email protected]>
"""
# how many n-grams to find
TOT_NGRAMS = 500
# how frequent must an n-gram be for inclusion
FREQ_FILTER = 3
# minimum character length of words to include
MIN_LENGTH = 4
import os
import csv
import nltk
import string
import itertools
import collections
from docopt import docopt
from nltk.corpus import PlaintextCorpusReader
from nltk.collocations import BigramAssocMeasures, BigramCollocationFinder, TrigramAssocMeasures, TrigramCollocationFinder
string.punctuation += "…"
options = docopt(__doc__)
SOURCE = options['--input'] + '/' # added / to be safe
TARGET = options['--output'] + '/'
if not os.path.isdir(TARGET):
os.makedirs(TARGET)
wordlists = PlaintextCorpusReader(SOURCE, ".*\.txt$")
bigram_measures = BigramAssocMeasures()
trigram_measures = TrigramAssocMeasures()
if options['--stopwords']:
fh = open(options['--stopwords'], 'r')
stopwords = fh.read()
fh.close()
stopwords = stopwords.split()
else:
stopwords = False
def write_results(results, prefix):
if not os.path.isdir(TARGET):
os.makedirs(TARGET)
# Bigrams
fh = open(prefix + '-bigrams.txt', 'w')
for bigram in results['bigrams']:
fh.write(' '.join(bigram) + "\n")
fh.close()
fh = csv.writer(open(prefix + '-bigram_prefix.csv', 'w', encoding='utf-8'), dialect='excel')
fh.writerow(['measure', 'first', 'second'])
for key in results['b_prefix']:
for item in results['b_prefix'][key]:
fh.writerow([item[1], key, item[0]]) # funky ordering; fix it
# Trigrams
fh = open(prefix + '-trigrams.txt', 'w')
for trigram in results['trigrams']:
fh.write(' '.join(trigram) + "\n")
fh.close()
fh = csv.writer(open(prefix + '-trigram_prefix.csv', 'w', encoding='utf-8'))
fh.writerow(['measure', 'first', 'second', 'third'])
for key in results['t_prefix']:
for item in results['t_prefix'][key]:
fh.writerow([item[2], key, item[0], item[1]])
# Freq Dist
fh = csv.writer(open(prefix + '-fdist.csv', 'w', encoding='utf-8'), dialect='excel')
fh.writerow(['word', 'raw_frequency'])
for word in results['fdist'].keys():
fh.writerow([word, results['fdist'][word]])
def analyze_text(text, filename):
print(len(text), filename)
words = [w.lower() for w in text
if w not in string.punctuation
if w.lower() not in stopwords
and len(w) >= MIN_LENGTH]
fdist = nltk.FreqDist(words)
# what follows could totally be generalized
# Bigrams
print("Generating bigrams from", filename)
b_finder = BigramCollocationFinder.from_words(words)
b_finder.apply_freq_filter(FREQ_FILTER)
# if stopwords:
# b_finder.apply_word_filter(lambda w: w in stopwords)
bigrams = b_finder.nbest(bigram_measures.pmi, TOT_NGRAMS)
b_scored = b_finder.score_ngrams(bigram_measures.pmi)
b_prefix_keys = collections.defaultdict(list)
for key, scores in b_scored:
b_prefix_keys[key[0]].append((key[1], scores))
# Trigrams
print("Generating trigrams from", filename)
t_finder = TrigramCollocationFinder.from_words(words)
t_finder.apply_freq_filter(FREQ_FILTER)
# if stopwords:
# t_finder.apply_word_filter(lambda w: w in stopwords)
trigrams = t_finder.nbest(trigram_measures.pmi, TOT_NGRAMS)
t_scored = t_finder.score_ngrams(trigram_measures.pmi)
t_prefix_keys = collections.defaultdict(list)
for key, scores in t_scored:
t_prefix_keys[key[0]].append((key[1], key[2], scores))
return({'bigrams': bigrams, 'b_prefix': b_prefix_keys,
'trigrams': trigrams, 't_prefix': t_prefix_keys,
'fdist': fdist})
wc_fh = csv.writer(open(TARGET + 'word_counts.csv', 'w', encoding='utf-8'))
wc_fh.writerow(['year', 'words'])
# Per file results
for filename in wordlists.fileids():
if len(wordlists.raw(fileids=[filename])) == 0:
continue
slug = filename.partition(".")[0] # grab a filename
wc_fh.writerow([slug, len(wordlists.words(fileids=[filename]))])
results = analyze_text(wordlists.words(fileids=[filename]), filename)
write_results(results, TARGET + slug)
# Cumulative results
# Note: chokes on empty files
wc_fh.writerow(['all', len(wordlists.words())])
results = analyze_text(wordlists.words(), "all")
write_results(results, TARGET + "all")