-
Notifications
You must be signed in to change notification settings - Fork 0
/
treereduction_program.py
388 lines (313 loc) · 16.3 KB
/
treereduction_program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
import base64
import logging
import threading
import time
import sys
import cloudpickle
from wukongdnc.wukong.wukong_problem import WukongProblem, WukongResult, UserProgram
from wukongdnc.server.util import decode_base64
# import wukong.memoization.memoization_controller as memoization_controller
import redis
import logging
from wukongdnc.constants import REDIS_IP_PRIVATE
from logging import handlers
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
#logger.addHandler(ch)
redis_client = redis.Redis(host = REDIS_IP_PRIVATE, port = 6379)
if logger.handlers:
for handler in logger.handlers:
handler.setFormatter(formatter)
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
handler.setFormatter(formatter)
debug_lock = threading.Lock()
root_problem_id = "[0,1]"
final_result_id = "[1,1]"
class ProblemType(WukongProblem):
# The threshold at which we switch to a sequential algorithm.
SEQUENTIAL_THRESHOLD = 128
# Get input arrays when the level reaches the INPUT_THRESHOLD, e.g., don't grab the initial 256MB array,
# wait until you reach level , say, 1, when there are two subproblems each half as big.
# To input at the start of the problem (root), use 0; the stack has 0 elements on it for the root problem.
# To input the first two subProblems, use 1. Etc.
INPUT_THRESHOLD = 1
def __init__(self, numbers = [], from_idx = -1, to_idx = -1, value = -1, UserProgram = None):
super(ProblemType, self).__init__(UserProgram = UserProgram)
self.numbers = numbers
self.from_idx = from_idx
self.to_idx = to_idx
self.value = value # just to keep Fibonacci happy.
def __repr__(self):
return self.__str__()
def __str__(self):
return "ProblemType(from=" + str(self.from_idx) + ", to=" + str(self.to_idx) + ", numbers=" + str(self.numbers) + ")"
@property
def memoize(self):
return False
class ResultType(WukongResult):
"""
If type is 1, ResultType is a normal result.
If type is 0, ResultType is a stopResult.
If type is -1, ResultType is a nullResult.
"""
def __init__(self, numbers = [], from_idx = -1, to_idx = -1, result_type = 0, value = -1):
super(ResultType, self).__init__()
self.numbers = numbers
self.from_idx = from_idx
self.to_idx = to_idx
self.type = result_type
self.value = value # just to keep Fibonacci happy.
assert(self.type >= -1 and self.type <= 1)
def __repr__(self):
return self.__str__()
def copy(self):
return ResultType(
value = self.value,
from_idx = self.from_idx,
to_idx = self.to_idx,
numbers = self.numbers)
def __str__(self):
return "ResultType(from=" + str(self.from_idx) + ", to=" + str(self.to_idx) + ", numbers=" + str(self.numbers) + ")"
class MergesortProgram(UserProgram):
def __init__(self):
super(MergesortProgram, self).__init__()
global final_result_id
global root_problem_id
self.root_problem_id = root_problem_id
self.final_result_id = final_result_id
def __repr__(self):
return self.__str__()
def base_case(self, problem: ProblemType):
"""
The baseCase is always a sequential sort (though it could be on an array of length 1, if that is the sequential threshold.)
"""
size = problem.to_idx - problem.from_idx + 1
return size <= ProblemType.SEQUENTIAL_THRESHOLD
def preprocess(self, problem: ProblemType):
"""
Some problems may use a pre-processing step before anything else is done.
"""
pass
def trim_problem(self, problem: ProblemType):
"""
A problem P is by default passed to the executors that will be executing the child subproblems of P. One of these
executors will execute method combine() for these subproblems. In some cases, it is easier to write combine() when
the subproblem's parent problem data is available; however, this parent data will also be sent and retrieved from
storage if the parent data is part of the ProblemType data. (That is, problem P's parent data will be sent
as part of problem P since the parent data will be on the stack of subProblems (representing the call stack) for P.)
So if some of the parent data is not needed for combine() it should be deleted from the parent problem.
The field of a problem that will definitely be used is the problemID. Do not trim the problemID.
The FanInStack (in parent class WukongProblem) is not needed by the DivideandConquer framework and
can/should be trimmed.
One option is to create a trimProblem() method in call WukongProblem and always call this method (in method
Fanout) in addition to calling User.trimProblem(), where User.tribProblem may be empty.
"""
problem.numbers = None
problem.fan_in_stack = None
def problem_labeler(self,
subproblem : ProblemType,
childID : int,
parent_problem : ProblemType,
subproblems : list
) -> str:
"""
User must specify how subproblems are labeled. The problem label is used as a key into Wukong Storage,
where the value in the key-value pair is (eventually) the problem's result. Also used for Serverless
Networking pairing names.
Note: we could have used level-order IDs like:
1 2**0 children starting with child 2**0 (at level 0)
2 3 2**1 children starting with child 2**! (at level 1)
4 5 6 7 2**2 children starting with child 2**2 (at level 2)
This simplifies things since it is easy to compute the parent ID from the child's ID. With this ID scheme
we would not need to stack the problem IDs as we recurse; however, the stack makes it easy to get the level
since the level is just the size of the stack.
We'll have to see if this scheme would work in general,
i.e., for different numbers of children, when the number of children may vary for parent nodes, etc. There has to be a
way to recover the parent ID from a child, either by using a formula like above, or by stacking the IDs as we recurse.
Arguments:
----------
subproblem (ProblemType)
childID (int)
parent_problem (ProblemType)
subproblems (arraylike of ProblemType)
Returns:
--------
str
"""
mid = parent_problem.from_idx + ((parent_problem.to_idx - parent_problem.from_idx) // 2)
id_str = None
if childID == 0:
id_str = str(mid + 1)
if (mid + 1) < parent_problem.to_idx:
id_str += "x" + str(parent_problem.to_idx)
return id_str
else:
id_str = str(parent_problem.from_idx)
if parent_problem.from_idx < mid:
id_str += "x" + str(mid)
return id_str
def memoize_IDLabeler(self, problem : ProblemType) -> str:
return None # TreeReduction is not memoized
def divide(self,
problem : ProblemType,
subproblems : list
):
"""
Divide the problem into (a list of) subproblems.
Arguments:
----------
problem (ProblemType)
subproblems (arraylike of ProblemType)
Returns:
--------
Nothing
"""
logger.debug("Divide: treereduction run: from: " + str(problem.from_idx) + ", to: " + str(problem.to_idx))
logger.debug("Divide: problemID: " + str(problem.problem_id))
logger.debug("Divide: FanInStack: " + str(problem.fan_in_stack))
#size = problem.to_idx - problem.from_idx + 1
mid = problem.from_idx + ((problem.to_idx - problem.from_idx) // 2)
logger.debug("Divide: ID: " + str(problem.problem_id) + ", mid: " + str(mid) + ", mid+1: " + str(mid+1) + ", to: " + str(problem.to_idx))
# At some point, we'll want to stop passing the entire array around, as the subproblems only work with a sub-array.
right_problem = ProblemType(
numbers = problem.numbers,
from_idx = mid + 1,
to_idx = problem.to_idx)
left_problem = ProblemType(
numbers = problem.numbers,
from_idx = problem.from_idx,
to_idx = mid)
logger.debug("Divide: ID: " + str(problem.problem_id) + ": RightProblem: " + str(right_problem))
logger.debug("Divide: ID: " + str(problem.problem_id) + ": LeftProblem: " + str(left_problem))
subproblems.append(right_problem)
subproblems.append(left_problem)
def combine(self, subproblem_results: list, problem_result: ResultType, problem_id: str):
"""
Combine the subproblem results.
This is merge, which ignores from/to values for the subproblems, as it always starts merging from position 0
and finishes in the last positions of the arrays. The from/to values of a subproblem are w.r.t the original input array.
"""
first_result = subproblem_results[0]
second_result = subproblem_results[1]
first_array = first_result.numbers
second_array = second_result.numbers
result_length = max(len(first_array), len(second_array))
#values = [None for i in range(len(first_array) + len(second_array))]
values = [None for i in range(result_length)]
from_idx = 0
logger.debug("combine: values.length for merged arrays: " + str(len(first_array) + len(second_array)))
logger.debug("first array: " + str(first_array))
logger.debug("second array: " + str(second_array))
# If the arrays are the same length, then this will just compute the pair-wise sum between them.
# If one array is longer than the other, then this will compute the pair-wise sum until we exhaust the shorter array.
# At that point, we just stick values from the longer array into the result array without modifying them (we add 0 to them).
for i in range(0, result_length):
# As long as there are still elements in this array, we'll grab 'em.
arg1 = 0
if i < len(first_array):
arg1 = first_array[i]
# As long as there are still elements in this array, we'll grab 'em.
arg2 = 0
if i < len(second_array):
arg2 = second_array[i]
# Add whatever we grabbed and store it in the resulting array.
values[i] = arg1 + arg2
# li, ri = 0, 0
# # Merge.
# while (li < len(first_array) and ri < len(second_array)):
# logger.debug("li: " + str(li) + ", len(first_array): " + str(len(first_array)) + ", ri: " + str(ri) + ", len(second_array): " + str(len(second_array)))
# if first_array[li] < second_array[ri]:
# values[from_idx] = first_array[li]
# from_idx += 1
# li += 1
# else:
# values[from_idx] = second_array[ri]
# from_idx += 1
# ri += 1
# while (li < len(first_array)):
# values[from_idx] = first_array[li]
# from_idx += 1
# li += 1
# while (ri < len(second_array)):
# values[from_idx] = second_array[ri]
# from_idx += 1
# ri += 1
logger.debug("combine result: values.length: " + str(len(values)) + ", values: ")
problem_result.numbers = values
if first_result.from_idx < second_result.from_idx:
problem_result.from_idx = first_result.from_idx
problem_result.to_idx = second_result.to_idx
else:
problem_result.from_idx = second_result.from_idx
problem_result.to_idx = first_result.to_idx
def computeInputsOfSubproblems(self, problem: ProblemType, subproblems: list):
"""
User provides method to generate subproblem values, e.g., sub-array to be sorted, from problems.
The Problem Labels identify a (sub)problem, but we still need a way to generate the subproblem
data values. For example, if the parent array has values for 0-14, the left array has values for 0-7 and the
right array has values for 8-14. The right array will be passed (as an Lambda invocation argument or
written to Wukong storage) to a new executor for this subproblem.
"""
problem_size = problem.to_idx - problem.from_idx + 1
midArray = 0 + ((len(problem.numbers) - 1) // 2)
if len(problem.fan_in_stack) >= WukongProblem.INPUT_THRESHOLD:
logger.debug("computeInputsOfSubproblems ( >= INPUT_THRESHOLD): ID: " + str(problem.problem_id) + ", midArray: " + str(midArray) + ", to: " + str(problem.to_idx))
else:
logger.debug("computeInputsOfSubproblems ( < INPUT_THRESHOLD): ID: " + str(problem.problem_id) + ", midArray: " + str(midArray) + ", to: " + str(problem.to_idx))
left_array = []
right_array = []
try:
logger.debug("computeInputsOfSubproblems: problem.numbers: " + str(problem.numbers))
logger.debug("computeInputsOfSubproblems: ID: " + str(problem.problem_id) + ", len(numbers): " + str(len(problem.numbers)) + ", numbers: " + str(problem.numbers))
# Assuming that inputNumbers returns the problem's actual from-to subsegment of the complete input.
# Copies are made from the parent problem's sub-segment of the input array, are a prefix of parent's copy, and start with 0.
logger.debug("computeInputsOfSubproblems: ID: " + str(problem.problem_id) + " size < threshold, make left copy: from: 0 midArray+1 " + str((midArray+1)))
left_array = problem.numbers[0:midArray + 1]
logger.debug("computeInputsOfSubproblems: ID: " + str(problem.problem_id) + " size < threshold, make right copy: midArray+1: " + str((midArray+1)) + " to+1 " + str(len(problem.numbers)))
right_array = problem.numbers[midArray + 1: len(problem.numbers)]
# Assert
if problem_size != len(problem.numbers):
logger.error("Internal Error: computeInput: size != len(numbers)-1")
logger.error("computeInputsOfSubproblems: size: " + str(problem_size) + " problem.numbers.length-1: " + str(len(problem.numbers) - 1))
exit(1)
except Exception as ex:
logger.error("Exception encountered during 'computeInputsOfSubproblems()': " + str(ex))
exit(1)
subproblems[0].numbers = right_array
subproblems[1].numbers = left_array
def input_problem(self, problem: ProblemType):
"""
Alias for 'inputProblem()'
"""
self.inputProblem(problem)
def inputProblem(self, problem: ProblemType):
"""
The problem data must be obtained from Wukong storage. Here, we are getting a specific subsegment of the input array,
which is the segment from-to.
"""
logger.debug("inputNumbers")
problem_input_encoded = redis_client.get("input")
problem_input_serialized = decode_base64(problem_input_encoded)
numbers = cloudpickle.loads(problem_input_serialized)
problem_size = problem.to_idx - problem.from_idx + 1
if problem_size != len(numbers):
numbers = numbers[problem.from_idx, problem.to_idx + 1]
problem.numbers = numbers
def sequential(self, problem: ProblemType, result: ResultType):
"""
User provides method to sequentially solve a problem
Insertion sort
"""
# QUESTION: Can I just call the built-in 'sorted' function here?
# Or am I missing something by doing this (and thus I need to implement insertion sort explicitly)?
problem.numbers = [sum(problem.numbers)]
result.numbers = [sum(problem.numbers)]
result.from_idx = 0 #problem.from_idx
result.to_idx = 1 #problem.to_idx
NullResult = ResultType(result_type = -1, value = -1)
StopResult = ResultType(result_type = 0, value = -1)