From 2df3f07b47ad7fc726c711454ef79c69d4c8a647 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:35:50 +0530 Subject: [PATCH 01/10] Queue and batch based execution --- build/lib/pyMR/main.py | 35 +- build/lib/pyMR/utils.py | 1 + devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes devTools/pg.py | 75 ++ pyMR.egg-info/PKG-INFO | 11 + pyMR.egg-info/SOURCES.txt | 11 + pyMR.egg-info/dependency_links.txt | 1 + pyMR.egg-info/not-zip-safe | 1 + pyMR.egg-info/top_level.txt | 1 + pyMR/__init__.py | 1 + pyMR/chunk.py | 83 +- pyMR/main.py | 255 +++--- pyMR/utils.py | 30 + tests/queue_test.py | 29 + tests/simple_test.py | 79 +- tests/test.txt | 984 --------------------- tests/test_files/test_small.txt | 47 + 17 files changed, 426 insertions(+), 1218 deletions(-) create mode 100644 devTools/pg.py create mode 100644 pyMR.egg-info/PKG-INFO create mode 100644 pyMR.egg-info/SOURCES.txt create mode 100644 pyMR.egg-info/dependency_links.txt create mode 100644 pyMR.egg-info/not-zip-safe create mode 100644 pyMR.egg-info/top_level.txt create mode 100644 tests/queue_test.py delete mode 100644 tests/test.txt create mode 100644 tests/test_files/test_small.txt diff --git a/build/lib/pyMR/main.py b/build/lib/pyMR/main.py index 619466a..782ef74 100644 --- a/build/lib/pyMR/main.py +++ b/build/lib/pyMR/main.py @@ -8,12 +8,13 @@ class Master(object): """Master class for creating and running workers""" - def __init__(self, num_workers, split_ratio=0.8): + def __init__(self, num_workers, split_ratio=0.8, load_each=20): super(Master, self).__init__() self.data_gen = None self.num_workers = num_workers self.workers = [] + self.load_each = load_each self.map_results = [] self.reduce_stack = Stack(self) @@ -26,11 +27,14 @@ def create_job(self, data, map_fn, red_fn): self.map_fn = map_fn self.red_fn = red_fn - self.data_gen = Chunks(data=data, - num_chunks=self.num_workers).get_chunks_gen() + # self.data_gen = Chunks(data=data, + # num_chunks=self.num_workers * self.load_each).get_chunks_gen() - # self.data_gen = iter(Chunks2(data=data, - # num_chunks=self.map_workers)) + self.data_gen = iter( + Chunks2(data=data, num_chunks=self.num_workers * self.load_each)) + print(next(self.data_gen)) + print(next(self.data_gen)) + print(len(next(self.data_gen))) # self.workers = [Worker(id=_, data_gen=self.data_gen, # job=map_fn, type='MAP', @@ -50,17 +54,20 @@ def start(self): processes = [] # while (generator is not empty): - self.workers = [ - Worker(id=_, data=next(self.data_gen), job=self.map_fn, type='MAP') - for _ in range(1, self.num_workers) - ] - - with concurrent.futures.ThreadPoolExecutor() as executor: - results = [ - executor.submit(worker.run) - for worker in self.workers[:self.map_workers] + for __ in range(self.load_each): + self.workers = [ + Worker(id=_, + data=next(self.data_gen), + job=self.map_fn, + type='MAP') for _ in range(1, self.num_workers) ] + with concurrent.futures.ThreadPoolExecutor() as executor: + results = [ + executor.submit(worker.run) + for worker in self.workers[:self.map_workers] + ] + # self.map_results = [f.result() # for f in concurrent.futures.as_completed(results)] # self.map_results = [f.add_done_callback(self.take_result) diff --git a/build/lib/pyMR/utils.py b/build/lib/pyMR/utils.py index 7d8838e..6ca1896 100644 --- a/build/lib/pyMR/utils.py +++ b/build/lib/pyMR/utils.py @@ -5,6 +5,7 @@ class Stack(object): def __init__(self, master): super(Stack, self).__init__() self._ = [] + self.master = master def push(self, elem): self._.append(elem) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 2c1909183b15d020eb5a4fc75698268aab063033..0ef04764bd670efaab53f3bc788670c0d00b2963 100644 GIT binary patch delta 16 XcmX@cbc~7Jn3tD}#cSP0c6&wuBi;k! delta 16 XcmX@cbc~7Jn3tDpm&M$T?DmWRC>I2F diff --git a/devTools/pg.py b/devTools/pg.py new file mode 100644 index 0000000..f5ac8bb --- /dev/null +++ b/devTools/pg.py @@ -0,0 +1,75 @@ +import concurrent.futures +import multiprocessing + + +def gen(n): + while n > 0: + yield n + n -= 1 + + +def sq(x): + return x**2 + + +def does_nothing(): + print('YAY') + + +class Worker(object): + """Worker class for running the job""" + def __init__(self, id, job): + """ + Constructor for Worker class + Takes in: + ID -> Number + job -> a function + type -> 'MAP' or 'REDUCE' + """ + + super(Worker, self).__init__() + + self.type_dict = {0: 'MAP', 1: 'REDUCE'} + self.job = job # Function + self.id = id + self.type = type + + def set_map_data(self, data): + """ + To set data before parallel execution + """ + + self.data = data + + def run(self): + + print(f'{self.type} Worker {self.id} is working!!', end='\r') + + if self.type == 'MAP': + result = self.job(self.data) + else: + result = self.job(A, B) + + def __str__(self): + return f'' + + def __repr__(self): + return str(self) + + +def main(): + + iter_ = gen(10) + + cpus = multiprocessing.cpu_count() + print(cpus) + + with concurrent.futures.ProcessPoolExecutor() as executor: + futures = [executor.submit(does_nothing) for _ in range(5)] + + for future in futures: + print(future.result()) + + +if __name__ == '__main__': + main() diff --git a/pyMR.egg-info/PKG-INFO b/pyMR.egg-info/PKG-INFO new file mode 100644 index 0000000..5ccccf9 --- /dev/null +++ b/pyMR.egg-info/PKG-INFO @@ -0,0 +1,11 @@ +Metadata-Version: 1.2 +Name: pyMR +Version: 0.1 +Summary: A python package to convert CPU-bound tasks to parallel mapReduce jobs. +Home-page: https://github.com/k4rth33k/pyMR +Author: k4rth33k +Author-email: kartheek2000mike@gmail.com +License: MIT +Description: UNKNOWN +Platform: UNKNOWN +Requires-Python: >=3.2S diff --git a/pyMR.egg-info/SOURCES.txt b/pyMR.egg-info/SOURCES.txt new file mode 100644 index 0000000..2c7ac5c --- /dev/null +++ b/pyMR.egg-info/SOURCES.txt @@ -0,0 +1,11 @@ +README.md +setup.py +pyMR/__init__.py +pyMR/chunk.py +pyMR/main.py +pyMR/utils.py +pyMR.egg-info/PKG-INFO +pyMR.egg-info/SOURCES.txt +pyMR.egg-info/dependency_links.txt +pyMR.egg-info/not-zip-safe +pyMR.egg-info/top_level.txt \ No newline at end of file diff --git a/pyMR.egg-info/dependency_links.txt b/pyMR.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/pyMR.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/pyMR.egg-info/not-zip-safe b/pyMR.egg-info/not-zip-safe new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/pyMR.egg-info/not-zip-safe @@ -0,0 +1 @@ + diff --git a/pyMR.egg-info/top_level.txt b/pyMR.egg-info/top_level.txt new file mode 100644 index 0000000..88ce918 --- /dev/null +++ b/pyMR.egg-info/top_level.txt @@ -0,0 +1 @@ +pyMR diff --git a/pyMR/__init__.py b/pyMR/__init__.py index fa6b0bd..f194699 100644 --- a/pyMR/__init__.py +++ b/pyMR/__init__.py @@ -1,3 +1,4 @@ from .main import Master from .main import Worker from .chunk import Chunks +from .utils import Queue diff --git a/pyMR/chunk.py b/pyMR/chunk.py index 59c6c06..9790b46 100644 --- a/pyMR/chunk.py +++ b/pyMR/chunk.py @@ -1,7 +1,8 @@ # root/pyMR/chunk.py from collections import Iterable -import threading +import copy +from itertools import tee class NotIterableException(Exception): @@ -12,34 +13,9 @@ class SizeException(Exception): pass -class threadsafe_iter: - """Takes an iterator/generator and makes it thread-safe by - serializing call to the `next` method of given iterator/generator. - """ - def __init__(self, it): - self.it = it - self.lock = threading.Lock() - - def __iter__(self): - return self - - def next(self): - with self.lock: - return self.it.next() - - -def threadsafe_generator(f): - """A decorator that takes a generator function and makes it thread-safe. - """ - def g(*a, **kw): - return threadsafe_iter(f(*a, **kw)) - - return g - - class Chunks(): - def __init__(self, data, num_chunks=10, overflow='last'): - self.data = data + def __init__(self, data, num_chunks, overflow='last'): + self.data, self.len_data = tee(data) self.num_chunks = num_chunks self.overflow = overflow self.chunks = [] @@ -51,8 +27,9 @@ def __init__(self, data, num_chunks=10, overflow='last'): try: self.len_ = len(data) except: - self.len_ = sum([1 for _ in data]) + self.len_ = sum([1 for _ in self.len_data]) + print(self.len_) if self.len_ < self.num_chunks: raise SizeException("len(data) should be more than num_chunks") @@ -82,7 +59,6 @@ def get_chunks(self): return self.chunks - # @threadsafe_generator def get_chunks_gen(self): temp_chunk = [] @@ -99,50 +75,3 @@ def get_chunks_gen(self): if i == self.len_: assert len(temp_chunk) == self.overflow_len + self.each_chunk yield temp_chunk - - -class Chunks2: - def __init__(self, data, num_chunks=10): - self.data = data - self.num_chunks = num_chunks - self.chunk_count = 0 - self.lock = threading.Lock() - - if not isinstance(data, Iterable): - raise NotIterableException("'data' should be iterable") - - try: - self.len_ = len(data) - except: - self.len_ = sum([1 for _ in data]) - - if self.len_ < self.num_chunks: - raise SizeException("len(data) should be more than num_chunks") - - if not isinstance(self.num_chunks, int): - raise Exception("'num_chunks' must be of type 'int'") - - self.overflow_len = self.len_ % self.num_chunks - self.each_chunk = self.len_ // self.num_chunks - - def __iter__(self): - return self - - def __next__(self): - # with self.lock: - if True: - temp_chunk = [] - for i, elem in enumerate(self.data, start=1): - - temp_chunk.append(elem) - if not i % self.each_chunk and self.chunk_count < self.num_chunks: - # self.chunks.append(temp_chunk) - if self.chunk_count != self.num_chunks - 1: - self.chunk_count += 1 - return temp_chunk - temp_chunk = [] - - if i == self.len_: - assert len(temp_chunk) == self.overflow_len + \ - self.each_chunk - return temp_chunk diff --git a/pyMR/main.py b/pyMR/main.py index 619466a..5541819 100644 --- a/pyMR/main.py +++ b/pyMR/main.py @@ -1,84 +1,142 @@ # root/pyMR/main.py -from .chunk import Chunks2 from .chunk import Chunks import concurrent.futures -from .utils import Stack +from .utils import Queue + + +class ArgumentError(Exception): + pass class Master(object): """Master class for creating and running workers""" - def __init__(self, num_workers, split_ratio=0.8): + def __init__(self, + num_workers, + split_ratio=0.75, + granularity=3, + verbose=True): + """ + Constructor for Master, master orchestrates the execution of workers + + Args: + num_workers (int): Number of workers needed to be spawn + split_ratio (float): Ratio of map to reduce workers + granularity (int): # Chunks needed by each map worker + verbose (bool): set to True to print logss + Returns: + Nothing + Raises: + ArgumentError: if arguments are not of right type + """ super(Master, self).__init__() - self.data_gen = None - self.num_workers = num_workers - self.workers = [] + self.__workers = [] - self.map_results = [] - self.reduce_stack = Stack(self) + self.num_workers = num_workers + self.verbose = verbose + self.granularity = granularity + self.__map_num = int(split_ratio * num_workers) + self.__red_num = self.num_workers - self.__map_num + self.queue = Queue() - self.map_workers = int(split_ratio * num_workers) - self.reduce_workers = num_workers - self.map_workers + self.validate = None def create_job(self, data, map_fn, red_fn): + """ + Initializes chunks and map/reduce functions + + Args: + data (Iterable): Data to be processed(Generator is highly recommended) + map_fn (Function): Function (not lambda) to be applied on chunks + red_fn (Function): Function (not lambda) to be applied on results + Returns: + Raises: + """ self.map_fn = map_fn self.red_fn = red_fn - self.data_gen = Chunks(data=data, - num_chunks=self.num_workers).get_chunks_gen() - - # self.data_gen = iter(Chunks2(data=data, - # num_chunks=self.map_workers)) - - # self.workers = [Worker(id=_, data_gen=self.data_gen, - # job=map_fn, type='MAP', - # master=self) for _ in range(1, self.map_workers + 1)] - - # self.workers += [Worker(id=_, data_gen=self.data_gen, - # job=red_fn, type='REDUCE', - # master=self) for _ in range(self.map_workers + 1, - # self.num_workers + 1)] - - # Priming - # self.workers = [Worker(id=_, data=next(self.data_gen), - # job=map_fn, type='MAP') for _ in range(1, self.num_workers)] + chunks = Chunks(data=data, + num_chunks=self.__map_num * self.granularity) + self.data_gen = chunks.get_chunks_gen() def start(self): - print('Job has started!') - processes = [] + """ + Starts mapReduce execition loop, refer[link] - # while (generator is not empty): - self.workers = [ - Worker(id=_, data=next(self.data_gen), job=self.map_fn, type='MAP') - for _ in range(1, self.num_workers) - ] - - with concurrent.futures.ThreadPoolExecutor() as executor: - results = [ - executor.submit(worker.run) - for worker in self.workers[:self.map_workers] - ] + Args: + Returns: + Raises: + """ - # self.map_results = [f.result() - # for f in concurrent.futures.as_completed(results)] - # self.map_results = [f.add_done_callback(self.take_result) - # for f in concurrent.futures.as_completed(results)] + print('Job has started!' * self.verbose, end='\n' * self.verbose) - for f in concurrent.futures.as_completed(results): - f.add_done_callback(self.take_result) + data_empty = False + converted = False + verif = None - print() + # Initialize worker objects + self.__workers = [ + Worker(id=_, job=self.map_fn, type='MAP', verbose=self.verbose) + for _ in range(1, self.__map_num + 1) + ] + self.__workers += [ + Worker(id=_, job=self.red_fn, type='RED', verbose=self.verbose) + for _ in range(self.__map_num + 1, self.__red_num + 1) + ] - def take_result(self, future): + while not self.__verify(): + count = 0 + # self.queue.dequeue() + + if not data_empty: + # Setting up map workers + for worker in self.__workers[:self.__map_num]: + try: + worker.set_data(next(self.data_gen)) + count += 1 + except StopIteration: + # print('Exception') + data_empty = True + + if data_empty and not converted: + for worker in self.__workers[:self.__map_num]: + worker.type = 'RED' + worker.job = self.red_fn + converted = True + + # Setting up reduce workers + for worker in self.__workers: + if self.queue.size() >= 2 and worker.type == 'RED': + count += 1 + A, B = self.queue.dequeue(), self.queue.dequeue() + worker.set_data([A, B]) + + with concurrent.futures.ProcessPoolExecutor() as executor: + results = [ + executor.submit(worker.run) + for worker in self.__workers[:count] + ] + + for f in concurrent.futures.as_completed(results): + f.add_done_callback(self.__take_result) + + def __take_result(self, future): result = future.result() - self.map_results.append(result) + self.queue.enqueue(result) + + def __verify(self): + if self.queue.size() == 1: + if self.validate: + return id(self.validate) == id(self.queue.peek()) + else: + self.validate = self.queue.peek() - # print('\n', result) + return False def result(self): - return self.map_results + return self.validate def __str__(self): return f'' @@ -87,91 +145,40 @@ def __repr__(self): return str(self) -# class Master(object): -# """Master class for creating and running workers""" - -# def __init__(self, num_workers): -# super(Master, self).__init__() - -# self.data_gen = None -# self.num_workers = num_workers -# self.workers = [] - -# self.map_results = [] -# self.reduce_stack = Stack(self) - -# def create_job(self, data, map_fn, red_fn): - -# self.data = Chunks2(data) -# self.map_fn = map_fn -# self.red_fn = red_fn - -# def start(self): -# print('Job has started!') -# processes = [] - -# with concurrent.futures.ProcessPoolExecutor() as executor: -# results = executor.map(self.map_fn, self.data) - -# # self.map_results = [f.result() -# # for f in concurrent.futures.as_completed(results)] -# # self.map_results = [f.add_done_callback(self.take_result) -# # for f in concurrent.futures.as_completed(results)] - -# self.results = [result for result in results] -# # print(self.results) - -# def take_result(self, future): -# result = future.result() -# print('\n', result) - -# def result(self): -# return self.results - -# def __str__(self): -# return f'' - -# def __repr__(self): -# return str(self) - - class Worker(object): """Worker class for running the job""" - def __init__(self, id, data, job, type): + def __init__(self, id, job, type, verbose): + """ + Constructor for Worker class + Takes in: + ID -> Number + job -> a function + type -> 'MAP' or 'REDUCE' + """ super(Worker, self).__init__() self.job = job # Function - # self.data_gen = data_gen - self.data = data self.id = id self.type = type - self.result = None - self.working = False - - def run(self, A=None, B=None): + self.verbose = verbose - print(f'{self.type} Worker {self.id} is working!!', end='\r') - self.working = True - # data = None - - if self.type == 'MAP': - # try: - # while data is None: - # data = next(self.data_gen) - # print(self.id, data) + def set_data(self, data): + """ + To set data before parallel execution + """ + self.data = data - # except Exception as e: - # print(e) + def run(self): - # self.result = self.job(data) - self.result = self.job(self.data) + print(f'{self.type} Worker {self.id} is working' * self.verbose, + end='\r' * self.verbose) - # return self.result + if self.type == 'MAP': + result = self.job(self.data) else: - self.result = self.job(A, B) + result = self.job(self.data.pop(), self.data.pop()) - self.working = False - return self.result + return result def __str__(self): return f'' diff --git a/pyMR/utils.py b/pyMR/utils.py index 7d8838e..0e8a76e 100644 --- a/pyMR/utils.py +++ b/pyMR/utils.py @@ -5,6 +5,7 @@ class Stack(object): def __init__(self, master): super(Stack, self).__init__() self._ = [] + self.master = master def push(self, elem): self._.append(elem) @@ -13,3 +14,32 @@ def push(self, elem): def pop(self): return self._.pop() + + +class Queue(object): + """A simple queue to store results""" + def __init__(self): + super(Queue, self).__init__() + self._ = [] + + def enqueue(self, result): + self._.append(result) + + def is_empty(self): + return len(self._) == 0 + + def dequeue(self): + if self.is_empty(): + raise Exception('Cannot dequeue from an empty queue') + x = self._[0] + self._ = self._[1:] + return x + + def size(self): + return len(self._) + + def peek(self): + return self._[0] + + def __str__(self): + return str(self._) diff --git a/tests/queue_test.py b/tests/queue_test.py new file mode 100644 index 0000000..675ae64 --- /dev/null +++ b/tests/queue_test.py @@ -0,0 +1,29 @@ +from pyMR import Queue + + +def test_enq(): + q = Queue() + assert q.is_empty() == True + + for i in range(10): + q.enqueue(i) + + for i in range(10): + assert q.dequeue() == i + + +def test_deq(): + q = Queue() + try: + q.dequeue() + except: + pass + + +def test_size(): + q = Queue() + for i in range(1, 10): + q.enqueue(i) + q.dequeue() + + assert q.size() == 0 diff --git a/tests/simple_test.py b/tests/simple_test.py index b78da62..2b2b2e1 100644 --- a/tests/simple_test.py +++ b/tests/simple_test.py @@ -1,37 +1,78 @@ # root/test/simple_test.py from pyMR import Master +from collections import defaultdict +from math import sqrt -def map_(x): - print('Running map') - return sum(x) +def map_prime(x): + temp = [] + for val in x: + flag = False + for i in range(2, int(sqrt(val)) + 1): + if val % i == 0: + flag = True + break + if not flag: + temp.append(val) + return temp -def red_(x, y): + +def map_sum(list_): + return sum(list_) + + +def map_freq(lines): + freq = defaultdict(int) + for line in lines: + for c in line: + freq[c] += 1 + + return freq + + +def red_list(x, y): return x + y +def red_freq(f1, f2): + for letter, freq in f1.items(): + f2[letter] += freq + + return f2 + + # 1000000000 -def test_main(): - master = Master(num_workers=30) - # master.create_job(data=range(100), - # map_fn=lambda x: sum(x), - # red_fn=lambda x, y: x + y) - master.create_job(data=list(range(1000000000)), map_fn=map_, red_fn=red_) + + +def test_prime(): + master = Master(num_workers=5, granularity=3, verbose=False) + master.create_job(data=(range(1, 1000)), map_fn=map_prime, red_fn=red_list) + + master.start() + result = master.result() + + assert set(result) == set(map_prime(range(1, 1000))) + + +def test_sum(): + master = Master(num_workers=5, granularity=3, verbose=False) + master.create_job(data=(range(1, 1000)), map_fn=map_sum, red_fn=red_list) master.start() result = master.result() - print(result) + + assert result == map_sum(range(1, 1000)) -# def test_len(): -# master = Master(num_workers=5) -# master.create_job(data=range(101), -# map_fn=lambda x: sum(x), -# red_fn=lambda x, y: x + y) +def test_freq(): + master = Master(num_workers=5, granularity=1, verbose=True) + master.create_job(data=(open('./tests/test_files/test_small.txt', 'r')), + map_fn=map_freq, + red_fn=red_freq) -# assert len(master.workers) == 5 + master.start() + result = master.result() -if __name__ == '__main__': - test_main() + assert result == map_freq(open('./tests/test_files/test_small.txt', 'r')) diff --git a/tests/test.txt b/tests/test.txt deleted file mode 100644 index 16dbab5..0000000 --- a/tests/test.txt +++ /dev/null @@ -1,984 +0,0 @@ -This is line 0 -This is line 1 -This is line 2 -This is line 3 -This is line 4 -This is line 5 -This is line 6 -This is line 7 -This is line 8 -This is line 9 -This is line 10 -This is line 11 -This is line 12 -This is line 13 -This is line 14 -This is line 15 -This is line 16 -This is line 17 -This is line 18 -This is line 19 -This is line 20 -This is line 21 -This is line 22 -This is line 23 -This is line 24 -This is line 25 -This is line 26 -This is line 27 -This is line 28 -This is line 29 -This is line 30 -This is line 31 -This is line 32 -This is line 33 -This is line 34 -This is line 35 -This is line 36 -This is line 37 -This is line 38 -This is line 39 -This is line 40 -This is line 41 -This is line 42 -This is line 43 -This is line 44 -This is line 45 -This is line 46 -This is line 47 -This is line 48 -This is line 49 -This is line 50 -This is line 51 -This is line 52 -This is line 53 -This is line 54 -This is line 55 -This is line 56 -This is line 57 -This is line 58 -This is line 59 -This is line 60 -This is line 61 -This is line 62 -This is line 63 -This is line 64 -This is line 65 -This is line 66 -This is line 67 -This is line 68 -This is line 69 -This is line 70 -This is line 71 -This is line 72 -This is line 73 -This is line 74 -This is line 75 -This is line 76 -This is line 77 -This is line 78 -This is line 79 -This is line 80 -This is line 81 -This is line 82 -This is line 83 -This is line 84 -This is line 85 -This is line 86 -This is line 87 -This is line 88 -This is line 89 -This is line 90 -This is line 91 -This is line 92 -This is line 93 -This is line 94 -This is line 95 -This is line 96 -This is line 97 -This is line 98 -This is line 99 -This is line 100 -This is line 101 -This is line 102 -This is line 103 -This is line 104 -This is line 105 -This is line 106 -This is line 107 -This is line 108 -This is line 109 -This is line 110 -This is line 111 -This is line 112 -This is line 113 -This is line 114 -This is line 115 -This is line 116 -This is line 117 -This is line 118 -This is line 119 -This is line 120 -This is line 121 -This is line 122 -This is line 123 -This is line 124 -This is line 125 -This is line 126 -This is line 127 -This is line 128 -This is line 129 -This is line 130 -This is line 131 -This is line 132 -This is line 133 -This is line 134 -This is line 135 -This is line 136 -This is line 137 -This is line 138 -This is line 139 -This is line 140 -This is line 141 -This is line 142 -This is line 143 -This is line 144 -This is line 145 -This is line 146 -This is line 147 -This is line 148 -This is line 149 -This is line 150 -This is line 151 -This is line 152 -This is line 153 -This is line 154 -This is line 155 -This is line 156 -This is line 157 -This is line 158 -This is line 159 -This is line 160 -This is line 161 -This is line 162 -This is line 163 -This is line 164 -This is line 165 -This is line 166 -This is line 167 -This is line 168 -This is line 169 -This is line 170 -This is line 171 -This is line 172 -This is line 173 -This is line 174 -This is line 175 -This is line 176 -This is line 177 -This is line 178 -This is line 179 -This is line 180 -This is line 181 -This is line 182 -This is line 183 -This is line 184 -This is line 185 -This is line 186 -This is line 187 -This is line 188 -This is line 189 -This is line 190 -This is line 191 -This is line 192 -This is line 193 -This is line 194 -This is line 195 -This is line 196 -This is line 197 -This is line 198 -This is line 199 -This is line 200 -This is line 201 -This is line 202 -This is line 203 -This is line 204 -This is line 205 -This is line 206 -This is line 207 -This is line 208 -This is line 209 -This is line 210 -This is line 211 -This is line 212 -This is line 213 -This is line 214 -This is line 215 -This is line 216 -This is line 217 -This is line 218 -This is line 219 -This is line 220 -This is line 221 -This is line 222 -This is line 223 -This is line 224 -This is line 225 -This is line 226 -This is line 227 -This is line 228 -This is line 229 -This is line 230 -This is line 231 -This is line 232 -This is line 233 -This is line 234 -This is line 235 -This is line 236 -This is line 237 -This is line 238 -This is line 239 -This is line 240 -This is line 241 -This is line 242 -This is line 243 -This is line 244 -This is line 245 -This is line 246 -This is line 247 -This is line 248 -This is line 249 -This is line 250 -This is line 251 -This is line 252 -This is line 253 -This is line 254 -This is line 255 -This is line 256 -This is line 257 -This is line 258 -This is line 259 -This is line 260 -This is line 261 -This is line 262 -This is line 263 -This is line 264 -This is line 265 -This is line 266 -This is line 267 -This is line 268 -This is line 269 -This is line 270 -This is line 271 -This is line 272 -This is line 273 -This is line 274 -This is line 275 -This is line 276 -This is line 277 -This is line 278 -This is line 279 -This is line 280 -This is line 281 -This is line 282 -This is line 283 -This is line 284 -This is line 285 -This is line 286 -This is line 287 -This is line 288 -This is line 289 -This is line 290 -This is line 291 -This is line 292 -This is line 293 -This is line 294 -This is line 295 -This is line 296 -This is line 297 -This is line 298 -This is line 299 -This is line 300 -This is line 301 -This is line 302 -This is line 303 -This is line 304 -This is line 305 -This is line 306 -This is line 307 -This is line 308 -This is line 309 -This is line 310 -This is line 311 -This is line 312 -This is line 313 -This is line 314 -This is line 315 -This is line 316 -This is line 317 -This is line 318 -This is line 319 -This is line 320 -This is line 321 -This is line 322 -This is line 323 -This is line 324 -This is line 325 -This is line 326 -This is line 327 -This is line 328 -This is line 329 -This is line 330 -This is line 331 -This is line 332 -This is line 333 -This is line 334 -This is line 335 -This is line 336 -This is line 337 -This is line 338 -This is line 339 -This is line 340 -This is line 341 -This is line 342 -This is line 343 -This is line 344 -This is line 345 -This is line 346 -This is line 347 -This is line 348 -This is line 349 -This is line 350 -This is line 351 -This is line 352 -This is line 353 -This is line 354 -This is line 355 -This is line 356 -This is line 357 -This is line 358 -This is line 359 -This is line 360 -This is line 361 -This is line 362 -This is line 363 -This is line 364 -This is line 365 -This is line 366 -This is line 367 -This is line 368 -This is line 369 -This is line 370 -This is line 371 -This is line 372 -This is line 373 -This is line 374 -This is line 375 -This is line 376 -This is line 377 -This is line 378 -This is line 379 -This is line 380 -This is line 381 -This is line 382 -This is line 383 -This is line 384 -This is line 385 -This is line 386 -This is line 387 -This is line 388 -This is line 389 -This is line 390 -This is line 391 -This is line 392 -This is line 393 -This is line 394 -This is line 395 -This is line 396 -This is line 397 -This is line 398 -This is line 399 -This is line 400 -This is line 401 -This is line 402 -This is line 403 -This is line 404 -This is line 405 -This is line 406 -This is line 407 -This is line 408 -This is line 409 -This is line 410 -This is line 411 -This is line 412 -This is line 413 -This is line 414 -This is line 415 -This is line 416 -This is line 417 -This is line 418 -This is line 419 -This is line 420 -This is line 421 -This is line 422 -This is line 423 -This is line 424 -This is line 425 -This is line 426 -This is line 427 -This is line 428 -This is line 429 -This is line 430 -This is line 431 -This is line 432 -This is line 433 -This is line 434 -This is line 435 -This is line 436 -This is line 437 -This is line 438 -This is line 439 -This is line 440 -This is line 441 -This is line 442 -This is line 443 -This is line 444 -This is line 445 -This is line 446 -This is line 447 -This is line 448 -This is line 449 -This is line 450 -This is line 451 -This is line 452 -This is line 453 -This is line 454 -This is line 455 -This is line 456 -This is line 457 -This is line 458 -This is line 459 -This is line 460 -This is line 461 -This is line 462 -This is line 463 -This is line 464 -This is line 465 -This is line 466 -This is line 467 -This is line 468 -This is line 469 -This is line 470 -This is line 471 -This is line 472 -This is line 473 -This is line 474 -This is line 475 -This is line 476 -This is line 477 -This is line 478 -This is line 479 -This is line 480 -This is line 481 -This is line 482 -This is line 483 -This is line 484 -This is line 485 -This is line 486 -This is line 487 -This is line 488 -This is line 489 -This is line 490 -This is line 491 -This is line 492 -This is line 493 -This is line 494 -This is line 495 -This is line 496 -This is line 497 -This is line 498 -This is line 499 -This is line 500 -This is line 501 -This is line 502 -This is line 503 -This is line 504 -This is line 505 -This is line 506 -This is line 507 -This is line 508 -This is line 509 -This is line 510 -This is line 511 -This is line 512 -This is line 513 -This is line 514 -This is line 515 -This is line 516 -This is line 517 -This is line 518 -This is line 519 -This is line 520 -This is line 521 -This is line 522 -This is line 523 -This is line 524 -This is line 525 -This is line 526 -This is line 527 -This is line 528 -This is line 529 -This is line 530 -This is line 531 -This is line 532 -This is line 533 -This is line 534 -This is line 535 -This is line 536 -This is line 537 -This is line 538 -This is line 539 -This is line 540 -This is line 541 -This is line 542 -This is line 543 -This is line 544 -This is line 545 -This is line 546 -This is line 547 -This is line 548 -This is line 549 -This is line 550 -This is line 551 -This is line 552 -This is line 553 -This is line 554 -This is line 555 -This is line 556 -This is line 557 -This is line 558 -This is line 559 -This is line 560 -This is line 561 -This is line 562 -This is line 563 -This is line 564 -This is line 565 -This is line 566 -This is line 567 -This is line 568 -This is line 569 -This is line 570 -This is line 571 -This is line 572 -This is line 589 -This is line 590 -This is line 591 -This is line 592 -This is line 593 -This is line 594 -This is line 595 -This is line 596 -This is line 597 -This is line 598 -This is line 599 -This is line 600 -This is line 601 -This is line 602 -This is line 603 -This is line 604 -This is line 605 -This is line 606 -This is line 607 -This is line 608 -This is line 609 -This is line 610 -This is line 611 -This is line 612 -This is line 613 -This is line 614 -This is line 615 -This is line 616 -This is line 617 -This is line 618 -This is line 619 -This is line 620 -This is line 621 -This is line 622 -This is line 623 -This is line 624 -This is line 625 -This is line 626 -This is line 627 -This is line 628 -This is line 629 -This is line 630 -This is line 631 -This is line 632 -This is line 633 -This is line 634 -This is line 635 -This is line 636 -This is line 637 -This is line 638 -This is line 639 -This is line 640 -This is line 641 -This is line 642 -This is line 643 -This is line 644 -This is line 645 -This is line 646 -This is line 647 -This is line 648 -This is line 649 -This is line 650 -This is line 651 -This is line 652 -This is line 653 -This is line 654 -This is line 655 -This is line 656 -This is line 657 -This is line 658 -This is line 659 -This is line 660 -This is line 661 -This is line 662 -This is line 663 -This is line 664 -This is line 665 -This is line 666 -This is line 667 -This is line 668 -This is line 669 -This is line 670 -This is line 671 -This is line 672 -This is line 673 -This is line 674 -This is line 675 -This is line 676 -This is line 677 -This is line 678 -This is line 679 -This is line 680 -This is line 681 -This is line 682 -This is line 683 -This is line 684 -This is line 685 -This is line 686 -This is line 687 -This is line 688 -This is line 689 -This is line 690 -This is line 691 -This is line 692 -This is line 693 -This is line 694 -This is line 695 -This is line 696 -This is line 697 -This is line 698 -This is line 699 -This is line 700 -This is line 701 -This is line 702 -This is line 703 -This is line 704 -This is line 705 -This is line 706 -This is line 707 -This is line 708 -This is line 709 -This is line 710 -This is line 711 -This is line 712 -This is line 713 -This is line 714 -This is line 715 -This is line 716 -This is line 717 -This is line 718 -This is line 719 -This is line 720 -This is line 721 -This is line 722 -This is line 723 -This is line 724 -This is line 725 -This is line 726 -This is line 727 -This is line 728 -This is line 729 -This is line 730 -This is line 731 -This is line 732 -This is line 733 -This is line 734 -This is line 735 -This is line 736 -This is line 737 -This is line 738 -This is line 739 -This is line 740 -This is line 741 -This is line 742 -This is line 743 -This is line 744 -This is line 745 -This is line 746 -This is line 747 -This is line 748 -This is line 749 -This is line 750 -This is line 751 -This is line 752 -This is line 753 -This is line 754 -This is line 755 -This is line 756 -This is line 757 -This is line 758 -This is line 759 -This is line 760 -This is line 761 -This is line 762 -This is line 763 -This is line 764 -This is line 765 -This is line 766 -This is line 767 -This is line 768 -This is line 769 -This is line 770 -This is line 771 -This is line 772 -This is line 773 -This is line 774 -This is line 775 -This is line 776 -This is line 777 -This is line 778 -This is line 779 -This is line 780 -This is line 781 -This is line 782 -This is line 783 -This is line 784 -This is line 785 -This is line 786 -This is line 787 -This is line 788 -This is line 789 -This is line 790 -This is line 791 -This is line 792 -This is line 793 -This is line 794 -This is line 795 -This is line 796 -This is line 797 -This is line 798 -This is line 799 -This is line 800 -This is line 801 -This is line 802 -This is line 803 -This is line 804 -This is line 805 -This is line 806 -This is line 807 -This is line 808 -This is line 809 -This is line 810 -This is line 811 -This is line 812 -This is line 813 -This is line 814 -This is line 815 -This is line 816 -This is line 817 -This is line 818 -This is line 819 -This is line 820 -This is line 821 -This is line 822 -This is line 823 -This is line 824 -This is line 825 -This is line 826 -This is line 827 -This is line 828 -This is line 829 -This is line 830 -This is line 831 -This is line 832 -This is line 833 -This is line 834 -This is line 835 -This is line 836 -This is line 837 -This is line 838 -This is line 839 -This is line 840 -This is line 841 -This is line 842 -This is line 843 -This is line 844 -This is line 845 -This is line 846 -This is line 847 -This is line 848 -This is line 849 -This is line 850 -This is line 851 -This is line 852 -This is line 853 -This is line 854 -This is line 855 -This is line 856 -This is line 857 -This is line 858 -This is line 859 -This is line 860 -This is line 861 -This is line 862 -This is line 863 -This is line 864 -This is line 865 -This is line 866 -This is line 867 -This is line 868 -This is line 869 -This is line 870 -This is line 871 -This is line 872 -This is line 873 -This is line 874 -This is line 875 -This is line 876 -This is line 877 -This is line 878 -This is line 879 -This is line 880 -This is line 881 -This is line 882 -This is line 883 -This is line 884 -This is line 885 -This is line 886 -This is line 887 -This is line 888 -This is line 889 -This is line 890 -This is line 891 -This is line 892 -This is line 893 -This is line 894 -This is line 895 -This is line 896 -This is line 897 -This is line 898 -This is line 899 -This is line 900 -This is line 901 -This is line 902 -This is line 903 -This is line 904 -This is line 905 -This is line 906 -This is line 907 -This is line 908 -This is line 909 -This is line 910 -This is line 911 -This is line 912 -This is line 913 -This is line 914 -This is line 915 -This is line 916 -This is line 917 -This is line 918 -This is line 919 -This is line 920 -This is line 921 -This is line 922 -This is line 923 -This is line 924 -This is line 925 -This is line 926 -This is line 927 -This is line 928 -This is line 929 -This is line 930 -This is line 931 -This is line 932 -This is line 933 -This is line 934 -This is line 935 -This is line 936 -This is line 937 -This is line 938 -This is line 939 -This is line 940 -This is line 941 -This is line 942 -This is line 943 -This is line 944 -This is line 945 -This is line 946 -This is line 947 -This is line 948 -This is line 949 -This is line 950 -This is line 951 -This is line 952 -This is line 953 -This is line 954 -This is line 955 -This is line 956 -This is line 957 -This is line 958 -This is line 959 -This is line 960 -This is line 961 -This is line 962 -This is line 963 -This is line 964 -This is line 965 -This is line 966 -This is line 967 -This is line 968 -This is line 969 -This is line 970 -This is line 971 -This is line 972 -This is line 973 -This is line 974 -This is line 975 -This is line 976 -This is line 977 -This is line 978 -This is line 979 -This is line 980 -This is line 981 -This is line 982 -This is line 983 -This is line 984 -This is line 985 -This is line 986 -This is line 987 -This is line 988 -This is line 989 -This is line 990 -This is line 991 -This is line 992 -This is line 993 -This is line 994 -This is line 995 -This is line 996 -This is line 997 -This is line 998 -This is line 999 diff --git a/tests/test_files/test_small.txt b/tests/test_files/test_small.txt new file mode 100644 index 0000000..38d2bed --- /dev/null +++ b/tests/test_files/test_small.txt @@ -0,0 +1,47 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliquad +Sit amet justo donec enim diam vulputate utd +Leo integer malesuada nunc vel risus commodo viverra maecenasd +Quis varius quam quisque id diam vel quam elementum pulvinard +Sollicitudin nibh sit amet commodod +Pellentesque nec nam aliquam sem etd +Tortor at risus viverra adipiscing at in tellus integer feugiatd +At risus viverra adipiscing atd +Dis parturient montes nascetur ridiculus musd +Feugiat nisl pretium fusce id velit ut tortor pretium viverrad +Ut consequat semper viverra nam libero justo laoreet sitd +Ultrices mi tempus imperdiet nulla malesuada pellentesque elit egetd +Augue interdum velit euismod in pellentesque massa placeratd +Vel eros donec ac odio tempor orci dapibus ultrices ind +Dignissim cras tincidunt lobortis feugiat vivamus at augue egetd +Fringilla est ullamcorper eget nulla facilisi etiam dignissim diam quisd +Morbi tristique senectus et netus et malesuada fames ac turpisd +Neque gravida in fermentum et sollicitudin ac orci phasellusd +Quam vulputate dignissim suspendisse in est ante in nibhd +Massa ultricies mi quis hendreritd +Dignissim convallis aenean et tortor atd +Nisi est sit amet facilisis magna etiamd +Magna fringilla urna porttitor rhoncus dolor purus nond +Turpis egestas maecenas pharetra convallis posuere morbi leo urna molestied +Vitae auctor eu augue utd +Justo eget magna fermentum iaculis eu nond +Consectetur adipiscing elit duis tristique sollicitudind +Tellus id interdum velit laoreet id donec ultricesd +Euismod nisi porta lorem mollis aliquam ut porttitor leo ad +Sit amet mauris commodo quis imperdiet massa tincidunt nunc pulvinard +Nibh tortor id aliquet lectus proin nibh nisl condimentum id. +Integer feugiat scelerisque varius morbi. +A cras semper auctor neque vitae tempus quam pellentesque nec. +Lectus mauris ultrices eros in cursus turpis massa. +Quis risus sed vulputate odio. +Dictum varius duis at consectetur lorem donec massa sapien faucibus. +Lectus sit amet est placerat in. +Aliquet eget sit amet tellus cras adipiscing. +Lobortis elementum nibh tellus molestie nunc non blandit. +Aliquam vestibulum morbi blandit cursus risus at ultrices mi tempus. +Fermentum leo vel orci porta. +Amet risus nullam eget felis eget nunc. +Ornare suspendisse sed nisi lacus sed viverra tellus in hac. +Ultrices tincidunt arcu non sodales neque sodales ut etiam. +Faucibus scelerisque eleifend donec pretium vulputate sapien nec. +Urna neque viverra justo nec ultrices dui sapien eget mi. +Amet nulla facilisi morbi tempus iaculis urna id volutpat. From 0da7ccf6b1d7da341bcb4fbfb8b07d51f57a65f4 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:39:29 +0530 Subject: [PATCH 02/10] Queue, batch based execution --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes devTools/push.py | 3 ++- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 0ef04764bd670efaab53f3bc788670c0d00b2963..f3521791b9f8d61c274cf8ad7ef597300131c4b4 100644 GIT binary patch delta 16 XcmX@cbc~7Jn3tF9g3!Z_?DmWRDIo-^ delta 16 XcmX@cbc~7Jn3tD}#cSP0c6&wuBi;k! diff --git a/devTools/push.py b/devTools/push.py index cc44379..eb90ce0 100644 --- a/devTools/push.py +++ b/devTools/push.py @@ -10,4 +10,5 @@ os.system(f'git commit -m \"{cmt}\"') -os.system('git push origin master') +branch = input('Branch: ') +os.system('git push origin {branch}') From 2d32a8b10fdfa3a39c1a00ba12ca12d589381495 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:40:54 +0530 Subject: [PATCH 03/10] Queue, batch based execution --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes devTools/push.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index f3521791b9f8d61c274cf8ad7ef597300131c4b4..4a74e7096d89fb6fdb7d90aa192278a4cfd9bc3a 100644 GIT binary patch delta 15 WcmX@cbc~73n3tDpr|?EL2SxxPS_85G delta 15 WcmX@cbc~73n3tF9g3v}b2SxxP-vh`1 diff --git a/devTools/push.py b/devTools/push.py index eb90ce0..30f8d1e 100644 --- a/devTools/push.py +++ b/devTools/push.py @@ -11,4 +11,4 @@ os.system(f'git commit -m \"{cmt}\"') branch = input('Branch: ') -os.system('git push origin {branch}') +os.system(f'git push origin {branch}') From fbf14faddc180dfe2f47cb707b12688031bef87a Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:45:00 +0530 Subject: [PATCH 04/10] Removed Stack class --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes pyMR/utils.py | 15 --------------- 2 files changed, 15 deletions(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 4a74e7096d89fb6fdb7d90aa192278a4cfd9bc3a..09472ba68b77d05d8881f6229225e8bf864fd07d 100644 GIT binary patch delta 15 WcmX@cbc~73n3tDJNMs|M10w((bps3l delta 15 WcmX@cbc~73n3tDpr|?EL2SxxPS_85G diff --git a/pyMR/utils.py b/pyMR/utils.py index 0e8a76e..e9331df 100644 --- a/pyMR/utils.py +++ b/pyMR/utils.py @@ -1,21 +1,6 @@ # root/pyMR/utils.py -class Stack(object): - def __init__(self, master): - super(Stack, self).__init__() - self._ = [] - self.master = master - - def push(self, elem): - self._.append(elem) - if len(self._) == 2: - master.reduce(self.pop(), self.pop()) - - def pop(self): - return self._.pop() - - class Queue(object): """A simple queue to store results""" def __init__(self): From 7351541c2e49b45c892615209f7123566aab22b6 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:53:54 +0530 Subject: [PATCH 05/10] Updated build --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes devTools/push.py | 2 ++ 2 files changed, 2 insertions(+) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 09472ba68b77d05d8881f6229225e8bf864fd07d..f0cc8f2357e5eaab74ee8c830b8f45fa4b865117 100644 GIT binary patch delta 15 WcmX@cbc~73n3tD}TXZ9v10w((Py+@4 delta 15 WcmX@cbc~73n3tDJNMs|M10w((bps3l diff --git a/devTools/push.py b/devTools/push.py index 30f8d1e..6ccaf2e 100644 --- a/devTools/push.py +++ b/devTools/push.py @@ -4,6 +4,8 @@ import clear import format +os.system('pip install . --no-cache-dir') + cmt = input('Commit message: ') os.system('git add .') From 9e94d9ec359514bf5ce8c935c782232825d18131 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 22:56:57 +0530 Subject: [PATCH 06/10] Removed build dir --- build/lib/pyMR/__init__.py | 3 - build/lib/pyMR/chunk.py | 148 ---------------- build/lib/pyMR/main.py | 187 --------------------- build/lib/pyMR/utils.py | 16 -- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes 5 files changed, 354 deletions(-) delete mode 100644 build/lib/pyMR/__init__.py delete mode 100644 build/lib/pyMR/chunk.py delete mode 100644 build/lib/pyMR/main.py delete mode 100644 build/lib/pyMR/utils.py diff --git a/build/lib/pyMR/__init__.py b/build/lib/pyMR/__init__.py deleted file mode 100644 index fa6b0bd..0000000 --- a/build/lib/pyMR/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .main import Master -from .main import Worker -from .chunk import Chunks diff --git a/build/lib/pyMR/chunk.py b/build/lib/pyMR/chunk.py deleted file mode 100644 index 59c6c06..0000000 --- a/build/lib/pyMR/chunk.py +++ /dev/null @@ -1,148 +0,0 @@ -# root/pyMR/chunk.py - -from collections import Iterable -import threading - - -class NotIterableException(Exception): - pass - - -class SizeException(Exception): - pass - - -class threadsafe_iter: - """Takes an iterator/generator and makes it thread-safe by - serializing call to the `next` method of given iterator/generator. - """ - def __init__(self, it): - self.it = it - self.lock = threading.Lock() - - def __iter__(self): - return self - - def next(self): - with self.lock: - return self.it.next() - - -def threadsafe_generator(f): - """A decorator that takes a generator function and makes it thread-safe. - """ - def g(*a, **kw): - return threadsafe_iter(f(*a, **kw)) - - return g - - -class Chunks(): - def __init__(self, data, num_chunks=10, overflow='last'): - self.data = data - self.num_chunks = num_chunks - self.overflow = overflow - self.chunks = [] - self.chunk_count = 0 - - if not isinstance(data, Iterable): - raise NotIterableException("'data' should be iterable") - - try: - self.len_ = len(data) - except: - self.len_ = sum([1 for _ in data]) - - if self.len_ < self.num_chunks: - raise SizeException("len(data) should be more than num_chunks") - - if not isinstance(self.num_chunks, int): - raise Exception("'num_chunks' must be of type 'int'") - - self.overflow_len = self.len_ % self.num_chunks - self.each_chunk = self.len_ // self.num_chunks - - def get_chunks(self): - - temp_chunk = [] - for i, elem in enumerate(self.data, start=1): - temp_chunk.append(elem) - - if not i % self.each_chunk and len(self.chunks) < self.num_chunks: - self.chunks.append(temp_chunk) - temp_chunk = [] - - if i == self.len_: - assert len(temp_chunk) == self.overflow_len - - if self.overflow == 'first': - self.chunks[0] += temp_chunk - else: - self.chunks[-1] += temp_chunk - - return self.chunks - - # @threadsafe_generator - def get_chunks_gen(self): - - temp_chunk = [] - for i, elem in enumerate(self.data, start=1): - - temp_chunk.append(elem) - if not i % self.each_chunk and self.chunk_count < self.num_chunks: - # self.chunks.append(temp_chunk) - if self.chunk_count != self.num_chunks - 1: - self.chunk_count += 1 - yield temp_chunk - temp_chunk = [] - - if i == self.len_: - assert len(temp_chunk) == self.overflow_len + self.each_chunk - yield temp_chunk - - -class Chunks2: - def __init__(self, data, num_chunks=10): - self.data = data - self.num_chunks = num_chunks - self.chunk_count = 0 - self.lock = threading.Lock() - - if not isinstance(data, Iterable): - raise NotIterableException("'data' should be iterable") - - try: - self.len_ = len(data) - except: - self.len_ = sum([1 for _ in data]) - - if self.len_ < self.num_chunks: - raise SizeException("len(data) should be more than num_chunks") - - if not isinstance(self.num_chunks, int): - raise Exception("'num_chunks' must be of type 'int'") - - self.overflow_len = self.len_ % self.num_chunks - self.each_chunk = self.len_ // self.num_chunks - - def __iter__(self): - return self - - def __next__(self): - # with self.lock: - if True: - temp_chunk = [] - for i, elem in enumerate(self.data, start=1): - - temp_chunk.append(elem) - if not i % self.each_chunk and self.chunk_count < self.num_chunks: - # self.chunks.append(temp_chunk) - if self.chunk_count != self.num_chunks - 1: - self.chunk_count += 1 - return temp_chunk - temp_chunk = [] - - if i == self.len_: - assert len(temp_chunk) == self.overflow_len + \ - self.each_chunk - return temp_chunk diff --git a/build/lib/pyMR/main.py b/build/lib/pyMR/main.py deleted file mode 100644 index 782ef74..0000000 --- a/build/lib/pyMR/main.py +++ /dev/null @@ -1,187 +0,0 @@ -# root/pyMR/main.py - -from .chunk import Chunks2 -from .chunk import Chunks -import concurrent.futures -from .utils import Stack - - -class Master(object): - """Master class for creating and running workers""" - def __init__(self, num_workers, split_ratio=0.8, load_each=20): - super(Master, self).__init__() - - self.data_gen = None - self.num_workers = num_workers - self.workers = [] - self.load_each = load_each - - self.map_results = [] - self.reduce_stack = Stack(self) - - self.map_workers = int(split_ratio * num_workers) - self.reduce_workers = num_workers - self.map_workers - - def create_job(self, data, map_fn, red_fn): - - self.map_fn = map_fn - self.red_fn = red_fn - - # self.data_gen = Chunks(data=data, - # num_chunks=self.num_workers * self.load_each).get_chunks_gen() - - self.data_gen = iter( - Chunks2(data=data, num_chunks=self.num_workers * self.load_each)) - print(next(self.data_gen)) - print(next(self.data_gen)) - print(len(next(self.data_gen))) - - # self.workers = [Worker(id=_, data_gen=self.data_gen, - # job=map_fn, type='MAP', - # master=self) for _ in range(1, self.map_workers + 1)] - - # self.workers += [Worker(id=_, data_gen=self.data_gen, - # job=red_fn, type='REDUCE', - # master=self) for _ in range(self.map_workers + 1, - # self.num_workers + 1)] - - # Priming - # self.workers = [Worker(id=_, data=next(self.data_gen), - # job=map_fn, type='MAP') for _ in range(1, self.num_workers)] - - def start(self): - print('Job has started!') - processes = [] - - # while (generator is not empty): - for __ in range(self.load_each): - self.workers = [ - Worker(id=_, - data=next(self.data_gen), - job=self.map_fn, - type='MAP') for _ in range(1, self.num_workers) - ] - - with concurrent.futures.ThreadPoolExecutor() as executor: - results = [ - executor.submit(worker.run) - for worker in self.workers[:self.map_workers] - ] - - # self.map_results = [f.result() - # for f in concurrent.futures.as_completed(results)] - # self.map_results = [f.add_done_callback(self.take_result) - # for f in concurrent.futures.as_completed(results)] - - for f in concurrent.futures.as_completed(results): - f.add_done_callback(self.take_result) - - print() - - def take_result(self, future): - result = future.result() - self.map_results.append(result) - - # print('\n', result) - - def result(self): - return self.map_results - - def __str__(self): - return f'' - - def __repr__(self): - return str(self) - - -# class Master(object): -# """Master class for creating and running workers""" - -# def __init__(self, num_workers): -# super(Master, self).__init__() - -# self.data_gen = None -# self.num_workers = num_workers -# self.workers = [] - -# self.map_results = [] -# self.reduce_stack = Stack(self) - -# def create_job(self, data, map_fn, red_fn): - -# self.data = Chunks2(data) -# self.map_fn = map_fn -# self.red_fn = red_fn - -# def start(self): -# print('Job has started!') -# processes = [] - -# with concurrent.futures.ProcessPoolExecutor() as executor: -# results = executor.map(self.map_fn, self.data) - -# # self.map_results = [f.result() -# # for f in concurrent.futures.as_completed(results)] -# # self.map_results = [f.add_done_callback(self.take_result) -# # for f in concurrent.futures.as_completed(results)] - -# self.results = [result for result in results] -# # print(self.results) - -# def take_result(self, future): -# result = future.result() -# print('\n', result) - -# def result(self): -# return self.results - -# def __str__(self): -# return f'' - -# def __repr__(self): -# return str(self) - - -class Worker(object): - """Worker class for running the job""" - def __init__(self, id, data, job, type): - - super(Worker, self).__init__() - self.job = job # Function - # self.data_gen = data_gen - self.data = data - self.id = id - self.type = type - self.result = None - self.working = False - - def run(self, A=None, B=None): - - print(f'{self.type} Worker {self.id} is working!!', end='\r') - self.working = True - # data = None - - if self.type == 'MAP': - # try: - # while data is None: - # data = next(self.data_gen) - # print(self.id, data) - - # except Exception as e: - # print(e) - - # self.result = self.job(data) - self.result = self.job(self.data) - - # return self.result - else: - self.result = self.job(A, B) - - self.working = False - return self.result - - def __str__(self): - return f'' - - def __repr__(self): - return str(self) diff --git a/build/lib/pyMR/utils.py b/build/lib/pyMR/utils.py deleted file mode 100644 index 6ca1896..0000000 --- a/build/lib/pyMR/utils.py +++ /dev/null @@ -1,16 +0,0 @@ -# root/pyMR/utils.py - - -class Stack(object): - def __init__(self, master): - super(Stack, self).__init__() - self._ = [] - self.master = master - - def push(self, elem): - self._.append(elem) - if len(self._) == 2: - master.reduce(self.pop(), self.pop()) - - def pop(self): - return self._.pop() diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index f0cc8f2357e5eaab74ee8c830b8f45fa4b865117..bfa4d2b3920f36ec6884bb037b5892aba003f44b 100644 GIT binary patch delta 15 WcmX@cbc~73n3tDJT6`m$10w((x&s&h delta 15 WcmX@cbc~73n3tD}TXZ9v10w((Py+@4 From 1b001c188c49f0a28d2cce50a194bdbbd70dddf9 Mon Sep 17 00:00:00 2001 From: k4rth33k <39260345+k4rth33k@users.noreply.github.com> Date: Wed, 10 Jun 2020 23:14:38 +0530 Subject: [PATCH 07/10] Treat all errors as warnings --- .github/workflows/pythonpackage.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 5bc4b64..ae8c93b 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -34,7 +34,7 @@ jobs: run: | pip install flake8 # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --exclude=devTools,tests + # flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics --exclude=devTools,tests # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics --exclude=devTools,tests - name: Test with pytest From 9d201f22be0a8798f6e0503bd8dc35fd738f8d49 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 23:22:40 +0530 Subject: [PATCH 08/10] Fixed f string (CI) --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes pyMR/__init__.py | 1 - pyMR/chunk.py | 3 +-- pyMR/main.py | 5 +---- setup.py | 23 ++++++++++----------- 5 files changed, 13 insertions(+), 19 deletions(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index bfa4d2b3920f36ec6884bb037b5892aba003f44b..67ca78bce71de18345db9be7cec5603016cd283b 100644 GIT binary patch delta 14 VcmX@cbc~7Bn3tF9(neNCMgShh1GfMG delta 14 VcmX@cbc~7Bn3tDJdLyeNBLEvS0{{R3 diff --git a/pyMR/__init__.py b/pyMR/__init__.py index f194699..fa5d4b9 100644 --- a/pyMR/__init__.py +++ b/pyMR/__init__.py @@ -1,4 +1,3 @@ from .main import Master -from .main import Worker from .chunk import Chunks from .utils import Queue diff --git a/pyMR/chunk.py b/pyMR/chunk.py index 9790b46..939c862 100644 --- a/pyMR/chunk.py +++ b/pyMR/chunk.py @@ -1,7 +1,6 @@ # root/pyMR/chunk.py from collections import Iterable -import copy from itertools import tee @@ -26,7 +25,7 @@ def __init__(self, data, num_chunks, overflow='last'): try: self.len_ = len(data) - except: + except TypeError: self.len_ = sum([1 for _ in self.len_data]) print(self.len_) diff --git a/pyMR/main.py b/pyMR/main.py index 5541819..8ee3fe1 100644 --- a/pyMR/main.py +++ b/pyMR/main.py @@ -74,7 +74,6 @@ def start(self): data_empty = False converted = False - verif = None # Initialize worker objects self.__workers = [ @@ -88,7 +87,6 @@ def start(self): while not self.__verify(): count = 0 - # self.queue.dequeue() if not data_empty: # Setting up map workers @@ -97,7 +95,6 @@ def start(self): worker.set_data(next(self.data_gen)) count += 1 except StopIteration: - # print('Exception') data_empty = True if data_empty and not converted: @@ -139,7 +136,7 @@ def result(self): return self.validate def __str__(self): - return f'' + return ''.format(id(self)) def __repr__(self): return str(self) diff --git a/setup.py b/setup.py index 2f646b5..941094a 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,14 @@ # root/setup.py from setuptools import setup -setup( - name='pyMR', - version='0.1', - description= - 'A python package to convert CPU-bound tasks to parallel mapReduce jobs.', - url='https://github.com/k4rth33k/pyMR', - author='k4rth33k', - author_email='kartheek2000mike@gmail.com', - license='MIT', - python_requires='>=3.2S', - packages=['pyMR'], - zip_safe=False) +setup(name='pyMR', + version='0.1', + description='A python package to convert CPU-bound tasks ' + + 'to parallel mapReduce jobs.', + url='https://github.com/k4rth33k/pyMR', + author='k4rth33k', + author_email='kartheek2000mike@gmail.com', + license='MIT', + python_requires='>=3.2S', + packages=['pyMR'], + zip_safe=False) From caedb8e32d6267b4f0b84a4d1288a6705d5802e6 Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 23:26:41 +0530 Subject: [PATCH 09/10] Removed f strings --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes pyMR/chunk.py | 1 - pyMR/main.py | 10 ++-------- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 67ca78bce71de18345db9be7cec5603016cd283b..847254ba4ad9c07cebe81d98406d63739061722a 100644 GIT binary patch delta 15 WcmX@cbc~73n3tF9s_aHK2SxxQGy~WG delta 15 WcmX@cbc~73n3tF9lK4h82SxxQ1Ov|i diff --git a/pyMR/chunk.py b/pyMR/chunk.py index 939c862..53cdcdb 100644 --- a/pyMR/chunk.py +++ b/pyMR/chunk.py @@ -28,7 +28,6 @@ def __init__(self, data, num_chunks, overflow='last'): except TypeError: self.len_ = sum([1 for _ in self.len_data]) - print(self.len_) if self.len_ < self.num_chunks: raise SizeException("len(data) should be more than num_chunks") diff --git a/pyMR/main.py b/pyMR/main.py index 8ee3fe1..7b4cd11 100644 --- a/pyMR/main.py +++ b/pyMR/main.py @@ -167,8 +167,8 @@ def set_data(self, data): def run(self): - print(f'{self.type} Worker {self.id} is working' * self.verbose, - end='\r' * self.verbose) + print('{} Worker {} is working' * self.verbose, + end='\r' * self.verbose).format(self.type, self.id) if self.type == 'MAP': result = self.job(self.data) @@ -176,9 +176,3 @@ def run(self): result = self.job(self.data.pop(), self.data.pop()) return result - - def __str__(self): - return f'' - - def __repr__(self): - return str(self) From 7413ecff6b1ca6c01b82e31bda1541446d9f3bbf Mon Sep 17 00:00:00 2001 From: KARTHEEK Date: Wed, 10 Jun 2020 23:29:07 +0530 Subject: [PATCH 10/10] Minor print change --- devTools/__pycache__/format.cpython-36.pyc | Bin 326 -> 326 bytes pyMR/main.py | 5 +++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/devTools/__pycache__/format.cpython-36.pyc b/devTools/__pycache__/format.cpython-36.pyc index 847254ba4ad9c07cebe81d98406d63739061722a..711c221d2a31292ecde29f2ca5e3ad6fc40bea0b 100644 GIT binary patch delta 15 WcmX@cbc~73n3tF9pxj0_2SxxP!2`tr delta 15 WcmX@cbc~73n3tF9s_aHK2SxxQGy~WG diff --git a/pyMR/main.py b/pyMR/main.py index 7b4cd11..bf9e6f0 100644 --- a/pyMR/main.py +++ b/pyMR/main.py @@ -167,8 +167,9 @@ def set_data(self, data): def run(self): - print('{} Worker {} is working' * self.verbose, - end='\r' * self.verbose).format(self.type, self.id) + print(('{} Worker {} is working'.format(self.type, self.id)) * + self.verbose, + end='\r' * self.verbose) if self.type == 'MAP': result = self.job(self.data)