-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimpsons_paradox.py
303 lines (254 loc) · 8.34 KB
/
simpsons_paradox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
from dataclasses import dataclass
from itertools import product
from functools import cached_property
import pandas as pd
from tqdm import tqdm
def get_nice_ratios():
"Returns a set of nice ratios that are less than 1."
rs = set()
for divisor in [1, 2, 4, 5, 10]:
for i in range(divisor):
rs.add(i / divisor)
return rs
i = 1
class multiples_gen:
"Generator of octets of integers that have a multiple relation."
factor_set = [1, 3, 4, 9]
def __init__(self, limit):
self.limit = limit
self.base = 1
self.l = 0
self.factors = product(self.factor_set, repeat=8)
def __iter__(self):
return self
def __next__(self):
if self.l > self.limit:
raise StopIteration()
try:
new_factors = next(self.factors)
except StopIteration:
self.factors = product(self.factor_set, repeat=8)
new_factors = next(self.factors)
self.base += 1
self.l += 1
return [self.base * f for f in new_factors]
@dataclass
class Table:
"""Data container for experiment results"""
Ax_pos: int
Ax_neg: int
Ay_pos: int
Ay_neg: int
Bx_pos: int
Bx_neg: int
By_pos: int
By_neg: int
nice_ratios = get_nice_ratios()
@property
def columns(self):
"""
Column names for two events, two priors and two outcomes
["Ax_pos", "Ax_neg", "Ay_pos", "Ay_neg", "Bx_pos", "Bx_neg", "By_pos", "By_neg"]
"""
return list(
f"{Ep}_{s}"
for Ep, s in product(
(f"{E}{p}" for E, p in product(["A", "B"], ["x", "y"])), ("pos", "neg")
)
)
def __post_init__(self):
self.Ax_sum = self.Ax_pos + self.Ax_neg
self.Ay_sum = self.Ay_pos + self.Ay_neg
self.A_pos = self.Ax_pos + self.Ay_pos
self.A_sum = self.Ax_sum + self.Ay_sum
self.Bx_sum = self.Bx_pos + self.Bx_neg
self.By_sum = self.By_pos + self.By_neg
self.B_pos = self.Bx_pos + self.By_pos
self.B_sum = self.Bx_sum + self.By_sum
self.Ax_rate = 0 if not self.Ax_sum else self.Ax_pos / self.Ax_sum
self.Ay_rate = 0 if not self.Ay_sum else self.Ay_pos / self.Ay_sum
self.Bx_rate = 0 if not self.Bx_sum else self.Bx_pos / self.Bx_sum
self.By_rate = 0 if not self.By_sum else self.By_pos / self.By_sum
self.A_rate = 0 if not self.A_sum else self.A_pos / self.A_sum
self.B_rate = 0 if not self.B_sum else self.B_pos / self.B_sum
def is_paradoxical(self):
return all(
[
self.Ax_rate > self.Bx_rate,
self.Ay_rate > self.By_rate,
self.A_rate < self.B_rate,
]
) or all(
[
self.Ax_rate < self.Bx_rate,
self.Ay_rate < self.By_rate,
self.A_rate > self.B_rate,
]
)
def has_null(self):
return 0 in [
self.Ax_pos,
self.Ay_pos,
self.Ax_neg,
self.Ay_neg,
self.Bx_pos,
self.By_pos,
self.Bx_neg,
self.By_neg,
]
def as_df(self):
df = pd.DataFrame(
[
[
self.Ax_pos,
self.Ax_sum,
self.Ax_rate,
self.Bx_pos,
self.Bx_sum,
self.Bx_rate,
],
[
self.Ay_pos,
self.Ay_sum,
self.Ay_rate,
self.By_pos,
self.By_sum,
self.By_rate,
],
[
self.A_pos,
self.A_sum,
self.A_rate,
self.B_pos,
self.B_sum,
self.B_rate,
],
],
columns=["A_pos", "A_sum", "A_rate", "B_pos", "B_sum", "B_rate"],
index=["x", "y", "all"],
)
return df
def has_nice_ratios(self):
return all(
[
rate in self.nice_ratios
for rate in [
self.Ax_rate,
self.Ay_rate,
self.Bx_rate,
self.By_rate,
self.A_rate,
self.B_rate,
]
]
)
def has_ballanced_events(self):
return A_sum == B_sum
def values_paradoxical(Ax_pos, Ax_neg, Ay_pos, Ay_neg, Bx_pos, Bx_neg, By_pos, By_neg):
return all(
[
(0 if not (Ax_pos + Ax_neg) else Ax_pos / (Ax_pos + Ax_neg))
> (0 if not (Bx_pos + Bx_neg) else Bx_pos / (Bx_pos + Bx_neg)),
(0 if not (Ay_pos + Ay_neg) else Ay_pos / (Ay_pos + Ay_neg))
> (0 if not (By_pos + By_neg) else By_pos / (By_pos + By_neg)),
(
0
if not ((Ax_pos + Ax_neg) + (Ay_pos + Ay_neg))
else (Ax_pos + Ay_pos) / ((Ax_pos + Ax_neg) + (Ay_pos + Ay_neg))
)
< (
0
if not ((Bx_pos + Bx_neg) + (By_pos + By_neg))
else (Bx_pos + By_pos) / ((Bx_pos + Bx_neg) + (By_pos + By_neg))
),
]
) or all(
[
(0 if not (Ax_pos + Ax_neg) else Ax_pos / (Ax_pos + Ax_neg))
< (0 if not (Bx_pos + Bx_neg) else Bx_pos / (Bx_pos + Bx_neg)),
(0 if not (Ay_pos + Ay_neg) else Ay_pos / (Ay_pos + Ay_neg))
< (0 if not (By_pos + By_neg) else By_pos / (By_pos + By_neg)),
(
0
if not ((Ax_pos + Ax_neg) + (Ay_pos + Ay_neg))
else (Ax_pos + Ay_pos) / ((Ax_pos + Ax_neg) + (Ay_pos + Ay_neg))
)
> (
0
if not ((Bx_pos + Bx_neg) + (By_pos + By_neg))
else (Bx_pos + By_pos) / ((Bx_pos + Bx_neg) + (By_pos + By_neg))
),
]
)
class SimpsonFinder:
"""
Find pretty Simpson's paradox examples
"""
def find_dc(self, rng, only_nice_ratios=False):
"""
Use dataclass.
"""
progress_bar = tqdm(total=rng**8)
for values in product(range(rng), repeat=8):
progress_bar.update()
t = Table(*values)
if (
t.is_paradoxical()
and not t.has_null()
and (not only_nice_ratios or t.has_nice_ratios())
):
return t.as_df()
def find_rdc(self, rng, only_nice_ratios=False):
"""
Recycle dataclass.
"""
progress_bar = tqdm(total=rng**8)
t = Table(*range(8)) # init class for recycling
for values in product(range(rng), repeat=8):
progress_bar.update()
# recycle Table
for val, atr in zip(values, t.__annotations__.keys()):
setattr(t, atr, val)
if (
t.is_paradoxical()
and not t.has_null()
and (not only_nice_ratios or t.has_nice_ratios())
):
return t.as_df()
def find_f(self, rng, only_nice_ratios=False):
"""
Check in func.
"""
progress_bar = tqdm(total=rng**8)
for values in product(range(rng), repeat=8):
progress_bar.update()
if not values_paradoxical(*values):
continue
t = Table(*values)
if (
t.is_paradoxical()
and not t.has_null()
and (not only_nice_ratios or t.has_nice_ratios())
):
return t.as_df()
def find_m(self, rng, only_nice_ratios=False):
"""
Use only multiples.
"""
progress_bar = tqdm(total=rng**8)
self.last = None
m_gen = multiples_gen(limit=rng**8)
for values in m_gen:
progress_bar.update()
t = Table(*values)
self.last = t
if (
t.is_paradoxical()
and not t.has_null()
and (not only_nice_ratios or t.has_nice_ratios())
):
return t.as_df()
if __name__ == "__main__":
sf = SimpsonFinder()
solution = sf.find_m(rng=12, only_nice_ratios=False)
print(solution)