-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
257 lines (213 loc) · 8.19 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
"""Provide C&C capabilities for experiments
Basic workflow:
### TODO
"""
from __future__ import print_function, division
import collections
import copy
import glob
import os
import shutil
import warnings
import numpy as np
# python2 compatibility
try:
input = raw_input
except NameError:
pass
def ensure_tracking(experiment_name, experimentfile, quiet=False):
ensure_exist(os.path.join('simulations', '01_runs'))
trackingFileName = os.path.join('simulations', '01_runs', experiment_name)
if os.path.isfile(trackingFileName):
if quiet:
return
else:
if os.path.basename(experimentfile) != experiment_name:
response = input("{} already exists and is named different "
"than your current file {} are you sure you "
"set experimentName correctly?\nDo you want "
"to override it? [y/n/a] ".format(
trackingFileName, experimentfile))
else:
response = input("{} already exists. Do you want to override "
"it? [y/n/a] ".format(trackingFileName))
if response is "n":
pass
elif response is "y":
shutil.copy(experimentfile, trackingFileName)
else:
print("Aborting")
exit()
else:
shutil.copy(experimentfile, trackingFileName)
def ensure_exist(folder):
try:
os.makedirs(folder)
except OSError:
if not os.path.isdir(folder):
raise
def ensure_is_empty(folder):
for f in glob.glob(folder + '/*'):
os.remove(f)
def expanddict(dict_to_expand, expansions, rules):
"""Return a list of copies of dict_to_expand with a kartesian product of
all expansions.
Input:
dict_to_expand: dictionary with values to replace
expansions: dictionary of {"identifier": [values]} tuples
rules: dictionary of restrictions TODO: add description
Output:
expanded_dicts: list of dictionaries with all expansions due to the
kartesian product of the modifier values
>>> d = { 1: 'blub', 2: {3: 'blub', 4: 'hello'}}
>>> e = {'blub': ['a', 'b'], 'hello': [11, 12]}
>>> expanddict(d, e, {})
[{1: 'a', 2: {3: 'a', 4: 11}}, {1: 'b', 2: {3: 'b', 4: 11}}, {1: 'a', 2: {3: 'a', 4: 12}}, {1: 'b', 2: {3: 'b', 4: 12}}]
"""
expanded_dicts = [dict_to_expand]
for ident, values in expansions.items():
if values[0] == 'func':
if values[1] == 'linspace':
values = np.linspace(values[2], values[3], values[4]).round(decimals=7)
elif values[1] == 'logspace':
values = np.logspace(values[2], values[3], values[4]).round(decimals=7)
elif values[1] == 'multiple':
actual_values = []
for vs in values[2]:
if vs[1] == 'linspace':
actual_values.append(np.linspace(vs[2], vs[3], vs[4]).round(decimals=7))
elif vs[1] == 'logspace':
actual_values.append(np.linspace(vs[2], vs[3], vs[4]).round(decimals=7))
else:
raise ValueError("Didn't recognize {} as a function type".format(values[1]))
values = np.unique(np.concatenate(actual_values))
else:
raise ValueError("Didn't recognize {} as a function type".format(values[1]))
keypositions = _find_key_from_identifier(dict_to_expand, ident)
tmp = []
for d in expanded_dicts:
for v in values:
for kp in keypositions:
_update_dict(d, v, *kp)
tmp.append(copy.deepcopy(d))
expanded_dicts = tmp
return_dicts = [exdict for exdict in expanded_dicts
if not _violate_any(exdict, rules)]
return return_dicts
def generate_folder_template(replacement_dictionary, expanding_dictonary,
base='simulations', experimentname=''):
entries = []
for replacement_id in replacement_dictionary.keys():
keys = _find_key_from_identifier(
_flatten_dictionary(expanding_dictonary), replacement_id)
# keys is a list of lists, i.e. keys[0] contains the list of nested
# keys at which we find the replacement_id, due to the flattening,
# this key has only length 1.
entry = '{' + keys[0][0] + '}'
entries.append(entry)
return os.path.join(base, experimentname, "_".join(entries))
def get_folders(ex_dicts, sim_folder_template):
"""Helper function providing the simulation folder from the template."""
folders = [sim_folder_template.format(**d)
for d in map(_flatten_dictionary, ex_dicts)]
return folders
def get_jobs(folders):
jobfiles = []
for folder in folders:
jobfilename = os.path.join(folder, 'job')
if os.path.exists(jobfilename):
jobfiles.append(jobfilename)
else:
warnings.warn("Missing jobfile in {}".format(folder))
return jobfiles
# ------ private functions ------
def _find_key_from_identifier(dict_to_expand, identifier):
"""Return a list of keys to all leaves of dict_to_expand whose values are
identifier.
>>> d = { 1: 'blub', 2: {3: 'blub', 4: 'hello'}}
>>> _find_key_from_identifier(d, 'blub')
[[1], [2, 3]]
"""
keypositions = []
for k, v in dict_to_expand.items():
if v == identifier:
keypositions.append([k])
elif isinstance(v, dict):
subkey_positions = _find_key_from_identifier(v, identifier)
for sk in subkey_positions:
keypositions.append([k] + sk)
elif isinstance(v, list):
for i, x in enumerate(v):
if x == identifier:
keypositions.append([k] + [i])
return keypositions
def _flatten_dictionary(d, parent_key='', sep='_'):
"""Return a flat dictionary with concatenated keys for a nested dictionary d.
>>> d = { 'a': {'aa': 1, 'ab': {'aba': 11}}, 'b': 2, 'c': {'cc': 3}}
>>> _flatten_dictionary(d)
{'a_aa': 1, 'b': 2, 'c_cc': 3, 'a_ab_aba': 11}
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(_flatten_dictionary(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
def _update_dict(dic, value, key, *keys):
"""Set dic[k0][k1]...[kn] = value for keys=[k0, k1, ..., kn].
>>> d = { 1: 'blub', 2: {3: 'blub', 4: 'hello'}}
>>> keys = _find_key_from_identifier(d, 'blub')
>>> keys
[[1], [2, 3]]
>>> _update_dict(d, 'ch', *keys[0])
>>> d
{1: 'ch', 2: {3: 'blub', 4: 'hello'}}
>>> _update_dict(d, 'ch', *keys[1])
>>> d
{1: 'ch', 2: {3: 'ch', 4: 'hello'}}
"""
if keys:
_update_dict(dic[key], value, *keys)
else:
dic[key] = value
def _violate_any(dic, rules):
for rule in rules:
factor1 = _get_from_dict(dic, *rule[0])
factor2 = _get_from_dict(dic, *rule[1])
operator = rule[2]
if _violate(operator, factor1, factor2):
return True
else:
return False
def _violate(operator, factor1, factor2):
if operator == '==':
return factor1 != factor2
elif operator == '-==':
return factor1 != -factor2
elif operator == '<':
return factor1 >= factor2
elif operator == '>':
return factor1 <= factor2
else:
raise NotImplementedError('No rule for {} implemented'
''.format(operator))
def _get_from_dict(dataDict, key, *keys):
"""Recursive get from nested dicts
>>> d = { 1: {2: 'blub', 3: 'argh'}, 4: {5: 'use'}}
>>> _get_from_dict(d, 1, 3)
'argh'
>>> _get_from_dict(d, 4, 5)
'use'
"""
if keys:
return _get_from_dict(dataDict[key], *keys)
else:
return dataDict[key]
if __name__ == '__main__':
import doctest
import sys
res = doctest.testmod()
print(res)
sys.exit(res[0])