diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 8eacc99..93748a1 100644 Binary files a/devTools/__pycache__/format.cpython-36.pyc and b/devTools/__pycache__/format.cpython-36.pyc differ diff --git a/pyMR/main.py b/pyMR/main.py index bf9e6f0..9c80b1e 100644 --- a/pyMR/main.py +++ b/pyMR/main.py @@ -1,7 +1,7 @@ # root/pyMR/main.py from .chunk import Chunks -import concurrent.futures +import concurrent.futures as cf from .utils import Queue @@ -15,7 +15,8 @@ def __init__(self, num_workers, split_ratio=0.75, granularity=3, - verbose=True): + verbose=True, + threaded=False): """ Constructor for Master, master orchestrates the execution of workers @@ -41,6 +42,8 @@ def __init__(self, self.queue = Queue() self.validate = None + self.executor = cf.ThreadPoolExecutor( + ) if threaded else cf.ProcessPoolExecutor() def create_job(self, data, map_fn, red_fn): """ @@ -110,18 +113,15 @@ def start(self): A, B = self.queue.dequeue(), self.queue.dequeue() worker.set_data([A, B]) - with concurrent.futures.ProcessPoolExecutor() as executor: - results = [ - executor.submit(worker.run) - for worker in self.__workers[:count] - ] + futures = [ + self.executor.submit(worker.run) + for worker in self.__workers[:count] + ] - for f in concurrent.futures.as_completed(results): - f.add_done_callback(self.__take_result) + for f in cf.as_completed(futures): + self.queue.enqueue(f.result()) - def __take_result(self, future): - result = future.result() - self.queue.enqueue(result) + self.executor.shutdown() def __verify(self): if self.queue.size() == 1: diff --git a/setup.py b/setup.py index 42efb80..cb0f27b 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup setup(name='pyParallelMR', - version='0.1', + version='0.1.2', description='A python package to convert CPU-bound tasks ' + 'to parallel mapReduce jobs.', url='https://github.com/k4rth33k/pyMR',