-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtiming.py
219 lines (178 loc) · 6.21 KB
/
timing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import core
import timeit
import random
import sys
import argparse
import time
# =================
# Benchmark classes
# =================
class Benchmark:
def __init__(self):
with open('core.py') as f:
self.setup = f.read()
self.setup += """\nfrom copy import deepcopy\n"""
self.load()
self.build_info()
self.extend_setup()
def load(self):
raise NotImplementedError
def build_info(self):
size = 0
num = 0
max = 0
for lst in self.lsts:
size += len(lst)
if len(lst) > max:
max = len(lst)
num += 1
self.info = "{} lists, average size {}, max size {}".format(num, size//num, max)
def extend_setup(self):
self.setup += ''
raise NotImplementedError
class Niklas(Benchmark):
def __init__(self, filename):
self.filename = filename
super().__init__()
def load(self):
self.lsts = []
with open(self.filename, "r") as f:
for line in f:
lst = [int(x) for x in line.split()]
self.lsts.append(lst)
def build_info(self):
super().build_info()
self.info += '\n(from file: {})'.format(self.filename)
def extend_setup(self):
self.setup += """
lsts = []
size = 0
num = 0
max = 0
for line in open("{0}", "r"):
lst = [int(x) for x in line.split()]
size += len(lst)
if len(lst) > max: max = len(lst)
num += 1
lsts.append(lst)
""".format(self.filename)
class Sven(Benchmark):
def load(self):
import json
with open('./lists/sven_list.txt') as f:
self.lsts = json.loads(f.read())
def extend_setup(self):
self.setup += """
import json
with open('./lists/sven_list.txt') as f:
lsts = json.loads(f.read())
"""
class Agf(Benchmark):
def load(self):
import random
tenk = range(10000)
self.lsts = [random.sample(tenk, random.randint(0, 500)) for _ in range(2000)]
def extend_setup(self):
self.setup += """\nlsts = {}""".format(repr(self.lsts))
# ======================================
# Function for building Nik's test lists
# ======================================
def build_timing_list(filename,
class_count=50,
class_size=1000,
list_count_per_class=10,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.5):
large_list_sizes = list(range(*large_list_sizes))
small_list_sizes = list(range(*small_list_sizes))
with open(filename, "w") as f:
lists = []
classes = [list(range(class_size*i, class_size*(i+1))) for i in range(class_count)]
for c in classes:
# distribute each class across ~300 lists
for i in range(list_count_per_class):
lst = []
if random.random() < large_list_probability:
size = random.choice(large_list_sizes)
else:
size = random.choice(small_list_sizes)
nums = set(c)
for j in range(size):
x = random.choice(list(nums))
lst.append(x)
nums.remove(x)
random.shuffle(lst)
lists.append(lst)
random.shuffle(lists)
for lst in lists:
f.write(" ".join(str(x) for x in lst) + "\n")
# ===============
# Timing function
# ===============
def timing(bench, number):
print('\nTiming with: >> {} << Benchmark'.format(bench.__class__.__name__))
print('Info: {}'.format(bench.info))
setup = bench.setup
print('-- Press Ctrl-C to skip a test --\n')
times = []
for name, value in vars(core).items():
if name.endswith('_merge'):
print('timing: {} '.format(value.__doc__), end='')
sys.stdout.flush()
try:
t = timeit.timeit("{}(deepcopy(lsts))".format(name),
setup=setup,
number=number)
except KeyboardInterrupt:
print(' skipped.')
try:
time.sleep(0.2)
except KeyboardInterrupt:
print('Two fast Ctrl-C - exiting')
sys.exit(0)
else:
times.append((t, value.__doc__))
print(' -- {:0.4f} -- '.format(t))
print('\nTiming Results:')
for t,name in sorted(times):
print('{:0.3f} -- {}'.format(t,name))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Timing merge solutions.')
list_file = parser.add_mutually_exclusive_group()
list_file.add_argument('--new', action='store_true',
help='build a new test list')
args = parser.parse_args()
if args.new:
print('building test list (for Nik test) ... ', end='')
sys.stdout.flush()
param = dict(class_count = 50,
class_size = 1000,
list_count_per_class = 100,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.5,
filename = './lists/timing_1.txt')
build_timing_list(**param)
param = dict(class_count = 15,
class_size = 1000,
list_count_per_class = 300,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.5,
filename = './lists/timing_2.txt')
build_timing_list(**param)
param = dict(class_count = 15,
class_size = 1000,
list_count_per_class = 300,
large_list_sizes = (100, 1000),
small_list_sizes = (0, 100),
large_list_probability = 0.1,
filename = './lists/timing_3.txt')
build_timing_list(**param)
print('done')
timing(Niklas('./lists/timing_1.txt'), number=3)
timing(Niklas('./lists/timing_2.txt'), number=3)
timing(Niklas('./lists/timing_3.txt'), number=3)
timing(Sven(), number=500)
timing(Agf(), number=10)