Skip to content

Commit

Permalink
add more stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
hearues-zueke-github committed Nov 20, 2020
1 parent 8173f98 commit ad1e057
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 97 deletions.
1 change: 1 addition & 0 deletions matrix_modulo/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
objs/*
config_file.py
77 changes: 4 additions & 73 deletions matrix_modulo/vector_matrix_modulo.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from memory_tempfile import MemoryTempfile
from shutil import copyfile

import config_file

sys.path.append('..')
from utils import mkdirs
from utils_multiprocessing_manager import MultiprocessingManager
Expand Down Expand Up @@ -120,53 +122,17 @@ def calc_new_cycles(n, modulo):


if __name__ == '__main__':
# for file, d_n_mod_x_len in l_values:
# for n, d_mod_x_len in d_n_mod_x_len.items():
# for modulo, d_x_len in d_mod_x_len.items():
# print("n: {}, modulo: {}".format(n, modulo))
# max_cycle_length = max([v[0] for k, v in d_x_len.items()])

# print("l_values: {}".format(l_values))

# sys.exit()


# n = int(sys.argv[1])
# if n > 10:
# n = 10
# if n < 1:
# n = 1

# modulo = int(sys.argv[2])
# if modulo > 10:
# modulo = 10
# if modulo < 1:
# modulo = 1

# n = 4
# modulo = 3

mult_proc_manag = MultiprocessingManager(cpu_count=4)
mult_proc_manag.define_new_func(name='calc_new_cycles', func=calc_new_cycles)
# l_func_args = [
# (2, 2), (2, 3), (2, 4),
# (3, 2), (3, 3), (3, 4),
# (4, 2), (4, 3), (4, 4),
# ]
l_func_args = [
(n, modulo) for n in range(2, 8) for modulo in range(2, 6)
]

l_func_args = config_file.l_func_args
l_func_args = [l_func_args[i] for i in np.random.permutation(np.arange(0, len(l_func_args)))]
l_func_name = ['calc_new_cycles'] * len(l_func_args)
# mult_proc_manag.do_new_jobs(l_func_name=['calc_new_cycles']*, l_func_args)
mult_proc_manag.do_new_jobs(l_func_name=l_func_name, l_func_args=l_func_args)
# calc_new_cycles(n=n, modulo=modulo)

del mult_proc_manag

# if n == 1 and modulo == 1:
l_files = [os.path.join(root, file) for root, dirs, files in os.walk(OBJS_DIR_PATH) for file in files]
# sys.exit()

l_values_d = [
(file, get_pkl_gz_obj(create_d_n_mod_x_len, file))
Expand All @@ -184,38 +150,3 @@ def calc_new_cycles(n, modulo):
l_values_sort = sorted(l_values)
print("l_values_sort: {}".format(l_values_sort))
sys.exit()

# if n == 1 and modulo == 1:
# l_values = [
# (n, modulo, max([v[0] for k, v in d_x_len.items()]))
# for n, d_mod_x_len in d_n_mod_x_len.items()
# for modulo, d_x_len in d_mod_x_len.items()
# # for x, (cycle_length, A_t) in d_x_len.items()
# ]
# print("l_values: {}".format(l_values))

# y = np.dot(A, x) % modulo

# print("x:\n{}".format(x))
# print("A:\n{}".format(A))
# print("y:\n{}".format(y))
# # sys.exit()

# # a = np.random.randint(0, 2, (n, ))
# # b = np.random.randint(0, 2, (n, ))

# # C = np.add.outer(a, b) % modulo

# # an1 = np.dot(C, a) % modulo
# # bn1 = np.dot(C, b) % modulo

# # an2 = np.dot(a, C) % modulo
# # bn2 = np.dot(b, C) % modulo

# print("a:\n{}".format(a))
# print("b:\n{}".format(b))
# print("C:\n{}".format(C))
# print("an1:\n{}".format(an1))
# print("bn1:\n{}".format(bn1))
# print("an2:\n{}".format(an2))
# print("bn2:\n{}".format(bn2))
1 change: 1 addition & 0 deletions test_programs/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.mypy_cache
objs
*/__pycache__
config_file.py
17 changes: 4 additions & 13 deletions test_programs/try_multiprocessing_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from memory_tempfile import MemoryTempfile
from shutil import copyfile

import config_file

sys.path.append('..')
from utils import mkdirs
from utils_multiprocessing_manager import MultiprocessingManager
Expand All @@ -43,19 +45,8 @@ def do_simple_loop(n):

