forked from muhammet-mucahit/Yemek-Yemek-REST-API
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapriori.py
executable file
·258 lines (226 loc) · 8.59 KB
/
apriori.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
from collections import defaultdict
from itertools import combinations
from sys import stdout
class cached_property(object):
"""A cached property only computed once
"""
def __init__(self, func):
self.func = func
def __get__(self, obj, cls):
if obj is None:
return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
class Base(object):
"""A base workflow for Apriori algorithm
"""
def _before_generate_frequent_itemset(self):
"""Invoked before generate_frequent_itemset()
"""
pass
def _after_generate_frequent_itemset(self):
"""Invoked before generate_frequent_itemset()
"""
pass
def generate_frequent_itemset(self):
"""Generate and return frequent itemset
"""
raise NotImplementedError(
"generate_frequent_itemset(self) need to be implemented.")
def _before_generate_rule(self):
"""Invoked before generate_frequent_itemset()
"""
pass
def _after_generate_rule(self):
"""Invoked before generate_frequent_itemset()
"""
pass
def generate_rule(self):
"""Generate and return rule
"""
raise NotImplementedError(
"generate_rule(self) need to be implemented.")
def run(self):
"""Run Apriori algorithm and return rules
"""
# generate frequent itemset
self._before_generate_frequent_itemset()
self.generate_frequent_itemset()
self._after_generate_frequent_itemset()
# generate rule
self._before_generate_rule()
self.generate_rule()
self._after_generate_rule()
class Apriori(Base):
"""A simple implementation of Apriori algorithm
Example:
dataset = [
['bread', 'milk'],
['bread', 'diaper', 'beer', 'egg'],
['milk', 'diaper', 'beer', 'cola'],
['bread', 'milk', 'diaper', 'beer'],
['bread', 'milk', 'diaper', 'cola'],
]
minsup = minconf = 0.6
apriori = Apriori(dataset, minsup, minconf)
apriori.run()
apriori.print_rule()
Results:
Rules
milk --> bread (confidence = 0.75)
bread --> milk (confidence = 0.75)
diaper --> bread (confidence = 0.75)
bread --> diaper (confidence = 0.75)
beer --> diaper (confidence = 1.0)
diaper --> beer (confidence = 0.75)
diaper --> milk (confidence = 0.75)
milk --> diaper (confidence = 0.75)
"""
def __init__(self, transaction_list, minsup, minconf, selected_items=None):
"""Initialization
:param transaction_list: a list cantains transaction
:param minsup: minimum support
:param minconf: minimum confidence
:param selected_items: selected items in frequent itemset, default `None`
"""
self.transaction_list = transaction_list
self.transaction_list_full_length = len(transaction_list)
self.minsup = minsup
self.minconf = minconf
if selected_items is not None and selected_items is not []:
self.selected_items = frozenset(selected_items)
else:
self.selected_items = None
self.frequent_itemset = dict()
# support for every frequent itemset
self.frequent_itemset_support = defaultdict(float)
# convert transaction_list
self.transaction_list = list([frozenset(transaction)
for transaction in transaction_list])
self.rule = []
def set_selected_items(self, selected_items):
"""Set selected items
"""
self.selected_items = frozenset(selected_items)
@cached_property
def items(self):
"""Return all items in the self.transaction_list
"""
items = set()
for transaction in self.transaction_list:
for item in transaction:
items.add(item)
return items
def filter_with_minsup(self, itemsets):
"""Return subset of itemsets which satisfies minsup
and record their support
"""
local_counter = defaultdict(int)
for itemset in itemsets:
for transaction in self.transaction_list:
if itemset.issubset(transaction):
local_counter[itemset] += 1
# filter with counter
result = set()
for itemset, count in local_counter.items():
support = float(count) / self.transaction_list_full_length
if support >= self.minsup:
result.add(itemset)
self.frequent_itemset_support[itemset] = support
return result
def _after_generate_frequent_itemset(self):
"""Filter frequent itemset with selected items
"""
if self.selected_items is None:
return
local_remove = []
for key, val in self.frequent_itemset.items():
for itemset in val:
if not self.selected_items.issubset(itemset):
local_remove.append((key, itemset))
for (key, itemset) in local_remove:
self.frequent_itemset[key].remove(itemset)
def generate_frequent_itemset(self):
"""Generate and return frequent itemset
"""
def _apriori_gen(itemset, length):
"""Return candidate itemset with given itemset and length
"""
# simply use F(k-1) x F(k-1) (itemset + itemset)
return set([x.union(y) for x in itemset for y in itemset
if len(x.union(y)) == length])
k = 1
current_itemset = set()
# generate 1-frequnt_itemset
for item in self.items:
current_itemset.add(frozenset([item]))
self.frequent_itemset[k] = self.filter_with_minsup(current_itemset)
# generate k-frequent_itemset
while True:
k += 1
current_itemset = _apriori_gen(current_itemset, k)
current_itemset = self.filter_with_minsup(current_itemset)
if current_itemset != set([]):
self.frequent_itemset[k] = current_itemset
else:
break
return self.frequent_itemset
def _generate_rule(self, itemset, frequent_itemset_k):
"""Generate rule with F(k) in DFS style
"""
# make sure the itemset has at least two element to generate the rule
if len(itemset) < 2:
return
for element in combinations(list(itemset), 1):
rule_head = itemset - frozenset(element)
confidence = self.frequent_itemset_support[frequent_itemset_k] / \
self.frequent_itemset_support[rule_head]
if confidence >= self.minconf:
rule = ((rule_head, itemset - rule_head), confidence)
# if rule not in self.rule, add and recall _generate_rule() in DFS
if rule not in self.rule:
self.rule.append(rule)
self._generate_rule(rule_head, frequent_itemset_k)
def generate_rule(self):
"""Generate and return rule
"""
# generate frequent itemset if not generated
if len(self.frequent_itemset) == 0:
self.generate_frequent_itemset()
# generate in DFS style
for key, val in self.frequent_itemset.items():
if key == 1:
continue
for itemset in val:
self._generate_rule(itemset, itemset)
return self.rule
def print_frequent_itemset(self):
"""Print out frequent itemset
"""
stdout.write(
'======================================================\n')
stdout.write('Frequent itemset:\n')
for key, val in self.frequent_itemset.items():
#stdout.write('frequent itemset size of {0}:\n'.format(key))
for itemset in val:
stdout.write('(')
stdout.write(', '.join(itemset))
stdout.write(')')
stdout.write(' support = {0}\n'.format(
round(self.frequent_itemset_support[itemset], 3)))
stdout.write(
'======================================================\n')
def print_rule(self):
"""Print out rules
"""
res = list()
for rule in self.rule:
result = ""
head = rule[0][0]
tail = rule[0][1]
confidence = rule[1]
result += ', '.join(head)
result += ' ==> '
result += ', '.join(tail)
res.append(result)
return res