Skip to content

Commit

Permalink
Merge pull request #2 from k4rth33k/MPThreads
Browse files Browse the repository at this point in the history
Added Threading and other refactoring
  • Loading branch information
k4rth33k authored Jun 13, 2020
2 parents 2b4671e + 92813fb commit 394d2c6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
Binary file modified devTools/__pycache__/format.cpython-36.pyc
Binary file not shown.
24 changes: 12 additions & 12 deletions pyMR/main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# root/pyMR/main.py

from .chunk import Chunks
import concurrent.futures
import concurrent.futures as cf
from .utils import Queue


Expand All @@ -15,7 +15,8 @@ def __init__(self,
num_workers,
split_ratio=0.75,
granularity=3,
verbose=True):
verbose=True,
threaded=False):
"""
Constructor for Master, master orchestrates the execution of workers
Expand All @@ -41,6 +42,8 @@ def __init__(self,
self.queue = Queue()

self.validate = None
self.executor = cf.ThreadPoolExecutor(
) if threaded else cf.ProcessPoolExecutor()

def create_job(self, data, map_fn, red_fn):
"""
Expand Down Expand Up @@ -110,18 +113,15 @@ def start(self):
A, B = self.queue.dequeue(), self.queue.dequeue()
worker.set_data([A, B])

with concurrent.futures.ProcessPoolExecutor() as executor:
results = [
executor.submit(worker.run)
for worker in self.__workers[:count]
]
futures = [
self.executor.submit(worker.run)
for worker in self.__workers[:count]
]

for f in concurrent.futures.as_completed(results):
f.add_done_callback(self.__take_result)
for f in cf.as_completed(futures):
self.queue.enqueue(f.result())

def __take_result(self, future):
result = future.result()
self.queue.enqueue(result)
self.executor.shutdown()

def __verify(self):
if self.queue.size() == 1:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from setuptools import setup
setup(name='pyParallelMR',
version='0.1',
version='0.1.2',
description='A python package to convert CPU-bound tasks ' +
'to parallel mapReduce jobs.',
url='https://github.com/k4rth33k/pyMR',
Expand Down

0 comments on commit 394d2c6

Please sign in to comment.