mult_proc_manag = MultiprocessingManager(cpu_count=4)
mult_proc_manag.define_new_func(name='do_simple_loop', func=do_simple_loop)
n_smaller = 6
n_bigger = 8
i_s = 10**n_smaller
i_b = 10**n_bigger
l_func_args = [
(i_s, ), (i_s, ),
(i_b, ), (i_s, ),
(i_b, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ),
(i_b, ), (i_s, ), (i_s, ), (i_s, ),
(i_b, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ),
(i_b, ), (i_s, ), (i_s, ),
(i_b, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ), (i_s, ),
]

l_func_args = config_file.l_func_args
l_func_name = ['do_simple_loop'] * len(l_func_args)
mult_proc_manag.do_new_jobs(l_func_name=l_func_name, l_func_args=l_func_args)

Expand Down
39 changes: 28 additions & 11 deletions utils_multiprocessing_manager.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import dill
import time

from collections import deque
from multiprocessing import Process, Pipe

WORKER_SLEEP_TIME = 0.02
MANAGER_SLEEP_TIME = 0.02

class MultiprocessingManager(Exception):
def __init__(self, cpu_count):
self.cpu_count = cpu_count
Expand Down Expand Up @@ -40,7 +44,7 @@ def worker_thread(self, worker_nr, pipe_in, pipe_out):
pipe_out.send(ret_tpl)
elif name == 'test_ret':
pipe_out.send('IS WORKING!')
time.sleep(0.1)
time.sleep(WORKER_SLEEP_TIME)


def define_new_func(self, name, func):
Expand All @@ -65,16 +69,24 @@ def test_worker_threads_response(self):


def do_new_jobs(self, l_func_name, l_func_args):
amount = len(l_func_name)
len_l_func_name = len(l_func_name)

l_ret = []
if amount <= self.worker_amount:
if len_l_func_name <= self.worker_amount:
for worker_nr, (pipe_send, func_name, func_args) in enumerate(zip(self.pipes_send_main, l_func_name, l_func_args), 0):
pipe_send.send(('func_def_exec', (func_name, func_args)))
print("Doing job: worker_nr: {}".format(worker_nr))

for pipe_recv in self.pipes_recv_main:
# TODO: make a poll in a loop, until everything is done!

dq_pipe_i = deque(range(0, len_l_func_name))
while len(dq_pipe_i) > 0:
pipe_i = dq_pipe_i.popleft()

pipe_recv = self.pipes_recv_main[pipe_i]
if not pipe_recv.poll():
dq_pipe_i.append(pipe_i)
time.sleep(MANAGER_SLEEP_TIME)
continue

ret_tpl = pipe_recv.recv()
worker_nr, ret_val = ret_tpl
l_ret.append(ret_val)
Expand All @@ -85,13 +97,12 @@ def do_new_jobs(self, l_func_name, l_func_args):
pipe_send.send(('func_def_exec', (func_name, func_args)))
print("Doing job: worker_nr: {}".format(worker_nr))

# TODO: create a loop, where every single worker is checked if the worker is done or not!
pipe_i = 0
for func_name, func_args in zip(l_func_name[self.worker_amount:], l_func_args[self.worker_amount:]):
while True:
pipe_recv = self.pipes_recv_main[pipe_i]
if not pipe_recv.poll():
time.sleep(0.02)
time.sleep(MANAGER_SLEEP_TIME)
pipe_i = (pipe_i+1) % self.worker_amount
continue
break
Expand All @@ -109,16 +120,22 @@ def do_new_jobs(self, l_func_name, l_func_args):

pipe_i = (pipe_i+1) % self.worker_amount

for _ in range(0, self.worker_amount):
dq_pipe_i = deque(range(0, self.worker_amount))
while len(dq_pipe_i) > 0:
pipe_i = dq_pipe_i.popleft()

pipe_recv = self.pipes_recv_main[pipe_i]
if not pipe_recv.poll():
dq_pipe_i.append(pipe_i)
time.sleep(MANAGER_SLEEP_TIME)
continue

ret_tpl = pipe_recv.recv()
worker_nr, ret_val = ret_tpl
l_ret.append(ret_val)

print("Finished: worker_nr: {}".format(worker_nr))

pipe_i = (pipe_i+1) % self.worker_amount

return l_ret


Expand Down

0 comments on commit ad1e057

Please sign in to comment.